ホームページ  >  記事  >  バックエンド開発  >  asyncio 非同期コルーチン フレームワークに基づくステーション B ライブ ブロードキャストの集中砲火の収集の詳細な紹介

asyncio 非同期コルーチン フレームワークに基づくステーション B ライブ ブロードキャストの集中砲火の収集の詳細な紹介

高洛峰
高洛峰オリジナル
2017-03-28 15:32:022076ブラウズ

この記事で共有する内容は、ライブ コレクションを実装するための asyncio 非同期コルーチンフレームワークに基づいています。 B局の集中砲火を放送 システムはシンプルな設計で、ソースコードが添付されているので、困っている友達が参照できます

」>

はじめに

タイトルはサイト全体ですが、現在はトップのレベルのみです生放送ルームカーテンコレクションの丸一日100弾。

弾幕収集システムは、ステーション B の生放送弾幕の以前の Python バージョンに基づいて修正されています。特定のプロトコル分析については、前の記事を参照してください。

ライブ弾幕プロトコルは TCP プロトコルを直接ベースにしているため、ステーション B が私のような動作に対して対策を講じることは困難です。私のような悪意のある行為を検出するには、私が知らない技術的な手段があるはずです。

100部屋を同時に接続する実験と、1つの部屋を100回接続する実験をしてみましたが、問題ありませんでした。 >150は締め切りとなります。

ライブブロードキャストルームの選択

現在、弾幕収集システムはライブブロードキャストルームの選択において比較的単純であり、上位100レベルを直接選択します。

この部分は将来修正され、定期的に http://live.bilibili.com/all にアクセスして、新たに開始されたライブ ブロードキャスト ルームを確認し、タスクを動的に追加するように変更される予定です。

非同期タスクと弾幕ストレージ

収集システムは引き続き asyncio 非同期コルーチン フレームワークを使用し、ライブ ブロードキャスト ルームごとに次のメソッドを使用してループに追加します。

danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
task1 = asyncio.ensure_future(danmuji.connectServer())
task2 = asyncio.ensure_future(danmuji.HeartbeatLoop())

実際、ハートビート タスク HeartbeatLoop を ConnectorServer に入れて開始すると、コードはよりエレガントに見えます。しかし、その理由は、後述するタスク リストを維持する必要があるためです。

弾幕ストレージの選択に少し時間を費やしました。

データベース ストレージは同期 IO プロセスであり、コメントを収集するタスクをブロックします。 aiomysqlのような非同期インターフェースはありますが、データベースの設定が面倒すぎるため、この小規模なシステムは簡単に導入できると考えています。

最終的には、組み込みの sqlite3 を使用することにしました。ただし、sqlite3 は並列処理を実行できないため、データベース ストレージ専用のスレッドが開かれます。別スレッドでは、100 * 2 タスクですべての弾幕と人数情報を収集し、queue commentq、numq に詰め込みます。ストレージ スレッドは 10 秒ごとに起動し、キュー内のデータを sqlite3 に書き込み、キューをクリアします。

マルチスレッドと非同期の連携により、ネットワークトラフィックがブロックされません。

考えられる接続失敗シナリオの処理

弾幕プロトコルは TCP に直接基づいており、ビットは相互に直接関連しています。解析エラーが発生すると、簡単に Exception がスローされます (個人的には、TCP は信頼できるものですが)。送信、B ウェブサイトサーバー自体にエラーがある可能性もあります)。したがって、自動再接続機構を設計する必要があります。

asyncio のドキュメントには、

Done は結果/例外が利用可能であること、または現在のタスクがキャンセルされたことを意味すると記載されています。 Done() を使用して判断できます。

各ライブ ブロードキャスト ルームは 2 つのタスクに対応しています。分析タスクは最も失敗しやすいですが、ハートビート タスクには影響しないため、対応するハートビート タスクを見つけて終了する必要があります。

タスクを作成するときに各部屋の 2 つのタスクを記録するために辞書を使用します

実行プロセス中、10 秒ごとにチェックが行われます

for url in self.tasks:
  item = self.tasks[url]
  task1 = item[0]
  task2 = item[1]
  if task1.done() == True or task2.done() == True:
    if task1.done() == False:
      task1.cancel()
    if task2.done() == False:
      task2.cancel()
    danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
    task11 = asyncio.ensure_future(danmuji.connectServer())
    task22 = asyncio.ensure_future(danmuji.HeartbeatLoop())
    self.tasks[url] = [task11, task22]

実際、私はタスクの失敗シナリオを一度だけ見たことがあります。 , なぜなら、ホストの部屋が封鎖されているため、生放送の部屋に入ることはできません。

self.tasks[url] = [task1, task2]結論

サイトBの人数は、弾幕サーバーに接続するリンクの数に基づいて計算されます。リンク数を操作することで、一気に閲覧者数を増やすことができるのですが、何かビジネスチャンスはあるのでしょうか?

過去数日間の運用で、ほとんどの部屋でライブブロードキャストが行われていない場合でも、早朝を含めて 5 人を超える人がいることがあることがわかりました。私のように24時間弾幕を集めている人がいるのは推測するしかありません。
  1. トップ 100 では、1 日あたり平均 4,000 万の集中砲火データがあります。

  2. 集めた弾幕で何ができますか?まだ考えていませんが、ユーザーの行動分析に使用できるかもしれません -_^

以上がasyncio 非同期コルーチン フレームワークに基づくステーション B ライブ ブロードキャストの集中砲火の収集の詳細な紹介の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。