>백엔드 개발 >PHP 튜토리얼 >생산자와 소비자의 PHP 구현에 대한 자세한 설명(Kafka 애플리케이션)

생산자와 소비자의 PHP 구현에 대한 자세한 설명(Kafka 애플리케이션)

藏色散人
藏色散人앞으로
2021-03-24 17:18:024908검색

머리말PHP에서 Kafka를 사용하려면 RdKafka 확장이 필요하며, RdKafka는 librdkafka에 의존하므로 두 가지 모두 설치해야 합니다. 구체적인 설치 방법은 Baidu에서 찾을 수 있습니다.

생산자(테스트)소비자를 생성하는 데 필요한 단계:

생산자 구성 매개변수
  • 생산자 인스턴스 생성
  • 주제 인스턴스 생성(생산자에 따라 다름)
  • 주제 메시지 생성
  • 푸시 메시지
  • 구체적인 코드는 다음과 같습니다.
        $conf = new \RdKafka\Conf();
        // 绑定服务节点
        $conf->set('metadata.broker.list', '127.0.0.1:32772');

        // 创建生产者
        $kafka = new \RdKafka\Producer($conf);

        // 创建主题实例
        $topic = $kafka->newTopic('p1r1');
        // 生产主题数据,此时消息在缓冲区中,并没有真正被推送
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message');
        // 阻塞时间(毫秒), 0为非阻塞
        $kafka->poll(0); 

        // 推送消息,如果不调用此函数,消息不会被发送且会丢失
        $result = $kafka->flush(5000);

        if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
            throw new \RuntimeException('Was unable to flush, messages might be lost!');
        }

Consumer소비자를 생성하려면 여러 단계가 필요합니다.

소비자 구성 매개변수
  • 구성 매개변수를 적용하여 소비자 인스턴스 생성
  • 해당 주제 구독
  • 데이터 가져오기
  • 변위 제출
  • 구체적인 코드는 다음과 같습니다:
        $conf = new \RdKafka\Conf();
        // 绑定消费者组
        $conf->set('group.id', 'ceshi');
        // 绑定服务节点,多个用,分隔
        $conf->set('metadata.broker.list', '127.0.0.1:32787');
        // 设置自动提交为false
        $conf->set('enable.auto.commit', 'false');
        // 设置当前消费者拉取数据时的偏移量, 可选参数:
        // earliest: 如果消费者组是新创建的,从头开始消费,否则从消费者组当前消费位移开始。
        // latest:如果消费者组是新创建的,从最新偏移量开始,否则从消费者组当前消费位移开始。
        $conf->set('auto.offset.reset', 'earliest');

        // 创建消费者实例
        $consumer = new \RdKafka\KafkaConsumer($conf);
        // 消费者订阅主题,数组形式
        $consumer->subscribe(['topic1','topic2']);
        while (true) {
            // 消费数据,阻塞5秒(5秒内有数据就消费,没有数据等待5秒进入下一轮循环)
            $message = $consumer->consume(5000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    // 业务逻辑
                    var_dump($message);

                    // 提交位移
                    $consumer->commit($message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "Timed out\n";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
        // 关闭消费者(一般用在脚本中,不需要关闭)
        $conumser->close();

지정된 파티션의 데이터만 소비:

    // 对消费者指定分区,注意此方式不能与subscribe一同使用
    $consumer->assign([
        new RdKafka\TopicPartition("topic", 0),
        new RdKafka\TopicPartition("topic", 1),
    ]);

위 내용은 생산자와 소비자의 PHP 구현에 대한 자세한 설명(Kafka 애플리케이션)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 learnku.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제