Golang은 동시 프로그래밍에 매우 적합한 것으로 입증되었습니다. 고루틴은 비동기 프로그래밍보다 읽기 쉽고 우아하며 효율적입니다. 본 논문에서는 대용량 데이터(ETL)의 일괄 처리에 적합한 Golang 구현에 적합한 파이프라인 실행 모델을 제안합니다.
다음과 같은 적용 시나리오를 상상해 보세요. > 🎜#이 응용 프로그램 시나리오를 상상해보십시오. 사용자 댓글 로드(예: 1 10억) 각 댓글의 사용자 ID에 따라 데이터베이스 B(MySQL)의 사용자 정보를 연결하고 NLP 서비스(자연어 처리)를 호출하여 각 댓글을 처리하고 데이터베이스 C(ElasticSearch)를 작성합니다.
애플리케이션에서 발생하는 다양한 문제로 인해 다음 요구 사항을 요약합니다.요구 사항 1: 데이터를 일괄 처리해야 합니다(예: 일괄 처리당 100개 항목). 문제가 발생하면(예: 데이터베이스 오류) 중단되고 다음에 프로그램이 시작될 때 중단에서 다시 시작하기 위해 체크포인트가 사용됩니다.
요구사항 2: 데이터베이스 및 NLP 서비스가 합리적인 로드를 갖도록 각 프로세스에 대해 합리적인 수의 동시성을 설정합니다(다른 비즈니스에 영향을 주지 않고 가능한 한 많은 리소스를 점유하여 ETL 성능 향상). 예를 들어 (1)-(4)단계에서는 동시성 수를 각각 1, 4, 8, 2로 설정합니다.
이것은 일반적인 파이프라인 실행 모델입니다. 각 데이터 배치(예: 100개 품목)를 조립 라인의 제품으로 생각하십시오. 4단계는 조립 라인의 4가지 처리 절차에 해당합니다. 각 프로세스가 완료된 후 반제품이 고객에게 전달됩니다. 다음 과정. 각 공정에서 동시에 처리할 수 있는 제품의 수는 다릅니다.
재사용 가능한 파이프라인 모듈
ETL 작업을 보다 효율적으로 완료하기 위해 파이프라인을 모듈로 추상화했습니다. . 먼저 코드를 붙여넣은 후 의미를 분석해 보겠습니다. 모듈을 직접 사용할 수 있으며 사용되는 주요 인터페이스는 NewPipeline, Async 및 Wait입니다.
이 파이프라인 구성 요소를 사용하면 ETL 프로그램이 간단하고 효율적이며 안정적이므로 프로그래머가 번거로운 동시 프로세스 제어에서 벗어날 수 있습니다.package main import "log" func main() { //恢复上次执行的checkpoint,如果是第一次执行就获取一个初始值。 checkpoint := loadCheckpoint() //工序(1)在pipeline外执行,最后一个工序是保存checkpoint pipeline := NewPipeline(4, 8, 2, 1) for { //(1) //加载100条数据,并修改变量checkpoint //data是数组,每个元素是一条评论,之后的联表、NLP都直接修改data里的每条记录。 data, err := extractReviewsFromA(&checkpoint, 100) if err != nil { log.Print(err) break } //这里有个Golang著名的坑。 //“checkpoint”是循环体外的变量,它在内存中只有一个实例并在循环中不断被修改,所以不能在异步中使用它。 //这里创建一个副本curCheckpoint,储存本次循环的checkpoint。 curCheckpoint := checkpoint ok := pipeline.Async(func() error { //(2) return joinUserFromB(data) }, func() error { //(3) return nlp(data) }, func() error { //(4) return loadDataToC(data) }, func() error { //(5)保存checkpoint log.Print("done:", curCheckpoint) return saveCheckpoint(curCheckpoint) }) if !ok { break } if len(data) < 100 { break } //处理完毕 } err := pipeline.Wait() if err != nil { log.Print(err) } }
위 내용은 golang이 빅데이터를 처리하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!