Heim > Artikel > Backend-Entwicklung > Detaillierte Einführung in das Sammeln von Live-Broadcast-Sperren auf Station B basierend auf dem asynchronen Coroutine-Framework Asyncio
Dieser Artikel stellt Ihnen ein Framework vor, das auf asynchroner asynchroner Coroutine basiert Implementieren Sie eine einfache Entwerfen Sie das Sperrfeuer-Sammelsystem für Live-Übertragungen von Station B und fügen Sie den Quellcode bei. Freunde in Not können darauf verweisen
">VorwortObwohl der Titel Es handelt sich um die gesamte Website, aber derzeit werden nur ganztägige Sperren für Live-Übertragungsräume der StufeTop 100 erfasst.
Das Sperrfeuer-Sammelsystem wurde basierend auf der vorherigen Live-Übertragung DanmakujiPython-Version von Bilibili geändert. Eine spezifische Protokollanalyse finden Sie im vorherigen Artikel.
Das Live-Sperrfeuerprotokoll basiert direkt auf dem TCP-Protokoll, daher wäre es für Station B schwierig, Gegenmaßnahmen gegen Verhalten wie meines zu ergreifen. Es sollte technische Mittel geben, von denen ich nichts weiß, um bösartiges Verhalten wie das meine zu erkennen. Ich habe versucht, 100 Räume gleichzeitig zu verbinden und 100 Mal eine Verbindung zu einem einzelnen Raum herzustellen, und es gab kein Problem. >150 wird geschlossen.Auswahl von Live-Übertragungsräumen
Jetzt ist das Sperrfeuer-Sammelsystem bei der Auswahl von Live-Übertragungsräumen relativ einfach und wählt direkt die Top-100-Ebene aus. Dieser Teil wird in Zukunft geändert und dahingehend geändert, dass Sie regelmäßig zu http://live.bilibili.com/all gehen, um die neu gestarteten Live-Übertragungsräume zu überprüfen und dynamisch Aufgaben hinzuzufügen.Asynchrone Aufgaben und Sperrspeicher
Das Erfassungssystem verwendet weiterhin das asynchrone Coroutine-Framework Asyncio. Für jeden Live-Übertragungsraum wird die folgende Methode verwendet, um ihn hinzuzufügen Schleife.danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq) task1 = asyncio.ensure_future(danmuji.connectServer()) task2 = asyncio.ensure_future(danmuji.HeartbeatLoop())Tatsächlich sieht der Code eleganter aus, wenn Sie die Heartbeat-Aufgabe HeartbeatLoop zum Starten in den ConnectorServer einfügen. Der Grund dafür ist jedoch, dass ich eine Aufgabenliste führen muss, die später beschrieben wird. Ich habe einige Zeit damit verbracht, den Sperrspeicher auszuwählen. Datenbankspeicherung ist ein synchroner E/A-Prozess, der die Sperrmüllsammelaufgabe blockiert. Obwohl es eine asynchrone
Schnittstelle wie aiomysql gibt, ist die Konfiguration der Datenbank zu mühsam. Ich gehe davon aus, dass dieses kleine System einfach bereitgestellt werden kann.
Am Ende habe ich mich für die Verwendung des integrierten SQLite3 entschieden. Allerdings kann sqlite3 keine parallelen Vorgänge ausführen, daher wird ein Thread nur für die Datenbankspeicherung geöffnet. In einem anderen Thread sammeln 100 * 2 Aufgaben das gesamte Sperrfeuer und die Anzahl der Personeninformationen und stopfen sie in die Durch die Zusammenarbeit von Multithreading und Asynchronität wird der Netzwerkverkehr nicht blockiert. Mögliche Verarbeitung von VerbindungsfehlerszenarienDas Sperrprotokoll basiert direkt auf TCP und die Bits stehen in direktem Zusammenhang miteinander. Sobald ein Analysefehler auftritt, kann er leicht ausgelöst werden Wie in der Asyncio-Dokumentation erwähnt,Fertig bedeutet entweder, dass ein Ergebnis/eine Ausnahme verfügbar ist oder dass die Zukunft abgebrochen wurde.
Wenn die Funktion normal zurückkehrt, eine Ausnahme auslöst oder abgebrochen wird, wird die aktuelle Aufgabe beendet. Zur Bestimmung können Sie done() verwenden.
Jeder Live-Übertragungsraum entspricht zwei Aufgaben. Die Analyseaufgabe ist am einfachsten zu scheitern, hat jedoch keinen Einfluss auf die Heartbeat-Aufgabe, daher müssen Sie die entsprechende Heartbeat-Aufgabe herausfinden und beenden.Verwenden Sie beim Erstellen der Aufgabe ein Wörterbuch, um die beiden Aufgaben in jedem Raum aufzuzeichnen,
self.tasks[url] = [task1, task2]
for url in self.tasks: item = self.tasks[url] task1 = item[0] task2 = item[1] if task1.done() == True or task2.done() == True: if task1.done() == False: task1.cancel() if task2.done() == False: task2.cancel() danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq) task11 = asyncio.ensure_future(danmuji.connectServer()) task22 = asyncio.ensure_future(danmuji.HeartbeatLoop()) self.tasks[url] = [task11, task22]Eigentlich habe ich nur einen Misserfolg einer Mission gesehen, weil das Zimmer des Moderators blockiert war und es unmöglich war, den Live-Übertragungsraum zu betreten.
Fazit
Top100 durchschnittlich 40 Mio. Sperrdaten pro Tag.
Was kann man mit den gesammelten Sperrfeuern machen? Ich habe noch nicht darüber nachgedacht, vielleicht kann ich es für die Analyse des Benutzerverhaltens verwenden -_^
Das obige ist der detaillierte Inhalt vonDetaillierte Einführung in das Sammeln von Live-Broadcast-Sperren auf Station B basierend auf dem asynchronen Coroutine-Framework Asyncio. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!