Maison >développement back-end >Tutoriel Python >Un pipeline de données pour des millions de films et des millions de liens de streaming
Février 2023 : Je voulais voir toutes les partitions des films, des émissions de télévision et où les diffuser sur une seule page, mais je n'ai pas trouvé d'agrégateur incluant toutes les sources pertinentes pour moi.
Mars 2023 : J'ai donc construit un MVP qui récupérait les scores à la volée et mettait le site en ligne. Cela a fonctionné, mais était lent (10 secondes pour afficher les scores).
Octobre 2023 : Réalisant que stocker des données de mon côté est une nécessité, j'ai découvert windmill.dev. Il éclipse facilement les moteurs d'orchestration similaires - du moins pour mes besoins.
Avance rapide jusqu'à aujourd'hui et après 12 mois de collecte continue de données, je souhaite partager en détail le fonctionnement du pipeline. Vous apprendrez à créer un système complexe qui récupère des données provenant de nombreuses sources différentes, les normalise et les combine dans un format optimisé pour les requêtes.
Voici la vue Exécutions. Chaque point représente une exécution de flux. Un flux peut être n'importe quoi, par exemple un simple script en une étape :
Le bloc au centre contient un script comme celui-ci (simplifié) :
def main(): return tmdb_extract_daily_dump_data() def tmdb_extract_daily_dump_data(): print("Checking TMDB for latest daily dumps") init_mongodb() daily_dump_infos = get_daily_dump_infos() for daily_dump_info in daily_dump_infos: download_zip_and_store_in_db(daily_dump_info) close_mongodb() return [info.to_mongo() for info in daily_dump_infos] [...]
La bête suivante est également un flux (rappelez-vous, ce n'est qu'un des points verts) :
(image plus haute résolution : https://i.imgur.com/LGhTUGG.png)
Décomposons celui-ci :
Chacune de ces étapes est plus ou moins complexe et implique l'utilisation de processus asynchrones.
Pour déterminer les titres à choisir ensuite, deux voies sont traitées en parallèle. C’est un autre domaine dans lequel Windmill brille. La parallélisation et l'orchestration fonctionnent parfaitement avec leur architecture.
Les deux voies pour sélectionner l'élément suivant sont :
Tout d'abord, les titres auxquels aucune donnée n'est jointe seront sélectionnés pour chaque source de données. Cela signifie que si le pipeline Metacritic contient un film qui n'a pas encore été récupéré, il sera ensuite sélectionné. Cela garantit que chaque titre a été traité au moins une fois, y compris les nouveaux.
Une fois que chaque titre a des données jointes, le pipeline sélectionne ceux avec les données les moins récentes.
Voici un exemple d'une telle exécution de flux, ici avec une erreur car la limite de débit a été atteinte :
Windmill vous permet de définir facilement des tentatives pour chaque étape du flux. Dans ce cas, la logique est de réessayer trois fois en cas d'erreurs. À moins que la limite de débit n'ait été atteinte (ce qui est généralement un code d'état ou un message d'erreur différent), nous nous arrêtons immédiatement.
Ce qui précède fonctionne, mais présente un problème sérieux : les versions récentes ne sont pas mises à jour suffisamment à temps. Cela peut prendre des semaines, voire des mois, pour que tous les aspects des données soient récupérés avec succès. Par exemple, il peut arriver qu'un film ait une partition IMDb récente, mais que les autres partitions soient obsolètes et que les liens de streaming manquent complètement. Surtout pour les scores et la disponibilité du streaming, je voulais obtenir une bien meilleure précision.
Pour résoudre ce problème, la deuxième voie se concentre sur une stratégie de priorisation différente : Les films/émissions les plus populaires et les plus tendances sont sélectionnés pour une actualisation complète des données dans toutes les sources de données. J'ai déjà montré ce flux, c'est celui que j'ai appelé bête plus tôt.
Les titres qui sont affichés plus souvent sur l'application bénéficient également d'un boost de priorité. Cela signifie que chaque fois qu'un film ou une émission apparaît dans les premiers résultats de recherche ou lorsque leur vue détaillée est ouverte, ils seront probablement actualisés bientôt.
Chaque titre ne peut être actualisé qu'une fois par semaine en utilisant la voie prioritaire pour garantir que nous ne récupérons pas de données qui n'ont probablement pas changé entre-temps.
Vous pourriez vous demander : le scraping est-il légal ? Le fait de récupérer les données est normalement acceptable. Ce que vous faites avec les données doit cependant être soigneusement étudié. Dès que vous tirez profit d'un service qui utilise des données récupérées, vous enfreignez probablement leurs termes et conditions. (voir Le paysage juridique du web scraping et le « scraping » n'est qu'un accès automatisé, et tout le monde le fait )
Le scraping et les lois connexes sont nouveaux et souvent non testés et il existe de nombreuses zones grises juridiques. Je suis déterminé à citer chaque source en conséquence, à respecter les limites de tarifs et à éviter les demandes inutiles afin de minimiser l'impact sur leurs services.
Le fait est que les données ne seront pas utilisées à des fins lucratives. GoodWatch sera gratuit pour tout le monde pour toujours.
Windmill utilise des Workers pour répartir l'exécution du code sur plusieurs processus. Chaque étape d'un flux est envoyée à un travailleur, ce qui le rend indépendant de la logique métier réelle. Seule l'application principale orchestre les tâches, tandis que les travailleurs ne reçoivent que les données d'entrée, le code à exécuter et renvoient le résultat.
C'est une architecture efficace qui évolue bien. Actuellement, 12 ouvriers se partagent le travail. Ils sont tous hébergés sur Hetzner.
Chaque travailleur a une consommation maximale de ressources de 1 vCPU et 2 Go de RAM. Voici un aperçu :
Windmill offre une expérience d'édition de type IDE dans le navigateur avec linting, formatage automatique, un assistant IA et même édition collaborative (la dernière est une fonctionnalité payante). La meilleure chose est ce bouton :
Cela me permet d'itérer et de tester rapidement des scripts avant de les déployer. J'édite et teste généralement les fichiers dans le navigateur et je les envoie vers git lorsque j'ai terminé.
La seule chose qui manque pour un environnement de codage optimal sont les outils de débogage (points d'arrêt et contexte variable). Actuellement, je débogue des scripts dans mon IDE local pour surmonter cette faiblesse.
Moi aussi !
Actuellement, GoodWatch nécessite environ 100 Go de stockage de données persistant :
Chaque jour 6 500 flux transitent par le moteur d'orchestration de Windmill. Cela se traduit par un volume quotidien de :
Ces chiffres sont fondamentalement différents en raison de politiques de limitation de taux différentes.
Une fois par jour, les données sont nettoyées et combinées dans le format de données final. Actuellement, la base de données qui alimente les magasins d'applications Web GoodWatch :
Imaginez que vous ne puissiez distinguer les films que par leur genre, ce qui est extrêmement limitatif, n'est-ce pas ?
C'est pourquoi j'ai lancé le projet ADN. Il permet de classer les films et les émissions selon d'autres attributs tels que Mood, Plot Elements, Types de caractères, Dialog ou Key Props. .
Voici le top 10 de toutes les valeurs ADN pour tous les éléments :
Cela permet deux choses :
Exemples :
Il y aura un article de blog dédié sur l'ADN avec beaucoup plus de détails à l'avenir.
Pour bien comprendre le fonctionnement du pipeline de données, voici un aperçu de ce qui se passe pour chaque source de données :
Pour chaque source de données, il existe un flux ìnit qui prépare une collection MongoDB avec toutes les données requises. Pour IMDb, c'est juste l'imdb_id. Pour Rotten Tomatoes, le titre et release_year sont requis. En effet, l'ID est inconnu et nous devons deviner l'URL correcte en fonction du nom.
Sur la base de la sélection de priorité expliquée ci-dessus, les éléments des collections préparées sont mis à jour avec les données récupérées. Chaque source de données possède sa propre collection qui devient de plus en plus complète au fil du temps.
Il existe un flux pour les films, un pour les émissions de télévision et un autre pour les liens de streaming. Ils collectent toutes les données nécessaires provenant de diverses collections et les stockent dans leurs tables Postgres respectives, qui sont ensuite interrogées par l'application Web.
Voici un extrait du flux et du script de la copie des films :
Certains de ces flux sont longs à s'exécuter, parfois même plus de 6 heures. Cela peut être optimisé en signalant tous les éléments qui ont été mis à jour et en les copiant uniquement au lieu de traiter par lots l'ensemble des données. Un des nombreux éléments TODO sur ma liste ?
La planification est aussi simple que de définir des expressions cron pour chaque flux ou script qui doit être exécuté automatiquement :
Voici un extrait de tous les horaires définis pour GoodWatch :
Au total, il y a une cinquantaine de plannings définis.
Des données de qualité impliquent de grandes responsabilités. Beaucoup de choses peuvent mal tourner. Et c’est ce qui s’est produit.
Les premières versions de mes scripts prenaient du temps pour mettre à jour toutes les entrées d'une collection ou d'un tableau. C'est parce que j'ai inséré chaque élément individuellement. Cela entraîne beaucoup de frais généraux et ralentit considérablement le processus.
Une bien meilleure approche consiste à collecter les données à insérer et à regrouper les requêtes de base de données. Voici un exemple pour MongoDB :
def main(): return tmdb_extract_daily_dump_data() def tmdb_extract_daily_dump_data(): print("Checking TMDB for latest daily dumps") init_mongodb() daily_dump_infos = get_daily_dump_infos() for daily_dump_info in daily_dump_infos: download_zip_and_store_in_db(daily_dump_info) close_mongodb() return [info.to_mongo() for info in daily_dump_infos] [...]
Même avec le traitement par lots, certains scripts consommaient tellement de mémoire que les travailleurs plantaient. La solution consistait à affiner soigneusement la taille du lot pour chaque cas d’utilisation.
Certains lots peuvent être exécutés par étapes de 5 000, d'autres stockent beaucoup plus de données en mémoire et s'exécutent mieux avec 500.
Windmill a une fonctionnalité intéressante pour observer la mémoire pendant l'exécution d'un script :
Windmill est un atout majeur dans la boîte à outils de tout développeur pour automatiser les tâches. Cela a été un boost de productivité inestimable pour moi, me permettant de me concentrer sur la structure des flux et la logique métier tout en externalisant le gros du travail de l'orchestration des tâches, de la gestion des erreurs, des tentatives et de la mise en cache.
La gestion de gros volumes de données reste un défi, et l'optimisation du pipeline est un processus continu - mais je suis vraiment satisfait de la façon dont tout s'est déroulé jusqu'à présent.
Je le pensais. Laissez-moi simplement lier quelques ressources et nous avons terminé :
Saviez-vous que GoodWatch est open-source ? Vous pouvez consulter tous les scripts et définitions de flux dans ce référentiel : https://github.com/alp82/goodwatch-monorepo/tree/main/goodwatch-flows/windmill/f
Faites-moi savoir si vous avez des questions.
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!