首頁  >  文章  >  後端開發  >  golang 如何處理大數據

golang 如何處理大數據

(*-*)浩
(*-*)浩原創
2019-12-27 11:09:424724瀏覽

golang 如何處理大數據

Golang被證明非常適合併發編程,goroutine比非同步編程更易讀、優雅、高效。本文提出一個適合由Golang實現的Pipeline執行模型,適合大量處理大量資料(ETL)的情境。

想像這樣的應用情境:                     (建議學習:go

#從資料庫A(Cassandra)載入使用者評論中(數量龐大,例如10億條);根據每個評論的使用者ID、從資料庫B(MySQL)關聯使用者資料;呼叫NLP服務(自然語言處理),處理每個評論;將處理結果寫入資料庫C(ElasticSearch)。

由於應用程式遇到的各種問題,歸納出這些需求:
需求一:應分批處理數據,例如規定每批100條。出現問題時(例如任一資料庫故障)則中斷,下次程式啟動時使用checkpoint從中斷處復原。
需求二:每個流程設定合理的並發數、讓資料庫和NLP服務有合理的負載(不影響其它業務的基礎上,盡可能佔用更多資源以提高ETL效能)。例如,步驟(1)-(4)分別設定並發數1、4、8、2。

這就是一個典型的Pipeline(管線)執行模型。把每一批資料(例如100條)看作管線上的產品,4個步驟對應管線上4個處理工序,每個工序處理完畢後就把半成品交給下一個工序。每個工序可以同時處理的產品數各不相同。

你可能會先想到啟用1 4 8 2個goroutine,使用channel來傳遞資料。我也曾經這麼幹,結論就是這麼幹會讓程式設計師瘋掉:流程並發控製程式碼非常複雜,特別是你得處理異常、執行時間超出預期、可控中斷等問題,你不得不加入一堆channel,直到你自己都不記得有什麼用。

重用的Pipeline模組

#為了更有效率完成ETL工作,我將Pipeline抽象化成模組。我先把程式碼貼出來,再解析意義。模組可以直接使用,主要使用的介面是:NewPipeline、Async、Wait。

使用這個Pipeline元件,我們的ETL程式將會簡單、有效率、可靠,讓程式設計師從繁瑣的並發流程控制中解放出來:

package main
 
import "log"
 
func main() {
    //恢复上次执行的checkpoint,如果是第一次执行就获取一个初始值。
    checkpoint := loadCheckpoint()
    
    //工序(1)在pipeline外执行,最后一个工序是保存checkpoint
    pipeline := NewPipeline(4, 8, 2, 1) 
    for {
        //(1)
        //加载100条数据,并修改变量checkpoint
        //data是数组,每个元素是一条评论,之后的联表、NLP都直接修改data里的每条记录。
        data, err := extractReviewsFromA(&checkpoint, 100) 
        if err != nil {
            log.Print(err)
            break
        }
        
        //这里有个Golang著名的坑。
        //“checkpoint”是循环体外的变量,它在内存中只有一个实例并在循环中不断被修改,所以不能在异步中使用它。
        //这里创建一个副本curCheckpoint,储存本次循环的checkpoint。
        curCheckpoint := checkpoint
        
        ok := pipeline.Async(func() error {
            //(2)
            return joinUserFromB(data)
        }, func() error {
            //(3)
            return nlp(data)
        }, func() error {
            //(4)
            return loadDataToC(data)
        }, func() error {
            //(5)保存checkpoint
            log.Print("done:", curCheckpoint)
            return saveCheckpoint(curCheckpoint)
        })
        if !ok { break }
        
        if len(data) < 100 { break } //处理完毕
    }
    err := pipeline.Wait()
    if err != nil { log.Print(err) }
}

以上是golang 如何處理大數據的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn