Heim  >  Artikel  >  Java  >  Spark-Streaming kombiniert mit Kafka-Programmierung unter Java8 (Spark 2.0 & Kafka 0.10).

Spark-Streaming kombiniert mit Kafka-Programmierung unter Java8 (Spark 2.0 & Kafka 0.10).

无忌哥哥
无忌哥哥Original
2018-07-23 09:29:163076Durchsuche

Vorhin haben wir über eine einfache Demo von Spark-Streaming gesprochen, und es gab auch Beispiele für erfolgreiche Kafka-Läufe. Hier werden wir die beiden kombinieren, was auch eine der häufig verwendeten Anwendungen ist.

1. Relevante Komponentenversionen
Bestätigen Sie zunächst die Version, da sie sich etwas von der vorherigen Version unterscheidet und daher noch nicht verwendet werden muss Java8 und Spark 2.0.0 werden verwendet, Kafka 0.10.

2. Maven-Pakete vorstellen
Ich habe im Internet einige Beispiele für Kombinationen gefunden, aber sie unterschieden sich von meiner aktuellen Version, sodass ich überhaupt keinen Erfolg hatte, also habe ich etwas recherchiert und die aufgelistet importierte Pakete.

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.0.0</version>
</dependency>

Das neueste Paket ohne Kafka-Versionsnummer, das im Internet zu finden ist, ist 1.6.3. Ich habe es versucht, aber es kann unter Spark2 nicht mehr erfolgreich ausgeführt werden, daher habe ich die Version gefunden, die Kafka0.10 entspricht . Achten Sie auf spark2. Die Scala-Version von .0 ist bereits 2.11, daher muss auf die Einbindung 2.11 folgen, was die Scala-Version angibt.

3.SparkSteamingKafka-Klasse
Es ist zu beachten, dass der importierte Paketpfad org.apache.spark.streaming.kafka010.xxx ist, sodass der Import auch hier enthalten ist. Weitere Informationen entnehmen Sie bitte direkt den Kommentaren.

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import scala.Tuple2;

public class SparkSteamingKafka {
    public static void main(String[] args) throws InterruptedException {
        String brokers = "master2:6667";
        String topics = "topic1";
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

        Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        //kafka相关参数,必要!缺了会报错
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers) ;
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //Topic分区
        Map<TopicPartition, Long> offsets = new HashMap<>();
        offsets.put(new TopicPartition("topic1", 0), 2L); 
        //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
        JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets)
            );
        //这里就跟之前的demo一样了,只是需要注意这边的lines里的参数本身是个ConsumerRecord对象
        JavaPairDStream<String, Integer> counts = 
                lines.flatMap(x -> Arrays.asList(x.value().toString().split(" ")).iterator())
                .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
                .reduceByKey((x, y) -> x + y);  
        counts.print();
//  可以打印所有信息,看下ConsumerRecord的结构
//      lines.foreachRDD(rdd -> {
//          rdd.foreach(x -> {
//            System.out.println(x);
//          });
//        });
        ssc.start();
        ssc.awaitTermination();
        ssc.close();
    }
}

4. Führen Sie den Test aus
Hier verwenden wir die im vorherigen Artikel über Kafka geschriebene Klasse und stellen die Daten auf dem Kafka-Server auf dem Master2-Knoten bereit Testläufe spark2.

UserKafkaProducer producerThread = new UserKafkaProducer(KafkaProperties.topic);
producerThread.start();

Führen Sie die SparkSteamingKafka-Klasse noch einmal in 3 aus und Sie können sehen, dass sie erfolgreich war.

Spark-Streaming kombiniert mit Kafka-Programmierung unter Java8 (Spark 2.0 & Kafka 0.10).

Spark-Streaming kombiniert mit Kafka-Programmierung unter Java8 (Spark 2.0 & Kafka 0.10).

Das obige ist der detaillierte Inhalt vonSpark-Streaming kombiniert mit Kafka-Programmierung unter Java8 (Spark 2.0 & Kafka 0.10).. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn