先ほど、Spark ストリーミングの簡単なデモについて説明しましたが、成功した Kafka 実行の例もありました。これも一般的に使用される用途の 1 つです。
1. 関連するコンポーネントのバージョン
以前のバージョンと若干異なるため、最初にバージョンを確認する必要があります。また、scala はまだ使用されておらず、java8、spark 2.0.0、およびkafka0.10を使用しています。
2. Mavenパッケージの導入
ネットで組み合わせ例をいくつか見つけたのですが、今のバージョンと違っていて全然うまくいかなかったので、調べてインポートしたパッケージを列挙してみました。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.0.0</version> </dependency>
インターネット上で見つかるkafkaのバージョン番号のない最新のパッケージは1.6.3ですが、spark2では正常に動作しなくなったので、kafka0.10に相当するバージョンを見つけました。 Spark2.0 の scala バージョンはすでに 2.11 であるため、インクルードの後に 2.11 を付けて、scala バージョンを示す必要があります。
3.SparkSteamingKafka クラス
インポートされたパッケージのパスは org.apache.spark.streaming.kafka010.xxx であるため、インポートもここに含まれることに注意してください。その他の情報については、コメントを直接参照してください。
import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import scala.Tuple2; public class SparkSteamingKafka { public static void main(String[] args) throws InterruptedException { String brokers = "master2:6667"; String topics = "topic1"; SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count"); JavaSparkContext sc = new JavaSparkContext(conf); sc.setLogLevel("WARN"); JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1)); Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(","))); //kafka相关参数,必要!缺了会报错 Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", brokers) ; kafkaParams.put("bootstrap.servers", brokers); kafkaParams.put("group.id", "group1"); kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //Topic分区 Map<TopicPartition, Long> offsets = new HashMap<>(); offsets.put(new TopicPartition("topic1", 0), 2L); //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定 JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets) ); //这里就跟之前的demo一样了,只是需要注意这边的lines里的参数本身是个ConsumerRecord对象 JavaPairDStream<String, Integer> counts = lines.flatMap(x -> Arrays.asList(x.value().toString().split(" ")).iterator()) .mapToPair(x -> new Tuple2<String, Integer>(x, 1)) .reduceByKey((x, y) -> x + y); counts.print(); // 可以打印所有信息,看下ConsumerRecord的结构 // lines.foreachRDD(rdd -> { // rdd.foreach(x -> { // System.out.println(x); // }); // }); ssc.start(); ssc.awaitTermination(); ssc.close(); } }
4. テストを実行します
ここでは、kafka に関する前の記事で書いたプロデューサー クラスを使用し、データを kafka サーバーに配置します。これは、master2 ノードにデプロイされた kafka であり、ローカル テストは、spark2 を実行します。
UserKafkaProducer producerThread = new UserKafkaProducer(KafkaProperties.topic); producerThread.start();
3 で SparkSteamingKafka クラスを再度実行すると、成功したことがわかります。
以上がJava8 での Kafka プログラミングと組み合わせた Spark ストリーミング (Spark 2.0 および Kafka 0.10)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。