Home  >  Article  >  Backend Development  >  Detailed introduction to collecting station B live broadcast barrages based on asyncio asynchronous coroutine framework

Detailed introduction to collecting station B live broadcast barrages based on asyncio asynchronous coroutine framework

高洛峰
高洛峰Original
2017-03-28 15:32:021986browse

This article shares with you an asynchronous coroutine based on asyncioframework Implement a simple design to collect live broadcast barrage collection system of Station B, and attach the source code. Friends in need can refer to the following

">

Preface

Although the title It is the entire site, but currently it only collects all-day barrages for level top 100 live broadcast rooms.

The barrage collection system is modified based on the previous Bilibili live broadcast Danmakuji Python version. For specific protocol analysis, please see the previous article.

The live barrage protocol is directly based on the TCP protocol, so it would be difficult for Station B to take countermeasures against behavior like mine. There should be technical means that I don't know about to detect malicious behavior like mine.

I have tried connecting 100 rooms at the same time and connecting a single room 100 times, and there is no problem. >150 will be closed.

Selection of live broadcast rooms

Now the barrage collection system is relatively simple in selecting live broadcast rooms, and directly selects the top 100 level.

This part will be modified in the future, and it will be changed to regularly go to http://live.bilibili.com/all to check the newly launched live broadcast rooms, and dynamically add tasks.

Asynchronous tasks and barrage storage

The collection system still uses the asyncio asynchronous coroutine framework. For each live broadcast room, the following method is used to add it to the loop.

danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
task1 = asyncio.ensure_future(danmuji.connectServer())
task2 = asyncio.ensure_future(danmuji.HeartbeatLoop())

In fact, if you put the heartbeat task HeartbeatLoop into connectorServer to start, the code will look more elegant. But the reason for this is that I need to maintain a task list, which will be described later.

I spent some time choosing the barrage storage.

Database storage is a synchronous IO process. Insert will block the barrage collection task. Although there is an asynchronous interface like aiomysql, configuring the database is too troublesome. My idea is that this small system can be easily deployed.

In the end I chose to use the built-in sqlite3. However, sqlite3 cannot perform parallel operations, so a thread is opened for database storage alone. In another thread, 100 * 2 tasks collect all barrage and number of people information, and stuff them into queue commentq, numq. The storage thread wakes up every 10 seconds, writes the data in the queue into sqlite3, and clears the queue.

With the cooperation of multi-threading and asynchronous, network traffic is not blocked.

Possible connection failure scenario processing

The barrage protocol is directly based on TCP, and the bits are directly related to each other. Once the parsing error occurs, it is easy to throw Exception (Personally, although TCP is a reliable transmission, it is possible for the B station server itself to have errors). Therefore, it is necessary to design an automatic reconnection mechanism.

Mentioned in the asyncio documentation,

Done means either that a result / exception are available, or that the future was canceled.

Function Returns normally, throws an exception or is canceled, it will exit the current task. You can use done() to determine.

Each live broadcast room corresponds to two tasks. The parsing task is the easiest to fail, but it will not affect the heartbeat task, so the corresponding heartbeat task must be found and ended.
Use a dictionary to record the two tasks in each room when creating the task,

self.tasks[url] = [task1, task2]

in During the operation, a check is made every 10 seconds.

for url in self.tasks:
  item = self.tasks[url]
  task1 = item[0]
  task2 = item[1]
  if task1.done() == True or task2.done() == True:
    if task1.done() == False:
      task1.cancel()
    if task2.done() == False:
      task2.cancel()
    danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq)
    task11 = asyncio.ensure_future(danmuji.connectServer())
    task22 = asyncio.ensure_future(danmuji.HeartbeatLoop())
    self.tasks[url] = [task11, task22]

Actually, I have only seen one task failure scenario. It was because the host's room was blocked, making it impossible to enter the live broadcast room.

Conclusion

  1. The number of people at Station B is calculated based on the number of links connecting to the barrage server. By manipulating the number of links, you can instantly increase the number of viewers. Is there any business opportunity?

  2. In the past few days of operation, I found that even if most rooms are not live broadcasting, there can still be >5 people, including early morning. I can only guess that there are people like me collecting barrages 24 hours a day.

  3. top100 average 40M barrage data per day.

  4. What can you do with the collected barrages? I haven’t thought about it yet, maybe I can use it for user behavior analysis -_^

The above is the detailed content of Detailed introduction to collecting station B live broadcast barrages based on asyncio asynchronous coroutine framework. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn