Heim  >  Artikel  >  Backend-Entwicklung  >  Wie Golang mit Big Data umgeht

Wie Golang mit Big Data umgeht

(*-*)浩
(*-*)浩Original
2019-12-27 11:09:424747Durchsuche

Wie Golang mit Big Data umgeht

Golang hat sich als sehr geeignet für die gleichzeitige Programmierung erwiesen. Goroutinen sind lesbarer, eleganter und effizienter als die asynchrone Programmierung. In diesem Artikel wird ein Pipeline-Ausführungsmodell vorgeschlagen, das für die Implementierung durch Golang geeignet ist und sich für die Stapelverarbeitung großer Datenmengen (ETL) eignet.

Stellen Sie sich ein solches Anwendungsszenario vor:                                                                                                                                                                                                                 1 Milliarde Benutzerinformationen aus Datenbank B (MySQL) zuordnen, basierend auf der Benutzer-ID jedes Kommentars; Verarbeitung), um jeden Kommentar zu verarbeiten; schreiben Sie die Verarbeitungsergebnisse in die Datenbank C (ElasticSearch).

Aufgrund verschiedener bei der Anwendung aufgetretener Probleme werden diese Anforderungen zusammengefasst:

Anforderung 1: Daten sollten in Stapeln verarbeitet werden, beispielsweise 100 Artikel pro Stapel. Wenn ein Problem auftritt (z. B. ein Datenbankfehler), wird der Vorgang unterbrochen und beim nächsten Start des Programms wird ein Prüfpunkt verwendet, um nach der Unterbrechung fortzufahren.

Anforderung 2: Legen Sie für jeden Prozess eine angemessene Anzahl an Parallelität fest, damit die Datenbank und der NLP-Dienst eine angemessene Auslastung aufweisen (ohne andere Unternehmen zu beeinträchtigen, belegen Sie so viele Ressourcen wie möglich, um die ETL-Leistung zu verbessern). Beispielsweise werden in den Schritten (1) bis (4) die Parallelitätszahlen auf 1, 4, 8 bzw. 2 festgelegt.


Dies ist ein typisches Pipeline-Ausführungsmodell. Stellen Sie sich jede Datencharge (z. B. 100 Artikel) als Produkt am Fließband vor. Die 4 Schritte entsprechen den 4 Verarbeitungsprozessen am Fließband. Nach Abschluss jedes Prozesses wird das Halbzeug übergeben der nächste Prozess. Die Anzahl der Produkte, die in jedem Prozess gleichzeitig verarbeitet werden können, variiert.

Vielleicht denken Sie zunächst darüber nach, 1+4+8+2 Goroutinen zu aktivieren und Kanäle zur Datenübertragung zu verwenden. Ich habe das schon einmal gemacht, und die Schlussfolgerung ist, dass dies Programmierer verrückt machen wird: Der Prozess-Parallelitätskontrollcode ist sehr kompliziert, insbesondere wenn man mit Ausnahmen, über den Erwartungen liegenden Ausführungszeiten, kontrollierbaren Unterbrechungen usw. umgehen muss Fügen Sie eine Reihe von Kanälen hinzu, bis Sie sich nicht einmal mehr an die Verwendung erinnern.

Wiederverwendbares Pipeline-Modul

Um die ETL-Arbeit effizienter abzuschließen, habe ich Pipeline in Module abstrahiert. Ich füge zuerst den Code ein und analysiere dann die Bedeutung. Das Modul kann direkt verwendet werden und die wichtigsten verwendeten Schnittstellen sind: NewPipeline, Async und Wait.

Mithilfe dieser Pipeline-Komponente wird unser ETL-Programm einfach, effizient und zuverlässig sein und Programmierer von der umständlichen gleichzeitigen Prozesssteuerung befreien:

package main
 
import "log"
 
func main() {
    //恢复上次执行的checkpoint,如果是第一次执行就获取一个初始值。
    checkpoint := loadCheckpoint()
    
    //工序(1)在pipeline外执行,最后一个工序是保存checkpoint
    pipeline := NewPipeline(4, 8, 2, 1) 
    for {
        //(1)
        //加载100条数据,并修改变量checkpoint
        //data是数组,每个元素是一条评论,之后的联表、NLP都直接修改data里的每条记录。
        data, err := extractReviewsFromA(&checkpoint, 100) 
        if err != nil {
            log.Print(err)
            break
        }
        
        //这里有个Golang著名的坑。
        //“checkpoint”是循环体外的变量,它在内存中只有一个实例并在循环中不断被修改,所以不能在异步中使用它。
        //这里创建一个副本curCheckpoint,储存本次循环的checkpoint。
        curCheckpoint := checkpoint
        
        ok := pipeline.Async(func() error {
            //(2)
            return joinUserFromB(data)
        }, func() error {
            //(3)
            return nlp(data)
        }, func() error {
            //(4)
            return loadDataToC(data)
        }, func() error {
            //(5)保存checkpoint
            log.Print("done:", curCheckpoint)
            return saveCheckpoint(curCheckpoint)
        })
        if !ok { break }
        
        if len(data) < 100 { break } //处理完毕
    }
    err := pipeline.Wait()
    if err != nil { log.Print(err) }
}

Das obige ist der detaillierte Inhalt vonWie Golang mit Big Data umgeht. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn