インターネット テクノロジの継続的な開発とアプリケーション シナリオの継続的な拡大により、リアルタイム キャッシュ テクノロジはインターネット企業にとってますます不可欠なスキルになってきています。リアルタイム キャッシュ テクノロジの方法として、メッセージ キューは実際のアプリケーションで開発者の間でますます好まれています。この記事では主に、Golang で Kafka メッセージ キューに基づくリアルタイム キャッシュ技術を確立する方法を紹介します。
Kafka メッセージ キューとは何ですか?
Kafka は、LinkedIn によって開発された分散メッセージング システムで、数千万のメッセージを処理できます。高スループット、低遅延、耐久性、高信頼性という特徴があります。 Kafka にはプロデューサー、コンシューマー、トピックという 3 つの主要なコンポーネントがあり、このうちプロデューサーとコンシューマーが Kafka の中核部分です。
プロデューサは、指定されたトピックにメッセージを送信し、パーティションとキー (Key) を指定することもできます。コンシューマはトピックから対応するメッセージを受け取ります。 Kafka では、プロデューサーとコンシューマーは独立しており、互いに依存関係はなく、同じトピックを共有することによってのみ相互作用します。このアーキテクチャは分散メッセージ配信を実装し、さまざまなビジネス シナリオにおけるメッセージ キューの要件を効果的に解決します。
Golang と Kafka の組み合わせ
Golang は、近年人気が高まっている効率的なプログラミング言語であり、高い同時実行性、高いパフォーマンスなどの特徴を備え、ますます広く使用されています。 Golang ではゴルーチンの数がカーネル スレッドの数と 1 対 1 の関係にあるため、メッセージ キューと組み合わせるという固有の利点があります。これは、Golang が大規模な同時タスクを効率的かつスムーズに処理できることを意味します。 Kafka は、水平方向の拡張を実現するために、カスタマイズ可能なパーティション ルールに従ってさまざまなメッセージを異なるブローカー ノードに配信できます。
Golang でサードパーティの Kafka ライブラリ sarama を使用すると、Kafka との対話を簡単に実装できます。具体的な実装手順は次のとおりです:
1. Golang プロジェクトに sarama ライブラリを導入します:
import "github.com/Shopify/sarama"
2. メッセージ送信者 (プロデューサー) インスタンスを作成します:
config := sarama.NewConfig() config.Producer.Return.Successes = true producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
このうち NewConfig() は新しい設定ファイルのインスタンスの作成に使用され、Return.Successes は各メッセージの送信が成功した場合に成功情報が返されることを示し、 NewAsyncProducer() はプロデューサーのインスタンスの作成に使用されます。 Kafka クラスター内のブローカーを表し、ノードの IP アドレスとポート番号を表します。
3. メッセージの送信:
msg := &sarama.ProducerMessage{ Topic: "test-topic", Value: sarama.StringEncoder("hello world"), } producer.Input() <- msg
このうち、ProducerMessage はメッセージの構造を表し、Topic はメッセージが属するトピックを表し、Value はメッセージの内容を表します。
4. メッセージコンシューマ (Consumer) インスタンスの作成:
config := sarama.NewConfig() config.Consumer.Return.Errors = true consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
このうち、NewConfig() は新しい設定ファイルのインスタンスを作成するために使用され、Return.Errors はメッセージが送信されるたびに消費された場合、消費失敗のエラー メッセージを返します。コンシューマ インスタンスの作成には NewConsumer() が使用されます。
5. メッセージの消費:
partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, sarama.OffsetNewest) for msg := range partitionConsumer.Messages() { fmt.Printf("Consumed message: %s ", string(msg.Value)) partitionConsumer.MarkOffset(msg, "") // 确认消息已被消费 }
このうち、ConsumePartition() は消費のトピック、パーティション、消費場所 (最新のメッセージまたは最も古いメッセージ) を指定するために使用され、Messages() はトピック内で消費されたメッセージから取得するために使用されます。メッセージを消費した後、MarkOffset() メソッドを使用して、メッセージが消費されたことを確認する必要があります。
Kafka リアルタイム キャッシュの実装
Golang では、Kafka メッセージ キューを介してリアルタイム キャッシュを確立すると非常に便利です。プロジェクト内にキャッシュ管理モジュールを作成し、実際のニーズに応じてキャッシュの内容を対応するメッセージ構造に変換し、プロデューサーを通じて Kafka クラスター内の指定されたトピックにメッセージを送信し、コンシューマーがメッセージを消費するのを待ちます。トピックを立てて続行します。
具体的な実装手順は次のとおりです:
1. プロジェクトでキャッシュ構造とキャッシュ変数を定義します:
type Cache struct { Key string Value interface{} } var cache []Cache
このうち、Key はキャッシュ キー ( Key) 、Value はキャッシュされた値 (Value) を表します。
2. キャッシュを対応するメッセージ構造に変換します:
type Message struct { Operation string // 操作类型(Add/Delete/Update) Cache Cache // 缓存内容 } func generateMessage(operation string, cache Cache) Message { return Message{ Operation: operation, Cache: cache, } }
このうち、Message はメッセージ構造を表し、Operation はキャッシュ操作の種類を表し、generateMessage() はメッセージを返すために使用されます。実例。
3. プロデューサーを作成し、キャッシュされたコンテンツをメッセージとして指定されたトピックに送信します:
func producer(messages chan *sarama.ProducerMessage) { config := sarama.NewConfig() config.Producer.Return.Successes = true producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config) if err != nil { panic(err) } for { select { case msg := <-messages: producer.Input() <- msg } } } func pushMessage(operation string, cache Cache, messages chan *sarama.ProducerMessage) { msg := sarama.ProducerMessage{ Topic: "cache-topic", Value: sarama.StringEncoder(generateMessage(operation, cache)), } messages <- &msg }
このうち、Producer() は、プロデューサー インスタンスを作成し、メッセージの受信を待機するために使用されます。パイプラインから送信されます。pushMessage() は、キャッシュされたコンテンツを Message インスタンスに変換し、プロデューサーを使用して指定されたトピックに送信するために使用されます。
4. コンシューマを作成し、指定されたトピックをリッスンし、メッセージが到着したときに対応する操作を実行します:
func consumer() { config := sarama.NewConfig() config.Consumer.Return.Errors = true consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config) if err != nil { panic(err) } partitionConsumer, err := consumer.ConsumePartition("cache-topic", 0, sarama.OffsetNewest) if err != nil { panic(err) } for msg := range partitionConsumer.Messages() { var message Message err := json.Unmarshal(msg.Value, &message) if err != nil { fmt.Println("Failed to unmarshal message: ", err.Error()) continue } switch message.Operation { case "Add": cache = append(cache, message.Cache) case "Delete": for i, c := range cache { if c.Key == message.Cache.Key { cache = append(cache[:i], cache[i+1:]...) break } } case "Update": for i, c := range cache { if c.Key == message.Cache.Key { cache[i] = message.Cache break } } } partitionConsumer.MarkOffset(msg, "") // 确认消息已被消费 } }
このうち、consumer() は、コンシューマ インスタンスを作成し、コンシューマ インスタンスをリッスンするために使用されます。 json.Unmarshal() 関数は、メッセージの Value フィールドを解析して Message 構造にし、Operation フィールドに基づいて対応するキャッシュ操作を実行します。メッセージを消費した後、MarkOffset() メソッドを使用して、メッセージが消費されたことを確認する必要があります。
上記の手順により、Golang で Kafka ライブラリ sarama を使用して、Kafka メッセージ キューに基づくリアルタイム キャッシュ テクノロジを確立することに成功しました。実際のアプリケーションでは、実際のニーズに応じてさまざまな Kafka クラスター構成とパーティション ルールを選択し、さまざまなビジネス シナリオに柔軟に対応できます。
以上がGolang で Kafka メッセージ キューに基づくリアルタイム キャッシュ テクノロジを確立します。の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。