Golang被證明非常適合併發編程,goroutine比非同步編程更易讀、優雅、高效。本文提出一個適合由Golang實現的Pipeline執行模型,適合大量處理大量資料(ETL)的情境。
想像這樣的應用情境: (建議學習:go)
#從資料庫A(Cassandra)載入使用者評論中(數量龐大,例如10億條);根據每個評論的使用者ID、從資料庫B(MySQL)關聯使用者資料;呼叫NLP服務(自然語言處理),處理每個評論;將處理結果寫入資料庫C(ElasticSearch)。
由於應用程式遇到的各種問題,歸納出這些需求:
需求一:應分批處理數據,例如規定每批100條。出現問題時(例如任一資料庫故障)則中斷,下次程式啟動時使用checkpoint從中斷處復原。
需求二:每個流程設定合理的並發數、讓資料庫和NLP服務有合理的負載(不影響其它業務的基礎上,盡可能佔用更多資源以提高ETL效能)。例如,步驟(1)-(4)分別設定並發數1、4、8、2。
這就是一個典型的Pipeline(管線)執行模型。把每一批資料(例如100條)看作管線上的產品,4個步驟對應管線上4個處理工序,每個工序處理完畢後就把半成品交給下一個工序。每個工序可以同時處理的產品數各不相同。
你可能會先想到啟用1 4 8 2個goroutine,使用channel來傳遞資料。我也曾經這麼幹,結論就是這麼幹會讓程式設計師瘋掉:流程並發控製程式碼非常複雜,特別是你得處理異常、執行時間超出預期、可控中斷等問題,你不得不加入一堆channel,直到你自己都不記得有什麼用。
可重用的Pipeline模組
#為了更有效率完成ETL工作,我將Pipeline抽象化成模組。我先把程式碼貼出來,再解析意義。模組可以直接使用,主要使用的介面是:NewPipeline、Async、Wait。
使用這個Pipeline元件,我們的ETL程式將會簡單、有效率、可靠,讓程式設計師從繁瑣的並發流程控制中解放出來:
package main import "log" func main() { //恢复上次执行的checkpoint,如果是第一次执行就获取一个初始值。 checkpoint := loadCheckpoint() //工序(1)在pipeline外执行,最后一个工序是保存checkpoint pipeline := NewPipeline(4, 8, 2, 1) for { //(1) //加载100条数据,并修改变量checkpoint //data是数组,每个元素是一条评论,之后的联表、NLP都直接修改data里的每条记录。 data, err := extractReviewsFromA(&checkpoint, 100) if err != nil { log.Print(err) break } //这里有个Golang著名的坑。 //“checkpoint”是循环体外的变量,它在内存中只有一个实例并在循环中不断被修改,所以不能在异步中使用它。 //这里创建一个副本curCheckpoint,储存本次循环的checkpoint。 curCheckpoint := checkpoint ok := pipeline.Async(func() error { //(2) return joinUserFromB(data) }, func() error { //(3) return nlp(data) }, func() error { //(4) return loadDataToC(data) }, func() error { //(5)保存checkpoint log.Print("done:", curCheckpoint) return saveCheckpoint(curCheckpoint) }) if !ok { break } if len(data) < 100 { break } //处理完毕 } err := pipeline.Wait() if err != nil { log.Print(err) } }
以上是golang 如何處理大數據的詳細內容。更多資訊請關注PHP中文網其他相關文章!