[聚合文章] Kafka常用命令

消息系统 2882-01-01 17 阅读

本文基于Kafka 0.10.0

基本操作

列出所有topic

kafka-topics.sh --zookeeper localhost:2181 --list

创建topic

kafka-topics.sh --zookeeper localhost:2181 \                            --create \                            --topic earth \                            --partitions 1 \                            --replication-factor 1

生产数据

向earth发送一条消息

echo "The first record" | kafka-console-producer.sh \                    --broker-list localhost:9092 \                    --topic earth

向earth发送一条包含key的消息

echo '00000,{"name":"Steve", "title":"Captain America"}' | kafka-console-producer.sh \              --broker-list localhost:9092 \              --topic earth \              --property parse.key=true \              --property key.separator=,

消费数据

kafka-console-consumer.sh --zookeeper localhost:2181 \                                      --topic earth \                                      --from-beginning

将消息的key也输出

kafka-console-consumer.sh --zookeeper localhost:2181 \                                      --topic earth \                                      --from-beginning                                      --property print.key=true \                                      --property key.separator=,

Topic的offset统计

kafka-run-class.sh kafka.tools.GetOffsetShell \                                 --broker-list localhost:9092 \                                 --topic earth \                                 --time -1

最后的参数-1表示显示获取当前offset最大值,-2表示offset的最小值

如果遇到数据倾斜的情况,可以通过kafka-simple-consumer-shell.sh查看具体某个partition数据内容,例如

kafka-simple-consumer-shell.sh --broker-list localhost:9092 \                                       --topic earth \                                       --partition 1 \                                       --print-offsets \                                       --offset 18 \                                       --clientId test \                                       --property print.key=true

高级Consumers和Groups

创建一个consumer.properties配置文件,指定group.id

echo "group.id=Avengers" > consumer.properties

然后再发送一条数据

echo "The second record" | kafka-console-producer.sh \                    --broker-list localhost:9092 \                    --topic earth

通过consumer验证一下当前topic的数据,

kafka-console-consumer.sh --zookeeper localhost:2181 \                                      --topic earth \                                      --from-beginning \                                      --consumer.config consumer.properties

得到的结果是

The first recordThe second record

这是看一下zookeeper中存储的内容

[zk: localhost:2181(CONNECTED) 0] get /consumers/Avengers/offsets/earth/02cZxid = 0x8200012d1dctime = Fri May 05 17:10:02 CST 2017mZxid = 0x8200012d1dmtime = Fri May 05 17:10:02 CST 2017pZxid = 0x8200012d1dcversion = 0dataVersion = 0aclVersion = 0ephemeralOwner = 0x0dataLength = 1numChildren = 0

第一行的2表示的就是我们配置的这个group消费的最后一个offset,如果再次运行

kafka-console-consumer.sh --zookeeper localhost:2181 \                                      --topic earth \                                      --consumer.config consumer.properties

没有任何结果输出

这时需要通过UpdateOffsetsInZK重置offset,在刚才的配置中加入

echo "zookeeper.connect=localhost:2181" >> consumer.properties

然后运行

kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest consumer.properties earth

显示如下结果

updating partition 0 with new offset: 0updated the offset for 1 partitions

这样运行刚才的命令

kafka-console-consumer.sh --zookeeper localhost:2181 \                                      --topic earth \                                      --consumer.config consumer.properties

会重新从第一个offset开始读,即显示

The first recordThe second record

但是如果运行下面的命令,即加上--from-beginning

kafka-console-consumer.sh --zookeeper localhost:2181 \                                      --topic earth \                                      --from-beginning \                                      --consumer.config consumer.properties

就会提示

Found previous offset information for this group Avengers. Please use --delete-consumer-offsets to delete previous offsets metadata

必须要加上--delete-consumer-offsets才可以,像这样

kafka-console-consumer.sh --zookeeper localhost:2181 \                                      --topic earth \                                     --delete-consumer-offsets \                                      --from-beginning \                                      --consumer.config consumer.properties

注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。