ホームページ  >  記事  >  バックエンド開発  >  Golang で Kafka メッセージ キューに基づくリアルタイム キャッシュ テクノロジを確立します。

Golang で Kafka メッセージ キューに基づくリアルタイム キャッシュ テクノロジを確立します。

PHPz
PHPzオリジナル
2023-06-21 11:37:12864ブラウズ

インターネット テクノロジの継続的な開発とアプリケーション シナリオの継続的な拡大により、リアルタイム キャッシュ テクノロジはインターネット企業にとってますます不可欠なスキルになってきています。リアルタイム キャッシュ テクノロジの方法として、メッセージ キューは実際のアプリケーションで開発者の間でますます好まれています。この記事では主に、Golang で Kafka メッセージ キューに基づくリアルタイム キャッシュ技術を確立する方法を紹介します。

Kafka メッセージ キューとは何ですか?

Kafka は、LinkedIn によって開発された分散メッセージング システムで、数千万のメッセージを処理できます。高スループット、低遅延、耐久性、高信頼性という特徴があります。 Kafka にはプロデューサー、コンシューマー、トピックという 3 つの主要なコンポーネントがあり、このうちプロデューサーとコンシューマーが Kafka の中核部分です。

プロデューサは、指定されたトピックにメッセージを送信し、パーティションとキー (Key) を指定することもできます。コンシューマはトピックから対応するメッセージを受け取ります。 Kafka では、プロデューサーとコンシューマーは独立しており、互いに依存関係はなく、同じトピックを共有することによってのみ相互作用します。このアーキテクチャは分散メッセージ配信を実装し、さまざまなビジネス シナリオにおけるメッセージ キューの要件を効果的に解決します。

Golang と Kafka の組み合わせ

Golang は、近年人気が高まっている効率的なプログラミング言語であり、高い同時実行性、高いパフォーマンスなどの特徴を備え、ますます広く使用されています。 Golang ではゴルーチンの数がカーネル スレッドの数と 1 対 1 の関係にあるため、メッセージ キューと組み合わせるという固有の利点があります。これは、Golang が大規模な同時タスクを効率的かつスムーズに処理できることを意味します。 Kafka は、水平方向の拡張を実現するために、カスタマイズ可能なパーティション ルールに従ってさまざまなメッセージを異なるブローカー ノードに配信できます。

Golang でサードパーティの Kafka ライブラリ sarama を使用すると、Kafka との対話を簡単に実装できます。具体的な実装手順は次のとおりです:

1. Golang プロジェクトに sarama ライブラリを導入します:

import "github.com/Shopify/sarama"

2. メッセージ送信者 (プロデューサー) インスタンスを作成します:

config := sarama.NewConfig()
config.Producer.Return.Successes = true
producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)

このうち NewConfig() は新しい設定ファイルのインスタンスの作成に使用され、Return.Successes は各メッセージの送信が成功した場合に成功情報が返されることを示し、 NewAsyncProducer() はプロデューサーのインスタンスの作成に使用されます。 Kafka クラスター内のブローカーを表し、ノードの IP アドレスとポート番号を表します。

3. メッセージの送信:

msg := &sarama.ProducerMessage{
  Topic: "test-topic",
  Value: sarama.StringEncoder("hello world"),
}
producer.Input() <- msg

このうち、ProducerMessage はメッセージの構造を表し、Topic はメッセージが属するトピックを表し、Value はメッセージの内容を表します。

4. メッセージコンシューマ (Consumer) インスタンスの作成:

config := sarama.NewConfig()
config.Consumer.Return.Errors = true
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)

このうち、NewConfig() は新しい設定ファイルのインスタンスを作成するために使用され、Return.Errors はメッセージが送信されるたびに消費された場合、消費失敗のエラー メッセージを返します。コンシューマ インスタンスの作成には NewConsumer() が使用されます。

5. メッセージの消費:

partitionConsumer, err := consumer.ConsumePartition("test-topic", 0, sarama.OffsetNewest)
for msg := range partitionConsumer.Messages() {
  fmt.Printf("Consumed message: %s
", string(msg.Value))
  partitionConsumer.MarkOffset(msg, "") // 确认消息已被消费
}

このうち、ConsumePartition() は消費のトピック、パーティション、消費場所 (最新のメッセージまたは最も古いメッセージ) を指定するために使用され、Messages() はトピック内で消費されたメッセージから取得するために使用されます。メッセージを消費した後、MarkOffset() メソッドを使用して、メッセージが消費されたことを確認する必要があります。

Kafka リアルタイム キャッシュの実装

Golang では、Kafka メッセージ キューを介してリアルタイム キャッシュを確立すると非常に便利です。プロジェクト内にキャッシュ管理モジュールを作成し、実際のニーズに応じてキャッシュの内容を対応するメッセージ構造に変換し、プロデューサーを通じて Kafka クラスター内の指定されたトピックにメッセージを送信し、コンシューマーがメッセージを消費するのを待ちます。トピックを立てて続行します。

具体的な実装手順は次のとおりです:

1. プロジェクトでキャッシュ構造とキャッシュ変数を定義します:

type Cache struct {
  Key   string
  Value interface{}
}

var cache []Cache

このうち、Key はキャッシュ キー ( Key) 、Value はキャッシュされた値 (Value) を表します。

2. キャッシュを対応するメッセージ構造に変換します:

type Message struct {
  Operation string // 操作类型(Add/Delete/Update)
  Cache     Cache  // 缓存内容
}

func generateMessage(operation string, cache Cache) Message {
  return Message{
    Operation: operation,
    Cache:     cache,
  }
}

このうち、Message はメッセージ構造を表し、Operation はキャッシュ操作の種類を表し、generateMessage() はメッセージを返すために使用されます。実例。

3. プロデューサーを作成し、キャッシュされたコンテンツをメッセージとして指定されたトピックに送信します:

func producer(messages chan *sarama.ProducerMessage) {
  config := sarama.NewConfig()
  config.Producer.Return.Successes = true
  producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
  if err != nil {
    panic(err)
  }

  for {
    select {
    case msg := <-messages:
      producer.Input() <- msg
    }
  }
}

func pushMessage(operation string, cache Cache, messages chan *sarama.ProducerMessage) {
  msg := sarama.ProducerMessage{
    Topic: "cache-topic",
    Value: sarama.StringEncoder(generateMessage(operation, cache)),
  }
  messages <- &msg
}

このうち、Producer() は、プロデューサー インスタンスを作成し、メッセージの受信を待機するために使用されます。パイプラインから送信されます。pushMessage() は、キャッシュされたコンテンツを Message インスタンスに変換し、プロデューサーを使用して指定されたトピックに送信するために使用されます。

4. コンシューマを作成し、指定されたトピックをリッスンし、メッセージが到着したときに対応する操作を実行します:

func consumer() {
  config := sarama.NewConfig()
  config.Consumer.Return.Errors = true
  consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
  if err != nil {
    panic(err)
  }

  partitionConsumer, err := consumer.ConsumePartition("cache-topic", 0, sarama.OffsetNewest)
  if err != nil {
    panic(err)
  }

  for msg := range partitionConsumer.Messages() {
    var message Message
    err := json.Unmarshal(msg.Value, &message)
    if err != nil {
      fmt.Println("Failed to unmarshal message: ", err.Error())
      continue
    }

    switch message.Operation {
    case "Add":
      cache = append(cache, message.Cache)
    case "Delete":
      for i, c := range cache {
        if c.Key == message.Cache.Key {
          cache = append(cache[:i], cache[i+1:]...)
          break
        }
      }
    case "Update":
      for i, c := range cache {
        if c.Key == message.Cache.Key {
          cache[i] = message.Cache
          break
        }
      }
    }
    partitionConsumer.MarkOffset(msg, "") // 确认消息已被消费
  }
}

このうち、consumer() は、コンシューマ インスタンスを作成し、コンシューマ インスタンスをリッスンするために使用されます。 json.Unmarshal() 関数は、メッセージの Value フィールドを解析して Message 構造にし、Operation フィールドに基づいて対応するキャッシュ操作を実行します。メッセージを消費した後、MarkOffset() メソッドを使用して、メッセージが消費されたことを確認する必要があります。

上記の手順により、Golang で Kafka ライブラリ sarama を使用して、Kafka メッセージ キューに基づくリアルタイム キャッシュ テクノロジを確立することに成功しました。実際のアプリケーションでは、実際のニーズに応じてさまざまな Kafka クラスター構成とパーティション ルールを選択し、さまざまなビジネス シナリオに柔軟に対応できます。

以上がGolang で Kafka メッセージ キューに基づくリアルタイム キャッシュ テクノロジを確立します。の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。