首頁  >  文章  >  後端開發  >  Python利用多執行緒爬取網頁資訊的功能

Python利用多執行緒爬取網頁資訊的功能

巴扎黑
巴扎黑原創
2017-08-09 11:03:411632瀏覽

這篇文章主要介紹了Python實現多線程抓取網頁功能,結合具體實例形式詳細分析了Python多線程編程的相關操作技巧與注意事項,並附帶demo實例給出了多線程抓取網頁的實作方法,需要的朋友可以參考下

本文實例講述了Python實作多執行緒抓取網頁功能。分享給大家供大家參考,具體如下:

最近,一直在做網路爬蟲相關的東西。 看了一下開源C++寫的larbin爬蟲,仔細閱讀了裡面的設計想法和一些關鍵技術的實作。

1、larbin的URL去重複使用的很有效率的bloom filter演算法;
2、DNS處理,使用的adns非同步的開源元件;
3、對於url佇列的處理,則是用部分快取到內存,部分寫入檔案的策略。
4、larbin對文件的相關操作做了很多工作
5、在larbin裡有連接池,透過創建套接字,向目標站點發送HTTP協議中GET方法,獲取內容,再解析header之類別的東西
6、大量描述字,透過poll方法進行I/O復用,很有效率
7、larbin可配置性很強
8、作者所使用的大量資料結構都是自己從最底層寫起的,基本上沒用STL之類的東西
......

還有很多,以後有時間在好好寫篇文章,總結下。

這兩天,用python寫了個多線程下載頁面的程序,對於I/O密集的應用而言,多線程顯然是個很好的解決方案。剛剛寫過的線程池,也剛好可以利用了。其實實用python爬取頁面非常簡單,有個urllib2的模組,使用起來很方便,基本上兩三行程式碼就可以搞定。雖然使用第三方模組,可以很方便的解決問題,但是對個人的技術累積而言沒有什麼好處,因為關鍵的演算法都是別人實現的,而不是你自己實現的,很多細節的東西,你根本就無法了解。 我們做技術的,不能一味的只是用別人寫好的模組或api,要自己動手實現,才能讓自己學習得更多。

我決定從socket寫起,也是去封裝GET協議,解析header,而且還可以把DNS的解析過程單獨處理,例如DNS緩存一下,所以這樣自己寫的話,可控性更強,更有利於擴展。對於timeout的處理,我用的全局的5秒鐘的超時處理,對於重定位(301or302)的處理是,最多重定位3次,因為之前測試過程中,發現很多站點的重定位又定位到自己,這樣就無限循環了,所以就設定了上限。具體原理,比較簡單,直接看程式碼就好了。

自己寫完之後,與urllib2進行了下效能對比,自己寫的效率還是比較高的,而且urllib2的錯誤率稍高一些,不知道為什麼。網路上有人說urllib2在多執行緒背景下有些小問題,具體我也不是特別清楚。

先貼程式碼:

fetchPage.py  使用Http協定的Get方法,進行頁面下載,並儲存為文件


'''
Created on 2012-3-13
Get Page using GET method
Default using HTTP Protocol , http port 80
@author: xiaojay
'''
import socket
import statistics
import datetime
import threading
socket.setdefaulttimeout(statistics.timeout)
class Error404(Exception):
  '''Can not find the page.'''
  pass
class ErrorOther(Exception):
  '''Some other exception'''
  def __init__(self,code):
    #print 'Code :',code
    pass
class ErrorTryTooManyTimes(Exception):
  '''try too many times'''
  pass
def downPage(hostname ,filename , trytimes=0):
  try :
    #To avoid too many tries .Try times can not be more than max_try_times
    if trytimes >= statistics.max_try_times :
      raise ErrorTryTooManyTimes
  except ErrorTryTooManyTimes :
    return statistics.RESULTTRYTOOMANY,hostname+filename
  try:
    s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    #DNS cache
    if statistics.DNSCache.has_key(hostname):
      addr = statistics.DNSCache[hostname]
    else:
      addr = socket.gethostbyname(hostname)
      statistics.DNSCache[hostname] = addr
    #connect to http server ,default port 80
    s.connect((addr,80))
    msg = 'GET '+filename+' HTTP/1.0\r\n'
    msg += 'Host: '+hostname+'\r\n'
    msg += 'User-Agent:xiaojay\r\n\r\n'
    code = ''
    f = None
    s.sendall(msg)
    first = True
    while True:
      msg = s.recv(40960)
      if not len(msg):
        if f!=None:
          f.flush()
          f.close()
        break
      # Head information must be in the first recv buffer
      if first:
        first = False
        headpos = msg.index("\r\n\r\n")
        code,other = dealwithHead(msg[:headpos])
        if code=='200':
          #statistics.fetched_url += 1
          f = open('pages/'+str(abs(hash(hostname+filename))),'w')
          f.writelines(msg[headpos+4:])
        elif code=='301' or code=='302':
          #if code is 301 or 302 , try down again using redirect location
          if other.startswith("http") :
            hname, fname = parse(other)
            downPage(hname,fname,trytimes+1)#try again
          else :
            downPage(hostname,other,trytimes+1)
        elif code=='404':
          raise Error404
        else :
          raise ErrorOther(code)
      else:
        if f!=None :f.writelines(msg)
    s.shutdown(socket.SHUT_RDWR)
    s.close()
    return statistics.RESULTFETCHED,hostname+filename
  except Error404 :
    return statistics.RESULTCANNOTFIND,hostname+filename
  except ErrorOther:
    return statistics.RESULTOTHER,hostname+filename
  except socket.timeout:
    return statistics.RESULTTIMEOUT,hostname+filename
  except Exception, e:
    return statistics.RESULTOTHER,hostname+filename
def dealwithHead(head):
  '''deal with HTTP HEAD'''
  lines = head.splitlines()
  fstline = lines[0]
  code =fstline.split()[1]
  if code == '404' : return (code,None)
  if code == '200' : return (code,None)
  if code == '301' or code == '302' :
    for line in lines[1:]:
      p = line.index(':')
      key = line[:p]
      if key=='Location' :
        return (code,line[p+2:])
  return (code,None)
def parse(url):
  '''Parse a url to hostname+filename'''
  try:
    u = url.strip().strip('\n').strip('\r').strip('\t')
    if u.startswith('http://') :
      u = u[7:]
    elif u.startswith('https://'):
      u = u[8:]
    if u.find(':80')>0 :
      p = u.index(':80')
      p2 = p + 3
    else:
      if u.find('/')>0:
        p = u.index('/')
        p2 = p
      else:
        p = len(u)
        p2 = -1
    hostname = u[:p]
    if p2>0 :
      filename = u[p2:]
    else : filename = '/'
    return hostname, filename
  except Exception ,e:
    print "Parse wrong : " , url
    print e
def PrintDNSCache():
  '''print DNS dict'''
  n = 1
  for hostname in statistics.DNSCache.keys():
    print n,'\t',hostname, '\t',statistics.DNSCache[hostname]
    n+=1
def dealwithResult(res,url):
  '''Deal with the result of downPage'''
  statistics.total_url+=1
  if res==statistics.RESULTFETCHED :
    statistics.fetched_url+=1
    print statistics.total_url , '\t fetched :', url
  if res==statistics.RESULTCANNOTFIND :
    statistics.failed_url+=1
    print "Error 404 at : ", url
  if res==statistics.RESULTOTHER :
    statistics.other_url +=1
    print "Error Undefined at : ", url
  if res==statistics.RESULTTIMEOUT :
    statistics.timeout_url +=1
    print "Timeout ",url
  if res==statistics.RESULTTRYTOOMANY:
    statistics.trytoomany_url+=1
    print e ,"Try too many times at", url
if __name__=='__main__':
  print 'Get Page using GET method'

#下面,我將利用上一篇的線程池作為輔助,實現多線程下的並行爬取,並用上面自己寫的下載頁面的方法和urllib2進行一下效能比較。


'''
Created on 2012-3-16
@author: xiaojay
'''
import fetchPage
import threadpool
import datetime
import statistics
import urllib2
'''one thread'''
def usingOneThread(limit):
  urlset = open("input.txt","r")
  start = datetime.datetime.now()
  for u in urlset:
    if limit <= 0 : break
    limit-=1
    hostname , filename = parse(u)
    res= fetchPage.downPage(hostname,filename,0)
    fetchPage.dealwithResult(res)
  end = datetime.datetime.now()
  print "Start at :\t" , start
  print "End at :\t" , end
  print "Total Cost :\t" , end - start
  print &#39;Total fetched :&#39;, statistics.fetched_url
&#39;&#39;&#39;threadpoll and GET method&#39;&#39;&#39;
def callbackfunc(request,result):
  fetchPage.dealwithResult(result[0],result[1])
def usingThreadpool(limit,num_thread):
  urlset = open("input.txt","r")
  start = datetime.datetime.now()
  main = threadpool.ThreadPool(num_thread)
  for url in urlset :
    try :
      hostname , filename = fetchPage.parse(url)
      req = threadpool.WorkRequest(fetchPage.downPage,args=[hostname,filename],kwds={},callback=callbackfunc)
      main.putRequest(req)
    except Exception:
      print Exception.message
  while True:
    try:
      main.poll()
      if statistics.total_url >= limit : break
    except threadpool.NoResultsPending:
      print "no pending results"
      break
    except Exception ,e:
      print e
  end = datetime.datetime.now()
  print "Start at :\t" , start
  print "End at :\t" , end
  print "Total Cost :\t" , end - start
  print &#39;Total url :&#39;,statistics.total_url
  print &#39;Total fetched :&#39;, statistics.fetched_url
  print &#39;Lost url :&#39;, statistics.total_url - statistics.fetched_url
  print &#39;Error 404 :&#39; ,statistics.failed_url
  print &#39;Error timeout :&#39;,statistics.timeout_url
  print &#39;Error Try too many times &#39; ,statistics.trytoomany_url
  print &#39;Error Other faults &#39;,statistics.other_url
  main.stop()
&#39;&#39;&#39;threadpool and urllib2 &#39;&#39;&#39;
def downPageUsingUrlib2(url):
  try:
    req = urllib2.Request(url)
    fd = urllib2.urlopen(req)
    f = open("pages3/"+str(abs(hash(url))),&#39;w&#39;)
    f.write(fd.read())
    f.flush()
    f.close()
    return url ,&#39;success&#39;
  except Exception:
    return url , None
def writeFile(request,result):
  statistics.total_url += 1
  if result[1]!=None :
    statistics.fetched_url += 1
    print statistics.total_url,&#39;\tfetched :&#39;, result[0],
  else:
    statistics.failed_url += 1
    print statistics.total_url,&#39;\tLost :&#39;,result[0],
def usingThreadpoolUrllib2(limit,num_thread):
  urlset = open("input.txt","r")
  start = datetime.datetime.now()
  main = threadpool.ThreadPool(num_thread)
  for url in urlset :
    try :
      req = threadpool.WorkRequest(downPageUsingUrlib2,args=[url],kwds={},callback=writeFile)
      main.putRequest(req)
    except Exception ,e:
      print e
  while True:
    try:
      main.poll()
      if statistics.total_url >= limit : break
    except threadpool.NoResultsPending:
      print "no pending results"
      break
    except Exception ,e:
      print e
  end = datetime.datetime.now()
  print "Start at :\t" , start
  print "End at :\t" , end
  print "Total Cost :\t" , end - start
  print &#39;Total url :&#39;,statistics.total_url
  print &#39;Total fetched :&#39;, statistics.fetched_url
  print &#39;Lost url :&#39;, statistics.total_url - statistics.fetched_url
  main.stop()
if __name__ ==&#39;__main__&#39;:
  &#39;&#39;&#39;too slow&#39;&#39;&#39;
  #usingOneThread(100)
  &#39;&#39;&#39;use Get method&#39;&#39;&#39;
  #usingThreadpool(3000,50)
  &#39;&#39;&#39;use urllib2&#39;&#39;&#39;
  usingThreadpoolUrllib2(3000,50)

實驗分析:

實驗數據:larbin抓取下來的3000條url,經過Mercator隊列模型(我用c++實現的,以後有機會發個blog)處理後的url集合,具有隨機和代表性。使用50個執行緒的執行緒池。
實驗環境:ubuntu10.04,網路較好,python2.6
儲存:小文件,每個頁面,一個文件進行儲存
PS:由於學校上網是按流量收費的,做網路爬蟲,灰常費流量啊! ! !過幾天,可能會做個大規模url下載的實驗,用幾十萬的url試試。

實驗結果:

使用urllib2 ,usingThreadpoolUrllib2(3000,50)

Start at :    2012-03-16 22:18:20.956054
End at :    2012-03-16 22:22:15.203018
Total Cost :    0:03:54.246964
detchd url : 559

下載頁面的實體儲存大小:84088kb

#使用自己的getPageUsingGet ,usingThreadpool(3000,50)

Start at :    2012-03-16 22:#Start at :    2012-03-16 22:#Start at :    2012-03-16 22:#Start at :    2012-03-16 22:#Start at :   23:40.206730

End at :    2012-03-16 22:26:26.843563

Total Cost :    0:02:46.6368333##Tod url 518
Error 404 : 94
Error timeout : 312
Error Try too many times  0
Error Other faults  112

#下載頁的實體儲存大小:87168kb#

小結: 自己寫的下載頁面程序,效率還是很不錯的,而且遺失的頁面也較少。但其實自己考慮一下,還是有很多地方可以優化的,例如檔案太分散,過多的小檔案創建和釋放定會產生不小的效能開銷,而且程式裡用的是hash命名,也會產生很多的計算,如果有好的策略,其實這些開銷都是可以省略的。另外DNS,也可以不使用python自帶的DNS解析,因為預設的DNS解析都是同步的操作,而DNS解析一般比較耗時,可以採取多執行緒的非同步的方式進行,再加以適當的DNS快取很大程度上可以提高效率。不僅如此,在實際的頁面抓取過程中,會有大量的url ,不可能一次性把它們存入內存,而應該按照一定的策略或是算法進行合理的分配。 總之,採集頁面要做的東西以及可以優化的東西,還有很多很多。

以上是Python利用多執行緒爬取網頁資訊的功能的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn