Maison  >  Questions et réponses  >  le corps du texte

Java操作Kafka执行不成功

使用kafka-clients操作kafka始终不成功,原因不清楚,下面贴出相关代码及配置,请懂得指点一下,谢谢!

环境及依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>

JDK版本为1.8、Kafka版本为2.12-0.10.2.0,服务器使用CentOS-7构建。

测试代码

public class TestBase {

    protected Logger log = LoggerFactory.getLogger(this.getClass());

    protected String kafka_server = "192.168.60.160:9092" ;

    protected String topic = "zlikun_topic";

}
public class ProducerTest extends TestBase {

    protected Properties props = new Properties();

    @Before
    public void init() {

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG ,MyPartitioner.class) ;
    }

    @Test
    public void test() throws InterruptedException {

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.printf("offset = %d ,partition = %d \n", recordMetadata.offset() ,recordMetadata.partition());
                    } else {
                        log.error("send error !" ,e);
                    }
                }
            });
        }

        TimeUnit.SECONDS.sleep(3);
        producer.close();

    }

}
public class ConsumerTest extends TestBase {

    private Properties props = new Properties();

    @Before
    public void init() {
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);
        props.put(ConsumerConfig.GROUP_ID_CONFIG ,"zlikun") ;
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    }

    @Test
    public void test() {

        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
//        consumer.assign(Arrays.asList(new TopicPartition(topic, 1)));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }

    }

}

问题

# 测试topic为手动创建
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic zlikun_topic

控制台输出信息

[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
伊谢尔伦伊谢尔伦2764 Il y a quelques jours1205

répondre à tous(2)je répondrai

  • 黄舟

    黄舟2017-04-18 10:57:11

    Je l'ai testé, ça marche bien https://github.com/MOBX/kafka...

    Il est recommandé de vérifier si la connexion au cluster kafka est normale. Vous avez signalé une TimeoutException
    Sinon, essayez de rétrograder les clients kafka vers 0.8.2.0

    .

    répondre
    0
  • 伊谢尔伦

    伊谢尔伦2017-04-18 10:57:11

    J'ai ajusté le journal au niveau DEBUG et j'ai observé le journal et découvert qu'il était dû à l'incapacité d'analyser correctement le nom d'hôte.

    2017-04-11 13:49:46.046 [main] DEBUG org.apache.kafka.clients.NetworkClient - Error connecting to node 0 at m160:9092:
    java.io.IOException: Can't resolve address: m160:9092
        at org.apache.kafka.common.network.Selector.connect(Selector.java:182)
        at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629)
        at org.apache.kafka.clients.NetworkClient.access0(NetworkClient.java:57)
        at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:768)
        at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:684)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:347)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:275)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at com.zlikun.mq.ConsumerTest.test(ConsumerTest.java:34)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:50)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.accessrrreee0(ParentRunner.java:58)
        at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
        at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
        at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
        at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
    Caused by: java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:107)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:649)
        at org.apache.kafka.common.network.Selector.connect(Selector.java:179)
        ... 36 more

    J'ai trouvé un article de blog en ligne http://blog.sina.com.cn/s/blo... qui prend également en charge cela. J'ai également configuré le nom d'hôte dans le fichier hosts et le test était normal. .
    Cependant, cela semble déraisonnable. Dans les applications réelles, cela affecterait trop le fonctionnement et la maintenance. Je ne sais pas s'il existe d'autres solutions meilleures.

    [2017/04/11 16:16]
    Je viens de trouver un article http://www.tuicool.com/articl... qui a résolu ce problème !

    répondre
    0
  • Annulerrépondre