用kafka的high level api会造成消费者线程的堵塞
我是在消费的时候将消息先批量缓存到buffer里,然后集中处理
可是会出现一个问题,当生产的消息不足够buffer时,消费者就阻塞了,不会执行后面进行buffer的处理
我想设定一个时间,比如堵塞5s后,自动将没有满的buffer消息进行处理,这个要怎么解决
例如:
buffer允许100条消息
buffer积累到100条进行一次处理
可是生产者只生产了50条消息,这50条消息接收过来的时候不够100条的buffer
这样buffer就阻塞了
如何能在阻塞一段时间后,能够把50条不满的消息处理了?
kafka版本:0.8.1
while (it.hasNext()) {
//获取消息
//消息消息,放入buffer(当缓存的消息达不满buffer时,是不进行处理的,到这里就堵塞了)
//处理消息
}