ホームページ  >  記事  >  Java  >  Java8 での Kafka プログラミングと組み合わせた Spark ストリーミング (Spark 2.0 および Kafka 0.10)

Java8 での Kafka プログラミングと組み合わせた Spark ストリーミング (Spark 2.0 および Kafka 0.10)

无忌哥哥
无忌哥哥オリジナル
2018-07-23 09:29:163116ブラウズ

先ほど、Spark ストリーミングの簡単なデモについて説明しましたが、成功した Kafka 実行の例もありました。これも一般的に使用される用途の 1 つです。

1. 関連するコンポーネントのバージョン
以前のバージョンと若干異なるため、最初にバージョンを確認する必要があります。また、scala はまだ使用されておらず、java8、spark 2.0.0、およびkafka0.10を使用しています。

2. Mavenパッケージの導入
ネットで組み合わせ例をいくつか見つけたのですが、今のバージョンと違っていて全然うまくいかなかったので、調べてインポートしたパッケージを列挙してみました。

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.0.0</version>
</dependency>

インターネット上で見つかるkafkaのバージョン番号のない最新のパッケージは1.6.3ですが、spark2では正常に動作しなくなったので、kafka0.10に相当するバージョンを見つけました。 Spark2.0 の scala バージョンはすでに 2.11 であるため、インクルードの後に​​ 2.11 を付けて、scala バージョンを示す必要があります。

3.SparkSteamingKafka クラス
インポートされたパッケージのパスは org.apache.spark.streaming.kafka010.xxx であるため、インポートもここに含まれることに注意してください。その他の情報については、コメントを直接参照してください。

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import scala.Tuple2;

public class SparkSteamingKafka {
    public static void main(String[] args) throws InterruptedException {
        String brokers = "master2:6667";
        String topics = "topic1";
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

        Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        //kafka相关参数,必要!缺了会报错
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers) ;
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //Topic分区
        Map<TopicPartition, Long> offsets = new HashMap<>();
        offsets.put(new TopicPartition("topic1", 0), 2L); 
        //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
        JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets)
            );
        //这里就跟之前的demo一样了,只是需要注意这边的lines里的参数本身是个ConsumerRecord对象
        JavaPairDStream<String, Integer> counts = 
                lines.flatMap(x -> Arrays.asList(x.value().toString().split(" ")).iterator())
                .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
                .reduceByKey((x, y) -> x + y);  
        counts.print();
//  可以打印所有信息,看下ConsumerRecord的结构
//      lines.foreachRDD(rdd -> {
//          rdd.foreach(x -> {
//            System.out.println(x);
//          });
//        });
        ssc.start();
        ssc.awaitTermination();
        ssc.close();
    }
}

4. テストを実行します
ここでは、kafka に関する前の記事で書いたプロデューサー クラスを使用し、データを kafka サーバーに配置します。これは、master2 ノードにデプロイされた kafka であり、ローカル テストは、spark2 を実行します。

UserKafkaProducer producerThread = new UserKafkaProducer(KafkaProperties.topic);
producerThread.start();

3 で SparkSteamingKafka クラスを再度実行すると、成功したことがわかります。

Java8 での Kafka プログラミングと組み合わせた Spark ストリーミング (Spark 2.0 および Kafka 0.10)

Java8 での Kafka プログラミングと組み合わせた Spark ストリーミング (Spark 2.0 および Kafka 0.10)

以上がJava8 での Kafka プログラミングと組み合わせた Spark ストリーミング (Spark 2.0 および Kafka 0.10)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。