Home >Operation and Maintenance >Safety >How to use sqlmapapi to initiate a scan

How to use sqlmapapi to initiate a scan

WBOY
WBOYforward
2023-05-20 23:13:431263browse

sqlmap can be said to be an artifact of SQL injection detection, but the efficiency of using sqlmap to test SQL injection is very low, and each URL needs to be tested manually. The developers of sqlmap have added sqlmapapi.py, which can be operated directly through interface calls, simplifying the execution of sqlmap commands.

sqlmap api is divided into server and client. sqlmap api has two modes, one is the interface mode based on HTTP protocol, and the other is the interface mode based on command line.

sqlmap source code download address: https://github.com/sqlmapproject/sqlmap/

1. View help

python sqlmapapi.py -hHow to use sqlmapapi to initiate a scan

2. Start the api server

Before using the API service, you need to start the API server first, whether it is based on the HTTP protocol or the command line-based interface mode. To start the API server, just run the following command: python sqlmapapi.py -s

After the command is successful, some information will be returned in the command line. The following command roughly means that the api server runs on the local port 8775, and the admin token is c6bbb0c1f86b7d7bc2ed6ce3e3bbdcb5, etc.

How to use sqlmapapi to initiate a scan However, there is a disadvantage in opening the api server in the above way. When the server and the client If the client is not a host, it will not be connected, so if you want to solve this problem, you can open the api server by entering the following command: python sqlmapapi.py -s -H "0.0.0.0" -p 8775

After the command is successful, the remote client can connect to the API server by specifying the remote host IP and port.

3. Command line-based interface mode

3.1. Open the client and initiate the injection command

python sqlmapapi.py -c

If the client and server are not on the same computer, enter the following command:

python sqlmapapi.py -c -H "192.168.1.101" -p 8775

3.2. Help command, get all commands

help          显示帮助信息
new ARGS      开启一个新的扫描任务
use TASKID    切换
taskid data   获取当前任务返回的数据
log           获取当前任务的扫描日志
status        获取当前任务的扫描状态
option OPTION 获取当前任务的选项
options       获取当前任务的所有配置信息
stop          停止当前任务 
kill          杀死当前任务
list          显示所有任务列表
flush         清空所有任务
exit          退出客户端

How to use sqlmapapi to initiate a scan

3.3.Detect injection

3.3.1.new command

new -u "url"

Example: new -u "http://www.baidu.com"

Although we only specify -u parameter, but it can be seen from the returned information that after entering the new command, first requested /task/new to create a new taskid, and then initiated a request to start the task, so it can be found that the The essence of the mode is also based on the HTTP protocol.

How to use sqlmapapi to initiate a scan

3.3.2. status command

Get the scanning status of the task. If the status field in the returned content is terminated, It means that the scan is completed. If the status field in the returned content is run, it means that the scan is still in progress. The following picture is a screenshot of the scan completed:

How to use sqlmapapi to initiate a scan3.3.3. data command

If the data field in the returned data is not empty, you can get the injection has been successful. This example shows a return content containing SQL injection, which contains information such as database type, payload, and injection parameters.

How to use sqlmapapi to initiate a scan

4. Interface mode based on HTTP protocol

Briefly introduce the main functions of h based on http interface calling mode of sqlmapapi.py, enter lib/ The server class of utils/api.py can be found to interact with the service by submitting data to the server. There are 3 types in total.

Users' methods User method

Admin function Management function

sqlmap core interact functions Core interactive function

Yes The types of submitted data are as follows:

4.1. User method

@get("/task/new")

@get("/task/new")
def task_new():
    """
    Create a new task
    """
    taskid = encodeHex(os.urandom(8), binary=False)
    remote_addr = request.remote_addr
 
    DataStore.tasks[taskid] = Task(taskid, remote_addr)
 
    logger.debug("Created new task: '%s'" % taskid)
    return jsonize({"success": True, "taskid": taskid})

@get ("/task/delete")

@get("/task/<taskid>/delete")
def task_delete(taskid):
    """
    Delete an existing task
    """
    if taskid in DataStore.tasks:
        DataStore.tasks.pop(taskid)
 
        logger.debug("(%s) Deleted task" % taskid)
        return jsonize({"success": True})
    else:
        response.status = 404
        logger.warning("[%s] Non-existing task ID provided to task_delete()" % taskid)
        return jsonize({"success": False, "message": "Non-existing task ID"})</taskid>

4.2, core interactive function

@get("/option/list")

@post("/ option/get")

@post("/option/set")

@post("/option/<taskid>/set")
def option_set(taskid):
    """
    Set value of option(s) for a certain task ID
    """
 
    if taskid not in DataStore.tasks:
        logger.warning("[%s] Invalid task ID provided to option_set()" % taskid)
        return jsonize({"success": False, "message": "Invalid task ID"})
 
    if request.json is None:
        logger.warning("[%s] Invalid JSON options provided to option_set()" % taskid)
        return jsonize({"success": False, "message": "Invalid JSON options"})
 
    for option, value in request.json.items():
        DataStore.tasks[taskid].set_option(option, value)
 
    logger.debug("(%s) Requested to set options" % taskid)
    return jsonize({"success": True})</taskid>

@post("/scan/start")

@post("/scan/<taskid>/start")
def scan_start(taskid):
    """
    Launch a scan
    """
 
    if taskid not in DataStore.tasks:
        logger.warning("[%s] Invalid task ID provided to scan_start()" % taskid)
        return jsonize({"success": False, "message": "Invalid task ID"})
 
    if request.json is None:
        logger.warning("[%s] Invalid JSON options provided to scan_start()" % taskid)
        return jsonize({"success": False, "message": "Invalid JSON options"})
 
    # Initialize sqlmap engine's options with user's provided options, if any
    for option, value in request.json.items():
        DataStore.tasks[taskid].set_option(option, value)
 
    # Launch sqlmap engine in a separate process
    DataStore.tasks[taskid].engine_start()
 
    logger.debug("(%s) Started scan" % taskid)
    return jsonize({"success": True, "engineid": DataStore.tasks[taskid].engine_get_id()})</taskid>

@get ("/scan/stop")

@get("/scan/<taskid>/stop")
def scan_stop(taskid):
    """
    Stop a scan
    """
 
    if (taskid not in DataStore.tasks or DataStore.tasks[taskid].engine_process() is None or DataStore.tasks[taskid].engine_has_terminated()):
        logger.warning("[%s] Invalid task ID provided to scan_stop()" % taskid)
        return jsonize({"success": False, "message": "Invalid task ID"})
 
    DataStore.tasks[taskid].engine_stop()
 
    logger.debug("(%s) Stopped scan" % taskid)
    return jsonize({"success": True})</taskid>

@get("/scan/kill")

@get("/scan/<taskid>/kill")
def scan_kill(taskid):
    """
    Kill a scan
    """
 
    if (taskid not in DataStore.tasks or DataStore.tasks[taskid].engine_process() is None or DataStore.tasks[taskid].engine_has_terminated()):
        logger.warning("[%s] Invalid task ID provided to scan_kill()" % taskid)
        return jsonize({"success": False, "message": "Invalid task ID"})
 
    DataStore.tasks[taskid].engine_kill()
 
    logger.debug("(%s) Killed scan" % taskid)
    return jsonize({"success": True})</taskid>

@get("/scan/status")

@get("/scan/<taskid>/status")
def scan_status(taskid):
    """
    Returns status of a scan
    """
 
    if taskid not in DataStore.tasks:
        logger.warning("[%s] Invalid task ID provided to scan_status()" % taskid)
        return jsonize({"success": False, "message": "Invalid task ID"})
 
    if DataStore.tasks[taskid].engine_process() is None:
        status = "not running"
    else:
        status = "terminated" if DataStore.tasks[taskid].engine_has_terminated() is True else "running"
 
    logger.debug("(%s) Retrieved scan status" % taskid)
    return jsonize({
        "success": True,
        "status": status,
        "returncode": DataStore.tasks[taskid].engine_get_returncode()
    })</taskid>

@ get("/scan/data")

@get("/scan/<taskid>/data")
def scan_data(taskid):
    """
    Retrieve the data of a scan
    """
 
    json_data_message = list()
    json_errors_message = list()
 
    if taskid not in DataStore.tasks:
        logger.warning("[%s] Invalid task ID provided to scan_data()" % taskid)
        return jsonize({"success": False, "message": "Invalid task ID"})
 
    # Read all data from the IPC database for the taskid
    for status, content_type, value in DataStore.current_db.execute("SELECT status, content_type, value FROM data WHERE taskid = ? ORDER BY id ASC", (taskid,)):
        json_data_message.append({"status": status, "type": content_type, "value": dejsonize(value)})
 
    # Read all error messages from the IPC database
    for error in DataStore.current_db.execute("SELECT error FROM errors WHERE taskid = ? ORDER BY id ASC", (taskid,)):
        json_errors_message.append(error)
 
    logger.debug("(%s) Retrieved scan data and error messages" % taskid)
    return jsonize({"success": True, "data": json_data_message, "error": json_errors_message})</taskid>

@get("/scan/log")

@get("/download/")

4.3. Management Function

@get("/admin/list")

@get("/admin/list")
@get("/admin/<token>/list")
def task_list(token=None):
    """
    Pull task list
    """
    tasks = {}
 
    for key in DataStore.tasks:
        if is_admin(token) or DataStore.tasks[key].remote_addr == request.remote_addr:
            tasks[key] = dejsonize(scan_status(key))["status"]
 
    logger.debug("(%s) Listed task pool (%s)" % (token, "admin" if is_admin(token) else request.remote_addr))
    return jsonize({"success": True, "tasks": tasks, "tasks_num": len(tasks)})</token>

@get("/admin//flush")

@get("/admin/flush")
@get("/admin/<token>/flush")
def task_flush(token=None):
    """
    Flush task spool (delete all tasks)
    """
 
    for key in list(DataStore.tasks):
        if is_admin(token) or DataStore.tasks[key].remote_addr == request.remote_addr:
            DataStore.tasks[key].engine_kill()
            del DataStore.tasks[key]
 
    logger.debug("(%s) Flushed task pool (%s)" % (token, "admin" if is_admin(token) else request.remote_addr))
    return jsonize({"success": True})</token>

Analyze and extract from the sqlmapapi.py file calling relationship. These operations can fully meet our testing needs, so they can be used for batch operations.

5. Use sqlmapapi to initiate scanning

sqlmapapi.py conveniently provides an http request entry, but when used, you can only get the final result of whether to inject or not. What exactly is initiated when each interface performs injection scanning? Such a request can be difficult to obtain with any number of requests. Let me share the flow chart recorded after combing the sqlmap source code. From the figure, you can locate the specific location of the payload-level request. If you want to know what kind of request was initiated and how many requests, you only need to add custom code here.

How to use sqlmapapi to initiate a scan

6. SQL injection automation implementation process

How to use sqlmapapi to initiate a scan

The above is the detailed content of How to use sqlmapapi to initiate a scan. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:yisu.com. If there is any infringement, please contact admin@php.cn delete