ホームページ  >  記事  >  Java  >  Java 分散 Kafka メッセージ キュー インスタンスの分析

Java 分散 Kafka メッセージ キュー インスタンスの分析

WBOY
WBOY転載
2023-04-19 16:10:151153ブラウズ

はじめに

Apache Kafka は、分散型パブリッシュ/サブスクライブ メッセージング システムです。kafka 公式 Web サイトでの kafka の定義は、「分散型パブリッシュ/サブスクライブ メッセージング システム」です。これは元々 LinkedIn によって開発され、2010 年に Apache Foundation に寄付され、トップのオープンソース プロジェクトになりました。 Kafka は、高速かつスケーラブルで、本質的に分散され、パーティション化され、複製可能なコミット ログ サービスです。

注: Kafka は JMS 仕様 () に従っておらず、パブリッシュおよびサブスクライブ通信メソッドのみを提供します。

Kafka コア関連の名前

  1. ブローカー: Kafka ノード、Kafka ノードはブローカーであり、複数のブローカーで Kafka クラスターを形成できます

  2. トピック: メッセージのタイプ。メッセージが保存されるディレクトリがトピックです。たとえば、ページ ビュー ログ、クリック ログなどがトピックの形式で存在できます。Kafka クラスターは配布を担当できます。

  3. マッサージ: Kafka の最も基本的な配信オブジェクト。

  4. パーティション: トピックの物理的なグループ。トピックは複数のパーティションに分割でき、各パーティションは順序付けられたキューになります。パーティショニングは Kafka に実装されており、ブローカーはリージョンを表します。

  5. セグメント: パーティションは物理的に複数のセグメントで構成されており、各セグメントにはメッセージ情報が格納されます。

  6. プロデューサー: プロデューサー。メッセージを作成して送信します。トピックに移動

  7. ##コンシューマ: コンシューマ、トピックをサブスクライブしてメッセージを消費、コンシューマがスレッドとして消費
  8. ##コンシューマ グループ: コンシューマ グループ、コンシューマグループには複数のコンシューマが含まれています
  9. オフセット: オフセット、メッセージ パーティション内のメッセージのインデックス位置として理解されます
  10. トピックとキューの違い:
キューは先入れ先出しの原則に従うデータ構造です

#kafka クラスターのインストール

それぞれに jdk1.8 環境をインストールします。サーバー

  • Zookeeper クラスター環境のインストール

  • ##kafka クラスター環境のインストール

  • ##環境テストの実行
  • #jdk 環境と Zookeeper のインストールについては、ここでは詳しく説明しません。
Kafka が Zookeeper に依存する理由: Kafka は mq 情報を Zookeeper に保存します。クラスター全体の拡張を容易にするために、Zookeeper のイベント通知を使用して相互に感知します。

Java 分散 Kafka メッセージ キュー インスタンスの分析kafka クラスターのインストール手順:

1. Kafka 圧縮パッケージをダウンロードします

2. インストール パッケージ

tar -zxvf kafka_2 を解凍します。 11 -1.0.0.tgz

3. kafka の設定ファイルを変更 config/server.properties

設定ファイルの変更内容:

Zookeeper 接続アドレス:

zookeeper.connect=192.168.1.19:2181

  • リスニング IP がローカル iplisteners=PLAINTEXT:// 192.168 に変更されます。 .1.19:9092

  • #kafka のブローカー ID、各ブローカーの ID は異なりますbroker.id=0

  • 4. Kafka を順番に開始します

    ./kafka-server-start.sh -daemon config/server.properties

kafka の使用法

kafkaファイル ストレージトピックは論理概念、パーティションは物理概念です。各パーティションはログ ファイルに対応し、ログ ファイルにはプロデューサーによって生成されたデータが保存されます。プロデューサーによって生成されたデータは、ログ ファイルの末尾に継続的に追加されます。ログ ファイルが大きくなりすぎてデータ配置の非効率が引き起こされるのを防ぐために、Kafka はシャーディングおよびインデックス作成メカニズムを採用して各パーティションを複数のセグメントに分割します。各セグメントには、「.index」ファイル、「.log」ファイル、および .timeindex ファイルが含まれます。これらのファイルはフォルダー内にあり、フォルダーの命名規則は次のとおりです: トピック名 パーティション シリアル番号。

例: コマンドを実行して新しいトピックを作成します。トピックは 3 つの領域に分割され、3 つのブローカーに保存されます:

./kafka-topics.sh --create - -zookeeper localhost: 2181 --replication-factor 1 --partitions 3 --topic kaico

Java 分散 Kafka メッセージ キュー インスタンスの分析 #a パーティション 複数のセグメントに分割

Java 分散 Kafka メッセージ キュー インスタンスの分析

#.log ログ ファイル
  • #.index offset インデックス ファイル
  • .timeindex タイムスタンプ インデックス ファイル
  • その他のファイル (partition.metadata、leader-epoch-checkpoint)
  • Springboot 統合 kafka
  • Maven 依存関係

     <dependencies>
            <!-- springBoot集成kafka -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <!-- SpringBoot整合Web组件 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
        </dependencies>

    yml 設定
  • # kafka
    spring:
      kafka:
        # kafka服务器地址(可以多个)
    #    bootstrap-servers: 192.168.212.164:9092,192.168.212.167:9092,192.168.212.168:9092
        bootstrap-servers: www.kaicostudy.com:9092,www.kaicostudy.com:9093,www.kaicostudy.com:9094
        consumer:
          # 指定一个默认的组名
          group-id: kafkaGroup1
          # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
          # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
          # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
          auto-offset-reset: earliest
          # key/value的反序列化
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
          # key/value的序列化
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          # 批量抓取
          batch-size: 65536
          # 缓存容量
          buffer-memory: 524288
          # 服务器地址
          bootstrap-servers: www.kaicostudy.com:9092,www.kaicostudy.com:9093,www.kaicostudy.com:9094
プロデューサー

@RestController
public class KafkaController {
	/**
	 * 注入kafkaTemplate
	 */
	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;
	/**
	 * 发送消息的方法
	 *
	 * @param key
	 *            推送数据的key
	 * @param data
	 *            推送数据的data
	 */
	private void send(String key, String data) {
		// topic 名称 key   data 消息数据
		kafkaTemplate.send("kaico", key, data);
	}
	// test 主题 1 my_test 3
	@RequestMapping("/kafka")
	public String testKafka() {
		int iMax = 6;
		for (int i = 1; i < iMax; i++) {
			send("key" + i, "data" + i);
		}
		return "success";
	}
}

コンシューマ

@Component
public class TopicKaicoConsumer {
    /**
     * 消费者使用日志打印消息
     */
    @KafkaListener(topics = "kaico") //监听的主题
    public void receive(ConsumerRecord<?, ?> consumer) {
        System.out.println("topic名称:" + consumer.topic() + ",key:" +
                consumer.key() + "," +
                "分区位置:" + consumer.partition()
                + ", 下标" + consumer.offset());
        //输出key对应的value的值
        System.out.println(consumer.value());
    }
}

以上がJava 分散 Kafka メッセージ キュー インスタンスの分析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。