앞서 우리는 스파크 스트리밍의 간단한 데모에 대해 이야기했고, 성공적인 kafka 실행의 예도 있었습니다. 여기서는 또한 일반적으로 사용되는 용도 중 하나인 두 가지를 결합해 보겠습니다.
1. 관련 컴포넌트 버전
이전 버전과 다소 다르기 때문에 먼저 버전을 확인해야 합니다. 또한, scala는 아직 사용되지 않으며, java8, Spark 2.0.0, 카프카 0.10을 사용합니다.
2. Maven 패키지 소개
인터넷에서 몇 가지 조합의 예를 찾았는데 현재 버전과 달라서 전혀 성공하지 못해서 좀 조사해서 가져온 패키지를 나열했습니다.
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.0.0</version> </dependency>
인터넷에서 찾을 수 있는 kafka 버전 번호가 없는 최신 패키지는 1.6.3입니다. 시도해 보았으나 더 이상 Spark2에서 성공적으로 실행되지 않아서 kafka0.10에 해당하는 버전을 찾았습니다. Spark2.0의 스칼라 버전은 이미 2.11이므로 스칼라 버전을 나타내는 2.11이 포함되어야 합니다.
3.SparkSteamingKafka 클래스
가져오는 패키지 경로는 org.apache.spark.streaming.kafka010.xxx이므로 여기에도 가져오기가 포함된다는 점에 유의하세요. 기타 사항은 댓글을 직접 참고하시기 바랍니다.
import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import scala.Tuple2; public class SparkSteamingKafka { public static void main(String[] args) throws InterruptedException { String brokers = "master2:6667"; String topics = "topic1"; SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count"); JavaSparkContext sc = new JavaSparkContext(conf); sc.setLogLevel("WARN"); JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1)); Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(","))); //kafka相关参数,必要!缺了会报错 Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", brokers) ; kafkaParams.put("bootstrap.servers", brokers); kafkaParams.put("group.id", "group1"); kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //Topic分区 Map<TopicPartition, Long> offsets = new HashMap<>(); offsets.put(new TopicPartition("topic1", 0), 2L); //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定 JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets) ); //这里就跟之前的demo一样了,只是需要注意这边的lines里的参数本身是个ConsumerRecord对象 JavaPairDStream<String, Integer> counts = lines.flatMap(x -> Arrays.asList(x.value().toString().split(" ")).iterator()) .mapToPair(x -> new Tuple2<String, Integer>(x, 1)) .reduceByKey((x, y) -> x + y); counts.print(); // 可以打印所有信息,看下ConsumerRecord的结构 // lines.foreachRDD(rdd -> { // rdd.foreach(x -> { // System.out.println(x); // }); // }); ssc.start(); ssc.awaitTermination(); ssc.close(); } }
4. 테스트 실행
여기에서는 kafka에 대해 이전 기사에서 작성한 생산자 클래스를 사용하고 데이터를 kafka 서버에 넣습니다. 이것은 master2 노드에 배포된 kafka이고 로컬 테스트는 Spark2를 실행합니다.
UserKafkaProducer producerThread = new UserKafkaProducer(KafkaProperties.topic); producerThread.start();
3에서 SparkSteamingKafka 클래스를 다시 실행하면 성공했음을 확인할 수 있습니다.
위 내용은 Java8(스파크 2.0 및 Kafka 0.10)에서 Kafka 프로그래밍과 결합된 Spark 스트리밍의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!