ホームページ  >  記事  >  バックエンド開発  >  Python を使用して非同期プロキシ クローラーおよびプロキシ プール メソッドを実装する

Python を使用して非同期プロキシ クローラーおよびプロキシ プール メソッドを実装する

高洛峰
高洛峰オリジナル
2017-03-19 09:28:322602ブラウズ

この記事では主に Python非同期プロキシ クローラーとプロキシ プールの実装に関する知識を紹介します。非常に参考になります。エディターで見てみましょう

非同期プロキシ プールとクロールを実装するには、Python を使用します。ルールに従って、プロキシ Web サイトから無料のプロキシを取得し、有効性を確認した後、redis に保存します。プロキシの数を定期的に拡張し、プール内のプロキシの有効性を確認し、無効なプロキシを削除します。同時に、サーバーは aiohttp を使用して実装され、他のプログラムは対応する URL にアクセスすることでプロキシ プールからプロキシを取得できます。

ソースコード

https://github.com/arrti/proxypool

環境

  • Python 3.5+

  • Redis

  • PhantomJS(オプション)

  • Supervisord (オプション)

asyncio の async および await 構文はコード内で広範囲に使用されているため、これらは Python3.5 でのみ提供されているため、Python3.5 以降のバージョンを使用するのが最善です。私は Python3 を使用しています。 .6。依存関係

selenium

    Selenium パッケージは主に PhantomJS を操作するために使用されます。
  • コードについては以下で説明します。

  • 1. クローラー部分

  • コアコード

  • async def start(self):
     for rule in self._rules:
     parser = asyncio.ensure_future(self._parse_page(rule)) # 根据规则解析页面来获取代理
     logger.debug('{0} crawler started'.format(rule.rule_name))
     if not rule.use_phantomjs:
      await page_download(ProxyCrawler._url_generator(rule), self._pages, self._stop_flag) # 爬取代理网站的页面
     else:
      await page_download_phantomjs(ProxyCrawler._url_generator(rule), self._pages,
    rule.phantomjs_load_flag, self._stop_flag) # 使用PhantomJS爬取
     await self._pages.join()
     parser.cancel()
     logger.debug('{0} crawler finished'.format(rule.rule_name))

    上記のコアコードは、実際には、asyncio.Queueを使用して実装されたプロダクション/コンシューマモデル

    です。以下は、このモデルの簡単な実装です。上記のコードを実行すると、次のような出力が得られます。
import asyncio
from random import random
async def produce(queue, n):
 for x in range(1, n + 1):
 print('produce ', x)
 await asyncio.sleep(random())
 await queue.put(x) # 向queue中放入item
async def consume(queue):
 while 1:
 item = await queue.get() # 等待从queue中获取item
 print('consume ', item)
 await asyncio.sleep(random())
 queue.task_done() # 通知queue当前item处理完毕 
async def run(n):
 queue = asyncio.Queue()
 consumer = asyncio.ensure_future(consume(queue))
 await produce(queue, n) # 等待生产者结束
 await queue.join() # 阻塞直到queue不为空
 consumer.cancel() # 取消消费者任务,否则它会一直阻塞在get方法处
def aio_queue_run(n):
 loop = asyncio.get_event_loop()
 try:
 loop.run_until_complete(run(n)) # 持续运行event loop直到任务run(n)结束
 finally:
 loop.close()
if name == 'main':
 aio_queue_run(5)
  • ページのクロール
  • produce 1
    produce 2
    consume 1
    produce 3
    produce 4
    consume 2
    produce 5
    consume 3
    consume 4
    consume 5
    aiohttp によって実装された Web クローリング
  • 関数 を使用すると、ほとんどのプロキシ Web サイトで上記の方法を使用できます。 js を使用してページを動的に生成し、Selenium を使用して PhantomJS のクロールを制御できます。このプロジェクトには、クローラーの効率に対する高い要件はありません。プロキシ Web サイトの更新頻度は制限されており、頻繁なクロールは必要ありません。 PhantomJS を完全に使用します。

    プロキシを解析する

    最も簡単な方法は、

    xpathを使用してプロキシを解析することです。Chrome ブラウザを使用している場合は、右クリックして選択したページ要素の xpath を直接取得できます。

    Chrome の拡張機能「XPath Helper」をインストールすると、ページ上で直接 xpath を実行およびデバッグできます。これは非常に便利です:

    BeautifulSoup は xpath をサポートせず、lxml を使用してページを解析します。 コードは次のとおりです。 :

    async def page_download(urls, pages, flag):
     url_generator = urls
     async with aiohttp.ClientSession() as session:
     for url in url_generator:
      if flag.is_set():
      break
      await asyncio.sleep(uniform(delay - 0.5, delay + 1))
      logger.debug('crawling proxy web page {0}'.format(url))
      try:
      async with session.get(url, headers=headers, timeout=10) as response:
       page = await response.text()
       parsed = html.fromstring(decode_html(page)) # 使用bs4来辅助lxml解码网页:http://lxml.de/elementsoup.html#Using only the encoding detection
       await pages.put(parsed)
       url_generator.send(parsed) # 根据当前页面来获取下一页的地址
      except StopIteration:
      break
      except asyncio.TimeoutError:
      logger.error('crawling {0} timeout'.format(url))
      continue # TODO: use a proxy
      except Exception as e:
      logger.error(e)
    クローラールール

    Web サイトのクローリング、プロキシ解析、フィルタリングおよびその他の操作のルールは、各プロキシ Web サイトのルール クラスによって定義され、ルール クラスの管理には基本クラスが使用されます。基本クラスは次のように定義されます:

    async def _parse_proxy(self, rule, page):
     ips = page.xpath(rule.ip_xpath) # 根据xpath解析得到list类型的ip地址集合
     ports = page.xpath(rule.port_xpath) # 根据xpath解析得到list类型的ip地址集合
     if not ips or not ports:
     logger.warning('{2} crawler could not get ip(len={0}) or port(len={1}), please check the xpaths or network'.
      format(len(ips), len(ports), rule.rule_name))
     return
     proxies = map(lambda x, y: '{0}:{1}'.format(x.text.strip(), y.text.strip()), ips, ports)
     if rule.filters: # 根据过滤字段来过滤代理,如“高匿”、“透明”等
     filters = []
     for i, ft in enumerate(rule.filters_xpath):
      field = page.xpath(ft)
      if not field:
      logger.warning('{1} crawler could not get {0} field, please check the filter xpath'.
       format(rule.filters[i], rule.rule_name))
      continue
      filters.append(map(lambda x: x.text.strip(), field))
     filters = zip(*filters)
     selector = map(lambda x: x == rule.filters, filters)
     proxies = compress(proxies, selector)
    for proxy in proxies:
    await self._proxies.put(proxy) # 解析后的代理放入asyncio.Queue中

    各パラメーターの意味は次のとおりです:

    start_url (必須) クローラーの開始ページ。

    ip_xpath(必須) IP をクロールするための

    xpath ルール。

    port_xpath(必須)

    ポート番号のクロール xpath ルール。

    Python を使用して非同期プロキシ クローラーおよびプロキシ プール メソッドを実装するpage_count

    クロールされたページの数。

    urls_formatPython を使用して非同期プロキシ クローラーおよびプロキシ プール メソッドを実装する

    urls_format によるページ アドレス 文字列

    の形式. format(start_url, n) は、一般的なページ アドレス形式であるページ n のアドレスを生成します。

    next_page_xpathnext_page_host

    由xpath规则来获取下一页的url(常见的是相对路径),结合host得到下一页的地址:next_page_host + url。

    use_phantomjs, phantomjs_load_flag

    use_phantomjs用于标识爬取该网站是否需要使用PhantomJS,若使用,需定义phantomjs_load_flag(网页上的某个元素,str类型)作为PhantomJS页面加载完毕的标志。

    filters

    过滤字段集合,可迭代类型。用于过滤代理。

    爬取各个过滤字段的xpath规则,与过滤字段按顺序一一对应。

    元类CrawlerRuleMeta用于管理规则类的定义,如:如果定义use_phantomjs=True,则必须定义phantomjs_load_flag,否则会抛出异常,不在此赘述。

    目前已经实现的规则有西刺代理、快代理、360代理、66代理和 秘密代理。新增规则类也很简单,通过继承CrawlerRuleBase来定义新的规则类YourRuleClass,放在proxypool/rules目录下,并在该目录下的init.py中添加from . import YourRuleClass(这样通过CrawlerRuleBase.subclasses()就可以获取全部的规则类了),重启正在运行的proxy pool即可应用新的规则。

    2. 检验部分

    免费的代理虽然多,但是可用的却不多,所以爬取到代理后需要对其进行检验,有效的代理才能放入代理池中,而代理也是有时效性的,还要定期对池中的代理进行检验,及时移除失效的代理。

    这部分就很简单了,使用aiohttp通过代理来访问某个网站,若超时,则说明代理无效。

    async def validate(self, proxies):
     logger.debug('validator started')
     while 1:
     proxy = await proxies.get()
     async with aiohttp.ClientSession() as session:
      try:
      real_proxy = 'http://' + proxy
      async with session.get(self.validate_url, proxy=real_proxy, timeout=validate_timeout) as resp:
       self._conn.put(proxy)
      except Exception as e:
      logger.error(e)
     proxies.task_done()

    3. server部分

    使用aiohttp实现了一个web server,启动后,访问http://host:port即可显示主页:

    Python を使用して非同期プロキシ クローラーおよびプロキシ プール メソッドを実装する

    • 访问http://host:port/get来从代理池获取1个代理,如:'127.0.0.1:1080';

    • 访问http://host:port/get/n来从代理池获取n个代理,如:"['127.0.0.1:1080', '127.0.0.1:443', '127.0.0.1:80']";

    • 访问http://host:port/count来获取代理池的容量,如:'42'。

    因为主页是一个静态的html页面,为避免每来一个访问主页的请求都要打开、读取以及关闭该html文件的开销,将其缓存到了redis中,通过html文件的修改时间来判断其是否被修改过,如果修改时间与redis缓存的修改时间不同,则认为html文件被修改了,则重新读取文件,并更新缓存,否则从redis中获取主页的内容。

    返回代理是通过aiohttp.web.Response(text=ip.decode('utf-8'))实现的,text要求str类型,而从redis中获取到的是bytes类型,需要进行转换。返回的多个代理,使用eval即可转换为list类型。

    返回主页则不同,是通过aiohttp.web.Response(body=main_page_cache, content_type='text/html') ,这里body要求的是bytes类型,直接将从redis获取的缓存返回即可,conten_type='text/html'必不可少,否则无法通过浏览器加载主页,而是会将主页下载下来——在运行官方文档中的示例代码的时候也要注意这点,那些示例代码基本上都没有设置content_type。

    这部分不复杂,注意上面提到的几点,而关于主页使用的静态资源文件的路径,可以参考之前的博客《aiohttp之添加静态资源路径》。

    4. 运行

    将整个代理池的功能分成了3个独立的部分:

    proxypool

    定期检查代理池容量,若低于下限则启动代理爬虫并对代理检验,通过检验的爬虫放入代理池,达到规定的数量则停止爬虫。

    proxyvalidator

    用于定期检验代理池中的代理,移除失效代理。

    proxyserver

    启动server。

    这3个独立的任务通过3个进程来运行,在Linux下可以使用supervisod来=管理这些进程,下面是supervisord的配置文件示例:

    ; supervisord.conf
    [unix_http_server]
    file=/tmp/supervisor.sock 
    
    [inet_http_server]  
    port=127.0.0.1:9001 
    
    [supervisord]
    logfile=/tmp/supervisord.log 
    logfile_maxbytes=5MB 
    logfile_backups=10  
    loglevel=debug  
    pidfile=/tmp/supervisord.pid 
    nodaemon=false  
    minfds=1024   
    minprocs=200   
    
    [rpcinterface:supervisor]
    supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
    
    [supervisorctl]
    serverurl=unix:///tmp/supervisor.sock
    
    [program:proxyPool]
    command=python /path/to/ProxyPool/run_proxypool.py  
    redirect_stderr=true
    stdout_logfile=NONE
    
    [program:proxyValidator]
    command=python /path/to/ProxyPool/run_proxyvalidator.py
    redirect_stderr=true  
    stdout_logfile=NONE
    
    [program:proxyServer]
    command=python /path/to/ProxyPool/run_proxyserver.py
    autostart=false
    redirect_stderr=true  
    stdout_logfile=NONE

    因为项目自身已经配置了日志,所以这里就不需要再用supervisord捕获stdout和stderr了。通过supervisord -c supervisord.conf启动supervisord,proxyPool和proxyServer则会随之自动启动,proxyServer需要手动启动,访问http://127.0.0.1:9001即可通过网页来管理这3个进程了:

    Python を使用して非同期プロキシ クローラーおよびプロキシ プール メソッドを実装する

    Supervisod の公式ドキュメントには、現在 (バージョン 3.3.1) は python3 をサポートしていないと書かれていますが、使用中に問題は見つかりませんでした。 Supervisord の複雑な機能を使用していないこともあるかもしれませんが、これは、単純なプロセスステータスの監視と開始および停止ツールと見なされます。

    以上がPython を使用して非同期プロキシ クローラーおよびプロキシ プール メソッドを実装するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

    声明:
    この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。