Heim >Java >javaLernprogramm >Java-verteilte Kafka-Nachrichtenwarteschlangeninstanzanalyse

Java-verteilte Kafka-Nachrichtenwarteschlangeninstanzanalyse

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBnach vorne
2023-04-19 16:10:151278Durchsuche

Einführung

Apache Kafka ist ein verteiltes Publish-Subscribe-Messaging-System. Die Definition von Kafka auf der offiziellen Kafka-Website lautet: ein verteiltes Publish-Subscribe-Messaging-System. Es wurde ursprünglich von LinkedIn entwickelt, das 2010 der Apache Foundation zur Verfügung gestellt wurde und zu einem Top-Open-Source-Projekt wurde. Kafka ist ein schneller, skalierbarer und inhärent verteilter, partitionierter und replizierbarer Commit-Log-Dienst.

Hinweis: Kafka folgt nicht der JMS-Spezifikation (), sondern bietet nur Veröffentlichungs- und Abonnement-Kommunikationsmethoden.

Kafka-Kernbezogene Namen

  1. Broker: Kafka-Knoten, ein Kafka-Knoten ist ein Broker, mehrere Broker können einen Kafka-Cluster bilden

  2. Topic: Ein Nachrichtentyp, das Verzeichnis, in dem die Nachricht gespeichert ist, ist das Thema B. Seitenaufrufprotokolle, Klickprotokolle usw., können in Form von Themen vorhanden sein. Der Kafka-Cluster kann für die gleichzeitige Verteilung mehrerer Themen verantwortlich sein. Massage: das grundlegendste Bereitstellungsobjekt in Kafka.

  3. Partition: Die physische Gruppierung von Themen. Ein Thema kann in mehrere Partitionen unterteilt werden, und jede Partition ist eine geordnete Warteschlange. Die Partitionierung ist in Kafka implementiert und ein Broker repräsentiert eine Region.

  4. Segment: Die Partition besteht physisch aus mehreren Segmenten. Jedes Segment speichert Nachrichteninformationen Verbraucht als Thread

  5. Verbrauchergruppe: Verbrauchergruppe. Eine Verbrauchergruppe enthält mehrere Verbraucher :

  6. Warteschlange ist eine Datenstruktur, die dem First-In-First-Out-Prinzip folgt Cluster-Umgebung
  7. Umgebungstest ausführen
  8. Die Installation der JDK-Umgebung und des Zookeepers wird hier nicht detailliert beschrieben.
  9. Warum ist Kafka von Zookeeper abhängig: Kafka speichert MQ-Informationen auf Zookeeper. Um die Erweiterung des gesamten Clusters zu vereinfachen, wird die Ereignisbenachrichtigung von Zookeeper verwendet, um sich gegenseitig zu erkennen.

    kafka-Cluster-Installationsschritte:
1. Laden Sie das komprimierte Paket von kafka herunter

tar -zxvf kafka_2.11-1.0.0.tgz

3. Ändern Sie die Konfigurationsdatei von kafka .properties
  • Änderungsinhalt der Konfigurationsdatei:

  • zookeeper-Verbindungsadresse: zookeeper.connect=192.168.1.19:2181

  • Die Abhör-IP wird in die lokale IPlisteners geändert =PLAINTEXT://192.168.1.19:9092

  • kafkas Broker-ID, die ID jedes Brokers ist unterschiedlich broker.id=0

4. Starten Sie kafka nacheinander

Java-verteilte Kafka-Nachrichtenwarteschlangeninstanzanalyse./kafka-server-start.sh -daemon config/server.properties

kafka verwendet

kafka-Dateispeicher

Thema ist ein logisches Konzept und die Partition ist physisch. Basierend auf dem Konzept Oben entspricht jede Partition einer Protokolldatei, und die Protokolldatei speichert die vom Produzenten generierten Daten. Die vom Produzenten generierten Daten werden kontinuierlich an das Ende der Protokolldatei angehängt. Um zu verhindern, dass die Protokolldatei zu groß wird und die Datenpositionierung ineffizient wird, verwendet Kafka einen Sharding- und Indizierungsmechanismus, um jede Partition in mehrere Segmente zu unterteilen . Jedes Segment enthält: „.index“-Dateien, „.log“-Dateien und .timeindex-Dateien. Diese Dateien befinden sich in einem Ordner und die Benennungsregel für den Ordner lautet: Themenname + Partitionsnummer.

Zum Beispiel: Führen Sie den Befehl aus, um ein neues Thema zu erstellen, das in drei Bereiche unterteilt und in drei Brokern gespeichert ist:

./kafka-topics.sh --create --zookeeper localhost:2181 --replication -faktor 1 - -partitions 3 --topic kaico

    Eine Partition ist in mehrere Segmente unterteilt
  • zookeeper.connect=192.168.1.19:2181

  • 监听的ip,修改为本机的iplisteners=PLAINTEXT://192.168.1.19:9092

  • kafka的brokerid,每台broker的id都不一样broker.id=0

4、依次启动kafka

./kafka-server-start.sh -daemon config/server.properties

kafka使用

kafka文件存储

topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生成的数据。Producer生成的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment,每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。

例如:执行命令新建一个主题,分三个区存放放在三个broker中:

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kaico

.log-Protokolldatei

Java-verteilte Kafka-Nachrichtenwarteschlangeninstanzanalyse

.index-Offset-IndexdateiJava-verteilte Kafka-Nachrichtenwarteschlangeninstanzanalyse

  • .timeindex Zeitstempel-Indexdatei

  • Andere Dateien (partition.metadata, Leader-Epoch-Checkpoint)

  • Springboot integriert Kafka

    Maven-Abhängigkeit
  •  <dependencies>
            <!-- springBoot集成kafka -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <!-- SpringBoot整合Web组件 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
        </dependencies>
  • yml. Konfiguration

    # kafka
    spring:
      kafka:
        # kafka服务器地址(可以多个)
    #    bootstrap-servers: 192.168.212.164:9092,192.168.212.167:9092,192.168.212.168:9092
        bootstrap-servers: www.kaicostudy.com:9092,www.kaicostudy.com:9093,www.kaicostudy.com:9094
        consumer:
          # 指定一个默认的组名
          group-id: kafkaGroup1
          # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
          # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
          # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
          auto-offset-reset: earliest
          # key/value的反序列化
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
          # key/value的序列化
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          # 批量抓取
          batch-size: 65536
          # 缓存容量
          buffer-memory: 524288
          # 服务器地址
          bootstrap-servers: www.kaicostudy.com:9092,www.kaicostudy.com:9093,www.kaicostudy.com:9094

    Produzent
  • @RestController
    public class KafkaController {
    	/**
    	 * 注入kafkaTemplate
    	 */
    	@Autowired
    	private KafkaTemplate<String, String> kafkaTemplate;
    	/**
    	 * 发送消息的方法
    	 *
    	 * @param key
    	 *            推送数据的key
    	 * @param data
    	 *            推送数据的data
    	 */
    	private void send(String key, String data) {
    		// topic 名称 key   data 消息数据
    		kafkaTemplate.send("kaico", key, data);
    	}
    	// test 主题 1 my_test 3
    	@RequestMapping("/kafka")
    	public String testKafka() {
    		int iMax = 6;
    		for (int i = 1; i < iMax; i++) {
    			send("key" + i, "data" + i);
    		}
    		return "success";
    	}
    }
  • Verbraucher

    @Component
    public class TopicKaicoConsumer {
        /**
         * 消费者使用日志打印消息
         */
        @KafkaListener(topics = "kaico") //监听的主题
        public void receive(ConsumerRecord<?, ?> consumer) {
            System.out.println("topic名称:" + consumer.topic() + ",key:" +
                    consumer.key() + "," +
                    "分区位置:" + consumer.partition()
                    + ", 下标" + consumer.offset());
            //输出key对应的value的值
            System.out.println(consumer.value());
        }
    }

Das obige ist der detaillierte Inhalt vonJava-verteilte Kafka-Nachrichtenwarteschlangeninstanzanalyse. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Dieser Artikel ist reproduziert unter:yisu.com. Bei Verstößen wenden Sie sich bitte an admin@php.cn löschen