kafka安装及Kafka-PHP扩展的使用,kafkakafka-php扩展
话说用了就要有点产出,要不然过段时间又忘了,所以在这里就记录一下试用Kafka的安装过程和php扩展的试用。
实话说,如果用于队列的话,跟PHP比较配的,还是Redis。用的顺手,呵呵,只是Redis不能有多个consumer。但Kafka官方对PHP不支持,PHP扩展是爱好者或使用者写的。下面就开始讲Kafka的安装吧。我以CentOS6.4为例,64位。
一. 首先确认下jdk有没有安装
使用命令
[root@localhost ~]# java -<span>version java version </span><span>"</span><span>1.8.0_73</span><span>"</span><span> Java(TM) SE Runtime Environment (build </span><span>1.8</span>.0_73-<span>b02) Java HotSpot(TM) </span><span>64</span>-Bit Server VM (build <span>25.73</span>-b02, mixed mode)
如果有以上信息的话,就往下安装吧,有些可能是jdk对不上,那就装到对的上的。如果没有安装,就看一下下面的jdk安装方法:
http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
到这个地址下载jdk8版本,我下载的是jdk-8u73-linux-x64.tar.gz,然后解压到/usr/local/jdk/下。
然后打开/etc/profile文件
[root@localhost ~]# vim /etc/profile
把下面这段代码写到文件里
export JAVA_HOME=/usr/local/jdk/jdk1.<span>8</span><span>.0_73 export CLASSPATH</span>=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/<span>dt.jar export PATH</span>=$JAVA_HOME/bin:$PATH
最后
[root@localhost ~]# source /etc/profile
这时jdk就生效了,可以使用 java -version验证下。
二. 接下来安装Kafka
1. 下载Kafka
到http://kafka.apache.org/downloads.html下载相应的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz。
2. 下载完解压到你喜欢的目录
我是解压到 /usr/local/kafka/kafka_2.9.1-0.8.2.2
3. 运行默认的Kafka
启动Zookeeper server
[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/zookeeper-server-start.<span>sh</span> config/zookeeper.properties &
启动Kafka server
[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-server-start.<span>sh</span> config/server.properties &
运行生产者producer
[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-console-producer.<span>sh</span> --broker-list localhost:<span>9092</span> --topic test
运行消费者consumer
[root@localhost kafka_2.<span>9.1</span>-<span>0.8</span>.<span>2.2</span>]# <span>sh</span> bin/kafka-console-consumer.<span>sh</span> --zookeeper localhost:<span>2181</span> --topic test --from-beginning
这样,在producer那边输入内容,consumer马上就能接收到。
4. 当有跨机的producer或consumer连接时
需要配置config/server.properties的host.name,要不然跨机的连不上。
三. Kafka-PHP扩展
使用了一圈,就https://github.com/nmred/kafka-php可以用。
我是使用composer安装的,以下是示例:
producer.php
<?<span>php </span><span>require</span> 'vendor/autoload.php'<span>; </span><span>while</span> (1<span>) { </span><span>$part</span> = <span>mt_rand</span>(0, 1<span>); </span><span>$produce</span> = \Kafka\Produce::getInstance('kafka0:2181', 3000<span>); </span><span>//</span><span> get available partitions</span> <span>$partitions</span> = <span>$produce</span>->getAvailablePartitions('topic_name'<span>); </span><span>var_dump</span>(<span>$partitions</span><span>); </span><span>//</span><span> send message</span> <span>$produce</span>->setRequireAck(-1<span>); </span><span>$produce</span>->setMessages('topic_name', 0, <span>array</span>(<span>date</span>('Y-m-d H:i:s'<span>)); </span><span>sleep</span>(3<span>); }</span>
consumer.php
<span>require</span> 'vendor/autoload.php'<span>; </span><span>$consumer</span> = \Kafka\Consumer::getInstance('kafka0:2181'<span>); </span><span>$group</span> = 'topic_name'<span>; </span><span>$consumer</span>->setGroup(<span>$group</span><span>); </span><span>$consumer</span>->setFromOffset(<span>true</span><span>); </span><span>$consumer</span>->setTopic('topic_name', 0<span>); </span><span>$consumer</span>->setMaxBytes(102400<span>); </span><span>$result</span> = <span>$consumer</span>-><span>fetch(); </span><span>print_r</span>(<span>$result</span><span>); </span><span>foreach</span> (<span>$result</span> <span>as</span> <span>$topicName</span> => <span>$partition</span><span>) { </span><span>foreach</span> (<span>$partition</span> <span>as</span> <span>$partId</span> => <span>$messageSet</span><span>) { </span><span>var_dump</span>(<span>$partition</span>-><span>getHighOffset()); </span><span>foreach</span> (<span>$messageSet</span> <span>as</span> <span>$message</span><span>) { </span><span>var_dump</span>((<span>string</span>)<span>$message</span><span>); } </span><span>var_dump</span>(<span>$partition</span>-><span>getMessageOffset()); } }</span>

说明本项目为springboot+kafak的整合项目,故其用了springboot中对kafak的消费注解@KafkaListener首先,application.properties中配置用逗号隔开的多个topic。方法:利用Spring的SpEl表达式,将topics配置为:@KafkaListener(topics=“#{’${topics}’.split(’,’)}”)运行程序,console打印的效果如下

随着互联网和科技的发展,数字化投资已成为人们越来越关注的话题。很多投资者不断探索和研究投资策略,希望能够获得更高的投资回报率。股票交易中,实时的股票分析对决策非常重要,其中使用Kafka实时消息队列和PHP技术实现更是一种高效且实用的手段。一、Kafka介绍Kafka是由LinkedIn公司开发的一个高吞吐量的分布式发布、订阅消息系统。Kafka的主要特点是

spring-kafka是基于java版的kafkaclient与spring的集成,提供了KafkaTemplate,封装了各种方法,方便操作,它封装了apache的kafka-client,不需要再导入client依赖org.springframework.kafkaspring-kafkaYML配置kafka:#bootstrap-servers:server1:9092,server2:9093#kafka开发地址,#生产者配置producer:#Kafka提供的序列化和反序列化类key

如何选择合适的Kafka可视化工具?五款工具对比分析引言:Kafka是一种高性能、高吞吐量的分布式消息队列系统,被广泛应用于大数据领域。随着Kafka的流行,越来越多的企业和开发者需要一个可视化工具来方便地监控和管理Kafka集群。本文将介绍五款常用的Kafka可视化工具,并对比它们的特点和功能,帮助读者选择适合自己需求的工具。一、KafkaManager

近年来,随着大数据的兴起和活跃的开源社区,越来越多的企业开始寻找高性能的交互式数据处理系统来满足日益增长的数据需求。在这场技术升级的浪潮中,go-zero和Kafka+Avro被越来越多的企业所关注和采用。go-zero是一款基于Golang语言开发的微服务框架,具有高性能、易用、易扩展、易维护等特点,旨在帮助企业快速构建高效的微服务应用系统。它的快速成长得

Kafka 是一个优秀的分布式消息中间件,许多系统中都会使用到 Kafka 来做消息通信。对分布式消息系统的了解和使用几乎成为一个后台开发人员必备的技能。

随着互联网的不断发展,对于消息系统的需求也越来越高。在构建高并发、高可靠性的消息系统中,go-zero和Kafka是两个非常好的选择。go-zero是一个基于Go语言的微服务框架,通过简单易用、高性能、可扩展等特点,在很多领域被广泛应用。Kafka是一个开源的分布式流媒体平台,具有高可靠性、高吞吐量、易拓展等特点,在处理大规模数据流和实时数据管道方面得到广泛

1.spring-kafkaorg.springframework.kafkaspring-kafka1.3.5.RELEASE2.配置文件相关信息kafka.bootstrap-servers=localhost:9092kafka.consumer.group.id=20230321#可以并发消费的线程数(通常与partition数量一致)kafka.consumer.concurrency=10kafka.consumer.enable.auto.commit=falsekafka.boo


핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

AI Hentai Generator
AI Hentai를 무료로 생성하십시오.

인기 기사

뜨거운 도구

SublimeText3 Linux 새 버전
SublimeText3 Linux 최신 버전

에디트플러스 중국어 크랙 버전
작은 크기, 구문 강조, 코드 프롬프트 기능을 지원하지 않음

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.

메모장++7.3.1
사용하기 쉬운 무료 코드 편집기

Dreamweaver Mac版
시각적 웹 개발 도구
