首頁  >  文章  >  Java  >  java8下spark-streaming結合kafka程式設計(spark 2.0 & kafka 0.10

java8下spark-streaming結合kafka程式設計(spark 2.0 & kafka 0.10

无忌哥哥
无忌哥哥原創
2018-07-23 09:29:163114瀏覽

前面有說spark-streaming的簡單demo,也有說到kafka成功跑通的例子,這裡就結合二者,也是常用的使用之一。

1.相關元件版本 
先確認版本,因為跟之前的版本有些不一樣,所以才有必要記錄下,另外仍然沒有使用scala,使用java8,spark 2.0.0 ,kafka 0.10。

2.引入maven包 
網路上找了一些結合的例子,但是跟我目前版本不一樣,所以根本就成功不了,所以探究了下,列出引入包。

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.0.0</version>
</dependency>

網上能找到的沒有kafka版本號的包最新是1.6.3,我試過,已經無法在spark2下成功運行了,所以找到的是對應kafka0.10的版本,注意spark2 .0的scala版本已經是2.11,所以包括之前必須後面跟著2.11,表示scala版本。

3.SparkSteamingKafka類別 
要注意的是引入的套件路徑是org.apache.spark.streaming.kafka010.xxx,所以這裡把import也放進來了。其他直接看註釋。

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import scala.Tuple2;

public class SparkSteamingKafka {
    public static void main(String[] args) throws InterruptedException {
        String brokers = "master2:6667";
        String topics = "topic1";
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

        Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        //kafka相关参数,必要!缺了会报错
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers) ;
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //Topic分区
        Map<TopicPartition, Long> offsets = new HashMap<>();
        offsets.put(new TopicPartition("topic1", 0), 2L); 
        //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
        JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets)
            );
        //这里就跟之前的demo一样了,只是需要注意这边的lines里的参数本身是个ConsumerRecord对象
        JavaPairDStream<String, Integer> counts = 
                lines.flatMap(x -> Arrays.asList(x.value().toString().split(" ")).iterator())
                .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
                .reduceByKey((x, y) -> x + y);  
        counts.print();
//  可以打印所有信息,看下ConsumerRecord的结构
//      lines.foreachRDD(rdd -> {
//          rdd.foreach(x -> {
//            System.out.println(x);
//          });
//        });
        ssc.start();
        ssc.awaitTermination();
        ssc.close();
    }
}

4.運行測試 
這裡使用上一篇kafka初探裡寫的producer類,put資料到kafka服務端,我這是master2節點上部署的kafka,本地測試運行spark2。

UserKafkaProducer producerThread = new UserKafkaProducer(KafkaProperties.topic);
producerThread.start();

再運行3裡的SparkSteamingKafka類,可以看到已經成功。 

java8下spark-streaming結合kafka程式設計(spark 2.0 & kafka 0.10

java8下spark-streaming結合kafka程式設計(spark 2.0 & kafka 0.10

#

以上是java8下spark-streaming結合kafka程式設計(spark 2.0 & kafka 0.10的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn