Home >Backend Development >Python Tutorial >A Data Pipeline for illion movies and million streaming links
Feb 2023: I wanted to see all scores for movies tv shows and where to stream them on one page but couldn't find an aggregator that included all sources that were relevant for me.
Mar 2023: So, I built an MVP that grabbed scores on the fly and put the site online. It worked, but was slow (10 seconds to display scores).
Oct 2023: Realizing that storing data on my side is a necessity, I discovered windmill.dev. It eclipses similar orchestration engines easily - at least for my needs.
Fast forward to today and after 12 months of continuous data munching, I want to share how the pipeline works in detail. You'll learn how to build a complex system that grabs data from many different sources, normalizes data and combines it into an optimized format for querying.
This is the Runs view. Every dot represents a flow run. A flow can be anything, for example a simple one-step script:
The block in the center contains a script like this (simplified):
def main(): return tmdb_extract_daily_dump_data() def tmdb_extract_daily_dump_data(): print("Checking TMDB for latest daily dumps") init_mongodb() daily_dump_infos = get_daily_dump_infos() for daily_dump_info in daily_dump_infos: download_zip_and_store_in_db(daily_dump_info) close_mongodb() return [info.to_mongo() for info in daily_dump_infos] [...]
The following beast is also a flow (remember, this is only one of the green dots):
(higher resolution image: https://i.imgur.com/LGhTUGG.png)
Let's break this one down:
Each of those steps are more or less complex and involve using async processes.
To determine which titles to pick next there are two lanes that are processed in parallel. This is another area where Windmill shines. Parallelization and orchestration works flawlessly with their architecture.
The two lanes to pick the next item are:
First of all, titles that don't have any data attached will be selected for each data source. That means if the Metacritic pipeline has a movie that wasn't scraped yet, it will be selected next. This makes sure that every title was processed at least once, including new ones.
Once every title has attached data, the pipeline selects those with the least recent data.
Here is example of such a flow run, here with an error because the rate limit was hit:
Windmill allows you to define retries for each step in the flow easily. In this case, the logic is to retry three times in case of errors. Unless the rate limit was hit (which is usually a different status code or error message), then we stop immediately.
The above works, but has a serious issue: recent releases are not updated timely enough. It can take weeks or even months until every data aspect has been successfully fetched. For example, it can happen that a movie has a recent IMDb score, but the other scores are outdated and the streaming links are missing completely. Especially for scores and streaming availability I wanted to achieve a much better accuracy.
To solve this problem, the second lane focuses on a different prioritization strategy: The most popular and trending movies/shows are selected for a complete data refresh across all data sources. I showed this flow before, it's the one I referred to as beast earlier.
Titles that are shown more often on the app get a priority boost as well. That means that every time a movie or show is coming up in the top search results or when their details view is opened, they will likely be refreshed soon.
Every title can only be refreshed once per week using the priority lane to ensure that we don't fetch data that likely hasn't changed in the meantime.
You might ask: Is scraping legal? The act of grabbing the data is normally fine. What you do with the data needs careful consideration though. As soon as you make profit from a service that uses scraped data, you are probably violating their terms and conditions. (see The Legal Landscape of Web Scraping and ‘Scraping’ Is Just Automated Access, and Everyone Does It)
Scraping and related laws are new and often untested and there is a lot of legal gray area. I'm determined to cite every source accordingly, respect rate limits and avoid unnecessary requests to minimize impact on their services.
Fact is, the data will not be used to make profit. GoodWatch will be free to use for everyone forever.
Windmill uses workers to distribute code execution across multiple processes. Each step in a flow is sent to a worker, which makes them independent from actual business logic. Only the main app orchestrates the jobs, whereas workers only receive input data, code to execute and return the result.
It's an efficient architecture that scales nicely. Currently, there are 12 workers splitting the work. They're all hosted on Hetzner.
Each worker has a maximum resource consumption of 1 vCPU and 2 GB of RAM. Here is an overview:
Windmill offers an in-browser IDE-like editor experience with linting, auto-formatting, an AI assistant and even collaborative editing (last one is a paid feature). The best thing is this button though:
It allows me to quickly iterate and test scripts before deploying them. I usually edit and test files in the browser and push them to git when I'm finished.
Only thing that's missing for an optimal coding environment are debugging tools (breakpoints & variable context). Currently, I'm debugging scripts in my local IDE to overcome this weakness.
Me too!
Currently GoodWatch requires around 100 GB of persistent data storage:
Every day 6.500 flows run through Windmill's orchestration engine. This results in a daily volume of:
These numbers are fundamentally different because of different rate limit policies.
Once per day, data is cleaned up and combined into the final data format. Currently the database that powers the GoodWatch webapp stores:
Imagine you could only distinguish movies by their genre, extremely limiting right?
That's why I started the DNA project. It allows categorizing movies and shows by other attributes like Mood, Plot Elements, Character Types, Dialog or Key Props.
Here are the top 10 of all DNA values over all items:
It allows two things:
Examples:
There will be dedicated blog post about the DNA with many more details in the future.
To fully understand how the data pipeline works, here is a breakdown what happens for each data source:
For each data source there is an ìnit flow that prepares a MongoDB collection with all required data. For IMDb, that's just the imdb_id. For Rotten Tomatoes, the title and release_year are required. That's because the ID is unknown and we need to guess the correct URL based on the name.
Based on the priority selection explained above, items in the prepared collections are updated with the data that is fetched. Each data source has their own collection which gets more and more complete over time.
There is a flow for movies, one for tv shows and another one for streaming links. They collect all necessary data from various collections and store them in their respective Postgres tables, which are then queried by the web application.
Here is an excerpt of the copy movies flow and script:
Some of these flows take a long time to execute, sometimes even longer than 6 hours. This can be optimized by flagging all items that were updated and only copying those instead of batch processing the whole data set. One of many TODO items on my list ?
Scheduling is as easy as defining cron expressions for each flow or script that needs to be executed automatically:
Here is an excerpt of all schedules that are defined for GoodWatch:
In total there are around 50 schedules defined.
With great data comes great responsibility. Lots can go wrong. And it did.
Early versions of my scripts were taking ages to update all entries in a collection or table. That was because I upserted every item individually. That causes a lot of overhead and slows down the process significantly.
A much better approach is to collect data to be upserted and batch the database queries. Here is an example for MongoDB:
def main(): return tmdb_extract_daily_dump_data() def tmdb_extract_daily_dump_data(): print("Checking TMDB for latest daily dumps") init_mongodb() daily_dump_infos = get_daily_dump_infos() for daily_dump_info in daily_dump_infos: download_zip_and_store_in_db(daily_dump_info) close_mongodb() return [info.to_mongo() for info in daily_dump_infos] [...]
Even with batch processing, some scripts consumed so much memory that the workers crashed. The solution was to carefully fine-tune the batch size for every use case.
Some batches are fine to run in steps of 5000, others store much more data in memory and run better with 500.
Windmill has a great feature to observe the memory while a script is running:
Windmill is a great asset in any developer's toolkit for automating tasks. It's been an invaluable productivity booster for me, allowing me to focus on the flow structure and business logic while outsourcing the heavy lifting of task orchestration, error handling, retries and caching.
Handling large volumes of data is still challenging, and optimizing the pipeline is an ongoing process - but I'm really happy with how everything has turned out so far.
Thought so. Just let me link a few resources and we're finished:
Did you know that GoodWatch is open-source? You can take a look at all scripts and flow definitions in this repository: https://github.com/alp82/goodwatch-monorepo/tree/main/goodwatch-flows/windmill/f
Let me know if you have any questions.
The above is the detailed content of A Data Pipeline for illion movies and million streaming links. For more information, please follow other related articles on the PHP Chinese website!