>Java >java지도 시간 >Java8(스파크 2.0 및 Kafka 0.10)에서 Kafka 프로그래밍과 결합된 Spark 스트리밍

Java8(스파크 2.0 및 Kafka 0.10)에서 Kafka 프로그래밍과 결합된 Spark 스트리밍

无忌哥哥
无忌哥哥원래의
2018-07-23 09:29:163175검색

앞서 우리는 스파크 스트리밍의 간단한 데모에 대해 이야기했고, 성공적인 kafka 실행의 예도 있었습니다. 여기서는 또한 일반적으로 사용되는 용도 중 하나인 두 가지를 결합해 보겠습니다.

1. 관련 컴포넌트 버전
이전 버전과 다소 다르기 때문에 먼저 버전을 확인해야 합니다. 또한, scala는 아직 사용되지 않으며, java8, Spark 2.0.0, 카프카 0.10을 사용합니다.

2. Maven 패키지 소개
인터넷에서 몇 가지 조합의 예를 찾았는데 현재 버전과 달라서 전혀 성공하지 못해서 좀 조사해서 가져온 패키지를 나열했습니다.

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
      <version>2.0.0</version>
</dependency>

인터넷에서 찾을 수 있는 kafka 버전 번호가 없는 최신 패키지는 1.6.3입니다. 시도해 보았으나 더 이상 Spark2에서 성공적으로 실행되지 않아서 kafka0.10에 해당하는 버전을 찾았습니다. Spark2.0의 스칼라 버전은 이미 2.11이므로 스칼라 버전을 나타내는 2.11이 포함되어야 합니다.

3.SparkSteamingKafka 클래스
가져오는 패키지 경로는 org.apache.spark.streaming.kafka010.xxx이므로 여기에도 가져오기가 포함된다는 점에 유의하세요. 기타 사항은 댓글을 직접 참고하시기 바랍니다.

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import scala.Tuple2;

public class SparkSteamingKafka {
    public static void main(String[] args) throws InterruptedException {
        String brokers = "master2:6667";
        String topics = "topic1";
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("streaming word count");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

        Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        //kafka相关参数,必要!缺了会报错
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers) ;
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //Topic分区
        Map<TopicPartition, Long> offsets = new HashMap<>();
        offsets.put(new TopicPartition("topic1", 0), 2L); 
        //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
        JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams, offsets)
            );
        //这里就跟之前的demo一样了,只是需要注意这边的lines里的参数本身是个ConsumerRecord对象
        JavaPairDStream<String, Integer> counts = 
                lines.flatMap(x -> Arrays.asList(x.value().toString().split(" ")).iterator())
                .mapToPair(x -> new Tuple2<String, Integer>(x, 1))
                .reduceByKey((x, y) -> x + y);  
        counts.print();
//  可以打印所有信息,看下ConsumerRecord的结构
//      lines.foreachRDD(rdd -> {
//          rdd.foreach(x -> {
//            System.out.println(x);
//          });
//        });
        ssc.start();
        ssc.awaitTermination();
        ssc.close();
    }
}

4. 테스트 실행
여기에서는 kafka에 대해 이전 기사에서 작성한 생산자 클래스를 사용하고 데이터를 kafka 서버에 넣습니다. 이것은 master2 노드에 배포된 kafka이고 로컬 테스트는 Spark2를 실행합니다.

UserKafkaProducer producerThread = new UserKafkaProducer(KafkaProperties.topic);
producerThread.start();

3에서 SparkSteamingKafka 클래스를 다시 실행하면 성공했음을 확인할 수 있습니다.

Java8(스파크 2.0 및 Kafka 0.10)에서 Kafka 프로그래밍과 결합된 Spark 스트리밍

Java8(스파크 2.0 및 Kafka 0.10)에서 Kafka 프로그래밍과 결합된 Spark 스트리밍

위 내용은 Java8(스파크 2.0 및 Kafka 0.10)에서 Kafka 프로그래밍과 결합된 Spark 스트리밍의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.