찾다

 >  Q&A  >  본문

Java操作Kafka执行不成功

使用kafka-clients操作kafka始终不成功,原因不清楚,下面贴出相关代码及配置,请懂得指点一下,谢谢!

环境及依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>

JDK版本为1.8、Kafka版本为2.12-0.10.2.0,服务器使用CentOS-7构建。

测试代码

public class TestBase {

    protected Logger log = LoggerFactory.getLogger(this.getClass());

    protected String kafka_server = "192.168.60.160:9092" ;

    protected String topic = "zlikun_topic";

}
public class ProducerTest extends TestBase {

    protected Properties props = new Properties();

    @Before
    public void init() {

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG ,MyPartitioner.class) ;
    }

    @Test
    public void test() throws InterruptedException {

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 发送消息
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.printf("offset = %d ,partition = %d \n", recordMetadata.offset() ,recordMetadata.partition());
                    } else {
                        log.error("send error !" ,e);
                    }
                }
            });
        }

        TimeUnit.SECONDS.sleep(3);
        producer.close();

    }

}
public class ConsumerTest extends TestBase {

    private Properties props = new Properties();

    @Before
    public void init() {
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);
        props.put(ConsumerConfig.GROUP_ID_CONFIG ,"zlikun") ;
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    }

    @Test
    public void test() {

        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
//        consumer.assign(Arrays.asList(new TopicPartition(topic, 1)));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }

    }

}

问题

# 测试topic为手动创建
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic zlikun_topic

控制台输出信息

[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
[kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
伊谢尔伦伊谢尔伦2802일 전1250

모든 응답(2)나는 대답할 것이다

  • 黄舟

    黄舟2017-04-18 10:57:11

    테스트해 보니 잘 작동합니다 https://github.com/MOBX/kafka...

    Kafka 클러스터 연결이 정상인지 확인하는 것이 좋습니다. TimeoutException을 보고했습니다.
    그렇지 않은 경우 kafka-clients를 0.8.2.0으로 다운그레이드해 보세요.

    회신하다
    0
  • 伊谢尔伦

    伊谢尔伦2017-04-18 10:57:11

    로그를 DEBUG 수준으로 조정하고 로그를 관찰한 결과 호스트 이름을 올바르게 구문 분석하지 못하여 발생한 것으로 나타났습니다.

    으아아아

    이것을 지원하는 블로그 게시물 http://blog.sina.com.cn/s/blo를 찾았습니다. hosts 파일에서도 호스트 이름을 구성했는데 테스트는 정상이었습니다. .
    하지만 이렇게 하면 실제 적용에서는 운영 및 유지 관리에 너무 많은 영향을 미칠 수 있습니다. 다른 더 나은 솔루션이 있는지는 모르겠습니다.

    [2017/04/11 16:16]
    방금 http://www.tuicool.com/articl... 이 문제를 해결한 기사를 찾았습니다!

    회신하다
    0
  • 취소회신하다