Maison  >  Article  >  développement back-end  >  Introduction détaillée à la collecte des barrages de diffusion en direct de la station B basée sur le cadre de coroutine asynchrone asyncio

Introduction détaillée à la collecte des barrages de diffusion en direct de la station B basée sur le cadre de coroutine asynchrone asyncio

高洛峰
高洛峰original
2017-03-28 15:32:021987parcourir

Cet article partage avec vous un framework basé sur la coroutine asynchrone asyncio Implémenter un simple conçu pour collecter le système de collecte de barrages de diffusion en direct de la station B et joindre le code source. Les amis dans le besoin peuvent s'y référer

">

Avant-propos

Bien que le titre. Il s'agit de l'intégralité du site, mais actuellement, il ne collecte que des barrages d'une journée entière pour les salles de diffusion en direct de niveau

top 100.

Le système de collecte de barrage est modifié en fonction de la précédente version de diffusion en direct Danmakuji

Python de Bilibili. Pour une analyse de protocole spécifique, veuillez consulter l’article précédent.

Le protocole de barrage en direct est directement basé sur le protocole TCP, il serait donc plus difficile pour la Station B de prendre des contre-mesures contre un comportement comme le mien. Il devrait y avoir des moyens techniques que je ne connais pas pour détecter un comportement malveillant comme le mien.

J'ai essayé de connecter 100 pièces en même temps et de connecter une seule pièce 100 fois, et il n'y a aucun problème. >150 sera fermé.

Sélection des salles de diffusion en direct

Désormais, le système de collecte de barrage est relativement simple dans la sélection des salles de diffusion en direct, en sélectionnant directement le niveau 100 supérieur.

Cette partie sera modifiée à l'avenir, et elle sera modifiée pour aller régulièrement sur http://live.bilibili.com/all pour vérifier les salles de diffusion en direct nouvellement lancées et ajouter dynamiquement des tâches.

Tâches asynchrones et stockage de barrage

Le système de collecte utilise toujours le framework de coroutine asynchrone asyncio Pour chaque salle de diffusion en direct, la méthode suivante est utilisée pour l'ajouter au. boucle.

danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
task1 = asyncio.ensure_future(danmuji.connectServer())
task2 = asyncio.ensure_future(danmuji.HeartbeatLoop())
En fait, si vous placez la tâche Heartbeat HeartbeatLoop dans ConnectorServer pour démarrer, le code sera plus élégant. Mais la raison en est que je dois maintenir une liste de tâches, qui sera décrite plus tard.

J'ai passé du temps à choisir le stockage du barrage.

Le stockage de la base de données est un processus d'E/S synchrone. L'insertion bloquera la tâche de collecte de barrage. Bien qu'il existe une

interface asynchrone telle que aiomysql, la configuration de la base de données est trop compliquée. Mon hypothèse est que ce petit système peut être facilement déployé.

En fin de compte, j'ai choisi d'utiliser le sqlite3 intégré. Cependant, sqlite3 ne peut pas effectuer d'opérations parallèles, donc un thread est ouvert uniquement pour le stockage de la base de données. Dans un autre fil de discussion, 100 * 2 tâches collectent toutes les informations sur le barrage et le nombre de personnes, et les mettent dans la

file d'attente commentq, numq. Le thread de stockage se réveille toutes les 10 secondes, écrit les données de la file d'attente dans sqlite3 et efface la file d'attente.

Grâce à la coopération du multi-threading et de l'asynchrone, le trafic réseau n'est pas bloqué.

Traitement du scénario d'échec de connexion possible

Le protocole de barrage est directement basé sur TCP, et les bits sont directement liés les uns aux autres. Une fois l'erreur d'analyse se produit, il est facile de lancer une

Exception. (Personnellement, bien que TCP soit une transmission fiable, il est possible que le serveur de la station B lui-même ait des erreurs). Il est donc nécessaire de concevoir un mécanisme de reconnexion automatique.

Comme mentionné dans la documentation asyncio,

Terminé signifie soit qu'un résultat/exception est disponible, soit que le futur a été annulé.

Si la fonction revient normalement, lève une exception ou est annulée, elle quittera la tâche en cours. Vous pouvez utiliser done() pour déterminer.

Chaque salle de diffusion en direct correspond à deux tâches. La tâche d'analyse est la plus simple à échouer, mais elle n'affectera pas la tâche de battement de cœur, vous devez donc découvrir et terminer la tâche de battement de cœur correspondante.

Utilisez un dictionnaire pour enregistrer les deux tâches dans chaque pièce lors de la création de la tâche,

self.tasks[url] = [task1, task2]

Pendant le processus en cours, effectuez une vérification toutes les 10 secondes,

for url in self.tasks:
  item = self.tasks[url]
  task1 = item[0]
  task2 = item[1]
  if task1.done() == True or task2.done() == True:
    if task1.done() == False:
      task1.cancel()
    if task2.done() == False:
      task2.cancel()
    danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
    task11 = asyncio.ensure_future(danmuji.connectServer())
    task22 = asyncio.ensure_future(danmuji.HeartbeatLoop())
    self.tasks[url] = [task11, task22]
En fait, je n'ai vu qu'un seul échec de mission. C'était parce que la salle de l'hôte était bloquée, rendant impossible l'entrée dans la salle de diffusion en direct.

Conclusion

  1. Le nombre de personnes à la Station B est calculé en fonction du nombre de liens se connectant au serveur de barrage. En manipulant le nombre de liens, vous pouvez augmenter instantanément le nombre de téléspectateurs. Y a-t-il des opportunités commerciales ?

  2. Au cours des derniers jours d'activité, j'ai constaté que même si la plupart des salles ne diffusent pas en direct, il peut quand même y avoir >5 personnes, y compris tôt le matin. Je ne peux que deviner qu'il y a des gens comme moi qui collectent des barrages 24 heures sur 24.

  3. Top100 en moyenne 40 millions de données de barrage par jour.

  4. Que pouvez-vous faire avec les barrages collectés ? Je n'y ai pas encore pensé, je pourrais peut-être l'utiliser pour l'analyse du comportement des utilisateurs -_^

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn