Home > Article > Backend Development > Detailed explanation of PHP implementation of producers and consumers (Kafka application)
This article introduces PHP to producers and consumers, I hope to help friends in need!
Using Kafka in PHP requires the RdKafka extension, and RdKafka depends on librdkafka, so we need to install both of them. The specific installation method is from Baidu. This article No further explanation.
The steps required to create a consumer:
The specific code is as follows:
$conf = new \RdKafka\Conf(); // 绑定服务节点 $conf->set('metadata.broker.list', '127.0.0.1:32772'); // 创建生产者 $kafka = new \RdKafka\Producer($conf); // 创建主题实例 $topic = $kafka->newTopic('p1r1'); // 生产主题数据,此时消息在缓冲区中,并没有真正被推送 $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message'); // 阻塞时间(毫秒), 0为非阻塞 $kafka->poll(0); // 推送消息,如果不调用此函数,消息不会被发送且会丢失 $result = $kafka->flush(5000); if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new \RuntimeException('Was unable to flush, messages might be lost!'); }
Creating a consumer requires several steps:
The specific code is as follows:
$conf = new \RdKafka\Conf(); // 绑定消费者组 $conf->set('group.id', 'ceshi'); // 绑定服务节点,多个用,分隔 $conf->set('metadata.broker.list', '127.0.0.1:32787'); // 设置自动提交为false $conf->set('enable.auto.commit', 'false'); // 设置当前消费者拉取数据时的偏移量, 可选参数: // earliest: 如果消费者组是新创建的,从头开始消费,否则从消费者组当前消费位移开始。 // latest:如果消费者组是新创建的,从最新偏移量开始,否则从消费者组当前消费位移开始。 $conf->set('auto.offset.reset', 'earliest'); // 创建消费者实例 $consumer = new \RdKafka\KafkaConsumer($conf); // 消费者订阅主题,数组形式 $consumer->subscribe(['topic1','topic2']); while (true) { // 消费数据,阻塞5秒(5秒内有数据就消费,没有数据等待5秒进入下一轮循环) $message = $consumer->consume(5000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: // 业务逻辑 var_dump($message); // 提交位移 $consumer->commit($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } // 关闭消费者(一般用在脚本中,不需要关闭) $conumser->close();
Consumption only Data in the specified partition:
// 对消费者指定分区,注意此方式不能与subscribe一同使用 $consumer->assign([ new RdKafka\TopicPartition("topic", 0), new RdKafka\TopicPartition("topic", 1), ]);
The above is the detailed content of Detailed explanation of PHP implementation of producers and consumers (Kafka application). For more information, please follow other related articles on the PHP Chinese website!