在Centos 上安装Kafka集群
安装准备:版本
Kafka版本:kafka_2.11-0.9.0.0
Zookeeper版本:zookeeper-3.4.7
Zookeeper 集群:bjrenrui0001 bjrenrui0002 bjrenrui0003
Zookeeper集群的搭建参见:在CentOS上安装ZooKeeper集群
物理环境
安装三台物理机:
192.168.100.200 bjrenrui0001(运行3个Broker)
192.168.100.201 bjrenrui0002(运行2个Broker)
192.168.100.202 bjrenrui0003(运行2个Broker)
该集群的创建主要分为三步,单节点单Broker,单节点多Broker,多节点多Broker
单节点单Broker
本节以bjrenrui0001上创建一个Broker为例
下载kafka:
下载路径:http://kafka.apache.org/downloads.html
cd /mq/
wget http://mirrors.hust.edu.cn/apache/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
copyfiles.sh kafka_2.11-0.9.0.0.tgz bjyfnbserver /mq/
tar zxvf kafka_2.11-0.9.0.0.tgz -C /mq/
ln -s /mq/kafka_2.11-0.9.0.0 /mq/kafka
mkdir /mq/kafka/logs
配置
修改config/server.properties
vi /mq/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://:9092
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mq/kafka/logs/kafka-logs
num.partitions=10
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181GG
zookeeper.connection.timeout.ms=6000
启动Kafka服务:
cd /mq/kafka;sh bin/kafka-server-start.sh -daemon config/server.properties
或
sh /mq/kafka/bin/kafka-server-start.sh -daemon /mq/kafka/config/server.properties
netstat -ntlp|grep -E '2181|9092'
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6 0 0 :::9092 :::* LISTEN 26903/java
tcp6 0 0 :::2181 :::* LISTEN 24532/java
创建Topic:
sh /mq/kafka/bin/kafka-topics.sh --create --zookeeper bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181 --replication-factor 1 --partitions 1 --topic test
查看Topic:
sh /mq/kafka/bin/kafka-topics.sh --list --zookeeper bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181
producer发送消息:
$ sh /mq/kafka/bin/kafka-console-producer.sh --broker-list bjrenrui0001:9092 --topic test
first
message
consumer接收消息:
$ sh bin/kafka-console-consumer.sh --zookeeper bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181 --topic test --from-beginning
first
message
如果要最新的数据,可以不带--from-beginning参数即可。
单节点多个Broker
将上个章节中的文件夹再复制两份分别为kafka_2,kafka_3
cp -r /mq/kafka_2.11-0.9.0.0 /mq/kafka_2.11-0.9.0.0_2
cp -r /mq/kafka_2.11-0.9.0.0 /mq/kafka_2.11-0.9.0.0_3
ln -s /mq/kafka_2.11-0.9.0.0_2 /mq/kafka_2
ln -s /mq/kafka_2.11-0.9.0.0_3 /mq/kafka_3
分别修改kafka_2/config/server.properties以及kafka_3/config/server.properties 文件中的broker.id,以及port属性,确保唯一性
vi /mq/kafka_2/config/server.properties
broker.id=2
listeners=PLAINTEXT://:9093
port=9093
host.name=bjrenrui0001
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mq/kafka_2/logs/kafka-logs
num.partitions=10
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181
zookeeper.connection.timeout.ms=6000
vi /mq/kafka_3/config/server.properties
broker.id=3
listeners=PLAINTEXT://:9094
port=9094
host.name=bjrenrui0001
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mq/kafka_3/logs/kafka-logs
num.partitions=10
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181
zookeeper.connection.timeout.ms=6000
启动
启动另外两个Broker:
sh /mq/kafka_2/bin/kafka-server-start.sh -daemon /mq/kafka_2/config/server.properties
sh /mq/kafka_3/bin/kafka-server-start.sh -daemon /mq/kafka_3/config/server.properties
检查端口:
[dreamjobs@bjrenrui0001 config]$ netstat -ntlp|grep -E '2181|909[2-9]'|sort -k3
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6 0 0 :::2181 :::* LISTEN 24532/java
tcp6 0 0 :::9092 :::* LISTEN 26903/java
tcp6 0 0 :::9093 :::* LISTEN 28672/java
tcp6 0 0 :::9094 :::* LISTEN 28734/java
创建一个replication factor为3的topic:
sh /mq/kafka/bin/kafka-topics.sh --create --zookeeper bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看Topic的状态:
$ sh /mq/kafka/bin/kafka-topics.sh --describe -zookeeper bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
从上面的内容可以看出,该topic包含1个part,replicationfactor为3,且Node3 是leador
解释如下:
"leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
"replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
"isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
再来看一下之前创建的test topic, 从下图可以看出没有进行replication
$ sh /mq/kafka/bin/kafka-topics.sh --describe --zookeeper bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
多个节点的多个Broker
在bjrenrui0002、bjrenrui0003上分别把下载的文件解压缩到kafka_4,kafka_5,kafka_6两个文件夹中,再将bjrenrui0001上的server.properties配置文件拷贝到这三个文件夹中
vi /mq/kafka_4/config/server.properties
broker.id=4
listeners=PLAINTEXT://:9095
port=9095
host.name=bjrenrui0002
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mq/kafka_4/logs/kafka-logs
num.partitions=10
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181
zookeeper.connection.timeout.ms=6000
vi /mq/kafka_5/config/server.properties
broker.id=5
listeners=PLAINTEXT://:9096
port=9096
host.name=bjrenrui0002
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mq/kafka_5/logs/kafka-logs
num.partitions=10
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181
zookeeper.connection.timeout.ms=6000
vi /mq/kafka_6/config/server.properties
broker.id=6
listeners=PLAINTEXT://:9097
port=9097
host.name=bjrenrui0003
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mq/kafka_6/logs/kafka-logs
num.partitions=10
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181
zookeeper.connection.timeout.ms=6000
vi /mq/kafka_7/config/server.properties
broker.id=7
listeners=PLAINTEXT://:9098
port=9098
host.name=bjrenrui0003
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/mq/kafka_7/logs/kafka-logs
num.partitions=10
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181
zookeeper.connection.timeout.ms=6000
启动服务
sh /mq/kafka/bin/kafka-server-start.sh -daemon /mq/kafka/config/server.properties
sh /mq/kafka_2/bin/kafka-server-start.sh -daemon /mq/kafka_2/config/server.properties
sh /mq/kafka_3/bin/kafka-server-start.sh -daemon /mq/kafka_3/config/server.properties
sh /mq/kafka_4/bin/kafka-server-start.sh -daemon /mq/kafka_4/config/server.properties
sh /mq/kafka_5/bin/kafka-server-start.sh -daemon /mq/kafka_5/config/server.properties
sh /mq/kafka_6/bin/kafka-server-start.sh -daemon /mq/kafka_6/config/server.properties
sh /mq/kafka_7/bin/kafka-server-start.sh -daemon /mq/kafka_7/config/server.properties
检查:
$ netstat -ntlp|grep -E '2181|909[2-9]'|sort -k3
停服务:
sh /mq/kafka/bin/kafka-server-stop.sh
如果使用脚本停broker服务,会把单节点上的多broker服务都停掉,慎重!!!
ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}' | xargs kill -SIGTERM
到目前为止,三台物理机上的7个Broker已经启动完毕:
[dreamjobs@bjrenrui0001 bin]$ netstat -ntlp|grep -E '2181|909[2-9]'|sort -k3
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6 0 0 :::2181 :::* LISTEN 24532/java
tcp6 0 0 :::9092 :::* LISTEN 33212/java
tcp6 0 0 :::9093 :::* LISTEN 32997/java
tcp6 0 0 :::9094 :::* LISTEN 33064/java
[dreamjobs@bjrenrui0002 config]$ netstat -ntlp|grep -E '2181|909[2-9]'|sort -k3
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6 0 0 :::2181 :::* LISTEN 6899/java
tcp6 0 0 :::9095 :::* LISTEN 33251/java
tcp6 0 0 :::9096 :::* LISTEN 33279/java
[dreamjobs@bjrenrui0003 config]$ netstat -ntlp|grep -E '2181|909[2-9]'|sort -k3
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 14562/java
tcp 0 0 0.0.0.0:9097 0.0.0.0:* LISTEN 23246/java
tcp 0 0 0.0.0.0:9098 0.0.0.0:* LISTEN 23270/java
producer发送消息:
$ sh /mq/kafka/bin/kafka-console-producer.sh --broker-list bjrenrui0001:9092 --topic my-replicated-topic
consumer接收消息:
$ sh /mq/kafka_4/bin/kafka-console-consumer.sh --zookeeper bjrenrui0001:2181,bjrenrui0002:2181,bjrenrui0003:2181 --topic my-replicated-topic --from-beginning
성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.

핫 AI 도구

Undresser.AI Undress
사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover
사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool
무료로 이미지를 벗다

Clothoff.io
AI 옷 제거제

AI Hentai Generator
AI Hentai를 무료로 생성하십시오.

인기 기사
R.E.P.O. 에너지 결정과 그들이하는 일 (노란색 크리스탈)
3 몇 주 전By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 최고의 그래픽 설정
3 몇 주 전By尊渡假赌尊渡假赌尊渡假赌
어 ass 신 크리드 그림자 : 조개 수수께끼 솔루션
2 몇 주 전ByDDD
R.E.P.O. 아무도들을 수없는 경우 오디오를 수정하는 방법
3 몇 주 전By尊渡假赌尊渡假赌尊渡假赌
WWE 2K25 : Myrise에서 모든 것을 잠금 해제하는 방법
4 몇 주 전By尊渡假赌尊渡假赌尊渡假赌

뜨거운 도구

SublimeText3 Linux 새 버전
SublimeText3 Linux 최신 버전

DVWA
DVWA(Damn Vulnerable Web App)는 매우 취약한 PHP/MySQL 웹 애플리케이션입니다. 주요 목표는 보안 전문가가 법적 환경에서 자신의 기술과 도구를 테스트하고, 웹 개발자가 웹 응용 프로그램 보안 프로세스를 더 잘 이해할 수 있도록 돕고, 교사/학생이 교실 환경 웹 응용 프로그램에서 가르치고 배울 수 있도록 돕는 것입니다. 보안. DVWA의 목표는 다양한 난이도의 간단하고 간단한 인터페이스를 통해 가장 일반적인 웹 취약점 중 일부를 연습하는 것입니다. 이 소프트웨어는

ZendStudio 13.5.1 맥
강력한 PHP 통합 개발 환경

SecList
SecLists는 최고의 보안 테스터의 동반자입니다. 보안 평가 시 자주 사용되는 다양한 유형의 목록을 한 곳에 모아 놓은 것입니다. SecLists는 보안 테스터에게 필요할 수 있는 모든 목록을 편리하게 제공하여 보안 테스트를 더욱 효율적이고 생산적으로 만드는 데 도움이 됩니다. 목록 유형에는 사용자 이름, 비밀번호, URL, 퍼징 페이로드, 민감한 데이터 패턴, 웹 셸 등이 포함됩니다. 테스터는 이 저장소를 새로운 테스트 시스템으로 간단히 가져올 수 있으며 필요한 모든 유형의 목록에 액세스할 수 있습니다.

SublimeText3 중국어 버전
중국어 버전, 사용하기 매우 쉽습니다.
