Maison  >  Questions et réponses  >  le corps du texte

python - 如何使用pykafka consumer进行数据处理并保存?

使用本地kafka bin/kafka-console-producer.sh --broker-list kafkaIP:port --topic topicName
创建命令行生产数据,然后打开python

from pykafka import KafkaClient
client = KafkaClient(hosts="192.168.x.x:9092")
topic = client.topics['wr_test']
consumer = topic.get_balanced_consumer(consumer_group='test-consumer-group',auto_commit_enable=True,zookeeper_connect='192.168.x.x:2121')

然后自己编写了简单的一套处理函数,从外部引用。将数据处理后存入elasticsearch 或者 数据库
比如
for msg in consumer:

if msg is not None:
    外部引入的处理函数(msg.value)
    

在python命令行
for msg in consumer:

print msg.offset, msg.value

这时候使用生产者敲入一些数据,在消费端就会就会立即打印出来
但是写成py文件之后,每次运行只会处理最近的生产的一次内容,在生产者中再进行输入一些内容,py文件就不会再进行数据处理了。
所以向问下如何编写能运行后能一直对消费者数据进行处理的函数?要注意哪些地方?

另外,get_balanced_consumer的方法,是连接zookeeper消费
使用topic.get_simple_consumer是直接消费kafka,使用这种方式就提示No handler for...的错误

还有一个疑问,就是实际生产环境日志产生量很快,应该如何编写一个多线程处理方法?

怪我咯怪我咯2741 Il y a quelques jours1260

répondre à tous(1)je répondrai

  • ringa_lee

    ringa_lee2017-04-18 10:36:31


    J'ai vu une solution alternative sur le blog de quelqu'un d'autre
    http://www.cnblogs.com/castle...
    Lisez msg.value du consommateur dans une liste, puis lisez les données de la liste pour le traitement des données. Une fois le processus terminé, affichez les données obtenues dans la liste. Utilisez également try: ... except :... continue

    répondre
    0
  • Annulerrépondre