Maison >développement back-end >Tutoriel Python >Introduction détaillée à la collecte des barrages de diffusion en direct de la station B basée sur le cadre de coroutine asynchrone asyncio
Cet article partage avec vous un framework basé sur la coroutine asynchrone asyncio Implémenter un simple conçu pour collecter le système de collecte de barrages de diffusion en direct de la station B et joindre le code source. Les amis dans le besoin peuvent s'y référer
">Avant-proposBien que le titre. Il s'agit de l'intégralité du site, mais actuellement, il ne collecte que des barrages d'une journée entière pour les salles de diffusion en direct de niveautop 100.
Le système de collecte de barrage est modifié en fonction de la précédente version de diffusion en direct DanmakujiPython de Bilibili. Pour une analyse de protocole spécifique, veuillez consulter l’article précédent.
Le protocole de barrage en direct est directement basé sur le protocole TCP, il serait donc plus difficile pour la Station B de prendre des contre-mesures contre un comportement comme le mien. Il devrait y avoir des moyens techniques que je ne connais pas pour détecter un comportement malveillant comme le mien. J'ai essayé de connecter 100 pièces en même temps et de connecter une seule pièce 100 fois, et il n'y a aucun problème. >150 sera fermé.Sélection des salles de diffusion en direct
Désormais, le système de collecte de barrage est relativement simple dans la sélection des salles de diffusion en direct, en sélectionnant directement le niveau 100 supérieur. Cette partie sera modifiée à l'avenir, et elle sera modifiée pour aller régulièrement sur http://live.bilibili.com/all pour vérifier les salles de diffusion en direct nouvellement lancées et ajouter dynamiquement des tâches.Tâches asynchrones et stockage de barrage
Le système de collecte utilise toujours le framework de coroutine asynchrone asyncio Pour chaque salle de diffusion en direct, la méthode suivante est utilisée pour l'ajouter au. boucle.danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq) task1 = asyncio.ensure_future(danmuji.connectServer()) task2 = asyncio.ensure_future(danmuji.HeartbeatLoop())En fait, si vous placez la tâche Heartbeat HeartbeatLoop dans ConnectorServer pour démarrer, le code sera plus élégant. Mais la raison en est que je dois maintenir une liste de tâches, qui sera décrite plus tard. J'ai passé du temps à choisir le stockage du barrage. Le stockage de la base de données est un processus d'E/S synchrone. L'insertion bloquera la tâche de collecte de barrage. Bien qu'il existe une
interface asynchrone telle que aiomysql, la configuration de la base de données est trop compliquée. Mon hypothèse est que ce petit système peut être facilement déployé.
En fin de compte, j'ai choisi d'utiliser le sqlite3 intégré. Cependant, sqlite3 ne peut pas effectuer d'opérations parallèles, donc un thread est ouvert uniquement pour le stockage de la base de données. Dans un autre fil de discussion, 100 * 2 tâches collectent toutes les informations sur le barrage et le nombre de personnes, et les mettent dans la Grâce à la coopération du multi-threading et de l'asynchrone, le trafic réseau n'est pas bloqué. Traitement du scénario d'échec de connexion possibleLe protocole de barrage est directement basé sur TCP, et les bits sont directement liés les uns aux autres. Une fois l'erreur d'analyse se produit, il est facile de lancer une Comme mentionné dans la documentation asyncio,Terminé signifie soit qu'un résultat/exception est disponible, soit que le futur a été annulé.
Si la fonction revient normalement, lève une exception ou est annulée, elle quittera la tâche en cours. Vous pouvez utiliser done() pour déterminer.
Chaque salle de diffusion en direct correspond à deux tâches. La tâche d'analyse est la plus simple à échouer, mais elle n'affectera pas la tâche de battement de cœur, vous devez donc découvrir et terminer la tâche de battement de cœur correspondante.Utilisez un dictionnaire pour enregistrer les deux tâches dans chaque pièce lors de la création de la tâche,
self.tasks[url] = [task1, task2]
for url in self.tasks: item = self.tasks[url] task1 = item[0] task2 = item[1] if task1.done() == True or task2.done() == True: if task1.done() == False: task1.cancel() if task2.done() == False: task2.cancel() danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq) task11 = asyncio.ensure_future(danmuji.connectServer()) task22 = asyncio.ensure_future(danmuji.HeartbeatLoop()) self.tasks[url] = [task11, task22]En fait, je n'ai vu qu'un seul échec de mission. C'était parce que la salle de l'hôte était bloquée, rendant impossible l'entrée dans la salle de diffusion en direct.
Conclusion
Top100 en moyenne 40 millions de données de barrage par jour.
Que pouvez-vous faire avec les barrages collectés ? Je n'y ai pas encore pensé, je pourrais peut-être l'utiliser pour l'analyse du comportement des utilisateurs -_^
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!