>백엔드 개발 >Golang >go-zero와 Kafka+Avro의 실천: 고성능 대화형 데이터 처리 시스템 구축

go-zero와 Kafka+Avro의 실천: 고성능 대화형 데이터 처리 시스템 구축

王林
王林원래의
2023-06-23 09:04:35898검색

최근 몇 년 동안 빅 데이터와 활발한 오픈 소스 커뮤니티가 등장하면서 점점 더 많은 기업이 증가하는 데이터 요구 사항을 충족하기 위해 고성능 대화형 데이터 처리 시스템을 찾기 시작했습니다. 이러한 기술 업그레이드의 물결 속에서 go-zero와 Kafka+Avro는 점점 더 많은 기업에서 주목을 받고 채택되고 있습니다.

go-zero는 Golang 언어를 기반으로 개발된 마이크로서비스 프레임워크로, 고성능, 사용 용이성, 손쉬운 확장 및 손쉬운 유지 관리를 지원하기 위해 설계되었습니다. 이렇게 빠른 성장을 이룬 것은 Golang 자체의 뛰어난 성능과 높은 개발 효율성은 물론 Go-Zero 팀의 지속적인 반복과 최적화 덕분입니다.

Kafka는 Apache가 개발한 분산 스트림 처리 시스템으로, 현재 빅데이터 생태계에서 가장 인기 있는 메시지 대기열 중 하나입니다. Avro는 Apache에서 개발한 데이터 직렬화 도구로, 데이터 스트림을 바이너리 형식으로 변환하여 데이터 압축 및 전송 효율성을 향상시킬 수 있습니다.

이 글에서는 go-zero와 Kafka+Avro를 결합하여 고성능 대화형 데이터 처리 시스템을 구축하는 방법을 소개하겠습니다. 구체적인 실제 프로세스는 다음과 같습니다.

  1. Kafka 클라이언트 통합

먼저 Go-Zero 서비스에 Kafka 클라이언트를 통합해야 합니다. go-zero는 Kafka와 쉽게 상호작용할 수 있는 Kafka 패키지를 제공합니다.

Kafka 패키지를 프로젝트에 도입하고 구성 파일에서 Kafka 매개 변수를 구성하면 Kafka와의 연결 및 데이터 상호 작용이 달성됩니다. 다음은 Kafka 구성 예입니다.

[kafka]
addrs = ["localhost:9092"]
version = "2.0.0"
maxMessageBytes = 10000000

특정 비즈니스 로직에서는 Kafka에서 제공하는 생산자 및 소비자 API를 사용하여 데이터를 보내고 받을 수 있습니다. 다음은 Kafka 생산자의 예입니다.

var (
    topic = "test"
)

func (s *Service) Produce(msg []byte) error {
    p, err := kafka.NewProducer(s.cfg.Kafka)
    if err != nil {
        return err
    }
    defer p.Close()

    return p.Send(context.TODO(), &kafka.Message{
        Key:   []byte(topic),
        Value: msg,
    })
}

위의 예에서는 "test"라는 Kafka 주제를 생성했으며 Produce 메서드가 호출되면 데이터가 해당 주제로 전송됩니다.

  1. 통합 Avro 직렬화

다음으로 직렬화 및 역직렬화를 위해 데이터를 Avro 형식으로 변환해야 합니다. go-zero는 Avro 패키지를 제공하고 코드 생성을 지원합니다. 스키마 파일을 정의하면 해당 Go 코드를 생성하여 Avro 데이터를 인코딩하고 디코딩할 수 있습니다.

다음은 Avro Schema 구성 예시입니다.

{
    "namespace": "com.example",
    "type": "record",
    "name": "User",
    "fields": [
        {
            "name": "name",
            "type": "string"
        },
        {
            "name": "age",
            "type": "int"
        }
    ]
}

다음 명령어를 실행하면 해당 Go 파일이 자동으로 생성됩니다.

$ go run github.com/gogo/protobuf/protoc-gen-gogofaster --proto_path=./ example.proto --gogofaster_out

생성된 Go 파일에서 Avro 필드 유형과 해당 Go 데이터 유형 매핑 관계는 데이터의 직렬화 및 역직렬화를 실현합니다.

  1. 대화형 데이터 처리 시스템 구축

Kafka와 Avro를 통합한 후 고성능 대화형 데이터 처리 시스템 구축을 시작할 수 있습니다. Kafka를 데이터 저장 센터로 사용하고 그 안에 여러 파티션을 구축하여 데이터의 분산 저장 및 처리를 달성할 수 있습니다.

각 파티션에 대해 소비자 그룹을 생성하여 데이터의 병렬 처리 및 로드 밸런싱을 달성할 수 있습니다. 동시에 우리는 go-zero에서 제공하는 코루틴 풀과 동기화 채널을 사용하여 데이터 처리의 동시성 성능을 최적화할 수 있습니다.

다음은 대화형 데이터 처리 시스템의 예입니다.

// 创建消费组
group, err := kafka.NewGroup(s.cfg.Kafka, "test", kafka.WithGroupID("test-group"))
if err != nil {
    return nil, err
}
// 创建消费者
consumer, err := group.NewConsumer(context.Background(), []string{"test"})
if err != nil {
    return nil, err
}
// 启动并发协程
for i := 0; i < s.cfg.WorkerNum; i++ {
    go func() {
        for {
            select {
                // 从同步通道中获取新消息
                case msg := <-msgs:
                    if err := s.processMsg(msg); err != nil {
                        log.Errorf("failed to process message(%v): %v", msg.Value, err)
                    }
                }
        }
    }()
}
// 消费数据
for {
    m, err := consumer.FetchMessage(context.Background())
    if err != nil {
        log.Errorf("failed to fetch message: %v", err)
        continue
    }
    // 将新消息发送到同步通道中
    msgs <- m
}

위의 예에서는 소비자 그룹 "test-group"을 생성하고 해당 소비자를 생성했습니다. 처리 중에 데이터의 병렬 처리를 달성하기 위해 먼저 여러 개의 동시 코루틴을 시작합니다. 새 메시지가 수신되면 이를 동기 채널로 보내고 비동기 처리를 위해 코루틴 풀을 활용합니다.

위 구성을 통해 go-zero, Kafka, Avro를 성공적으로 통합하여 고성능 대화형 데이터 처리 시스템을 구현했습니다. 이러한 시스템을 이용하면 대용량 데이터를 쉽게 처리할 수 있고, 데이터 처리 및 분석의 효율성을 높일 수 있습니다.

위 내용은 go-zero와 Kafka+Avro의 실천: 고성능 대화형 데이터 처리 시스템 구축의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.