Heim > Artikel > Backend-Entwicklung > So implementieren Sie die Website-Statusprüfung mit Python Asyncio
Wir können Asyncio verwenden, um den HTTP-Status einer Website abzufragen, indem wir einen Stream öffnen und HTTP-Anfragen und -Antworten schreiben und lesen.
Dann können wir Asyncio verwenden, um den Status mehrerer Websites gleichzeitig abzufragen und die Ergebnisse sogar dynamisch zu melden.
Das Asyncio-Modul bietet Unterstützung für das Öffnen von Socket-Verbindungen und das Lesen und Schreiben von Daten über Streams. Mit dieser Funktion können wir den Status einer Webseite überprüfen.
Dies kann vier Schritte umfassen, diese sind:
Öffnen Sie eine Verbindung
#🎜🎜 ## 🎜🎜#
2. HTTP-Verbindung öffnen... # open a socket connection reader, writer = await asyncio.open_connection('www.google.com', 80)Wir können auch eine SSL-Verbindung mit dem Parameter ssl=True öffnen. Damit kann eine HTTPS-Verbindung auf Port 443 geöffnet werden.
... # open a socket connection reader, writer = await asyncio.open_connection('www.google.com', 443)3. HTTP-Anfrage schreiben Nach dem Öffnen können wir Abfragen an StreamWriter schreiben, um HTTP-Anfragen zu stellen. HTTP-Anfragen der Version 1.1 erfolgen beispielsweise im Klartext. Wir können den Dateipfad „/“ anfordern, der so aussehen kann:
GET / HTTP/1.1 Host: www.google.com
'GET / HTTP/1.1\r\n' 'Host: www.google.com\r\n' '\r\n'Dieser String muss in Bytes codiert werden, bevor er in den StreamWriter geschrieben wird. Dies kann erreicht werden, indem die Methode encode() für die Zeichenfolge selbst verwendet wird. Die Standardkodierung „utf-8“ kann ausreichend sein.
... # encode string as bytes byte_data = string.encode()Die Bytes können dann über die write()-Methode des StreamWriters in den Socket geschrieben werden.
... # write query to socket writer.write(byte_data)Nachdem Sie eine Anfrage geschrieben haben, warten Sie am besten, bis die Datenbytes gesendet wurden und der Socket bereit ist. Dies kann durch die Methode drain() erreicht werden. Dies ist eine Coroutine, die warten muss.
... # wait for the socket to be ready. await writer.drain()4. Lesen Sie die HTTP-Antwort. Nachdem wir die HTTP-Anfrage gestellt haben, können wir die Antwort lesen. Dies kann über den StreamReader des Sockets erreicht werden. Die Antwort kann mit der Methode read() gelesen werden, die einen großen Byteblock liest, oder mit der Methode readline(), die eine Zeile mit Bytes liest. Wir bevorzugen möglicherweise die Methode readline(), da wir das textbasierte HTTP-Protokoll verwenden, das HTML-Daten Zeile für Zeile sendet. Die Methode readline() ist eine Coroutine und muss warten.
... # read one line of response line_bytes = await reader.readline()HTTP 1.1-Antwort besteht aus zwei Teilen: einem durch eine Leerzeile getrennten Header, gefolgt von einem Text, der durch eine Leerzeile abgeschlossen wird. Der Header enthält Informationen darüber, ob die Anfrage erfolgreich war und welcher Dateityp gesendet wird, und der Body enthält den Inhalt der Datei, beispielsweise eine HTML-Webseite. Die erste Zeile des HTTP-Headers enthält den HTTP-Status der angeforderten Seite auf dem Server. Jede Zeile muss von Bytes in String dekodiert werden. Dies kann durch die Verwendung der decode()-Methode für Bytedaten erreicht werden. Auch hier ist die Standardkodierung „utf_8“.
... # decode bytes into a string line_data = line_bytes.decode()5. HTTP-Verbindung schließen Wir können die Socket-Verbindung schließen, indem wir den StreamWriter schließen. Dies kann durch den Aufruf der Methode close() erreicht werden.
... # close the connection writer.close()
# get the HTTP/S status of a webpage async def get_status(url): # ...Die URL muss in ihre Bestandteile zerlegt werden. Wir benötigen den Hostnamen und den Dateipfad, wenn wir eine HTTP-Anfrage stellen. Wir müssen auch das URL-Schema (HTTP oder HTTPS) kennen, um festzustellen, ob SSL erforderlich ist. Dies kann mit der Funktion urllib.parse.urlsplit() erreicht werden, die eine URL-Zeichenfolge akzeptiert und ein benanntes Tupel aller URL-Elemente zurückgibt.
... # split the url into components url_parsed = urlsplit(url)Dann können wir eine HTTP-Verbindung basierend auf dem URL-Schema öffnen und den URL-Hostnamen verwenden.
... # open the connection if url_parsed.scheme == 'https': reader, writer = await asyncio.open_connection(url_parsed.hostname, 443, ssl=True) else: reader, writer = await asyncio.open_connection(url_parsed.hostname, 80)Als nächstes können wir eine HTTP-GET-Anfrage mit dem Hostnamen und dem Dateipfad erstellen und einen StreamWriter verwenden, um die codierten Bytes in den Socket zu schreiben.
... # send GET request query = f'GET {url_parsed.path} HTTP/1.1\r\nHost: {url_parsed.hostname}\r\n\r\n' # write query to socket writer.write(query.encode()) # wait for the bytes to be written to the socket await writer.drain()Als nächstes können wir die HTTP-Antwort lesen. Wir benötigen nur die erste Zeile der Antwort, die den HTTP-Status enthält.
... # read the single line response response = await reader.readline()Die Verbindung kann dann geschlossen werden.
... # close the connection writer.close()Schließlich können wir die vom Server gelesenen Bytes dekodieren, nachgestellte Leerzeichen entfernen und den HTTP-Status zurückgeben.
... # decode and strip white space status = response.decode().strip() # return the response return statusWenn man sie miteinander kombiniert, ist die vollständige get_status()-Coroutine unten aufgeführt. Es gibt keine Fehlerbehandlung, wie z. B. nicht erreichbarer Host oder langsame Antwort. Diese Ergänzungen werden den Lesern eine schöne Erweiterung bieten.
# get the HTTP/S status of a webpage async def get_status(url): # split the url into components url_parsed = urlsplit(url) # open the connection if url_parsed.scheme == 'https': reader, writer = await asyncio.open_connection(url_parsed.hostname, 443, ssl=True) else: reader, writer = await asyncio.open_connection(url_parsed.hostname, 80) # send GET request query = f'GET {url_parsed.path} HTTP/1.1\r\nHost: {url_parsed.hostname}\r\n\r\n' # write query to socket writer.write(query.encode()) # wait for the bytes to be written to the socket await writer.drain() # read the single line response response = await reader.readline() # close the connection writer.close() # decode and strip white space status = response.decode().strip() # return the response return statusAls nächstes können wir die Coroutine get_status() für mehrere Webseiten oder Websites aufrufen, die wir überprüfen möchten. In diesem Fall erstellen wir eine Liste der 10 besten Webseiten der Welt.
... # list of top 10 websites to check sites = ['https://www.google.com/', 'https://www.youtube.com/', 'https://www.facebook.com/', 'https://twitter.com/', 'https://www.instagram.com/', 'https://www.baidu.com/', 'https://www.wikipedia.org/', 'https://yandex.ru/', 'https://yahoo.com/', 'https://www.whatsapp.com/' ]
然后我们可以使用我们的 get_status() 协程依次查询每个。在这种情况下,我们将在一个循环中按顺序这样做,并依次报告每个状态。
... # check the status of all websites for url in sites: # get the status for the url status = await get_status(url) # report the url and its status print(f'{url:30}:\t{status}')
在使用 asyncio 时,我们可以做得比顺序更好,但这提供了一个很好的起点,我们可以在以后进行改进。将它们结合在一起,main() 协程查询前 10 个网站的状态。
# main coroutine async def main(): # list of top 10 websites to check sites = ['https://www.google.com/', 'https://www.youtube.com/', 'https://www.facebook.com/', 'https://twitter.com/', 'https://www.instagram.com/', 'https://www.baidu.com/', 'https://www.wikipedia.org/', 'https://yandex.ru/', 'https://yahoo.com/', 'https://www.whatsapp.com/' ] # check the status of all websites for url in sites: # get the status for the url status = await get_status(url) # report the url and its status print(f'{url:30}:\t{status}')
最后,我们可以创建 main() 协程并将其用作 asyncio 程序的入口点。
... # run the asyncio program asyncio.run(main())
将它们结合在一起,下面列出了完整的示例。
# SuperFastPython.com # check the status of many webpages import asyncio from urllib.parse import urlsplit # get the HTTP/S status of a webpage async def get_status(url): # split the url into components url_parsed = urlsplit(url) # open the connection if url_parsed.scheme == 'https': reader, writer = await asyncio.open_connection(url_parsed.hostname, 443, ssl=True) else: reader, writer = await asyncio.open_connection(url_parsed.hostname, 80) # send GET request query = f'GET {url_parsed.path} HTTP/1.1\r\nHost: {url_parsed.hostname}\r\n\r\n' # write query to socket writer.write(query.encode()) # wait for the bytes to be written to the socket await writer.drain() # read the single line response response = await reader.readline() # close the connection writer.close() # decode and strip white space status = response.decode().strip() # return the response return status # main coroutine async def main(): # list of top 10 websites to check sites = ['https://www.google.com/', 'https://www.youtube.com/', 'https://www.facebook.com/', 'https://twitter.com/', 'https://www.instagram.com/', 'https://www.baidu.com/', 'https://www.wikipedia.org/', 'https://yandex.ru/', 'https://yahoo.com/', 'https://www.whatsapp.com/' ] # check the status of all websites for url in sites: # get the status for the url status = await get_status(url) # report the url and its status print(f'{url:30}:\t{status}') # run the asyncio program asyncio.run(main())
运行示例首先创建 main() 协程并将其用作程序的入口点。main() 协程运行,定义前 10 个网站的列表。然后顺序遍历网站列表。 main()协程挂起调用get_status()协程查询一个网站的状态。
get_status() 协程运行、解析 URL 并打开连接。它构造一个 HTTP GET 查询并将其写入主机。读取、解码并返回响应。main() 协程恢复并报告 URL 的 HTTP 状态。
对列表中的每个 URL 重复此操作。该程序大约需要 5.6 秒才能完成,或者平均每个 URL 大约需要半秒。这突出了我们如何使用 asyncio 来查询网页的 HTTP 状态。
尽管如此,它并没有充分利用 asyncio 来并发执行任务。
https://www.google.com/ : HTTP/1.1 200 OK
https://www.youtube.com/ : HTTP/1.1 200 OK
https://www.facebook.com/ : HTTP/1.1 302 Found
https://twitter.com/ : HTTP/1.1 200 OK
https://www.instagram.com/ : HTTP/1.1 200 OK
https://www.baidu.com/ : HTTP/1.1 200 OK
https://www.wikipedia.org/ : HTTP/1.1 200 OK
https://yandex.ru/ : HTTP/1.1 302 Moved temporarily
https://yahoo.com/ : HTTP/1.1 301 Moved Permanently
https://www.whatsapp.com/ : HTTP/1.1 302 Found
asyncio 的一个好处是我们可以同时执行许多协程。我们可以使用 asyncio.gather() 函数在 asyncio 中并发查询网站的状态。
此函数采用一个或多个协程,暂停执行提供的协程,并将每个协程的结果作为可迭代对象返回。然后我们可以遍历 URL 列表和可迭代的协程返回值并报告结果。
这可能是比上述方法更简单的方法。首先,我们可以创建一个协程列表。
... # create all coroutine requests coros = [get_status(url) for url in sites]
接下来,我们可以执行协程并使用 asyncio.gather() 获取可迭代的结果。
请注意,我们不能直接提供协程列表,而是必须将列表解压缩为单独的表达式,这些表达式作为位置参数提供给函数。
... # execute all coroutines and wait results = await asyncio.gather(*coros)
这将同时执行所有协程并检索它们的结果。然后我们可以遍历 URL 列表和返回状态并依次报告每个。
... # process all results for url, status in zip(sites, results): # report status print(f'{url:30}:\t{status}')
将它们结合在一起,下面列出了完整的示例。
# SuperFastPython.com # check the status of many webpages import asyncio from urllib.parse import urlsplit # get the HTTP/S status of a webpage async def get_status(url): # split the url into components url_parsed = urlsplit(url) # open the connection if url_parsed.scheme == 'https': reader, writer = await asyncio.open_connection(url_parsed.hostname, 443, ssl=True) else: reader, writer = await asyncio.open_connection(url_parsed.hostname, 80) # send GET request query = f'GET {url_parsed.path} HTTP/1.1\r\nHost: {url_parsed.hostname}\r\n\r\n' # write query to socket writer.write(query.encode()) # wait for the bytes to be written to the socket await writer.drain() # read the single line response response = await reader.readline() # close the connection writer.close() # decode and strip white space status = response.decode().strip() # return the response return status # main coroutine async def main(): # list of top 10 websites to check sites = ['https://www.google.com/', 'https://www.youtube.com/', 'https://www.facebook.com/', 'https://twitter.com/', 'https://www.instagram.com/', 'https://www.baidu.com/', 'https://www.wikipedia.org/', 'https://yandex.ru/', 'https://yahoo.com/', 'https://www.whatsapp.com/' ] # create all coroutine requests coros = [get_status(url) for url in sites] # execute all coroutines and wait results = await asyncio.gather(*coros) # process all results for url, status in zip(sites, results): # report status print(f'{url:30}:\t{status}') # run the asyncio program asyncio.run(main())
运行该示例会像以前一样执行 main() 协程。在这种情况下,协程列表是在列表理解中创建的。
然后调用 asyncio.gather() 函数,传递协程并挂起 main() 协程,直到它们全部完成。协程执行,同时查询每个网站并返回它们的状态。
main() 协程恢复并接收可迭代的状态值。然后使用 zip() 内置函数遍历此可迭代对象和 URL 列表,并报告状态。
这突出了一种更简单的方法来同时执行协程并在所有任务完成后报告结果。它也比上面的顺序版本更快,在我的系统上完成大约 1.4 秒。
https://www.google.com/ : HTTP/1.1 200 OK
https://www.youtube.com/ : HTTP/1.1 200 OK
https://www.facebook.com/ : HTTP/1.1 302 Found
https://twitter.com/ : HTTP/1.1 200 OK
https://www.instagram.com/ : HTTP/1.1 200 OK
https://www.baidu.com/ : HTTP/1.1 200 OK
https://www.wikipedia.org/ : HTTP/1.1 200 OK
https://yandex.ru/ : HTTP/1.1 302 Moved temporarily
https://yahoo.com/ : HTTP/1.1 301 Moved Permanently
https://www.whatsapp.com/ : HTTP/1.1 302 Found
Das obige ist der detaillierte Inhalt vonSo implementieren Sie die Website-Statusprüfung mit Python Asyncio. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!