>백엔드 개발 >파이썬 튜토리얼 >Python은 멀티스레딩을 사용하여 웹페이지 정보를 크롤링합니다.

Python은 멀티스레딩을 사용하여 웹페이지 정보를 크롤링합니다.

巴扎黑
巴扎黑원래의
2017-08-09 11:03:411714검색

이 글에서는 Python의 멀티스레드 웹 페이지 크롤링 기능을 주로 소개하며, 구체적인 예제를 바탕으로 Python 멀티스레드 프로그래밍의 관련 운영 기술과 주의 사항을 자세히 분석합니다. 또한 구현 방법을 제공하는 데모 예제도 함께 제공됩니다. 멀티 스레드 웹 페이지 크롤링 , 필요한 친구는 참고할 수 있습니다

이 기사에서는 Python의 멀티 스레드 웹 페이지 크롤링 기능 구현 예를 설명합니다. 참고하실 수 있도록 모든 사람과 공유하세요. 자세한 내용은 다음과 같습니다.

최근에 웹 크롤러와 관련된 일을 하고 있습니다. 오픈소스 C++로 작성된 라빈 크롤러를 살펴보고, 디자인 아이디어와 몇 가지 핵심 기술의 구현을 주의 깊게 읽었습니다.

1. Larbin의 URL 재사용 해제는 매우 효율적인 블룸 필터 알고리즘입니다.
2. DNS 처리는 adns 비동기 오픈 소스 구성 요소를 사용합니다.
3. URL 대기열 처리의 경우 일부는 파일에 저장됩니다. 수입 전략.
4. Larbin은 파일 관련 작업을 많이 수행했습니다. 5. larbin에는 연결 풀이 있습니다. 소켓을 생성하여 대상 사이트에 HTTP 프로토콜의 GET 메서드를 보내고 콘텐츠를 가져옵니다. 그런 다음 헤더 등을 구문 분석합니다.
6. 폴링 방법을 통해 I/O 다중화에 많은 수의 설명자가 사용됩니다. 이는 매우 효율적입니다. 7. Larbin은 매우 구성 가능합니다. 작가님이 밑바닥부터 직접 작성하셔서 기본적으로 쓸모가 없는 것들이 많아요.

지난 이틀 동안 저는 Python으로 멀티스레드 페이지 다운로드 프로그램을 작성했습니다. I/O 집약적인 애플리케이션의 경우 멀티스레딩이 확실히 좋은 솔루션입니다. 방금 작성한 스레드 풀도 사용할 수 있습니다. 실제로 Python을 사용하여 페이지를 크롤링하는 것은 매우 간단합니다. 사용하기 매우 편리하고 기본적으로 두세 줄의 코드로 수행할 수 있는 urllib2 모듈이 있습니다. 타사 모듈을 사용하면 문제를 매우 편리하게 해결할 수 있지만 핵심 알고리즘은 자신이 아닌 다른 사람이 구현하므로 세부 사항을 많이 알 수 없기 때문에 개인 기술 축적에는 도움이 되지 않습니다. 기술 전문가로서 우리는 다른 사람이 작성한 모듈이나 API를 사용할 수 없으며 더 많은 것을 배울 수 있도록 직접 구현해야 합니다.

저는 GET 프로토콜을 캡슐화하고 헤더를 구문 분석하는 소켓에서 시작하기로 결정했습니다. DNS 캐싱과 같은 DNS 구문 분석 프로세스도 별도로 처리할 수 있으므로 직접 작성하면 제어가 더 용이해지고 더 좋아질 것입니다. 확장에 도움이 됩니다. 타임아웃 처리의 경우 전역 5초 타임아웃 처리를 사용합니다. 재배치(301 또는 302) 처리의 경우 최대 재배치는 3회입니다. 이전 테스트 과정에서 많은 사이트의 재배치가 자신에게 리디렉션되는 것을 발견했기 때문입니다. 무한 루프이므로 상한이 설정됩니다. 구체적인 원리는 비교적 간단합니다. 코드를 살펴보세요.

작성을 마친 후 urllib2와 성능을 비교해보니 제가 직접 작성한 글의 효율성이 상대적으로 높았고, urllib2의 오류율이 조금 더 높았던 이유는 모르겠습니다. 인터넷상의 어떤 사람들은 urllib2가 멀티스레드 환경에서 약간의 사소한 문제가 있다고 말하지만, 나는 그 세부 사항에 대해 특별히 명확하지 않습니다.

먼저 코드 게시:

fetchPage.py Http 프로토콜의 Get 메서드를 사용하여 페이지를 다운로드하고 파일로 저장합니다.

'''
Created on 2012-3-13
Get Page using GET method
Default using HTTP Protocol , http port 80
@author: xiaojay
'''
import socket
import statistics
import datetime
import threading
socket.setdefaulttimeout(statistics.timeout)
class Error404(Exception):
  '''Can not find the page.'''
  pass
class ErrorOther(Exception):
  '''Some other exception'''
  def __init__(self,code):
    #print 'Code :',code
    pass
class ErrorTryTooManyTimes(Exception):
  '''try too many times'''
  pass
def downPage(hostname ,filename , trytimes=0):
  try :
    #To avoid too many tries .Try times can not be more than max_try_times
    if trytimes >= statistics.max_try_times :
      raise ErrorTryTooManyTimes
  except ErrorTryTooManyTimes :
    return statistics.RESULTTRYTOOMANY,hostname+filename
  try:
    s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    #DNS cache
    if statistics.DNSCache.has_key(hostname):
      addr = statistics.DNSCache[hostname]
    else:
      addr = socket.gethostbyname(hostname)
      statistics.DNSCache[hostname] = addr
    #connect to http server ,default port 80
    s.connect((addr,80))
    msg = 'GET '+filename+' HTTP/1.0\r\n'
    msg += 'Host: '+hostname+'\r\n'
    msg += 'User-Agent:xiaojay\r\n\r\n'
    code = ''
    f = None
    s.sendall(msg)
    first = True
    while True:
      msg = s.recv(40960)
      if not len(msg):
        if f!=None:
          f.flush()
          f.close()
        break
      # Head information must be in the first recv buffer
      if first:
        first = False
        headpos = msg.index("\r\n\r\n")
        code,other = dealwithHead(msg[:headpos])
        if code=='200':
          #statistics.fetched_url += 1
          f = open('pages/'+str(abs(hash(hostname+filename))),'w')
          f.writelines(msg[headpos+4:])
        elif code=='301' or code=='302':
          #if code is 301 or 302 , try down again using redirect location
          if other.startswith("http") :
            hname, fname = parse(other)
            downPage(hname,fname,trytimes+1)#try again
          else :
            downPage(hostname,other,trytimes+1)
        elif code=='404':
          raise Error404
        else :
          raise ErrorOther(code)
      else:
        if f!=None :f.writelines(msg)
    s.shutdown(socket.SHUT_RDWR)
    s.close()
    return statistics.RESULTFETCHED,hostname+filename
  except Error404 :
    return statistics.RESULTCANNOTFIND,hostname+filename
  except ErrorOther:
    return statistics.RESULTOTHER,hostname+filename
  except socket.timeout:
    return statistics.RESULTTIMEOUT,hostname+filename
  except Exception, e:
    return statistics.RESULTOTHER,hostname+filename
def dealwithHead(head):
  '''deal with HTTP HEAD'''
  lines = head.splitlines()
  fstline = lines[0]
  code =fstline.split()[1]
  if code == '404' : return (code,None)
  if code == '200' : return (code,None)
  if code == '301' or code == '302' :
    for line in lines[1:]:
      p = line.index(':')
      key = line[:p]
      if key=='Location' :
        return (code,line[p+2:])
  return (code,None)
def parse(url):
  '''Parse a url to hostname+filename'''
  try:
    u = url.strip().strip('\n').strip('\r').strip('\t')
    if u.startswith('http://') :
      u = u[7:]
    elif u.startswith('https://'):
      u = u[8:]
    if u.find(':80')>0 :
      p = u.index(':80')
      p2 = p + 3
    else:
      if u.find('/')>0:
        p = u.index('/')
        p2 = p
      else:
        p = len(u)
        p2 = -1
    hostname = u[:p]
    if p2>0 :
      filename = u[p2:]
    else : filename = '/'
    return hostname, filename
  except Exception ,e:
    print "Parse wrong : " , url
    print e
def PrintDNSCache():
  '''print DNS dict'''
  n = 1
  for hostname in statistics.DNSCache.keys():
    print n,'\t',hostname, '\t',statistics.DNSCache[hostname]
    n+=1
def dealwithResult(res,url):
  '''Deal with the result of downPage'''
  statistics.total_url+=1
  if res==statistics.RESULTFETCHED :
    statistics.fetched_url+=1
    print statistics.total_url , '\t fetched :', url
  if res==statistics.RESULTCANNOTFIND :
    statistics.failed_url+=1
    print "Error 404 at : ", url
  if res==statistics.RESULTOTHER :
    statistics.other_url +=1
    print "Error Undefined at : ", url
  if res==statistics.RESULTTIMEOUT :
    statistics.timeout_url +=1
    print "Timeout ",url
  if res==statistics.RESULTTRYTOOMANY:
    statistics.trytoomany_url+=1
    print e ,"Try too many times at", url
if __name__=='__main__':
  print 'Get Page using GET method'
다음으로 스레드를 사용하겠습니다. 이전 글의 pool 보조적으로 멀티 쓰레드 하에서 병렬 크롤링을 구현하고, 위에서 작성한 페이지를 다운로드하는 방법을 이용하여 urllib2와 성능을 비교해보자.


'''
Created on 2012-3-16
@author: xiaojay
'''
import fetchPage
import threadpool
import datetime
import statistics
import urllib2
'''one thread'''
def usingOneThread(limit):
  urlset = open("input.txt","r")
  start = datetime.datetime.now()
  for u in urlset:
    if limit <= 0 : break
    limit-=1
    hostname , filename = parse(u)
    res= fetchPage.downPage(hostname,filename,0)
    fetchPage.dealwithResult(res)
  end = datetime.datetime.now()
  print "Start at :\t" , start
  print "End at :\t" , end
  print "Total Cost :\t" , end - start
  print &#39;Total fetched :&#39;, statistics.fetched_url
&#39;&#39;&#39;threadpoll and GET method&#39;&#39;&#39;
def callbackfunc(request,result):
  fetchPage.dealwithResult(result[0],result[1])
def usingThreadpool(limit,num_thread):
  urlset = open("input.txt","r")
  start = datetime.datetime.now()
  main = threadpool.ThreadPool(num_thread)
  for url in urlset :
    try :
      hostname , filename = fetchPage.parse(url)
      req = threadpool.WorkRequest(fetchPage.downPage,args=[hostname,filename],kwds={},callback=callbackfunc)
      main.putRequest(req)
    except Exception:
      print Exception.message
  while True:
    try:
      main.poll()
      if statistics.total_url >= limit : break
    except threadpool.NoResultsPending:
      print "no pending results"
      break
    except Exception ,e:
      print e
  end = datetime.datetime.now()
  print "Start at :\t" , start
  print "End at :\t" , end
  print "Total Cost :\t" , end - start
  print &#39;Total url :&#39;,statistics.total_url
  print &#39;Total fetched :&#39;, statistics.fetched_url
  print &#39;Lost url :&#39;, statistics.total_url - statistics.fetched_url
  print &#39;Error 404 :&#39; ,statistics.failed_url
  print &#39;Error timeout :&#39;,statistics.timeout_url
  print &#39;Error Try too many times &#39; ,statistics.trytoomany_url
  print &#39;Error Other faults &#39;,statistics.other_url
  main.stop()
&#39;&#39;&#39;threadpool and urllib2 &#39;&#39;&#39;
def downPageUsingUrlib2(url):
  try:
    req = urllib2.Request(url)
    fd = urllib2.urlopen(req)
    f = open("pages3/"+str(abs(hash(url))),&#39;w&#39;)
    f.write(fd.read())
    f.flush()
    f.close()
    return url ,&#39;success&#39;
  except Exception:
    return url , None
def writeFile(request,result):
  statistics.total_url += 1
  if result[1]!=None :
    statistics.fetched_url += 1
    print statistics.total_url,&#39;\tfetched :&#39;, result[0],
  else:
    statistics.failed_url += 1
    print statistics.total_url,&#39;\tLost :&#39;,result[0],
def usingThreadpoolUrllib2(limit,num_thread):
  urlset = open("input.txt","r")
  start = datetime.datetime.now()
  main = threadpool.ThreadPool(num_thread)
  for url in urlset :
    try :
      req = threadpool.WorkRequest(downPageUsingUrlib2,args=[url],kwds={},callback=writeFile)
      main.putRequest(req)
    except Exception ,e:
      print e
  while True:
    try:
      main.poll()
      if statistics.total_url >= limit : break
    except threadpool.NoResultsPending:
      print "no pending results"
      break
    except Exception ,e:
      print e
  end = datetime.datetime.now()
  print "Start at :\t" , start
  print "End at :\t" , end
  print "Total Cost :\t" , end - start
  print &#39;Total url :&#39;,statistics.total_url
  print &#39;Total fetched :&#39;, statistics.fetched_url
  print &#39;Lost url :&#39;, statistics.total_url - statistics.fetched_url
  main.stop()
if __name__ ==&#39;__main__&#39;:
  &#39;&#39;&#39;too slow&#39;&#39;&#39;
  #usingOneThread(100)
  &#39;&#39;&#39;use Get method&#39;&#39;&#39;
  #usingThreadpool(3000,50)
  &#39;&#39;&#39;use urllib2&#39;&#39;&#39;
  usingThreadpoolUrllib2(3000,50)

실험 분석:


실험 데이터: larbin으로 캡처한 3000개의 URL은 Mercator 대기열 모델로 처리되었습니다. (C++로 구현했으며, 확보되면 블로그에 게시하겠습니다.) 미래의 기회) 무작위적이고 대표적인 URL 모음입니다. 50개 스레드로 구성된 스레드 풀을 사용합니다. 실험 환경:

ubuntu10.04, 좋은 네트워크, python2.6

저장: 작은 파일, 각 페이지, 하나의 파일 저장PS: 학교의 인터넷 접속 비용은 트래픽에 따라 부과되므로 웹 크롤러를 수행하는 데는 비용이 많이 듭니다. ! ! ! 며칠 안에 대규모 URL 다운로드 실험을 실시하고 수십만 개의 URL을 사용해 시도할 수도 있습니다.
실험 결과:

urllib2

사용, ThreadpoolUrllib2(3000,50)시작 시간 : 2012-03-16 22:18:20.956054

끝 시간 : 2012-03-1 6 22:22:15.203018

총 비용: 0:03:54.246964총 URL: 3001총 가져온 URL: 2442

잃어버린 URL: 559


다운로드 페이지의 물리적 저장 크기: 84088kb

나만의 getPage 사용UsingGet, usingThreadpool(3000,50)
시작 시간 : 2012-03-16 22:23:40.206730
종료 : 2012-03-16 22:26:26.843563

총 비용 : 0:02:46.636833

총 URL : 3002

총 가져온 : 2484

Lost URL : 518

오류 404 : 94

오류 시간 초과 : 312
Error 너무 여러 번 시도 0
Error 기타 오류 112

다운로드 페이지의 물리적 저장 크기: 87168kb

요약: 제가 직접 작성한 다운로드 페이지 프로그램은 매우 효율적이며 손실된 페이지가 적습니다. 하지만 사실 생각해보면 아직 최적화할 수 있는 곳이 많이 있습니다. 예를 들어 파일이 너무 분산되어 있기 때문에 작은 파일을 너무 많이 생성하고 릴리스하면 확실히 성능 오버헤드가 많이 발생하고 프로그램이 실행되지 않습니다. 해시 이름 지정을 사용하므로 계산 측면에서도 많은 문제가 발생합니다. 좋은 전략이 있으면 실제로 이러한 비용을 생략할 수 있습니다. DNS 외에도 Python과 함께 제공되는 DNS 확인을 사용할 필요가 없습니다. 기본 DNS 확인은 동기 작업이고 DNS 확인은 일반적으로 시간이 많이 걸리기 때문에 멀티 스레드 비동기 방식으로 수행할 수 있습니다. 방식을 적절한 DNS 캐싱과 결합하면 효율성이 크게 향상될 수 있습니다. 뿐만 아니라, 실제 페이지 크롤링 과정에서 URL의 양이 많아 한번에 메모리에 저장하는 것은 불가능하며, 대신 특정 전략이나 알고리즘에 따라 합리적으로 할당해야 합니다. 간단히 말해서, 아직 해야 할 일과 컬렉션 페이지에 최적화할 수 있는 일이 많이 있습니다.

위 내용은 Python은 멀티스레딩을 사용하여 웹페이지 정보를 크롤링합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.