ホームページ  >  記事  >  バックエンド開発  >  golang がビッグデータを処理する方法

golang がビッグデータを処理する方法

(*-*)浩
(*-*)浩オリジナル
2019-12-27 11:09:424752ブラウズ

golang がビッグデータを処理する方法

Golang は同時プログラミングに非常に適していることが証明されており、Goroutine は非同期プログラミングよりも読みやすく、エレガントで効率的です。この記事では、大量のデータのバッチ処理(ETL)に適した、Golangによる実装に適したパイプライン実行モデルを提案します。

次のようなアプリケーション シナリオを想像しました。 (推奨学習: Go )

## データベース A (Cassandra) からユーザー レビューをロードします (大量の、たとえば、たとえば 10 億)、各コメントのユーザー ID に基づいてデータベース B (MySQL) からユーザー情報を関連付け、NLP サービス (自然言語処理) を呼び出して各コメントを処理し、処理結果をデータベース C (ElasticSearch) に書き込みます。

アプリケーションではさまざまな問題が発生するため、これらの要件を要約します。

要件 1: データはバッチで処理する必要があります。たとえば、バッチごとに 100 項目を指定します。問題 (データベース障害など) が発生すると中断され、次回プログラムを開始するときに中断から再開するためにチェックポイントが使用されます。
要件 2: データベースと NLP サービスに適切な負荷がかかるように、プロセスごとに適切な同時実行数を設定します (他のビジネスに影響を与えず、ETL パフォーマンスを向上させるためにできるだけ多くのリソースを占有します)。たとえば、手順 (1) ~ (4) では、同時実行数をそれぞれ 1、4、8、および 2 に設定します。

これは典型的なパイプライン実行モデルです。データの各バッチ (たとえば、100 個のアイテム) を、組立ライン上の製品と考えてください。4 つのステップは、組立ライン上の 4 つの処理手順に対応しています。各工程が完了すると、半製品が工場に引き渡されます。次の工程。各工程で同時に処理できる製品数は異なります。

まず、1 4 8 2 ゴルーチンを有効にし、チャネルを使用してデータを転送することを考えるかもしれません。私は以前にこれを行ったことがありますが、結論としては、これを行うとプログラマがおかしくなるということです。プロセスの同時実行制御コードは非常に複雑で、特に例外、予想を超える実行時間、制御可能な中断などを処理する必要がある場合には、次のことを行う必要があります。用途さえ覚えられなくなるまで、たくさんのチャンネルを追加してください。

再利用可能なパイプライン モジュール

ETL 作業をより効率的に完了するために、パイプラインをモジュールに抽象化しました。まずコードを貼り付けてから、意味を分析します。このモジュールは直接使用でき、主に使用されるインターフェイスは NewPipeline、Async、Wait です。

このパイプライン コンポーネントを使用すると、ETL プログラムはシンプル、効率的、信頼性の高いものになり、プログラマは面倒な同時プロセス制御から解放されます:

package main
 
import "log"
 
func main() {
    //恢复上次执行的checkpoint,如果是第一次执行就获取一个初始值。
    checkpoint := loadCheckpoint()
    
    //工序(1)在pipeline外执行,最后一个工序是保存checkpoint
    pipeline := NewPipeline(4, 8, 2, 1) 
    for {
        //(1)
        //加载100条数据,并修改变量checkpoint
        //data是数组,每个元素是一条评论,之后的联表、NLP都直接修改data里的每条记录。
        data, err := extractReviewsFromA(&checkpoint, 100) 
        if err != nil {
            log.Print(err)
            break
        }
        
        //这里有个Golang著名的坑。
        //“checkpoint”是循环体外的变量,它在内存中只有一个实例并在循环中不断被修改,所以不能在异步中使用它。
        //这里创建一个副本curCheckpoint,储存本次循环的checkpoint。
        curCheckpoint := checkpoint
        
        ok := pipeline.Async(func() error {
            //(2)
            return joinUserFromB(data)
        }, func() error {
            //(3)
            return nlp(data)
        }, func() error {
            //(4)
            return loadDataToC(data)
        }, func() error {
            //(5)保存checkpoint
            log.Print("done:", curCheckpoint)
            return saveCheckpoint(curCheckpoint)
        })
        if !ok { break }
        
        if len(data) < 100 { break } //处理完毕
    }
    err := pipeline.Wait()
    if err != nil { log.Print(err) }
}

以上がgolang がビッグデータを処理する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。