


Detailed introduction to collecting station B live broadcast barrages based on asyncio asynchronous coroutine framework
This article shares with you an asynchronous coroutine based on asyncioframework Implement a simple design to collect live broadcast barrage collection system of Station B, and attach the source code. Friends in need can refer to the following
">
Preface
Although the title It is the entire site, but currently it only collects all-day barrages for level top 100 live broadcast rooms.
The barrage collection system is modified based on the previous Bilibili live broadcast Danmakuji Python version. For specific protocol analysis, please see the previous article.
The live barrage protocol is directly based on the TCP protocol, so it would be difficult for Station B to take countermeasures against behavior like mine. There should be technical means that I don't know about to detect malicious behavior like mine.
I have tried connecting 100 rooms at the same time and connecting a single room 100 times, and there is no problem. >150 will be closed.
Selection of live broadcast rooms
Now the barrage collection system is relatively simple in selecting live broadcast rooms, and directly selects the top 100 level.
This part will be modified in the future, and it will be changed to regularly go to http://live.bilibili.com/all to check the newly launched live broadcast rooms, and dynamically add tasks.
Asynchronous tasks and barrage storage
The collection system still uses the asyncio asynchronous coroutine framework. For each live broadcast room, the following method is used to add it to the loop.
danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq) task1 = asyncio.ensure_future(danmuji.connectServer()) task2 = asyncio.ensure_future(danmuji.HeartbeatLoop())
In fact, if you put the heartbeat task HeartbeatLoop into connectorServer to start, the code will look more elegant. But the reason for this is that I need to maintain a task list, which will be described later.
I spent some time choosing the barrage storage.
Database storage is a synchronous IO process. Insert will block the barrage collection task. Although there is an asynchronous interface like aiomysql, configuring the database is too troublesome. My idea is that this small system can be easily deployed.
In the end I chose to use the built-in sqlite3. However, sqlite3 cannot perform parallel operations, so a thread is opened for database storage alone. In another thread, 100 * 2 tasks collect all barrage and number of people information, and stuff them into queue commentq, numq. The storage thread wakes up every 10 seconds, writes the data in the queue into sqlite3, and clears the queue.
With the cooperation of multi-threading and asynchronous, network traffic is not blocked.
Possible connection failure scenario processing
The barrage protocol is directly based on TCP, and the bits are directly related to each other. Once the parsing error occurs, it is easy to throw Exception (Personally, although TCP is a reliable transmission, it is possible for the B station server itself to have errors). Therefore, it is necessary to design an automatic reconnection mechanism.
Mentioned in the asyncio documentation,
Done means either that a result / exception are available, or that the future was canceled.
Function Returns normally, throws an exception or is canceled, it will exit the current task. You can use done() to determine.
Each live broadcast room corresponds to two tasks. The parsing task is the easiest to fail, but it will not affect the heartbeat task, so the corresponding heartbeat task must be found and ended.
Use a dictionary to record the two tasks in each room when creating the task,
self.tasks[url] = [task1, task2]
in During the operation, a check is made every 10 seconds.
for url in self.tasks: item = self.tasks[url] task1 = item[0] task2 = item[1] if task1.done() == True or task2.done() == True: if task1.done() == False: task1.cancel() if task2.done() == False: task2.cancel() danmuji = bilibiliClient(url, self.lock, self.commentq, self.numq) task11 = asyncio.ensure_future(danmuji.connectServer()) task22 = asyncio.ensure_future(danmuji.HeartbeatLoop()) self.tasks[url] = [task11, task22]
Actually, I have only seen one task failure scenario. It was because the host's room was blocked, making it impossible to enter the live broadcast room.
Conclusion
The number of people at Station B is calculated based on the number of links connecting to the barrage server. By manipulating the number of links, you can instantly increase the number of viewers. Is there any business opportunity?
In the past few days of operation, I found that even if most rooms are not live broadcasting, there can still be >5 people, including early morning. I can only guess that there are people like me collecting barrages 24 hours a day.
top100 average 40M barrage data per day.
What can you do with the collected barrages? I haven’t thought about it yet, maybe I can use it for user behavior analysis -_^
The above is the detailed content of Detailed introduction to collecting station B live broadcast barrages based on asyncio asynchronous coroutine framework. For more information, please follow other related articles on the PHP Chinese website!

SlicingaPythonlistisdoneusingthesyntaxlist[start:stop:step].Here'showitworks:1)Startistheindexofthefirstelementtoinclude.2)Stopistheindexofthefirstelementtoexclude.3)Stepistheincrementbetweenelements.It'susefulforextractingportionsoflistsandcanuseneg

NumPyallowsforvariousoperationsonarrays:1)Basicarithmeticlikeaddition,subtraction,multiplication,anddivision;2)Advancedoperationssuchasmatrixmultiplication;3)Element-wiseoperationswithoutexplicitloops;4)Arrayindexingandslicingfordatamanipulation;5)Ag

ArraysinPython,particularlythroughNumPyandPandas,areessentialfordataanalysis,offeringspeedandefficiency.1)NumPyarraysenableefficienthandlingoflargedatasetsandcomplexoperationslikemovingaverages.2)PandasextendsNumPy'scapabilitieswithDataFramesforstruc

ListsandNumPyarraysinPythonhavedifferentmemoryfootprints:listsaremoreflexiblebutlessmemory-efficient,whileNumPyarraysareoptimizedfornumericaldata.1)Listsstorereferencestoobjects,withoverheadaround64byteson64-bitsystems.2)NumPyarraysstoredatacontiguou

ToensurePythonscriptsbehavecorrectlyacrossdevelopment,staging,andproduction,usethesestrategies:1)Environmentvariablesforsimplesettings,2)Configurationfilesforcomplexsetups,and3)Dynamicloadingforadaptability.Eachmethodoffersuniquebenefitsandrequiresca

The basic syntax for Python list slicing is list[start:stop:step]. 1.start is the first element index included, 2.stop is the first element index excluded, and 3.step determines the step size between elements. Slices are not only used to extract data, but also to modify and invert lists.

Listsoutperformarraysin:1)dynamicsizingandfrequentinsertions/deletions,2)storingheterogeneousdata,and3)memoryefficiencyforsparsedata,butmayhaveslightperformancecostsincertainoperations.

ToconvertaPythonarraytoalist,usethelist()constructororageneratorexpression.1)Importthearraymoduleandcreateanarray.2)Uselist(arr)or[xforxinarr]toconvertittoalist,consideringperformanceandmemoryefficiencyforlargedatasets.


Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

SublimeText3 Chinese version
Chinese version, very easy to use

SAP NetWeaver Server Adapter for Eclipse
Integrate Eclipse with SAP NetWeaver application server.

SublimeText3 English version
Recommended: Win version, supports code prompts!

mPDF
mPDF is a PHP library that can generate PDF files from UTF-8 encoded HTML. The original author, Ian Back, wrote mPDF to output PDF files "on the fly" from his website and handle different languages. It is slower than original scripts like HTML2FPDF and produces larger files when using Unicode fonts, but supports CSS styles etc. and has a lot of enhancements. Supports almost all languages, including RTL (Arabic and Hebrew) and CJK (Chinese, Japanese and Korean). Supports nested block-level elements (such as P, DIV),

SecLists
SecLists is the ultimate security tester's companion. It is a collection of various types of lists that are frequently used during security assessments, all in one place. SecLists helps make security testing more efficient and productive by conveniently providing all the lists a security tester might need. List types include usernames, passwords, URLs, fuzzing payloads, sensitive data patterns, web shells, and more. The tester can simply pull this repository onto a new test machine and he will have access to every type of list he needs.
