エンタープライズ レベルのアプリケーション アーキテクチャがますます複雑になるにつれて、メッセージ送信が重要なコンポーネントになっています。ここでカフカが登場します。 Kafka は、メッセージのパブリッシュとサブスクリプションをサポートする効率的で信頼性の高い分散メッセージ キューであり、非常に高いスループットと低い遅延を備えた最新のエンタープライズ レベルのメッセージング システムです。 Kafka の API では、公式クライアントが複数の言語を提供していますが、近年 Golang が広く使われるようになってきたため、この記事では実装言語として Golang を使用して、Golang を使用して Kafka を実装する方法を説明します。
1. 依存関係
開始する前に、必要な依存関係をダウンロードする必要があります:
具体的な使用方法は次のとおりです。
go get github.com/Shopify/sarama
go get github. com/ pkg/errors
2. プロデューサーの作成
Kafka の API を導入する前に、最初にプロデューサー インスタンスを作成する必要があります。プロデューサーのコードは次のとおりです:
package main import ( "fmt" "time" "github.com/pkg/errors" "github.com/Shopify/sarama" ) func main() { config := sarama.NewConfig() config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Return.Successes = true producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(errors.Wrap(err, "failed to create producer")) } defer producer.Close() for i := 0; i < 10; i++ { message := &sarama.ProducerMessage{ Topic: "test_topic", Value: sarama.StringEncoder(fmt.Sprintf("test message %d", i)), } partition, offset, err := producer.SendMessage(message) if err != nil { fmt.Println(errors.Wrapf(err, "failed to send message: %s", message)) } else { fmt.Printf("message sent to partition %d at offset %d ", partition, offset) } time.Sleep(500 * time.Millisecond) // 延迟发送 } }
コードは主に次のことを行います:
3. コンシューマの作成
2 番目に、コンシューマ インスタンスを作成する必要があります。コンシューマ コードは次のとおりです。
package main import ( "context" "fmt" "os" "os/signal" "github.com/Shopify/sarama" "github.com/pkg/errors" ) func main() { config := sarama.NewConfig() config.Consumer.Return.Errors = true consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { panic(errors.Wrap(err, "failed to create consumer")) } defer consumer.Close() signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) partitions, err := consumer.Partitions("test_topic") if err != nil { panic(errors.Wrapf(err, "failed to read partitions for topic: test_topic")) } ctx, cancel := context.WithCancel(context.Background()) for _, partition := range partitions { go func(partition int32) { partitionConsumer, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetOldest) if err != nil { fmt.Printf("failed to create partition consumer for partition %d: %s ", partition, err) return } defer partitionConsumer.Close() for { select { case msg := <-partitionConsumer.Messages(): fmt.Printf("Consumed message from partition %d at offset %d: %s ", msg.Partition, msg.Offset, msg.Value) case <-signals: cancel() return case err := <-partitionConsumer.Errors(): fmt.Printf("Consumed error from partition %d: %s ", partition, err) case <-ctx.Done(): return } } }(partition) } <-signals fmt.Println("Shutting down consumer") }
コードは主に次のことを行います。
4. まとめ
上記では、Golang を使用して Kafka のプロデューサー部分とコンシューマー部分を実装しました。分散システムを実現するための重要なコンポーネントの 1 つとして、Kafka はメッセージを解決できます。システムには同時実行性の高い分散環境では問題がありますが、Kafka には優れたサポート ドキュメントと安定したコミュニティがあるため、実際の開発にストレスなく適用できます。
以上がgolangでkafkaを実装するの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。