Maison  >  Article  >  Java  >  Spark-streaming combiné à la programmation kafka sous java8 (spark 2.0 & kafka 0.10

Spark-streaming combiné à la programmation kafka sous java8 (spark 2.0 & kafka 0.10

无忌哥哥
无忌哥哥original
2018-07-23 09:29:163076parcourir

Plus tôt, il y a eu une simple démo de spark-streaming, et il y a également eu des exemples d'exécutions kafka réussies. Ici, nous combinerons les deux, ce qui est également l'une des utilisations couramment utilisées.

1. Versions des composants concernés
Confirmez d'abord la version, car elle est quelque peu différente de la version précédente, il est donc nécessaire de l'enregistrer. De plus, scala n'est toujours pas utilisé, et. java8 et spark 2.0.0 sont utilisés, kafka 0.10.

2. Présenter les packages maven
J'ai trouvé quelques exemples de combinaison sur Internet, mais ils étaient différents de ma version actuelle, donc je n'ai pas réussi du tout, j'ai donc fait quelques recherches et répertorié les paquets importés.

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.0.0</version>
</dependency>

Le dernier paquet sans numéro de version de kafka que l'on peut trouver sur Internet est la 1.6.3. Je l'ai essayé, mais il ne peut plus fonctionner avec succès sous spark2, j'ai donc trouvé la version correspondant à kafka0. .10. Notez que la version scala de spark2.0 est déjà 2.11, donc l'inclusion doit être suivie de 2.11, indiquant la version scala.

3.Classe SparkSteamingKafka
Il convient de noter que le chemin du package importé est org.apache.spark.streaming.kafka010.xxx, donc l'importation est également incluse ici. Pour d’autres informations, veuillez vous référer directement aux commentaires.

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import scala.Tuple2;

public class SparkSteamingKafka {
    public static void main(String[] args) throws InterruptedException {
        String brokers = "master2:6667";
        String topics = "topic1";
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

        Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        //kafka相关参数,必要!缺了会报错
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers) ;
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //Topic分区
        Map<TopicPartition, Long> offsets = new HashMap<>();
        offsets.put(new TopicPartition("topic1", 0), 2L); 
        //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
        JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets)
            );
        //这里就跟之前的demo一样了,只是需要注意这边的lines里的参数本身是个ConsumerRecord对象
        JavaPairDStream<String, Integer> counts = 
                lines.flatMap(x -> Arrays.asList(x.value().toString().split(" ")).iterator())
                .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
                .reduceByKey((x, y) -> x + y);  
        counts.print();
//  可以打印所有信息,看下ConsumerRecord的结构
//      lines.foreachRDD(rdd -> {
//          rdd.foreach(x -> {
//            System.out.println(x);
//          });
//        });
        ssc.start();
        ssc.awaitTermination();
        ssc.close();
    }
}

4. Exécutez le test
Ici, nous utilisons la classe producteur écrite dans l'article précédent sur kafka et mettons les données sur le serveur kafka déployé sur le nœud master2. le test exécute spark2 .

UserKafkaProducer producerThread = new UserKafkaProducer(KafkaProperties.topic);
producerThread.start();

Exécutez à nouveau la classe SparkSteamingKafka en 3 et vous pouvez voir qu'elle a réussi.

Spark-streaming combiné à la programmation kafka sous java8 (spark 2.0 & kafka 0.10

Spark-streaming combiné à la programmation kafka sous java8 (spark 2.0 & kafka 0.10

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn