这里对付kafka版本用新旧来区分。

旧版本(< v2.2)Kafka的参数

kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181 --create --topic topic_name

新版本(>= v2.2)Kafka的参数

phpkafka创建topic刚接触kafka时对于创立topic敕令一窍不懂 Node.js

kafka-topics.sh --bootstrap-server node1:9092 --create --topic topic_name

个中,2181是zookeeper 的监听端口,9092是kafka的监听端口。

旧版本用--zookeeper参数,主机名(或主机IP)和端口用ZooKeeper的,便是server.properties文件中zookeeper.connect属性的配置值

新版本用--bootstrap-server参数,主机名(或主机IP)和端口用某个节点的即可,即主机名(或主机IP)9092。

查看kafka版本

kafka并没有直接供应查看version的命令。
但也没紧要,我们可以通过进入kafka/libs文件夹,libs下的文件名称中就包含kafka版本信息,蓝色中的2.2.1便是我们安装的kafka的版本。

除了这种办法可以查看kafka版本外,我们还可以通过下面这种办法查看,

进入kafka目录,实行下面这个命令。

find ./libs/ -name \kafka_\ | head -1 | grep -o '\kafka[^\n]'

个中2.2.1便是我们要找的kafka版本。

负责踏实的你,相信你对前面的2.11是什么很好奇。
2.11是Scala版本,2.2.1是Kafka版本。

创建topic命令

知道kafka版本号后,按照前面供应的创建命令,我们对号入座。

./kafka-topics.sh --create --topic topic_name --replication-factor 3 --partitions 3 --bootstrap-server node1:9092,node2:9092,node3:9092

-topic 指定topic-partitions指定分区数

须要把稳的是分区数越多,在一定程度上会提高处理的吞吐量,由于kafka是基于文件进行读写,因此也须要打开更多的文件句柄,也会增加一定的性能开销。

但分区数也不宜过得,如果分区数过多,日志分段也会因此多很多,写的时候对I/O对性能影响很大。

–replication-factor

用来设置主题的副本数。

每个主题可以有多个副本,副本位于集群中不同的broker上,也可以说副本的数量不能超过broker的数量。

查看topic命令

想确认下topic创建是否成功,我们可以查看topic列表,看看刚才创建的topic是否在这个列表中,有的话就表示创建成功了。

./kafka-topics.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --list

也可以通过查看topic内容来确认。

./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topic_name

删除topic命令

跟mysql数据库一样,topic可以创建,天经地义可以删除。

./kafka-topics.sh --delete --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topic_name

查看消费组消费进度

./kafka-consumer-groups.sh --describe --bootstrap-server node1:9092,node2:9092,node3:9092 --group consumer_group_name

重置消费组位移

./kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --group consumer_group_name --all-topics --execute --reset-offsets --to-datetime 2022-08-07T16:00:00.000

须要把稳的是时区问题。

由于笔者水平有限,文中疏忽之处在所难免,欢迎批评示正。