Home  >  Article  >  Java  >  Spark-streaming combined with kafka programming under java8 (spark 2.0 & kafka 0.10

Spark-streaming combined with kafka programming under java8 (spark 2.0 & kafka 0.10

无忌哥哥
无忌哥哥Original
2018-07-23 09:29:163075browse

Earlier we talked about a simple demo of spark-streaming, and there were also examples of successful kafka runs. Here we will combine the two, which is also one of the commonly used uses.

1. Relevant component versions
First confirm the version, because it is somewhat different from the previous version, so it is necessary to record it. In addition, scala is still not used, using java8, spark 2.0.0 ,kafka 0.10.

2. Introduction of maven packages
I found some examples of combination on the Internet, but they were different from my current version, so I couldn't succeed at all, so I did some research and listed the imported packages.

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.0.0</version>
</dependency>

The latest package without kafka version number that can be found on the Internet is 1.6.3. I tried it, but it can no longer run successfully under spark2, so I found the version corresponding to kafka0.10. Pay attention to spark2 The scala version of .0 is already 2.11, so the inclusion must be followed by 2.11, indicating the scala version.

3.SparkSteamingKafka class
It should be noted that the imported package path is org.apache.spark.streaming.kafka010.xxx, so the import is also included here. For other information, please refer to the comments directly.

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import scala.Tuple2;

public class SparkSteamingKafka {
    public static void main(String[] args) throws InterruptedException {
        String brokers = "master2:6667";
        String topics = "topic1";
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

        Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        //kafka相关参数,必要!缺了会报错
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers) ;
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //Topic分区
        Map<TopicPartition, Long> offsets = new HashMap<>();
        offsets.put(new TopicPartition("topic1", 0), 2L); 
        //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
        JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets)
            );
        //这里就跟之前的demo一样了,只是需要注意这边的lines里的参数本身是个ConsumerRecord对象
        JavaPairDStream<String, Integer> counts = 
                lines.flatMap(x -> Arrays.asList(x.value().toString().split(" ")).iterator())
                .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
                .reduceByKey((x, y) -> x + y);  
        counts.print();
//  可以打印所有信息,看下ConsumerRecord的结构
//      lines.foreachRDD(rdd -> {
//          rdd.foreach(x -> {
//            System.out.println(x);
//          });
//        });
        ssc.start();
        ssc.awaitTermination();
        ssc.close();
    }
}

4. Run the test
Here we use the producer class written in the previous article on kafka, and put the data to the kafka server. This is kafka deployed on the master2 node, and the local test runs spark2.

UserKafkaProducer producerThread = new UserKafkaProducer(KafkaProperties.topic);
producerThread.start();

Run the SparkSteamingKafka class in 3 again and you can see that it has been successful.

Spark-streaming combined with kafka programming under java8 (spark 2.0 & kafka 0.10

Spark-streaming combined with kafka programming under java8 (spark 2.0 & kafka 0.10

The above is the detailed content of Spark-streaming combined with kafka programming under java8 (spark 2.0 & kafka 0.10. For more information, please follow other related articles on the PHP Chinese website!

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn