search

Home  >  Q&A  >  body text

Java操作Kafka执行不成功

使用kafka-clients操作kafka始终不成功,原因不清楚,下面贴出相关代码及配置,请懂得指点一下,谢谢!

环境及依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>

JDK版本为1.8、Kafka版本为2.12-0.10.2.0,服务器使用CentOS-7构建。

测试代码

public class TestBase {

    protected Logger log = LoggerFactory.getLogger(this.getClass());

    protected String kafka_server = "192.168.60.160:9092" ;

    protected String topic = "zlikun_topic";

}
public class ProducerTest extends TestBase {

    protected Properties props = new Properties();

    @Before
    public void init() {

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG ,MyPartitioner.class) ;
    }

    @Test
    public void test() throws InterruptedException {

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.printf("offset = %d ,partition = %d \n", recordMetadata.offset() ,recordMetadata.partition());
                    } else {
                        log.error("send error !" ,e);
                    }
                }
            });
        }

        TimeUnit.SECONDS.sleep(3);
        producer.close();

    }

}
public class ConsumerTest extends TestBase {

    private Properties props = new Properties();

    @Before
    public void init() {
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);
        props.put(ConsumerConfig.GROUP_ID_CONFIG ,"zlikun") ;
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    }

    @Test
    public void test() {

        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
//        consumer.assign(Arrays.asList(new TopicPartition(topic, 1)));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }

    }

}

问题

# 测试topic为手动创建
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic zlikun_topic

控制台输出信息

[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
伊谢尔伦伊谢尔伦2802 days ago1246

reply all(2)I'll reply

  • 黄舟

    黄舟2017-04-18 10:57:11

    Tested it, it works fine https://github.com/MOBX/kafka...

    It is recommended to check whether the kafka cluster connection is normal. What you reported is TimeoutException;
    If not, try downgrading kafka-clients to 0.8.2.0

    reply
    0
  • 伊谢尔伦

    伊谢尔伦2017-04-18 10:57:11

    I adjusted the log to DEBUGlevel, and after observing the log, I found that it was caused by the inability to correctly parse the host name.

    2017-04-11 13:49:46.046 [main] DEBUG org.apache.kafka.clients.NetworkClient - Error connecting to node 0 at m160:9092:
    java.io.IOException: Can't resolve address: m160:9092
        at org.apache.kafka.common.network.Selector.connect(Selector.java:182)
        at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629)
        at org.apache.kafka.clients.NetworkClient.access0(NetworkClient.java:57)
        at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:768)
        at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:684)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:347)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:203)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:138)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:216)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:193)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:275)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at com.zlikun.mq.ConsumerTest.test(ConsumerTest.java:34)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.junit.runners.model.FrameworkMethod.runReflectiveCall(FrameworkMethod.java:50)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.accessrrreee0(ParentRunner.java:58)
        at org.junit.runners.ParentRunner.evaluate(ParentRunner.java:268)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
        at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
        at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
        at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
    Caused by: java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:107)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:649)
        at org.apache.kafka.common.network.Selector.connect(Selector.java:179)
        ... 36 more

    I found a blog post on the Internet http://blog.sina.com.cn/s/blo... which also supports this. I also configured the host name in the hosts file and the test was normal.
    However, it seems unreasonable to do this. In actual applications, it would affect the operation and maintenance too much. I don’t know if there are other better solutions.

    [2017/04/11 16:16]
    Just found an article from the Internet http://www.tuicool.com/articl..., which solved this problem!

    reply
    0
  • Cancelreply