이 글은 비동기 프록시 크롤러와 프록시 풀을 구현하기 위한 Python 관련 지식을 주로 소개합니다. 참고할 만한 가치가 매우 높습니다.
python asyncio를 사용하여 비동기 프록시 풀을 구현하고 규칙에 따라 프록시 웹사이트에서 무료 프록시를 크롤링하고 유효성을 확인한 후 redis에 저장했습니다. 정기적으로 프록시 수를 늘리고 프록시의 유효성을 확인합니다. 풀에서 잘못된 프록시를 제거하세요. 동시에 aiohttp를 사용하여 서버를 구현하고, 다른 프로그램은 해당 URL에 액세스하여 프록시 풀에서 프록시를 얻을 수 있습니다.
소스코드
https://github.com/arrti/proxypool
환경
Python 3.5+
Redis
PhantomJS(선택 사항)
Supervisord(선택)
코드는 asyncio의 async 및 Wait 구문을 광범위하게 사용하기 때문에 Python 3.5에서만 제공되므로 사용할 수 없습니다. Python 3.5 이상을 사용하는 것이 가장 좋습니다. 저는 Python 3.6을 사용하고 있습니다.
redis
aiohttp
1. 크롤러 부분
핵심 코드
async def start(self): for rule in self._rules: parser = asyncio.ensure_future(self._parse_page(rule)) # 根据规则解析页面来获取代理 logger.debug('{0} crawler started'.format(rule.rule_name)) if not rule.use_phantomjs: await page_download(ProxyCrawler._url_generator(rule), self._pages, self._stop_flag) # 爬取代理网站的页面 else: await page_download_phantomjs(ProxyCrawler._url_generator(rule), self._pages, rule.phantomjs_load_flag, self._stop_flag) # 使用PhantomJS爬取 await self._pages.join() parser.cancel() logger.debug('{0} crawler finished'.format(rule.rule_name))위의 핵심 코드는 실제로 A입니다. asyncio.Queue를 사용하여 구현된 producer-consumer
import asyncio from random import random async def produce(queue, n): for x in range(1, n + 1): print('produce ', x) await asyncio.sleep(random()) await queue.put(x) # 向queue中放入item async def consume(queue): while 1: item = await queue.get() # 等待从queue中获取item print('consume ', item) await asyncio.sleep(random()) queue.task_done() # 通知queue当前item处理完毕 async def run(n): queue = asyncio.Queue() consumer = asyncio.ensure_future(consume(queue)) await produce(queue, n) # 等待生产者结束 await queue.join() # 阻塞直到queue不为空 consumer.cancel() # 取消消费者任务,否则它会一直阻塞在get方法处 def aio_queue_run(n): loop = asyncio.get_event_loop() try: loop.run_until_complete(run(n)) # 持续运行event loop直到任务run(n)结束 finally: loop.close() if name == 'main': aio_queue_run(5)위 코드를 실행하면 가능한 출력은 다음과 같습니다.
produce 1 produce 2 consume 1 produce 3 produce 4 consume 2 produce 5 consume 3 consume 4 consume 5
페이지 크롤링
async def page_download(urls, pages, flag): url_generator = urls async with aiohttp.ClientSession() as session: for url in url_generator: if flag.is_set(): break await asyncio.sleep(uniform(delay - 0.5, delay + 1)) logger.debug('crawling proxy web page {0}'.format(url)) try: async with session.get(url, headers=headers, timeout=10) as response: page = await response.text() parsed = html.fromstring(decode_html(page)) # 使用bs4来辅助lxml解码网页:http://lxml.de/elementsoup.html#Using only the encoding detection await pages.put(parsed) url_generator.send(parsed) # 根据当前页面来获取下一页的地址 except StopIteration: break except asyncio.TimeoutError: logger.error('crawling {0} timeout'.format(url)) continue # TODO: use a proxy except Exception as e: logger.error(e)aiohttp를 사용하여 웹 크롤링
프록시 구문 분석
Chrome을 사용하는 경우xpath를 사용하여 프록시를 구문 분석하는 것이 가장 쉽습니다. 브라우저에서 마우스 오른쪽 버튼을 클릭하여 선택한 페이지 요소의 xpath를 직접 가져올 수 있습니다.
실행하려면 Chrome 확장 프로그램인 "XPath Helper"를 설치하세요. 페이지에서 직접 xpath를 디버깅하는 것이 매우 편리합니다. BeautifulSoup은 xpath를 지원하지 않으며 lxml을 사용하여 페이지를 구문 분석합니다.
async def _parse_proxy(self, rule, page): ips = page.xpath(rule.ip_xpath) # 根据xpath解析得到list类型的ip地址集合 ports = page.xpath(rule.port_xpath) # 根据xpath解析得到list类型的ip地址集合 if not ips or not ports: logger.warning('{2} crawler could not get ip(len={0}) or port(len={1}), please check the xpaths or network'. format(len(ips), len(ports), rule.rule_name)) return proxies = map(lambda x, y: '{0}:{1}'.format(x.text.strip(), y.text.strip()), ips, ports) if rule.filters: # 根据过滤字段来过滤代理,如“高匿”、“透明”等 filters = [] for i, ft in enumerate(rule.filters_xpath): field = page.xpath(ft) if not field: logger.warning('{1} crawler could not get {0} field, please check the filter xpath'. format(rule.filters[i], rule.rule_name)) continue filters.append(map(lambda x: x.text.strip(), field)) filters = zip(*filters) selector = map(lambda x: x == rule.filters, filters) proxies = compress(proxies, selector) for proxy in proxies: await self._proxies.put(proxy) # 解析后的代理放入asyncio.Queue中
크롤러 규칙
웹사이트 크롤링, 프록시 구문 분석, 필터링 및 기타 작업에 대한 규칙은 각 프록시 웹사이트의 규칙 클래스와 메타클래스에 의해 정의됩니다. 기본 클래스는 규칙 클래스를 관리하는 데 사용됩니다. 기본 클래스는 다음과 같이 정의됩니다.class CrawlerRuleBase(object, metaclass=CrawlerRuleMeta): start_url = None page_count = 0 urls_format = None next_page_xpath = None next_page_host = '' use_phantomjs = False phantomjs_load_flag = None filters = () ip_xpath = None port_xpath = None filters_xpath = ()각 매개변수의 의미는
(필수)start_url
(필수)ip_xpath
(필수)port_xpath
page_count
urls_format
문자열, urls_format.format(start_url, n)을 통해 페이지 n의 주소를 생성하는 것이 더 일반적입니다. 페이지 주소 형식 .
, next_page_xpath
next_page_host
由xpath规则来获取下一页的url(常见的是相对路径),结合host得到下一页的地址:next_page_host + url。
use_phantomjs
, phantomjs_load_flag
use_phantomjs用于标识爬取该网站是否需要使用PhantomJS,若使用,需定义phantomjs_load_flag(网页上的某个元素,str类型)作为PhantomJS页面加载完毕的标志。
filters
过滤字段集合,可迭代类型。用于过滤代理。
爬取各个过滤字段的xpath规则,与过滤字段按顺序一一对应。
元类CrawlerRuleMeta用于管理规则类的定义,如:如果定义use_phantomjs=True,则必须定义phantomjs_load_flag,否则会抛出异常,不在此赘述。
目前已经实现的规则有西刺代理、快代理、360代理、66代理和 秘密代理。新增规则类也很简单,通过继承CrawlerRuleBase来定义新的规则类YourRuleClass,放在proxypool/rules目录下,并在该目录下的init.py中添加from . import YourRuleClass(这样通过CrawlerRuleBase.subclasses()就可以获取全部的规则类了),重启正在运行的proxy pool即可应用新的规则。
2. 检验部分
免费的代理虽然多,但是可用的却不多,所以爬取到代理后需要对其进行检验,有效的代理才能放入代理池中,而代理也是有时效性的,还要定期对池中的代理进行检验,及时移除失效的代理。
这部分就很简单了,使用aiohttp通过代理来访问某个网站,若超时,则说明代理无效。
async def validate(self, proxies): logger.debug('validator started') while 1: proxy = await proxies.get() async with aiohttp.ClientSession() as session: try: real_proxy = 'http://' + proxy async with session.get(self.validate_url, proxy=real_proxy, timeout=validate_timeout) as resp: self._conn.put(proxy) except Exception as e: logger.error(e) proxies.task_done()
3. server部分
使用aiohttp实现了一个web server,启动后,访问http://host:port即可显示主页:
访问http://host:port/get来从代理池获取1个代理,如:'127.0.0.1:1080';
访问http://host:port/get/n来从代理池获取n个代理,如:"['127.0.0.1:1080', '127.0.0.1:443', '127.0.0.1:80']";
访问http://host:port/count来获取代理池的容量,如:'42'。
因为主页是一个静态的html页面,为避免每来一个访问主页的请求都要打开、读取以及关闭该html文件的开销,将其缓存到了redis中,通过html文件的修改时间来判断其是否被修改过,如果修改时间与redis缓存的修改时间不同,则认为html文件被修改了,则重新读取文件,并更新缓存,否则从redis中获取主页的内容。
返回代理是通过aiohttp.web.Response(text=ip.decode('utf-8'))
实现的,text要求str类型,而从redis中获取到的是bytes类型,需要进行转换。返回的多个代理,使用eval即可转换为list类型。
返回主页则不同,是通过aiohttp.web.Response(body=main_page_cache, content_type='text/html')
,这里body要求的是bytes类型,直接将从redis获取的缓存返回即可,conten_type='text/html'
必不可少,否则无法通过浏览器加载主页,而是会将主页下载下来——在运行官方文档中的示例代码的时候也要注意这点,那些示例代码基本上都没有设置content_type。
这部分不复杂,注意上面提到的几点,而关于主页使用的静态资源文件的路径,可以参考之前的博客《aiohttp之添加静态资源路径》。
4. 运行
将整个代理池的功能分成了3个独立的部分:
proxypool
定期检查代理池容量,若低于下限则启动代理爬虫并对代理检验,通过检验的爬虫放入代理池,达到规定的数量则停止爬虫。
proxyvalidator
用于定期检验代理池中的代理,移除失效代理。
proxyserver
启动server。
这3个独立的任务通过3个进程来运行,在Linux下可以使用supervisod来=管理这些进程,下面是supervisord的配置文件示例:
; supervisord.conf [unix_http_server] file=/tmp/supervisor.sock [inet_http_server] port=127.0.0.1:9001 [supervisord] logfile=/tmp/supervisord.log logfile_maxbytes=5MB logfile_backups=10 loglevel=debug pidfile=/tmp/supervisord.pid nodaemon=false minfds=1024 minprocs=200 [rpcinterface:supervisor] supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface [supervisorctl] serverurl=unix:///tmp/supervisor.sock [program:proxyPool] command=python /path/to/ProxyPool/run_proxypool.py redirect_stderr=true stdout_logfile=NONE [program:proxyValidator] command=python /path/to/ProxyPool/run_proxyvalidator.py redirect_stderr=true stdout_logfile=NONE [program:proxyServer] command=python /path/to/ProxyPool/run_proxyserver.py autostart=false redirect_stderr=true stdout_logfile=NONE
因为项目自身已经配置了日志,所以这里就不需要再用supervisord捕获stdout和stderr了。通过supervisord -c supervisord.conf启动supervisord,proxyPool和proxyServer则会随之自动启动,proxyServer需要手动启动,访问http://127.0.0.1:9001即可通过网页来管理这3个进程了:
supervisord의 공식 문서에는 현재(버전 3.3.1)는 python3을 지원하지 않는다고 나와 있는데, 사용 중에 문제를 발견하지 못했습니다. 이는 제가 사용하지 않았기 때문일 수도 있습니다. Supervisord의 복잡한 기능을 간단한 프로세스 상태 모니터링 및 시작-중지 도구로 취급하세요.
위 내용은 Python을 사용하여 비동기 프록시 크롤러 및 프록시 풀 메서드 구현의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!