Maison >développement back-end >Tutoriel Python >Un pipeline de données pour des millions de films et des millions de liens de streaming

Un pipeline de données pour des millions de films et des millions de liens de streaming

Patricia Arquette
Patricia Arquetteoriginal
2024-12-27 15:02:10990parcourir

Février 2023 : Je voulais voir toutes les partitions des films, des émissions de télévision et où les diffuser sur une seule page, mais je n'ai pas trouvé d'agrégateur incluant toutes les sources pertinentes pour moi.

Mars 2023 : J'ai donc construit un MVP qui récupérait les scores à la volée et mettait le site en ligne. Cela a fonctionné, mais était lent (10 secondes pour afficher les scores).

Octobre 2023 : Réalisant que stocker des données de mon côté est une nécessité, j'ai découvert windmill.dev. Il éclipse facilement les moteurs d'orchestration similaires - du moins pour mes besoins.


Avance rapide jusqu'à aujourd'hui et après 12 mois de collecte continue de données, je souhaite partager en détail le fonctionnement du pipeline. Vous apprendrez à créer un système complexe qui récupère des données provenant de nombreuses sources différentes, les normalise et les combine dans un format optimisé pour les requêtes.

Des photos ou ça ne s'est pas produit !

A Data Pipeline for illion movies and million streaming links

Voici la vue Exécutions. Chaque point représente une exécution de flux. Un flux peut être n'importe quoi, par exemple un simple script en une étape :

A Data Pipeline for illion movies and million streaming links

Le bloc au centre contient un script comme celui-ci (simplifié) :

def main():
    return tmdb_extract_daily_dump_data()

def tmdb_extract_daily_dump_data():
    print("Checking TMDB for latest daily dumps")
    init_mongodb()

    daily_dump_infos = get_daily_dump_infos()
    for daily_dump_info in daily_dump_infos:
        download_zip_and_store_in_db(daily_dump_info)

    close_mongodb()
    return [info.to_mongo() for info in daily_dump_infos]

[...]

La bête suivante est également un flux (rappelez-vous, ce n'est qu'un des points verts) :

A Data Pipeline for illion movies and million streaming links

(image plus haute résolution : https://i.imgur.com/LGhTUGG.png)

Décomposons celui-ci :

  1. Obtenez le prochain film ou émission de télévision prioritaire (voir la section suivante)
  2. Obtenez des données à jour de TMDB
  3. Scrape IMDb, Metacritic et Rotten Tomatoes pour les scores actuels
  4. Grattez Tropes TV pour... tropes
  5. API Huggingface pour collecter des données ADN (expliquera ci-dessous)
  6. Stocker des vecteurs de haute dimension pour les données ADN
  7. Stockez les données relationnelles pour les films, les émissions et les liens de streaming

Chacune de ces étapes est plus ou moins complexe et implique l'utilisation de processus asynchrones.

Par où commencer ? File d'attente prioritaire

Pour déterminer les titres à choisir ensuite, deux voies sont traitées en parallèle. C’est un autre domaine dans lequel Windmill brille. La parallélisation et l'orchestration fonctionnent parfaitement avec leur architecture.

Les deux voies pour sélectionner l'élément suivant sont :

Voie 1 : flux pour chaque source de données séparément

Tout d'abord, les titres auxquels aucune donnée n'est jointe seront sélectionnés pour chaque source de données. Cela signifie que si le pipeline Metacritic contient un film qui n'a pas encore été récupéré, il sera ensuite sélectionné. Cela garantit que chaque titre a été traité au moins une fois, y compris les nouveaux.

Une fois que chaque titre a des données jointes, le pipeline sélectionne ceux avec les données les moins récentes.

Voici un exemple d'une telle exécution de flux, ici avec une erreur car la limite de débit a été atteinte :

A Data Pipeline for illion movies and million streaming links

Windmill vous permet de définir facilement des tentatives pour chaque étape du flux. Dans ce cas, la logique est de réessayer trois fois en cas d'erreurs. À moins que la limite de débit n'ait été atteinte (ce qui est généralement un code d'état ou un message d'erreur différent), nous nous arrêtons immédiatement.

Voie 2 : flux prioritaire pour chaque film/émission séparément

Ce qui précède fonctionne, mais présente un problème sérieux : les versions récentes ne sont pas mises à jour suffisamment à temps. Cela peut prendre des semaines, voire des mois, pour que tous les aspects des données soient récupérés avec succès. Par exemple, il peut arriver qu'un film ait une partition IMDb récente, mais que les autres partitions soient obsolètes et que les liens de streaming manquent complètement. Surtout pour les scores et la disponibilité du streaming, je voulais obtenir une bien meilleure précision.

Pour résoudre ce problème, la deuxième voie se concentre sur une stratégie de priorisation différente : Les films/émissions les plus populaires et les plus tendances sont sélectionnés pour une actualisation complète des données dans toutes les sources de données. J'ai déjà montré ce flux, c'est celui que j'ai appelé bête plus tôt.

Les titres qui sont affichés plus souvent sur l'application bénéficient également d'un boost de priorité. Cela signifie que chaque fois qu'un film ou une émission apparaît dans les premiers résultats de recherche ou lorsque leur vue détaillée est ouverte, ils seront probablement actualisés bientôt.

Chaque titre ne peut être actualisé qu'une fois par semaine en utilisant la voie prioritaire pour garantir que nous ne récupérons pas de données qui n'ont probablement pas changé entre-temps.

Êtes-vous autorisé à faire cela ? Considérations sur le grattage

Vous pourriez vous demander : le scraping est-il légal ? Le fait de récupérer les données est normalement acceptable. Ce que vous faites avec les données doit cependant être soigneusement étudié. Dès que vous tirez profit d'un service qui utilise des données récupérées, vous enfreignez probablement leurs termes et conditions. (voir Le paysage juridique du web scraping et le « scraping » n'est qu'un accès automatisé, et tout le monde le fait )

Le scraping et les lois connexes sont nouveaux et souvent non testés et il existe de nombreuses zones grises juridiques. Je suis déterminé à citer chaque source en conséquence, à respecter les limites de tarifs et à éviter les demandes inutiles afin de minimiser l'impact sur leurs services.

Le fait est que les données ne seront pas utilisées à des fins lucratives. GoodWatch sera gratuit pour tout le monde pour toujours.

Plus de travail ? Oui, Milord

Windmill utilise des Workers pour répartir l'exécution du code sur plusieurs processus. Chaque étape d'un flux est envoyée à un travailleur, ce qui le rend indépendant de la logique métier réelle. Seule l'application principale orchestre les tâches, tandis que les travailleurs ne reçoivent que les données d'entrée, le code à exécuter et renvoient le résultat.

C'est une architecture efficace qui évolue bien. Actuellement, 12 ouvriers se partagent le travail. Ils sont tous hébergés sur Hetzner.

Chaque travailleur a une consommation maximale de ressources de 1 vCPU et 2 Go de RAM. Voici un aperçu :

A Data Pipeline for illion movies and million streaming links

Éditeur de moulin à vent

Windmill offre une expérience d'édition de type IDE dans le navigateur avec linting, formatage automatique, un assistant IA et même édition collaborative (la dernière est une fonctionnalité payante). La meilleure chose est ce bouton :

A Data Pipeline for illion movies and million streaming links

Cela me permet d'itérer et de tester rapidement des scripts avant de les déployer. J'édite et teste généralement les fichiers dans le navigateur et je les envoie vers git lorsque j'ai terminé.

La seule chose qui manque pour un environnement de codage optimal sont les outils de débogage (points d'arrêt et contexte variable). Actuellement, je débogue des scripts dans mon IDE local pour surmonter cette faiblesse.

Nombres. J'aime les chiffres

Moi aussi !

Actuellement, GoodWatch nécessite environ 100 Go de stockage de données persistant :

  • 15 Go pour les données brutes de prétraitement (MongoDB)
  • 23 Go pour les données relationnelles traitées (Postgres)
  • 67 Go pour les données vectorielles (Postgres)

Chaque jour 6 500 flux transitent par le moteur d'orchestration de Windmill. Cela se traduit par un volume quotidien de :

  • 30.000Pages IMDb
  • 9.000 Pages TV Tropes
  • 5.000 Pages Tomates Pourries
  • 1.500 Invites Huggingface
  • 600 Pages métacritiques

Ces chiffres sont fondamentalement différents en raison de politiques de limitation de taux différentes.

Une fois par jour, les données sont nettoyées et combinées dans le format de données final. Actuellement, la base de données qui alimente les magasins d'applications Web GoodWatch :

  • 10 millionsliens de streaming
  • 1 million films
  • 300k Valeurs ADN
  • 200 000 émissions de télévision
  • 70 000 films/émissions avec de l'ADN

Quel est cet ADN dont vous parlez sans cesse ?

Imaginez que vous ne puissiez distinguer les films que par leur genre, ce qui est extrêmement limitatif, n'est-ce pas ?

C'est pourquoi j'ai lancé le projet ADN. Il permet de classer les films et les émissions selon d'autres attributs tels que Mood, Plot Elements, Types de caractères, Dialog ou Key Props. .

Voici le top 10 de toutes les valeurs ADN pour tous les éléments :

A Data Pipeline for illion movies and million streaming links

Cela permet deux choses :

  1. Filtrer par valeurs ADN (en utilisant des données relationnelles)
  2. Recherche par similarité (à l'aide de données vectorielles)

Exemples :

  • Ambiance mélancolique
  • Histoire similaire à celle de Dune : deuxième partie

Il y aura un article de blog dédié sur l'ADN avec beaucoup plus de détails à l'avenir.

Plongez plus profondément dans le pipeline de données

Pour bien comprendre le fonctionnement du pipeline de données, voici un aperçu de ce qui se passe pour chaque source de données :

1. Une fois par jour, une collection MongoDB est mise à jour avec toutes les données d'entrée requises

Pour chaque source de données, il existe un flux ìnit qui prépare une collection MongoDB avec toutes les données requises. Pour IMDb, c'est juste l'imdb_id. Pour Rotten Tomatoes, le titre et release_year sont requis. En effet, l'ID est inconnu et nous devons deviner l'URL correcte en fonction du nom.

2. Récupérez en continu les données et écrivez-les dans la collection MongoDB

Sur la base de la sélection de priorité expliquée ci-dessus, les éléments des collections préparées sont mis à jour avec les données récupérées. Chaque source de données possède sa propre collection qui devient de plus en plus complète au fil du temps.

3. Une fois par jour, différents flux collectent les données des collections MongoDB et les écrivent dans Postgres

Il existe un flux pour les films, un pour les émissions de télévision et un autre pour les liens de streaming. Ils collectent toutes les données nécessaires provenant de diverses collections et les stockent dans leurs tables Postgres respectives, qui sont ensuite interrogées par l'application Web.

Voici un extrait du flux et du script de la copie des films :

A Data Pipeline for illion movies and million streaming links

Certains de ces flux sont longs à s'exécuter, parfois même plus de 6 heures. Cela peut être optimisé en signalant tous les éléments qui ont été mis à jour et en les copiant uniquement au lieu de traiter par lots l'ensemble des données. Un des nombreux éléments TODO sur ma liste ?

Planification

La planification est aussi simple que de définir des expressions cron pour chaque flux ou script qui doit être exécuté automatiquement :

A Data Pipeline for illion movies and million streaming links

Voici un extrait de tous les horaires définis pour GoodWatch :

A Data Pipeline for illion movies and million streaming links

Au total, il y a une cinquantaine de plannings définis.

Défis

Des données de qualité impliquent de grandes responsabilités. Beaucoup de choses peuvent mal tourner. Et c’est ce qui s’est produit.

Traitement très lent

Les premières versions de mes scripts prenaient du temps pour mettre à jour toutes les entrées d'une collection ou d'un tableau. C'est parce que j'ai inséré chaque élément individuellement. Cela entraîne beaucoup de frais généraux et ralentit considérablement le processus.

Une bien meilleure approche consiste à collecter les données à insérer et à regrouper les requêtes de base de données. Voici un exemple pour MongoDB :

def main():
    return tmdb_extract_daily_dump_data()

def tmdb_extract_daily_dump_data():
    print("Checking TMDB for latest daily dumps")
    init_mongodb()

    daily_dump_infos = get_daily_dump_infos()
    for daily_dump_info in daily_dump_infos:
        download_zip_and_store_in_db(daily_dump_info)

    close_mongodb()
    return [info.to_mongo() for info in daily_dump_infos]

[...]

Scripts gourmands en mémoire

Même avec le traitement par lots, certains scripts consommaient tellement de mémoire que les travailleurs plantaient. La solution consistait à affiner soigneusement la taille du lot pour chaque cas d’utilisation.

Certains lots peuvent être exécutés par étapes de 5 000, d'autres stockent beaucoup plus de données en mémoire et s'exécutent mieux avec 500.

Windmill a une fonctionnalité intéressante pour observer la mémoire pendant l'exécution d'un script :

A Data Pipeline for illion movies and million streaming links

Points clés à retenir

Windmill est un atout majeur dans la boîte à outils de tout développeur pour automatiser les tâches. Cela a été un boost de productivité inestimable pour moi, me permettant de me concentrer sur la structure des flux et la logique métier tout en externalisant le gros du travail de l'orchestration des tâches, de la gestion des erreurs, des tentatives et de la mise en cache.

La gestion de gros volumes de données reste un défi, et l'optimisation du pipeline est un processus continu - mais je suis vraiment satisfait de la façon dont tout s'est déroulé jusqu'à présent.

D'accord, d'accord. C'est assez

Je le pensais. Laissez-moi simplement lier quelques ressources et nous avons terminé :

  • Bonne Montre
  • Communauté GoodWatch Discord
  • Moulin à vent
  • Communauté Discord Moulin à vent

Saviez-vous que GoodWatch est open-source ? Vous pouvez consulter tous les scripts et définitions de flux dans ce référentiel : https://github.com/alp82/goodwatch-monorepo/tree/main/goodwatch-flows/windmill/f

Faites-moi savoir si vous avez des questions.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn