本文主要介紹了Python實現非同步代理爬蟲及代理池的相關知識,具有很好的參考價值,下面跟著小編一起來看下吧
使用python asyncio實現了一個非同步代理池,根據規則爬取代理網站上的免費代理,在驗證其有效後存入redis中,定期擴展代理的數量並檢驗池中代理的有效性,移除失效的代理。同時用aiohttp實作了一個server,其他的程式可以透過存取對應的url來從代理池取得代理。
原始碼
Github
Redis
lxml
#requests##selenium
下面來說明程式碼。
1. 爬蟲部分
核心程式碼async def start(self):
for rule in self._rules:
parser = asyncio.ensure_future(self._parse_page(rule)) # 根据规则解析页面来获取代理
logger.debug('{0} crawler started'.format(rule.rule_name))
if not rule.use_phantomjs:
await page_download(ProxyCrawler._url_generator(rule), self._pages, self._stop_flag) # 爬取代理网站的页面
else:
await page_download_phantomjs(ProxyCrawler._url_generator(rule), self._pages,
rule.phantomjs_load_flag, self._stop_flag) # 使用PhantomJS爬取
await self._pages.join()
parser.cancel()
logger.debug('{0} crawler finished'.format(rule.rule_name))
上面的核心程式碼其實是一個用asyncio.Queue實現的生產-消費者
,以下是該模型的一個簡單實作:import asyncio
from random import random
async def produce(queue, n):
for x in range(1, n + 1):
print('produce ', x)
await asyncio.sleep(random())
await queue.put(x) # 向queue中放入item
async def consume(queue):
while 1:
item = await queue.get() # 等待从queue中获取item
print('consume ', item)
await asyncio.sleep(random())
queue.task_done() # 通知queue当前item处理完毕
async def run(n):
queue = asyncio.Queue()
consumer = asyncio.ensure_future(consume(queue))
await produce(queue, n) # 等待生产者结束
await queue.join() # 阻塞直到queue不为空
consumer.cancel() # 取消消费者任务,否则它会一直阻塞在get方法处
def aio_queue_run(n):
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(run(n)) # 持续运行event loop直到任务run(n)结束
finally:
loop.close()
if name == 'main':
aio_queue_run(5)
運行上面的程式碼,一個可能的輸出如下: produce 1
produce 2
consume 1
produce 3
produce 4
consume 2
produce 5
consume 3
consume 4
consume 5
爬取頁面
async def page_download(urls, pages, flag): url_generator = urls async with aiohttp.ClientSession() as session: for url in url_generator: if flag.is_set(): break await asyncio.sleep(uniform(delay - 0.5, delay + 1)) logger.debug('crawling proxy web page {0}'.format(url)) try: async with session.get(url, headers=headers, timeout=10) as response: page = await response.text() parsed = html.fromstring(decode_html(page)) # 使用bs4来辅助lxml解码网页:http://lxml.de/elementsoup.html#Using only the encoding detection await pages.put(parsed) url_generator.send(parsed) # 根据当前页面来获取下一页的地址 except StopIteration: break except asyncio.TimeoutError: logger.error('crawling {0} timeout'.format(url)) continue # TODO: use a proxy except Exception as e: logger.error(e)使用aiohttp實作的網頁爬取
函數,大部分代理網站都可以使用上面的方法來爬取,對於使用js動態產生頁面的網站可以使用selenium控制PhantomJS來爬取-本專案對爬蟲的效率要求不高,代理網站的更新頻率是有限的,不需要頻繁的爬取,完全可以使用PhantomJS。
解析代理
最簡單的莫過於用xpath來解析代理程式了,使用Chrome瀏覽器的話,直接透過右鍵就能獲得選取的頁面元素的xpath:
安裝
Chrome的擴充功能「XPath Helper」就可以直接在頁面上運行和調試
xpath,十分方便:
BeautifulSoup不支援xpath,使用lxml來解析頁面,程式碼如下:
async def _parse_proxy(self, rule, page): ips = page.xpath(rule.ip_xpath) # 根据xpath解析得到list类型的ip地址集合 ports = page.xpath(rule.port_xpath) # 根据xpath解析得到list类型的ip地址集合 if not ips or not ports: logger.warning('{2} crawler could not get ip(len={0}) or port(len={1}), please check the xpaths or network'. format(len(ips), len(ports), rule.rule_name)) return proxies = map(lambda x, y: '{0}:{1}'.format(x.text.strip(), y.text.strip()), ips, ports) if rule.filters: # 根据过滤字段来过滤代理,如“高匿”、“透明”等 filters = [] for i, ft in enumerate(rule.filters_xpath): field = page.xpath(ft) if not field: logger.warning('{1} crawler could not get {0} field, please check the filter xpath'. format(rule.filters[i], rule.rule_name)) continue filters.append(map(lambda x: x.text.strip(), field)) filters = zip(*filters) selector = map(lambda x: x == rule.filters, filters) proxies = compress(proxies, selector) for proxy in proxies: await self._proxies.put(proxy) # 解析后的代理放入asyncio.Queue中
#爬蟲規則
網站爬取、代理解析、濾等等操作的規則都是由各個代理網站的規則類別定義的,使用元類別和基底類別來管理規則類別。基底類別定義如下:<pre class="brush:py;">class CrawlerRuleBase(object, metaclass=CrawlerRuleMeta):
start_url = None
page_count = 0
urls_format = None
next_page_xpath = None
next_page_host = &#39;&#39;
use_phantomjs = False
phantomjs_load_flag = None
filters = ()
ip_xpath = None
port_xpath = None
filters_xpath = ()</pre>
各個參數的意義如下:
(必要)爬蟲的起始頁。
(必要)爬取IP的xpath規則。
port_xpath(必要)
爬取連接埠號碼的xpath規則。 page_count
爬取的頁面數量。
urls_format
頁面位址的格式
,透過urls_format.format(start_url, n)來產生第n頁的位址,這是比較常見的頁面位址格式。
next_page_xpath
,
由xpath規則來取得下一頁的url(常見的是相對路徑),結合host得到下一頁頁的位址:next_page_host + url。
phantomjs_load_flag#########use_phantomjs用來識別爬取網站是否需要使用PhantomJS,若使用,需定義phantomjs_load_flag(網頁上的某個元素,str類型)作為PhantomJS頁面載入完畢的標誌。 #########filters#########過濾欄位集合,可迭代型別。用於過濾代理。 ######爬取各個過濾欄位的xpath規則,與過濾欄位依序一一對應。 ######元類別CrawlerRuleMeta用於管理規則類別的定義,如:如果定義use_phantomjs=True,則必須定義phantomjs_load_flag,否則會###拋出例外###,不在此贅述。 ###目前已经实现的规则有西刺代理、快代理、360代理、66代理和 秘密代理。新增规则类也很简单,通过继承CrawlerRuleBase来定义新的规则类YourRuleClass,放在proxypool/rules目录下,并在该目录下的init.py中添加from . import YourRuleClass(这样通过CrawlerRuleBase.subclasses()就可以获取全部的规则类了),重启正在运行的proxy pool即可应用新的规则。
2. 检验部分
免费的代理虽然多,但是可用的却不多,所以爬取到代理后需要对其进行检验,有效的代理才能放入代理池中,而代理也是有时效性的,还要定期对池中的代理进行检验,及时移除失效的代理。
这部分就很简单了,使用aiohttp通过代理来访问某个网站,若超时,则说明代理无效。
async def validate(self, proxies): logger.debug('validator started') while 1: proxy = await proxies.get() async with aiohttp.ClientSession() as session: try: real_proxy = 'http://' + proxy async with session.get(self.validate_url, proxy=real_proxy, timeout=validate_timeout) as resp: self._conn.put(proxy) except Exception as e: logger.error(e) proxies.task_done()
3. server部分
使用aiohttp实现了一个web server,启动后,访问http://host:port即可显示主页:
访问host:port/get来从代理池获取1个代理,如:'127.0.0.1:1080';
访问host:port/get/n来从代理池获取n个代理,如:"['127.0.0.1:1080', '127.0.0.1:443', '127.0.0.1:80']";
访问host:port/count来获取代理池的容量,如:'42'。
因为主页是一个静态的html页面,为避免每来一个访问主页的请求都要打开、读取以及关闭该html文件的开销,将其缓存到了redis中,通过html文件的修改时间来判断其是否被修改过,如果修改时间与redis缓存的修改时间不同,则认为html文件被修改了,则重新读取文件,并更新缓存,否则从redis中获取主页的内容。
返回代理是通过aiohttp.web.Response(text=ip.decode('utf-8'))
实现的,text要求str类型,而从redis中获取到的是bytes类型,需要进行转换。返回的多个代理,使用eval即可转换为list类型。
返回主页则不同,是通过aiohttp.web.Response(body=main_page_cache, content_type='text/html')
,这里body要求的是bytes类型,直接将从redis获取的缓存返回即可,conten_type='text/html'
必不可少,否则无法通过浏览器加载主页,而是会将主页下载下来——在运行官方文档中的示例代码的时候也要注意这点,那些示例代码基本上都没有设置content_type。
这部分不复杂,注意上面提到的几点,而关于主页使用的静态资源文件的路径,可以参考之前的博客《aiohttp之添加静态资源路径》。
4. 运行
将整个代理池的功能分成了3个独立的部分:
proxypool
定期检查代理池容量,若低于下限则启动代理爬虫并对代理检验,通过检验的爬虫放入代理池,达到规定的数量则停止爬虫。
proxyvalidator
用于定期检验代理池中的代理,移除失效代理。
proxyserver
启动server。
这3个独立的任务通过3个进程来运行,在Linux下可以使用supervisod来=管理这些进程,下面是supervisord的配置文件示例:
; supervisord.conf [unix_http_server] file=/tmp/supervisor.sock [inet_http_server] port=127.0.0.1:9001 [supervisord] logfile=/tmp/supervisord.log logfile_maxbytes=5MB logfile_backups=10 loglevel=debug pidfile=/tmp/supervisord.pid nodaemon=false minfds=1024 minprocs=200 [rpcinterface:supervisor] supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface [supervisorctl] serverurl=unix:///tmp/supervisor.sock [program:proxyPool] command=python /path/to/ProxyPool/run_proxypool.py redirect_stderr=true stdout_logfile=NONE [program:proxyValidator] command=python /path/to/ProxyPool/run_proxyvalidator.py redirect_stderr=true stdout_logfile=NONE [program:proxyServer] command=python /path/to/ProxyPool/run_proxyserver.py autostart=false redirect_stderr=true stdout_logfile=NONE
因为项目自身已经配置了日志,所以这里就不需要再用supervisord捕获stdout和stderr了。通过supervisord -c supervisord.conf启动supervisord,proxyPool和proxyServer则会随之自动启动,proxyServer需要手动启动,访问http://127.0.0.1:9001即可通过网页来管理这3个进程了:
supervisod的官方文档说目前(版本3.3.1)不支持python3,但是我在使用过程中没有发现什么问题,可能也是由于我并没有使用supervisord的复杂功能,只是把它当作了一个简单的进程状态监控和启停工具了。
【相关推荐】
1. Python免费视频教程
3. Python學習手冊
#以上是詳解非同步代理程式和代理程式池的python程式碼的詳細內容。更多資訊請關注PHP中文網其他相關文章!