本文基于Kafka 0.10.0
基本操作
列出所有topic
kafka-topics.sh --zookeeper localhost:2181 --list
创建topic
kafka-topics.sh --zookeeper localhost:2181 \ --create \ --topic earth \ --partitions 1 \ --replication-factor 1
生产数据
向earth发送一条消息
echo "The first record" | kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic earth
向earth发送一条包含key的消息
echo '00000,{"name":"Steve", "title":"Captain America"}' | kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic earth \ --property parse.key=true \ --property key.separator=,
消费数据
kafka-console-consumer.sh --zookeeper localhost:2181 \ --topic earth \ --from-beginning
将消息的key也输出
kafka-console-consumer.sh --zookeeper localhost:2181 \ --topic earth \ --from-beginning --property print.key=true \ --property key.separator=,
Topic的offset统计
kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list localhost:9092 \ --topic earth \ --time -1
最后的参数-1表示显示获取当前offset最大值,-2表示offset的最小值
如果遇到数据倾斜的情况,可以通过kafka-simple-consumer-shell.sh
查看具体某个partition数据内容,例如
kafka-simple-consumer-shell.sh --broker-list localhost:9092 \ --topic earth \ --partition 1 \ --print-offsets \ --offset 18 \ --clientId test \ --property print.key=true
高级Consumers和Groups
创建一个consumer.properties配置文件,指定group.id
echo "group.id=Avengers" > consumer.properties
然后再发送一条数据
echo "The second record" | kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic earth
通过consumer验证一下当前topic的数据,
kafka-console-consumer.sh --zookeeper localhost:2181 \ --topic earth \ --from-beginning \ --consumer.config consumer.properties
得到的结果是
The first recordThe second record
这是看一下zookeeper中存储的内容
[zk: localhost:2181(CONNECTED) 0] get /consumers/Avengers/offsets/earth/02cZxid = 0x8200012d1dctime = Fri May 05 17:10:02 CST 2017mZxid = 0x8200012d1dmtime = Fri May 05 17:10:02 CST 2017pZxid = 0x8200012d1dcversion = 0dataVersion = 0aclVersion = 0ephemeralOwner = 0x0dataLength = 1numChildren = 0
第一行的2表示的就是我们配置的这个group消费的最后一个offset,如果再次运行
kafka-console-consumer.sh --zookeeper localhost:2181 \ --topic earth \ --consumer.config consumer.properties
没有任何结果输出
这时需要通过UpdateOffsetsInZK重置offset,在刚才的配置中加入
echo "zookeeper.connect=localhost:2181" >> consumer.properties
然后运行
kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest consumer.properties earth
显示如下结果
updating partition 0 with new offset: 0updated the offset for 1 partitions
这样运行刚才的命令
kafka-console-consumer.sh --zookeeper localhost:2181 \ --topic earth \ --consumer.config consumer.properties
会重新从第一个offset开始读,即显示
The first recordThe second record
但是如果运行下面的命令,即加上--from-beginning
kafka-console-consumer.sh --zookeeper localhost:2181 \ --topic earth \ --from-beginning \ --consumer.config consumer.properties
就会提示
Found previous offset information for this group Avengers. Please use --delete-consumer-offsets to delete previous offsets metadata
必须要加上--delete-consumer-offsets
才可以,像这样
kafka-console-consumer.sh --zookeeper localhost:2181 \ --topic earth \ --delete-consumer-offsets \ --from-beginning \ --consumer.config consumer.properties
注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。