suchen

Heim  >  Fragen und Antworten  >  Hauptteil

Python stößt beim Schreiben von Dateien mit mehreren Prozessen auf Codierungsprobleme, nicht jedoch bei der Verwendung mehrerer Threads

Wenn mehrere Prozesse zum Crawlen von Daten und zum Schreiben in eine Datei verwendet werden, wird beim Ausführen kein Fehler gemeldet, aber die Datei ist beim Öffnen verstümmelt.

Beim Umschreiben mit Multithreading gibt es kein solches Problem, alles ist normal.
Das Folgende ist der Code zum Schreiben von Daten in eine Datei:

def Get_urls(start_page,end_page):
    print ' run task {} ({})'.format(start_page,os.getpid())
 
    url_text = codecs.open('url.txt','a','utf-8')
        for i in range(start_page,end_page+1):
            pageurl=baseurl1+str(i)+baseurl2+searchword
            response = requests.get(pageurl, headers=header)
            soup = BeautifulSoup(response.content, 'html.parser')
            a_list=soup.find_all('a')
            for a in a_list:
                if a.text!=''and 'wssd_content.jsp?bookid'in a['href']:
                    text=a.text.strip()
                    url=baseurl+str(a['href'])
                    url_text.write(text+'\t'+url+'\n')
        url_text.close()

Prozesspool für mehrere Prozesse

def Multiple_processes_test():
    t1 = time.time()
    print 'parent process {} '.format(os.getpid())
    page_ranges_list = [(1,3),(4,6),(7,9)]
    pool = multiprocessing.Pool(processes=3)
    for page_range in page_ranges_list:
        pool.apply_async(func=Get_urls,args=(page_range[0],page_range[1]))
    pool.close()
    pool.join()
    t2 = time.time()
    print '时间:',t2-t1
我想大声告诉你我想大声告诉你2748 Tage vor1136

Antworte allen(3)Ich werde antworten

  • 巴扎黑

    巴扎黑2017-06-15 09:23:36

    图片上已经说了,文件以错误的编码形式载入了,说明你多进程写入的时候,编码不是utf-8

    Antwort
    0
  • 世界只因有你

    世界只因有你2017-06-15 09:23:36

    文件第一行添加:

    #coding: utf-8

    Antwort
    0
  • 我想大声告诉你

    我想大声告诉你2017-06-15 09:23:36

    打开同一个文件,相当危险,出错机率相当大,
    多线程不出错,极有可能是GIL,
    多进程没有锁,因此容易出错了。

    url_text = codecs.open('url.txt','a','utf-8')
    

    建议改为生产者消费都模式!

    比如这样

    # -*- coding: utf-8 -* -
    import time
    import os
    import codecs
    import multiprocessing
    import requests
    from bs4 import BeautifulSoup
    
    baseurl = ''
    baseurl1 = ''
    baseurl2 = ''
    pageurl = ''
    searchword = ''
    header = {}
    
    def fake(url, **kwargs):
        class Response(object):
            pass
        o = Response()
        o.content = '<a href="/{}/wssd_content.jsp?bookid">foo</a>'.format(url)
        return o
    
    requests.get = fake
    
    
    def Get_urls(start_page, end_page, queue):
        print('run task {} ({})'.format(start_page, os.getpid()))
        try:
            for i in range(start_page, end_page + 1):
                pageurl = baseurl1 + str(i) + baseurl2 + searchword
                response = requests.get(pageurl, headers=header)
                soup = BeautifulSoup(response.content, 'html.parser')
                a_list = soup.find_all('a')
                for a in a_list:
                    if a.text != ''and 'wssd_content.jsp?bookid'in a['href']:
                        text = a.text.strip()
                        url = baseurl + str(a['href'])
                        queue.put(text + '\t' + url + '\n')
        except Exception as e:
            import traceback
            traceback.print_exc()
    
    
    def write_file(queue):
        print("start write file")
        url_text = codecs.open('url.txt', 'a', 'utf-8')
        while True:
            line = queue.get()
            if line is None:
                break
            print("write {}".format(line))
            url_text.write(line)
        url_text.close()
    
    
    def Multiple_processes_test():
        t1 = time.time()
        manager = multiprocessing.Manager()
        queue = manager.Queue()
        print 'parent process {} '.format(os.getpid())
        page_ranges_list = [(1, 3), (4, 6), (7, 9)]
        consumer = multiprocessing.Process(target=write_file, args=(queue,))
        consumer.start()
        pool = multiprocessing.Pool(processes=3)
        results = []
        for page_range in page_ranges_list:
            result = pool.apply_async(func=Get_urls,
                             args=(page_range[0],
                                   page_range[1],
                                   queue
                                ))
            results.append(result)
        pool.close()
        pool.join()
        queue.put(None)
        consumer.join()
        t2 = time.time()
        print '时间:', t2 - t1
    
    
    if __name__ == '__main__':
        Multiple_processes_test()
    

    结果

    foo /4/wssd_content.jsp?bookid
    foo /5/wssd_content.jsp?bookid
    foo /6/wssd_content.jsp?bookid
    foo /1/wssd_content.jsp?bookid
    foo /2/wssd_content.jsp?bookid
    foo /3/wssd_content.jsp?bookid
    foo /7/wssd_content.jsp?bookid
    foo /8/wssd_content.jsp?bookid
    foo /9/wssd_content.jsp?bookid

    Antwort
    0
  • StornierenAntwort