Heim  >  Artikel  >  Backend-Entwicklung  >  So implementieren Sie die Website-Statusprüfung mit Python Asyncio

So implementieren Sie die Website-Statusprüfung mit Python Asyncio

PHPz
PHPznach vorne
2023-04-21 14:10:18802Durchsuche

Wir können Asyncio verwenden, um den HTTP-Status einer Website abzufragen, indem wir einen Stream öffnen und HTTP-Anfragen und -Antworten schreiben und lesen.

Dann können wir Asyncio verwenden, um den Status mehrerer Websites gleichzeitig abzufragen und die Ergebnisse sogar dynamisch zu melden.

1. So überprüfen Sie den HTTP-Status mit Asyncio

Das Asyncio-Modul bietet Unterstützung für das Öffnen von Socket-Verbindungen und das Lesen und Schreiben von Daten über Streams. Mit dieser Funktion können wir den Status einer Webseite überprüfen.

Dies kann vier Schritte umfassen, diese sind:

  • Öffnen Sie eine Verbindung

  • # 🎜 🎜#Anfrage schreiben

  • Antwort lesen

  • Verbindung schließen

    #🎜🎜 ## 🎜🎜#

    2. HTTP-Verbindung öffnen
Sie können die Funktion asyncio.open_connection() verwenden, um eine Verbindung in asyncio zu öffnen. Neben vielen Parametern benötigt die Funktion einen String-Hostnamen und eine ganzzahlige Portnummer.

Dies ist eine Coroutine, die unbedingt warten muss und einen StreamReader und einen StreamWriter zum Lesen und Schreiben über den Socket zurückgibt.

Hiermit kann eine HTTP-Verbindung auf Port 80 geöffnet werden.

...
# open a socket connection
reader, writer = await asyncio.open_connection('www.google.com', 80)

Wir können auch eine SSL-Verbindung mit dem Parameter ssl=True öffnen. Damit kann eine HTTPS-Verbindung auf Port 443 geöffnet werden.

...
# open a socket connection
reader, writer = await asyncio.open_connection('www.google.com', 443)

3. HTTP-Anfrage schreiben

Nach dem Öffnen können wir Abfragen an StreamWriter schreiben, um HTTP-Anfragen zu stellen. HTTP-Anfragen der Version 1.1 erfolgen beispielsweise im Klartext. Wir können den Dateipfad „/“ anfordern, der so aussehen kann:

GET / HTTP/1.1
Host: www.google.com

Es ist wichtig, dass am Ende jeder Zeile ein Wagenrücklauf und ein Zeilenvorschub (rn) stehen müssen, und eine Leerzeile am Ende.

Als Python-String könnte dies so aussehen:

'GET / HTTP/1.1\r\n'
'Host: www.google.com\r\n'
'\r\n'

Dieser String muss in Bytes codiert werden, bevor er in den StreamWriter geschrieben wird. Dies kann erreicht werden, indem die Methode encode() für die Zeichenfolge selbst verwendet wird. Die Standardkodierung „utf-8“ kann ausreichend sein.

...
# encode string as bytes
byte_data = string.encode()

Die Bytes können dann über die write()-Methode des StreamWriters in den Socket geschrieben werden.

...
# write query to socket
writer.write(byte_data)

Nachdem Sie eine Anfrage geschrieben haben, warten Sie am besten, bis die Datenbytes gesendet wurden und der Socket bereit ist. Dies kann durch die Methode drain() erreicht werden. Dies ist eine Coroutine, die warten muss.

...
# wait for the socket to be ready.
await writer.drain()

4. Lesen Sie die HTTP-Antwort.

Nachdem wir die HTTP-Anfrage gestellt haben, können wir die Antwort lesen. Dies kann über den StreamReader des Sockets erreicht werden. Die Antwort kann mit der Methode read() gelesen werden, die einen großen Byteblock liest, oder mit der Methode readline(), die eine Zeile mit Bytes liest.

Wir bevorzugen möglicherweise die Methode readline(), da wir das textbasierte HTTP-Protokoll verwenden, das HTML-Daten Zeile für Zeile sendet. Die Methode readline() ist eine Coroutine und muss warten.

...
# read one line of response
line_bytes = await reader.readline()

HTTP 1.1-Antwort besteht aus zwei Teilen: einem durch eine Leerzeile getrennten Header, gefolgt von einem Text, der durch eine Leerzeile abgeschlossen wird. Der Header enthält Informationen darüber, ob die Anfrage erfolgreich war und welcher Dateityp gesendet wird, und der Body enthält den Inhalt der Datei, beispielsweise eine HTML-Webseite.

Die erste Zeile des HTTP-Headers enthält den HTTP-Status der angeforderten Seite auf dem Server. Jede Zeile muss von Bytes in String dekodiert werden.

Dies kann durch die Verwendung der decode()-Methode für Bytedaten erreicht werden. Auch hier ist die Standardkodierung „utf_8“.

...
# decode bytes into a string
line_data = line_bytes.decode()

5. HTTP-Verbindung schließen

Wir können die Socket-Verbindung schließen, indem wir den StreamWriter schließen. Dies kann durch den Aufruf der Methode close() erreicht werden.

...
# close the connection
writer.close()

Dies blockiert nicht und schließt den Socket möglicherweise nicht sofort. Nachdem wir nun wissen, wie man HTTP-Anfragen stellt und Antworten mit Asyncio liest, schauen wir uns einige Beispiele für die Überprüfung des Status einer Webseite an.

6. Beispiel für die sequentielle Überprüfung des HTTP-Status

Wir können ein Beispiel entwickeln, um den HTTP-Status mehrerer Websites mithilfe von Asyncio zu überprüfen.

In diesem Beispiel entwickeln wir zunächst eine Coroutine, um den Status einer bestimmten URL zu überprüfen. Wir werden diese Coroutine dann einmal für jede der Top-10-Websites aufrufen.

Zuerst können wir eine Coroutine definieren, die eine URL-Zeichenfolge akzeptiert und den HTTP-Status zurückgibt.

# get the HTTP/S status of a webpage
async def get_status(url):
	# ...

Die URL muss in ihre Bestandteile zerlegt werden. Wir benötigen den Hostnamen und den Dateipfad, wenn wir eine HTTP-Anfrage stellen. Wir müssen auch das URL-Schema (HTTP oder HTTPS) kennen, um festzustellen, ob SSL erforderlich ist.

Dies kann mit der Funktion urllib.parse.urlsplit() erreicht werden, die eine URL-Zeichenfolge akzeptiert und ein benanntes Tupel aller URL-Elemente zurückgibt.

...
# split the url into components
url_parsed = urlsplit(url)

Dann können wir eine HTTP-Verbindung basierend auf dem URL-Schema öffnen und den URL-Hostnamen verwenden.

...
# open the connection
if url_parsed.scheme == 'https':
    reader, writer = await asyncio.open_connection(url_parsed.hostname, 443, ssl=True)
else:
    reader, writer = await asyncio.open_connection(url_parsed.hostname, 80)

Als nächstes können wir eine HTTP-GET-Anfrage mit dem Hostnamen und dem Dateipfad erstellen und einen StreamWriter verwenden, um die codierten Bytes in den Socket zu schreiben.

...
# send GET request
query = f'GET {url_parsed.path} HTTP/1.1\r\nHost: {url_parsed.hostname}\r\n\r\n'
# write query to socket
writer.write(query.encode())
# wait for the bytes to be written to the socket
await writer.drain()

Als nächstes können wir die HTTP-Antwort lesen. Wir benötigen nur die erste Zeile der Antwort, die den HTTP-Status enthält.

...
# read the single line response
response = await reader.readline()

Die Verbindung kann dann geschlossen werden.

...
# close the connection
writer.close()

Schließlich können wir die vom Server gelesenen Bytes dekodieren, nachgestellte Leerzeichen entfernen und den HTTP-Status zurückgeben.

...
# decode and strip white space
status = response.decode().strip()
# return the response
return status

Wenn man sie miteinander kombiniert, ist die vollständige get_status()-Coroutine unten aufgeführt. Es gibt keine Fehlerbehandlung, wie z. B. nicht erreichbarer Host oder langsame Antwort. Diese Ergänzungen werden den Lesern eine schöne Erweiterung bieten.

# get the HTTP/S status of a webpage
async def get_status(url):
    # split the url into components
    url_parsed = urlsplit(url)
    # open the connection
    if url_parsed.scheme == 'https':
        reader, writer = await asyncio.open_connection(url_parsed.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(url_parsed.hostname, 80)
    # send GET request
    query = f'GET {url_parsed.path} HTTP/1.1\r\nHost: {url_parsed.hostname}\r\n\r\n'
    # write query to socket
    writer.write(query.encode())
    # wait for the bytes to be written to the socket
    await writer.drain()
    # read the single line response
    response = await reader.readline()
    # close the connection
    writer.close()
    # decode and strip white space
    status = response.decode().strip()
    # return the response
    return status

Als nächstes können wir die Coroutine get_status() für mehrere Webseiten oder Websites aufrufen, die wir überprüfen möchten. In diesem Fall erstellen wir eine Liste der 10 besten Webseiten der Welt.

...
# list of top 10 websites to check
sites = ['https://www.google.com/',
    'https://www.youtube.com/',
    'https://www.facebook.com/',
    'https://twitter.com/',
    'https://www.instagram.com/',
    'https://www.baidu.com/',
    'https://www.wikipedia.org/',
    'https://yandex.ru/',
    'https://yahoo.com/',
    'https://www.whatsapp.com/'
    ]

然后我们可以使用我们的 get_status() 协程依次查询每个。在这种情况下,我们将在一个循环中按顺序这样做,并依次报告每个状态。

...
# check the status of all websites
for url in sites:
    # get the status for the url
    status = await get_status(url)
    # report the url and its status
    print(f'{url:30}:\t{status}')

在使用 asyncio 时,我们可以做得比顺序更好,但这提供了一个很好的起点,我们可以在以后进行改进。将它们结合在一起,main() 协程查询前 10 个网站的状态。

# main coroutine
async def main():
    # list of top 10 websites to check
    sites = ['https://www.google.com/',
        'https://www.youtube.com/',
        'https://www.facebook.com/',
        'https://twitter.com/',
        'https://www.instagram.com/',
        'https://www.baidu.com/',
        'https://www.wikipedia.org/',
        'https://yandex.ru/',
        'https://yahoo.com/',
        'https://www.whatsapp.com/'
        ]
    # check the status of all websites
    for url in sites:
        # get the status for the url
        status = await get_status(url)
        # report the url and its status
        print(f'{url:30}:\t{status}')

最后,我们可以创建 main() 协程并将其用作 asyncio 程序的入口点。

...
# run the asyncio program
asyncio.run(main())

将它们结合在一起,下面列出了完整的示例。

# SuperFastPython.com
# check the status of many webpages
import asyncio
from urllib.parse import urlsplit
 
# get the HTTP/S status of a webpage
async def get_status(url):
    # split the url into components
    url_parsed = urlsplit(url)
    # open the connection
    if url_parsed.scheme == 'https':
        reader, writer = await asyncio.open_connection(url_parsed.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(url_parsed.hostname, 80)
    # send GET request
    query = f'GET {url_parsed.path} HTTP/1.1\r\nHost: {url_parsed.hostname}\r\n\r\n'
    # write query to socket
    writer.write(query.encode())
    # wait for the bytes to be written to the socket
    await writer.drain()
    # read the single line response
    response = await reader.readline()
    # close the connection
    writer.close()
    # decode and strip white space
    status = response.decode().strip()
    # return the response
    return status
 
# main coroutine
async def main():
    # list of top 10 websites to check
    sites = ['https://www.google.com/',
        'https://www.youtube.com/',
        'https://www.facebook.com/',
        'https://twitter.com/',
        'https://www.instagram.com/',
        'https://www.baidu.com/',
        'https://www.wikipedia.org/',
        'https://yandex.ru/',
        'https://yahoo.com/',
        'https://www.whatsapp.com/'
        ]
    # check the status of all websites
    for url in sites:
        # get the status for the url
        status = await get_status(url)
        # report the url and its status
        print(f'{url:30}:\t{status}')
 
# run the asyncio program
asyncio.run(main())

运行示例首先创建 main() 协程并将其用作程序的入口点。main() 协程运行,定义前 10 个网站的列表。然后顺序遍历网站列表。 main()协程挂起调用get_status()协程查询一个网站的状态。

get_status() 协程运行、解析 URL 并打开连接。它构造一个 HTTP GET 查询并将其写入主机。读取、解码并返回响应。main() 协程恢复并报告 URL 的 HTTP 状态。

对列表中的每个 URL 重复此操作。该程序大约需要 5.6 秒才能完成,或者平均每个 URL 大约需要半秒。这突出了我们如何使用 asyncio 来查询网页的 HTTP 状态。

尽管如此,它并没有充分利用 asyncio 来并发执行任务。

https://www.google.com/       :    HTTP/1.1 200 OK
https://www.youtube.com/      :    HTTP/1.1 200 OK
https://www.facebook.com/     :    HTTP/1.1 302 Found
https://twitter.com/          :    HTTP/1.1 200 OK
https://www.instagram.com/    :    HTTP/1.1 200 OK
https://www.baidu.com/        :    HTTP/1.1 200 OK
https://www.wikipedia.org/    :    HTTP/1.1 200 OK
https://yandex.ru/            :    HTTP/1.1 302 Moved temporarily
https://yahoo.com/            :    HTTP/1.1 301 Moved Permanently
https://www.whatsapp.com/     :    HTTP/1.1 302 Found

7. 并发查看网站状态示例

asyncio 的一个好处是我们可以同时执行许多协程。我们可以使用 asyncio.gather() 函数在 asyncio 中并发查询网站的状态。

此函数采用一个或多个协程,暂停执行提供的协程,并将每个协程的结果作为可迭代对象返回。然后我们可以遍历 URL 列表和可迭代的协程返回值并报告结果。

这可能是比上述方法更简单的方法。首先,我们可以创建一个协程列表。

...
# create all coroutine requests
coros = [get_status(url) for url in sites]

接下来,我们可以执行协程并使用 asyncio.gather() 获取可迭代的结果。

请注意,我们不能直接提供协程列表,而是必须将列表解压缩为单独的表达式,这些表达式作为位置参数提供给函数。

...
# execute all coroutines and wait
results = await asyncio.gather(*coros)

这将同时执行所有协程并检索它们的结果。然后我们可以遍历 URL 列表和返回状态并依次报告每个。

...
# process all results
for url, status in zip(sites, results):
    # report status
    print(f'{url:30}:\t{status}')

将它们结合在一起,下面列出了完整的示例。

# SuperFastPython.com
# check the status of many webpages
import asyncio
from urllib.parse import urlsplit
 
# get the HTTP/S status of a webpage
async def get_status(url):
    # split the url into components
    url_parsed = urlsplit(url)
    # open the connection
    if url_parsed.scheme == 'https':
        reader, writer = await asyncio.open_connection(url_parsed.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(url_parsed.hostname, 80)
    # send GET request
    query = f'GET {url_parsed.path} HTTP/1.1\r\nHost: {url_parsed.hostname}\r\n\r\n'
    # write query to socket
    writer.write(query.encode())
    # wait for the bytes to be written to the socket
    await writer.drain()
    # read the single line response
    response = await reader.readline()
    # close the connection
    writer.close()
    # decode and strip white space
    status = response.decode().strip()
    # return the response
    return status
 
# main coroutine
async def main():
    # list of top 10 websites to check
    sites = ['https://www.google.com/',
        'https://www.youtube.com/',
        'https://www.facebook.com/',
        'https://twitter.com/',
        'https://www.instagram.com/',
        'https://www.baidu.com/',
        'https://www.wikipedia.org/',
        'https://yandex.ru/',
        'https://yahoo.com/',
        'https://www.whatsapp.com/'
        ]
    # create all coroutine requests
    coros = [get_status(url) for url in sites]
    # execute all coroutines and wait
    results = await asyncio.gather(*coros)
    # process all results
    for url, status in zip(sites, results):
        # report status
        print(f'{url:30}:\t{status}')
 
# run the asyncio program
asyncio.run(main())

运行该示例会像以前一样执行 main() 协程。在这种情况下,协程列表是在列表理解中创建的。

然后调用 asyncio.gather() 函数,传递协程并挂起 main() 协程,直到它们全部完成。协程执行,同时查询每个网站并返回它们的状态。

main() 协程恢复并接收可迭代的状态值。然后使用 zip() 内置函数遍历此可迭代对象和 URL 列表,并报告状态。

这突出了一种更简单的方法来同时执行协程并在所有任务完成后报告结果。它也比上面的顺序版本更快,在我的系统上完成大约 1.4 秒。

https://www.google.com/       :    HTTP/1.1 200 OK
https://www.youtube.com/      :    HTTP/1.1 200 OK
https://www.facebook.com/     :    HTTP/1.1 302 Found
https://twitter.com/          :    HTTP/1.1 200 OK
https://www.instagram.com/    :    HTTP/1.1 200 OK
https://www.baidu.com/        :    HTTP/1.1 200 OK
https://www.wikipedia.org/    :    HTTP/1.1 200 OK
https://yandex.ru/            :    HTTP/1.1 302 Moved temporarily
https://yahoo.com/            :    HTTP/1.1 301 Moved Permanently
https://www.whatsapp.com/     :    HTTP/1.1 302 Found

Das obige ist der detaillierte Inhalt vonSo implementieren Sie die Website-Statusprüfung mit Python Asyncio. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen