検索
ホームページバックエンド開発Python チュートリアル無数の映画と数百万のストリーミング リンクのデータ パイプライン

2023 年 2 月: 映画、テレビ番組のすべてのスコアとそれらのストリーミング先を 1 ページで確認したいと考えていましたが、自分に関連するすべてのソースを含むアグリゲーターが見つかりませんでした。

2023 年 3 月: そこで、その場でスコアを取得する MVP を構築し、サイトをオンラインにしました。動作しましたが、遅かったです (スコアの表示に 10 秒かかりました)。

2023 年 10 月: データを自分側に保存することが必要であることに気づき、windmill.dev を発見しました。少なくとも私のニーズに関しては、同様のオーケストレーション エンジンを簡単に上回ります。


今日は、12 か月間継続的にデータを読み込んだ後、パイプラインがどのように動作するかを詳しく共有したいと思います。さまざまなソースからデータを取得し、データを正規化し、クエリ用に最適化された形式に結合する複雑なシステムを構築する方法を学びます。

写真か起こらなかったか!

A Data Pipeline for illion movies and million streaming links

これは実行ビューです。すべてのドットはフローの実行を表します。フローは何でも構いません。たとえば、単純なワンステップ スクリプトです。

A Data Pipeline for illion movies and million streaming links

中央のブロックには次のようなスクリプトが含まれています (簡略化):

def main():
    return tmdb_extract_daily_dump_data()

def tmdb_extract_daily_dump_data():
    print("Checking TMDB for latest daily dumps")
    init_mongodb()

    daily_dump_infos = get_daily_dump_infos()
    for daily_dump_info in daily_dump_infos:
        download_zip_and_store_in_db(daily_dump_info)

    close_mongodb()
    return [info.to_mongo() for info in daily_dump_infos]

[...]

次の beast もフローです (これは緑色の点の 1 つにすぎないことに注意してください):

A Data Pipeline for illion movies and million streaming links

(高解像度画像: https://i.imgur.com/LGhTUGG.png)

これを詳しく見てみましょう:

  1. 次に優先される映画またはテレビ番組を入手します (次のセクションを参照)
  2. TMDB から最新データを取得します
  3. IMDbMetacriticRotten Tomatoes をスクレイピングして現在のスコアを取得します
  4. テレビの比喩を集めて...比喩を探します
  5. Huggingface API DNA データを収集します (以下で説明します)
  6. DNA データの高次元 ベクトルを保存
  7. 映画、番組、ストリーミング リンクのリレーショナル データを保存

これらの各手順は多かれ少なかれ複雑で、非同期プロセスの使用が含まれます。

どこから始めますか?優先キュー

次にどのタイトルを選択するかを決定するために、並行して処理される 2 つのレーンがあります。ここも風車が輝くエリアです。並列化とオーケストレーションは、そのアーキテクチャで完璧に機能します。

次のアイテムを選択するための 2 つのレーンは次のとおりです:

レーン 1: 各データ ソースの個別のフロー

まず、データが添付されていないタイトルがデータ ソースごとに選択されます。つまり、Metacritic パイプラインにまだスクレイピングされていないムービーがある場合、それが次に選択されます。これにより、新しいタイトルを含め、すべてのタイトルが少なくとも 1 回処理されるようになります。

すべてのタイトルにデータが添付されると、パイプラインは最も新しいデータを持つタイトルを選択します

これはそのようなフロー実行の例です。ここではレート制限に達したためエラーが発生しています:

A Data Pipeline for illion movies and million streaming links

Windmill を使用すると、フロー内の各ステップの再試行を簡単に定義できます。この場合、エラーが発生した場合に 3 回再試行するというロジックになります。レート制限に達しない限り (通常は別のステータス コードまたはエラー メッセージ)、直ちに停止します。

レーン 2: 各映画/番組ごとの優先フロー

上記は機能しますが、深刻な問題があります。最近のリリースは適切なタイミングで更新されていません。すべてのデータ側面が正常に取得されるまでには、数週間、場合によっては数か月かかる場合があります。たとえば、映画に最近の IMDb スコアが含まれているにもかかわらず、他のスコアが古く、ストリーミング リンクが完全に欠落している場合があります。特にスコアとストリーミングの可用性については、さらに高い精度を達成したいと考えていました。

この問題を解決するために、2 番目のレーンは別の優先順位付け戦略に焦点を当てています。最も人気のあるトレンドの映画/番組が選択され、すべてのデータ ソースにわたって完全なデータ更新が行われます。 このフローは以前に示しました。それは私が以前にと呼んだものです。

アプリでより頻繁に表示されるタイトルの優先順位も上がります。つまり、映画や番組が検索結果の上位に表示されるたび、または詳細ビューが開かれるたびに、すぐに更新される可能性があります。

すべてのタイトルは、優先レーンを使用して 週に 1 回のみ更新できます。これにより、その間に変更されていない可能性が高いデータが取得されないようにすることができます。

こんなことしてもいいですか?スクレイピングに関する考慮事項

「スクレイピングは合法ですか?」と疑問に思うかもしれません。データを取得する行為は通常は問題ありません。ただし、データをどう扱うかは慎重に検討する必要があります。 スクレイピングされたデータを使用するサービスから利益を得た時点で、その利用規約に違反している可能性があります。 (Web スクレイピングの法的状況と「スクレイピング」は単なる自動化されたアクセスであり、誰もがそれを行っているを参照) )

スクレイピングと関連法は新しいものであり、テストされていないことが多く、法的なグレーゾーンがたくさんあります。私は、サービスへの影響を最小限に抑えるために、すべての情報源を適宜引用し、レート制限を尊重し、不必要なリクエストを避けることを決意しています。

実際のところ、データは利益を得るために使用されるものではありません。 GoodWatch は誰でも永久に無料で使用できます。

さらに仕事をしますか?はい、ミロードさん

Windmill はワーカーを使用して、コードの実行を複数のプロセスに分散します。 フローの各ステップはワーカーに送信されるため、実際のビジネス ロジックから独立します。 メイン アプリのみがジョブを調整し、ワーカーは入力データ、実行するコード、結果を受け取るだけです。

これは、適切に拡張できる効率的なアーキテクチャです。現在は12名で分担して業務を行っています。これらはすべて Hetzner でホストされています。

各ワーカーの最大リソース消費量は 1 vCPU と 2 GB の RAM です。概要は次のとおりです:

A Data Pipeline for illion movies and million streaming links

風車編集者

Windmill は、リンティング自動フォーマットAI アシスタント、さらには 共同編集 を備えたブラウザ内 IDE のようなエディター エクスペリエンスを提供します。 🎜> (最後の機能は有料機能です)。ただし、最も優れているのはこのボタンです:

A Data Pipeline for illion movies and million streaming links

スクリプトをデプロイする前に、スクリプトを迅速に繰り返してテストすることができます。私は通常、ブラウザーでファイルを編集してテストし、完了したら git にプッシュします。

最適なコーディング環境に欠けている唯一のものは、デバッグ ツール (ブレークポイントと変数コンテキスト) です。現在、この弱点を克服するためにローカル IDE でスクリプトをデバッグしています。

数字。ナンバーズが好きです

私もです!

現在、GoodWatch には約 100 GB の永続データ ストレージが必要です:

  • 生の前処理データ (MongoDB) 用に 15 GB
  • 処理されたリレーショナル データ (Postgres) 用に 23 GB
  • 67 GB ベクトル データ (Postgres)

毎日 6,500 のフロー が Windmill のオーケストレーション エンジンを介して実行されます。これにより、1 日あたりの量は次のようになります:

  • 30,000 IMDb ページ
  • 9,000 テレビの比喩ページ
  • 5,000 ロッテン トマト ページ
  • 1,500 ハグフェイスプロンプト
  • 600 メタクリティック ページ

レート制限ポリシーが異なるため、これらの数値は根本的に異なります。

1 日に 1 回、データがクリーンアップされ、最終的なデータ形式に結合されます。現在、GoodWatch Web アプリ ストアを強化するデータベース:

  • 1,000 万 ストリーミング リンク
  • 100 万本 映画
  • 300,000 DNA 値
  • 200,000 テレビ番組
  • DNA が含まれる 70,000 映画/番組

あなたがいつも話しているその DNA とは何ですか?

映画をジャンルでしか区別できないと想像してみてください。非常に制限されていますよね?

それが私が DNA プロジェクトを始めた理由です。 ムードプロット要素キャラクタータイプダイアログ、または キープロップなどの他の属性によって映画や番組を分類できます。 .

全アイテムの DNA 値のトップ 10 は次のとおりです:

A Data Pipeline for illion movies and million streaming links

これにより次の 2 つのことが可能になります:

  1. DNA 値によるフィルター (リレーショナル データを使用)
  2. 類似度による検索(ベクトルデータを使用)

例:

  • メランコリックな気分
  • デューンと同様のストーリー: パート 2

将来、DNA についてさらに多くの詳細を記載した専用のブログ投稿が作成される予定です。

データパイプラインをさらに深く掘り下げる

データ パイプラインがどのように機能するかを完全に理解するために、各データ ソースで何が起こるかを以下に詳しく説明します。

1. 1 日に 1 回、MongoDB コレクションがすべての必要な入力データで更新されます。

データ ソースごとに、必要なすべてのデータを含む MongoDB コレクションを準備する ìnit フローがあります。 IMDb の場合、それは単なる imdb_id です。 Rotten Tomatoes の場合、タイトルとリリース年は必須です。これは、ID が不明であり、名前に基づいて正しい URL を推測する必要があるためです。

2. 継続的にデータをフェッチし、MongoDB コレクションに書き込みます。

上で説明した優先順位の選択に基づいて、準備されたコレクション内の項目が、フェッチされたデータで更新されます。各データ ソースには独自のコレクションがあり、時間の経過とともにますます完成していきます。

3. 1 日に 1 回、さまざまなフローが MongoDB コレクションからデータを収集し、Postgres に書き込みます。

映画用のフロー、テレビ番組用のフロー、ストリーミング リンク用のフローがあります。これらは、さまざまなコレクションから必要なデータをすべて収集し、それぞれの Postgres テーブルに保存し、Web アプリケーションによってクエリされます。

これはコピームービーのフローとスクリプトの抜粋です:

A Data Pipeline for illion movies and million streaming links

これらのフローの中には、実行に長い時間がかかるもの、場合によっては 6 時間以上かかるものもあります。これは、データ セット全体をバッチ処理するのではなく、更新されたすべての項目にフラグを付け、それらのみをコピーすることで最適化できます。私のリストにある多くの TODO 項目のうちの 1 つ ?

スケジュール設定

スケジューリングは、自動的に実行する必要がある各フローまたはスクリプトの cron 式を定義するのと同じくらい簡単です。

A Data Pipeline for illion movies and million streaming links

ここでは、GoodWatch に定義されているすべてのスケジュールの抜粋を示します:

A Data Pipeline for illion movies and million streaming links

合計で約 50 のスケジュールが定義されています。

課題

優れたデータには大きな責任が伴います。多くのことがうまくいかない可能性があります。そしてそれは実現しました。

処理が非常に遅い

私のスクリプトの初期バージョンでは、コレクションまたはテーブル内のすべてのエントリを更新するのに時間がかかりました。それは、すべての項目を個別に更新挿入したためです。これにより、多くのオーバーヘッドが発生し、プロセスが大幅に遅くなります。

より良いアプローチは、更新/挿入するデータを収集し、データベース クエリをバッチ処理することです。以下は MongoDB の例です:

def main():
    return tmdb_extract_daily_dump_data()

def tmdb_extract_daily_dump_data():
    print("Checking TMDB for latest daily dumps")
    init_mongodb()

    daily_dump_infos = get_daily_dump_infos()
    for daily_dump_info in daily_dump_infos:
        download_zip_and_store_in_db(daily_dump_info)

    close_mongodb()
    return [info.to_mongo() for info in daily_dump_infos]

[...]

メモリを大量に消費するスクリプト

バッチ処理であっても、一部のスクリプトはワーカーがクラッシュするほど大量のメモリを消費しました。解決策は、ユースケースごとにバッチ サイズを慎重に微調整することでした。

一部のバッチは 5000 ステップで実行できますが、他のバッチはメモリにはるかに多くのデータを保存し、500 ステップでより適切に実行されます。

Windmill には、スクリプトの実行中にメモリを観察する優れた機能があります。

A Data Pipeline for illion movies and million streaming links

重要なポイント

Windmill は、タスクを自動化するための開発者のツールキットに含まれる優れた資産です。これは私にとって非常に貴重な生産性の向上であり、タスク オーケストレーション、エラー処理、再試行、キャッシュといった重労働をアウトソーシングしながら、フロー構造とビジネス ロジックに集中できるようになりました。

大量のデータの処理は依然として困難であり、パイプラインの最適化は進行中のプロセスです。しかし、これまでのところすべてがうまくいっていることに本当に満足しています。

分かった、分かった。それで十分です

そう思いました。いくつかのリソースをリンクさせてください。これで完了です:

  • グッドウォッチ
  • GoodWatch Discord コミュニティ
  • 風車
  • Windmill Discord コミュニティ

GoodWatch はオープンソースであることをご存知ですか?このリポジトリですべてのスクリプトとフロー定義を確認できます: https://github.com/alp82/goodwatch-monorepo/tree/main/goodwatch-flows/windmill/f

ご質問がございましたら、お知らせください。

以上が無数の映画と数百万のストリーミング リンクのデータ パイプラインの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
Python Switchステートメントとは何ですか?Python Switchステートメントとは何ですか?Apr 30, 2025 pm 02:08 PM

この記事では、バージョン3.10で導入されたPythonの新しい「マッチ」ステートメントについて説明します。これは、他の言語のスイッチステートメントに相当するものです。コードの読みやすさを向上させ、従来のif-elif-elよりもパフォーマンスの利点を提供します

Pythonの例外グループとは何ですか?Pythonの例外グループとは何ですか?Apr 30, 2025 pm 02:07 PM

Python 3.11の例外グループは、複数の例外を同時に処理することで、同時シナリオと複雑な操作でエラー管理を改善します。

Pythonの関数注釈とは何ですか?Pythonの関数注釈とは何ですか?Apr 30, 2025 pm 02:06 PM

Pythonの関数注釈は、タイプチェック、ドキュメント、およびIDEサポートの関数にメタデータを追加します。それらはコードの読みやすさ、メンテナンスを強化し、API開発、データサイエンス、ライブラリの作成において重要です。

Pythonのユニットテストとは何ですか?Pythonのユニットテストとは何ですか?Apr 30, 2025 pm 02:05 PM

この記事では、Pythonの単体テスト、その利点、およびそれらを効果的に書く方法について説明します。テスト用のUnittestやPytestなどのツールを強調しています。

Pythonのアクセス仕様とは何ですか?Pythonのアクセス仕様とは何ですか?Apr 30, 2025 pm 02:03 PM

記事では、Pythonのアクセス仕様について説明します。Pythonは、厳格な執行ではなく、クラスメンバーの可視性を示すために命名規則を使用します。

Pythonの__init __()とは何ですか?また、セルフはどのように役割を果たしますか?Pythonの__init __()とは何ですか?また、セルフはどのように役割を果たしますか?Apr 30, 2025 pm 02:02 PM

記事では、Pythonの\ _ \ _ init \ _ \ _()メソッドと、オブジェクト属性の初期化における自己の役割について説明します。 \ _ \ _ init \ _ \ _()に対するその他のクラス方法と継承の影響についてもカバーされています。

Pythonの@ClassMethod、@StaticMethod、およびインスタンスメソッドの違いは何ですか?Pythonの@ClassMethod、@StaticMethod、およびインスタンスメソッドの違いは何ですか?Apr 30, 2025 pm 02:01 PM

この記事では、@ClassMethod、@StaticMethod、およびPythonのインスタンスメソッドの違いについて説明し、そのプロパティ、ユースケース、および利点を詳述します。必要な機能とDAに基づいて適切な方法タイプを選択する方法を説明します

Pythonアレイに要素をどのように追加しますか?Pythonアレイに要素をどのように追加しますか?Apr 30, 2025 am 12:19 AM

inpython、youappendelementStoalistusingtheappend()method.1)useappend()forsingleelements:my_list.append(4).2)useextend()or = formultipleElements:my_list.extend(another_list)ormy_list = [4,5,6] .3)forspecificpositions:my_list.insert(1,5).beaware

See all articles

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

Video Face Swap

Video Face Swap

完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

ホットツール

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

強力な PHP 統合開発環境

mPDF

mPDF

mPDF は、UTF-8 でエンコードされた HTML から PDF ファイルを生成できる PHP ライブラリです。オリジナルの作者である Ian Back は、Web サイトから「オンザフライ」で PDF ファイルを出力し、さまざまな言語を処理するために mPDF を作成しました。 HTML2FPDF などのオリジナルのスクリプトよりも遅く、Unicode フォントを使用すると生成されるファイルが大きくなりますが、CSS スタイルなどをサポートし、多くの機能強化が施されています。 RTL (アラビア語とヘブライ語) や CJK (中国語、日本語、韓国語) を含むほぼすべての言語をサポートします。ネストされたブロックレベル要素 (P、DIV など) をサポートします。

SecLists

SecLists

SecLists は、セキュリティ テスターの究極の相棒です。これは、セキュリティ評価中に頻繁に使用されるさまざまな種類のリストを 1 か所にまとめたものです。 SecLists は、セキュリティ テスターが必要とする可能性のあるすべてのリストを便利に提供することで、セキュリティ テストをより効率的かつ生産的にするのに役立ちます。リストの種類には、ユーザー名、パスワード、URL、ファジング ペイロード、機密データ パターン、Web シェルなどが含まれます。テスターはこのリポジトリを新しいテスト マシンにプルするだけで、必要なあらゆる種類のリストにアクセスできるようになります。

SAP NetWeaver Server Adapter for Eclipse

SAP NetWeaver Server Adapter for Eclipse

Eclipse を SAP NetWeaver アプリケーション サーバーと統合します。

MinGW - Minimalist GNU for Windows

MinGW - Minimalist GNU for Windows

このプロジェクトは osdn.net/projects/mingw に移行中です。引き続きそこでフォローしていただけます。 MinGW: GNU Compiler Collection (GCC) のネイティブ Windows ポートであり、ネイティブ Windows アプリケーションを構築するための自由に配布可能なインポート ライブラリとヘッダー ファイルであり、C99 機能をサポートする MSVC ランタイムの拡張機能が含まれています。すべての MinGW ソフトウェアは 64 ビット Windows プラットフォームで実行できます。