「既存の DataFrame にハッシュを追加する」というタスクが、数日かかっていたタスクが、ほぼスプリント全体を消費するまでにどのように変化したかを示します。
2022 年の第 2 四半期に、私は REST サービスから市場データを取得して BigQuery テーブルに保存するデータ パイプラインの開発に取り組み始めました。これはパイプラインの高レベルの説明です。興味深い部分は、データがどのようにクエリされたデータであり、それを DataFrame に変換し、AirFlow の GCSToBigQueryOperator を使用して BigQuery テーブルにアップロードする方法です。
最初は、書くのは簡単そうに見えましたが、Airflow の「冪等」原理により、少し難しさが加わりました。この REST サービスから何を取得するかは別のテーブルによって決定され、JOB が冪等であっても、参照として使用されるテーブルは 2 回の実行の間に変更される可能性があります。追加の時間を費やした後、データ エンジニアとの対話により、2022 年の第 3 四半期の終わりまでにパイプラインの準備が整いました。
2024 年の第 1 四半期に進みます。この時点までに、データにアクセスするユーザーが増え、クエリ パターンがパーティションを適切に使用していないことに気づきました。というより、文字列列に基づいてデータにアクセスしたいと考えていましたが、BigQuery では文字列列でパーティション分割することはできません。これにより、大量のデータがスキャンされ、頻繁に 1 日の割り当てに達してしまいました。
これにより、文字列列に基づいてデータを分割する方法を検討するようになりました。当社のデータ エンジニアは、FarmHash と追加のモジュロ演算を使用して、その文字列列を整数に変換することを提案しました。概念実証では、これによりスキャンがほぼ 90% 削減され、クエリのパフォーマンスが 3 ~ 5 倍向上しました。私たちはこれを最終的な解決策として進めることにしました。必要なのは次のことだけです:
Python で FarmHash フィンガープリントを計算するには、pyfarmhash モジュールがあります。モジュールをインストールし、以下のコードを使用してハッシュを計算しました。ローカルではすべて希望どおりに機能しました。
def get_hash(val: str) -> int: return additonal_logic(pyfarmhash.fingerprint64(...)) df[‘hash’] = df[‘Col’].apply(get_hash)
すべてのテストに合格したら、コードを Airflow にプッシュして実行します。この段階では何も問題が起こるとは予想していませんでした。実際、すべてが予定通り、予定時間内に完了したことに満足しています。
私は幸せな心と自信に満ちて、変更をプッシュし、ジョブを開始し、それが完了するまで 10 ~ 15 分待ちました。その間、私は別の仕事に切り替えました。すぐに、Airflow から予期せぬ失敗メールが届きました。ログを見て、pyfarmhash モジュールのインストール中に失敗していたことに驚きました!
問題を理解してもらうために、ジョブの構造を説明する必要があります。ジョブには次の手順があります:
このプロセスでは、データをダウンロードするタスク 1 は別の Python モジュールです。これを実行するには、Airflow の PythonVirtualenvOperator を使用しました。このオペレーターを使用すると、パッケージを要件として指定し、新しく作成された仮想環境にパッケージをインストールできます。パッケージがインストールされると、そのすべての依存関係もインストールされ、準備が整います。
データをダウンロードするモジュールに依存関係として pyfarmhash を追加しました。その他はすべて変更されていません。そして失敗しました!なぜ?
pyfarmhash は、C/C++ で実装されたハッシュ ライブラリです。インストール時に、パッケージをコンパイルするために GCC が必要ですが、それは Airflow ホストには存在しませんでした。 Airflow ホストに GCC を持たないのは理にかなっていましたが、残念ながら、これが私にとって障害でした。
pyfarmhash パッケージの純粋な Python 実装を探しましたが、見つかりませんでした。そこでホイールパッケージを探しましたが、やはりありませんでした。ホイール パッケージを構築してプッシュすることも考えましたが、そうするとホイール パッケージを社内で提供するという長期的な責任が発生することになります。追加の回避策のような手順は避けたかったのです。私はすべてのオプションを検討し、Airflow を維持するチームと話し合いました。彼らは、Docker イメージを作成し、KubernetesPodOperator で実行することを提案しました。外部環境に依存せずに環境を制御し、必要なものをすべて含めることができるため、これは良い選択肢でした。さらに、この解決策には回避策がありませんでした。唯一の短期的な欠点は、実装に時間がかかることでした。
Docker ベースのソリューションを開始する前に、私はこのタスクにすでに約 16 ~ 20 時間を費やしていました。 Docker ベースのソリューションの場合、さらに次のことが必要でした。
Airflow で PythonVirtualEnvOperator を使用するつもりがなくなったので、ワークフローを改善するために完全に削除することにしました。ダウンロードとロジックの削除を開始するためのエントリ ポイントを追加するには、Python パッケージを変更する必要がありました
Docker イメージを準備して最終的なソリューションを完成させるまでにさらに 30 ~ 36 時間かかりました。これには 6 ~ 7 営業日かかり、最初の 2 日を含めると、スプリントの長いタスクになりました。
これを振り返ると、機能しているソリューションを捨て、モジュール構造を変更し、Docker イメージを作成し、タスクに Docker イメージを使用するように 10 以上の AirFlow ジョブを変更して、この現実に対処し、最初のフラストレーションを克服する必要があったのだと思います。これはすべて、「単一の Python モジュールをコンパイルするには "gcc" が必要だったからだ!
」以上がコーディング タスクの見積もり: 何が問題になる可能性がありますか?の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。