Kafka技术内幕附录
第11章:附录
11.1 Kafka基本操作
11.1.1 创建、修改、删除、查看主题
我们可以手动创建主题或者让Kafka自动创建主题,手动创建主题必须指定分区数和副本因子。如果服务端开启了自动创建主题,新数据写入一个不存在的主题,服务端会自动创建这个主题。自动模式下主题的配置信息在server.properties文件中,比如分区数默认只有一个。因为分区是Kafka的最小并行单位,所以我们一般会根据集群规模设置合理的分区数,来达到客户端和服务端的负载均衡。副本因子( replication-factor
)是分区的副本数量,每条消息会复制到多个节点上,一般设置为3个副本。假设副本数为 N ,则最多允许 N - 1个节点宕机。 下面的实验在本机安装Kafka,假设ZK的端口为2181,Kafka的端口为9092。
# 创建主题 $ bin/kafka-topics.sh --zookeeper localhost:2181 --create \ --topic test --partitions 1 --replication-factor 3 # 修改主题的分区数 $ bin/kafka-topics.sh --zookeeper localhost:2181 --alter \ --topic test --partitions 2 # 列出所有的主题 $ bin/kafka-topics.sh --list --zookeeper localhost:2181 test # 查看某个主题的详细信息 $ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 主副本: 0 Replicas: 0 Isr: 0
在0.8.2版本之后,Kafka提供了删除主题的功能,但是默认并不会直接将Topic数据物理删除。如果要启用物理删除(即删除主题后,日志文件也会一同删除),需要在server.properties中设置 delete.topic.enable=true
。
$ bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. $ bin/kafka-topics.sh --list --zookeeper localhost:2181 test - marked for deletion
管理员创建好主题后,主题会被生产者和消费者使用。注意下面的实验中,新版本的生产者和消费者都是使用Broker地址连接Kafka集群,旧版本的消费者则使用ZK地址连接Kafka集群。
11.1.2 生产者和消费者
在终端控制台模拟生产消息和消费消息,每个控制台的消费者都会被分配唯一的消费组:
# 生产者 $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 1 2 3 4 5 # 旧消费者(控制台) $ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test 1 2 3 4 5 # 新消费者(控制台) $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --new-consumer --topic test --from-beginning
执行查看消费组列表的操作,可以列出当前活动的消费组,默认控制台的消费组是 console-consumer
加上一个随机数。上面由于分别启动了两个版本的消费者,所以对应了两个消费组。当然,也可以在控制台通过其他参数来指定消费组。
# 查看使用旧消费者的消费组列表 $ bin/kafka-consumer-groups.sh --list --zookeeper localhost:2181 console-consumer-36296 # 查询使用新消费者的消费组列表 $ bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092 console-consumer-89231
查看消费组对某个主题的消费状态,需要指定主题和消费组,这会打印出主题的所有分区、日志的大小、所属的消费者等。
采用新消费者方式的 Owner
为 none
:
$ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 \ --topic test --group console-consumer-36296 Group Topic Pid Offset logSize Lag Owner console-36296 test 0 2 2 0 dp0652-f94edaea-0 console-36296 test 1 1 1 0 dp0652-f94edaea-0 console-36296 test 2 2 2 0 dp0652-f94edaea-0 $ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 \ --topic test --group console-consumer-89231 Group Topic Pid Offset logSize Lag Owner console-89231 test 0 2 2 0 none console-89231 test 1 1 1 0 none console-89231 test 2 2 2 0 none
11.1.3 扩展集群
要向已有的Kafka集群添加新节点,我们只需要保证 broker.id
编号是唯一的,即可启动Kafka服务。但是新节点不会自动地分配到分区,除非在新加节点之后,新创建了主题。因此,通常我们希望在新添加节点后,能够将旧节点上的分区迁移一部分到新节点上,从而达到负载均衡的目的。迁移分区,实际上是将新节点作为分区的备份副本,当新节点完全复制了一个分区的所有数据,并且加入分区的ISR集合后,旧节点已有的一个副本就会被删除。在整个迁移过程中,分区的副本数保持不变,只不过分区的所属节点从旧节点迁移到了新节点。Kafka提供了分区重新分配( partition reassignment tool
)的工具来在不同节点之间移动分区,但该工具并不会自动学习Kafka集群的数据分布来移动分区达到数据的均匀分布,管理员需要手动指定哪些主题或分区需要移动。使用该工具需要执行下面的3个步骤。
-
--generate
:给定主题和需要移动到的目标节点,生成候选的分区分配计划。 -
--execute
:根据上一步的分区分配计划或者手动定义的计划执行数据迁移的任务。 -
--verify
:验证上一步执行任务涉及的所有分区的分配状态是否已经完成。
下面的示例会将 foo1
和 foo2
主题的所有分区全部移动到新的节点5、6上,最后这两个主题的所有分区都只在5、6节点上。第一步生成计划时,会列举出当前主题所有分区目前所在的节点,如果执行失败,管理员还可以进行回滚操作。
# [1] 生成分区分配计划,指定需要移动的主题和需要移动到的目标节点 $ cat topics-to-move.json {"topics": [{"topic": "foo1"}, {"topic": "foo2"}], "version":1} $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json \ --broker-list "5,6" --generate Current partition replica assignment {"version":1, "partitions":[ {"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } Proposed partition reassignment configuration {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] } # [2] 执行分区重新分配的任务 $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \ --reassignment-json-file expand-cluster-reassignment.json --execute # [3] 验证分区重新分配的进度 $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \ --reassignment-json-file expand-cluster-reassignment.json --verify Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo1,1] is in progress Reassignment of partition [foo1,2] is in progress Reassignment of partition [foo2,0] completed successfully Reassignment of partition [foo2,1] completed successfully Reassignment of partition [foo2,2] completed successfully
除了给定主题,由工具生成所有分区的执行计划,我们也可以直接指定主题需要迁移的分区(当然在 execute
阶段,工具还是会列出指定主题分区当前所在的节点):
$ cat custom-reassignment.json {"version":1,"partitions":[ {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[2,3]} ]} $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \ --reassignment-json-file custom-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[ {"topic":"foo1","partition":0,"replicas":[1,2]}, {"topic":"foo2","partition":1,"replicas":[3,4]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[ {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] }
除此之外,迁移工具还适用于给分区增加副本数。增加副本数是复制(而不是移动)已有的分区到其他节点,不管使用手动还是自动生成的分配计划,都要包含分区之前所在的节点。下面的示例中, foo
主题的分区0只有一个副本是存在节点5上,增加到3个副本后,存在的节点有5、6、7这3个节点。
$ cat increase-replication-factor.json {"version":1, "partitions":[ {"topic":"foo","partition":0,"replicas":[5,6,7]}] } $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 \ --reassignment-json-file increase-replication-factor.json --execute Current partition replica assignment {"version":1, "partitions":[{ "topic":"foo","partition":0,"replicas":[5]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[ {"topic":"foo","partition":0,"replicas":[5,6,7]}] } # 副本数为一个时的主题信息 $ bin/kafka-topics.sh --zookeeper localhost:2181 --topic foo --describe Topic:foo PartitionCount:1 ReplicationFactor:1 Configs: Topic: foo Partition: 0 主副本: 5 Replicas: 5 Isr: 5 # 增加副本数后的主题信息 $ bin/kafka-topics.sh --zookeeper localhost:2181 --topic foo --describe Topic:foo PartitionCount:1 ReplicationFactor:3 Configs: Topic: foo Partition: 0 主副本: 5 Replicas: 5,6,7 Isr: 5,6,7
注意:修改主题的分区数可以直接采用修改主题的方式,但是修改分区的副本数涉及数据的复制,需要用到上面的分区迁移工具。
11.2 安全机制( Security
)
Kafka的安全机制主要分为下面两个部分:
- 身份认证(
Authentication
):对客户端与服务器的连接进行身份认证。Kafka目前支持SSL
、SASL/Kerberos
、SASL/PLAIN
三种认证机制。 - 权限控制(
Authorization
):对消息级别的访问控制列表(ACL)权限控制。
下面以 SASL/PLAIN
的身份认证为例,服务端需要先修改下面三个配置文件,然后启动服务端:
$ vi config/server.properties listeners=SASL_PLAINTEXT://localhost:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN $ vi config/jaas.conf KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin" }; KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin"; }; $ vi bin/kafka-run-class.sh KAFKA_SASL_OPTS="-Djava.security.auth.login.config=../config/jaas.conf" KAFKA_OPTS="$KAFKA_SASL_OPTS $KAFKA_OPTS"
客户端也需要添加两个配置项,下面以控制台的生产者和消费者为例,说明客户端的身份认证:
$ vi config/producer.properties security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN $ vi config/consumer.properties security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN $ bin/kafka-console-producer.sh --broker-list localhost:9092 \ --topic test-security --producer.config config/producer.properties hello $ bin/kafka-console-consumer.sh --new-consumer \ --bootstrap-server localhost:9092 --topic test-security \ --from-beginning --consumer.config config/consumer.properties hello
如果使用代码,还需要设置 java.security.auth.login.config
为系统的环境变量配置。下面是生产者使用身份认证的示例:
public class KafkaProducerDemo { public static void main(String[] args) { // 设置客户端登陆的身份认证机制,指定配置文件 System.setProperty("java.security.auth.login.config", "/Users/zhengqh/.../resources/kafka_client_jaas.conf"); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("client.id", "DemoProducer"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("security.protocol", "SASL_PLAINTEXT"); // 安全协议类型 props.put("sasl.mechanism", "PLAIN"); // 安全机制 KafkaProducer<Integer, String> producer = new KafkaProducer<>(props); ProducerRecord<Integer, String> record1 = new ProducerRecord<Integer, String>("test-security", 1, "one"); producer.send(record1, new Callback() { public void onCompletion(RecordMetadata recordMetadata,Exception e){ System.out.println(recordMetadata); } }); producer.flush(); producer.close(); } }
上面我们只分析了 SASL_PLAINTEXT
安全协议的例子,Kafka支持的其他安全协议以及权限认证可以参考官方的文档。另外,服务端与ZooKeeper以及服务端之间也都有安全机制和身份认证机制,这里就不再深入分析。
11.3 Kafka配置
Kafka官方文档中针对服务端(代理节点)、主题、生产者、消费者都有完整的配置说明,下面列举了比较重要的一些配置项。
11.3.1 服务端的配置项
服务端的配置项参见表1。
表1 服务端配置信息
配置项 | 说明 |
---|---|
broker.id |
Kafka服务器的编号,同一个集群不同节点的编号应该唯一 |
zookeeper.connect |
连接ZooKeeper的地址,不同Kafka集群如果连接到同一个ZooKeeper,应该使用不同的chroot路径 |
auto.create.topics.enable |
自动创建主题,默认为 true |
auto.leader.rebalance.enable |
开启主副本自动平衡,当节点宕机后,会影响这个节点上的主副本转移到其他节点,宕机的节点重启后只能作为备份副本,如果开启平衡,则会将主副本转移到原节点 |
delete.topic.enable |
自动删除主题,默认为 false ,通过 delete 命令删除主题,并不会物理删除,只有开启该选项才会真正删除主题的日志文件 |
log.dirs |
日志文件的目录,可以指定多个目录。默认是/tmp/kafka-logs |
log.flush.interval.messages |
在消息集刷写到磁盘之前需要收集的消息数量,默认值为 Long.MAX |
log.flush.scheduler.interval.ms |
日志刷新线程过久,检查一次是否有日志文件需要刷写到磁盘,默认值为 Long.MAX 。 |
log.retention.bytes |
日志文件超过最大大小时删除旧数据,默认值为 -1 ,即永不会删除 |
log.retention.hours |
日志文件保留的时间,默认为168小时,即7天 |
log.segment.bytes |
单个日志文件片段的最大值,默认为1 GB,日志超过1 GB后会刷写到磁盘 |
message.max.bytes |
服务端接收的消息最大值,默认为1 MB,即一批消息最大不能超过1 MB |
min.insync.replicas |
当生产者的应答策略设置为 all 时,写操作的数量必须满足该值才算成功。默认值为 1 ,表示只要写到一个节点就算成功 |
offsets.commit.required.acks |
消费者提交偏移量和生产者写消息的行为类似,用应答来表示写操作是否成功,默认值为 -1 |
offsets.commit.timeout.ms |
类似于生产者的请求超时时间,写请求会被延迟,默认5秒 |
offsets.topic.num.partitions |
消费者提交偏移量内部主题的分区数量,默认为50个 |
offsets.topic.replication.factor |
消费者提交偏移量内部主题的副本数量,默认为3个 |
replica.fetch.min.bytes |
每个拉取请求最少要拉取的字节数量,默认为1byte。 |
replica.fetch.wait.max.ms |
每个拉取请求的最大等待时间,默认为500毫秒 |
replica.lag.time.max.ms |
备份副本在指定时间内都没有发送拉取请求,或者在这个时间内仍然没有赶上主副本,它将会被从ISR中移除,默认10秒 |
request.timeout.ms |
客户端从发送请求到接收响应的超时时间,默认30秒 |
zookeeper.session.timeout.ms |
ZooKeeper会话的超时时间,默认6秒 |
default.replication.factor |
自动创建的主题的副本数,默认为1个 |
log.cleaner.delete.retention.ms |
被删除的记录保存的时间,默认为1天 |
log.cleaner.enable |
是否开启日志清理线程,当清理策略为 compact 时,建议开启 |
log.index.interval.bytes |
添加1条索引到日志文件的间隔,默认为4096条 |
log.index.size.max.bytes |
索引文件的最大大小,默认为10 MB |
num.partitions |
每个主题的分区数量,默认为1个 |
replica.fetch.max.bytes |
拉取请求中每个分区的消息最大值,默认为1 MB |
replica.fetch.response.max.bytes |
整个拉取请求的消息最大值,默认为10 MB |
主题级别的一些配置和服务端级别的设置类似,比如 flush.messages
类似 log.flush.interval.messages
,表示刷写到磁盘的消息数量; flush.ms
类似 log.flush.scheduler.interval.ms
,表示刷写到磁盘的时间间隔; max.message.bytes
类似 message.max.bytes
,表示服务端接收的单条消息大小。
11.3.2 生产者的配置项
生产者配置信息参见表2。
表2 生产者配置信息
配置项 | 说明 |
---|---|
bootstrap.servers |
生产者客户端连接Kafka集群的地址和端口,多个节点用逗号分隔 |
acks |
生产者请求要求主副本收到的应答数量满足后,写请求才算成功。 0 表示记录添加到网络缓冲区后就认为已经发送,生产者不会等待服务端的任何应答; 1 表示主副本会将记录到本地日志文件,但不会等待任何备份副本的应答; -1 或 all 表示主副本必须等待ISR中所有副本都返回应答给它 |
retries |
发送时出现短暂的错误或者收到错误码,客户端会重新发送记录。如果 max.in.flight.requests.per.connection 没有设置为 1 ,在异常重试时,服务端收到的记录可能是乱序的 |
buffer.memory |
生产者发送记录给服务端在客户端的缓冲区,默认为32 MB |
batch.size |
当多条记录发送到同一个分区,生产者会尝试将一批记录分成更少的请求,来提高客户端和服务端的性能,默认每一个Batch的大小为16 KB。如果一条记录就超过了16 KB,则这条记录不会和其他记录组成Batch。Batch太小会减小吞吐量,Batch太大会占用太多的内存 |
max.request.size |
一个请求的最大值,实际上也是记录的最大值。注意服务端关于记录的最大值(Broker的 message.max.bytes ,或者Topic的 max.message.bytes )可能和它不同(实际上默认值都是1 MB)。这个配置项会限制生产者一个请求中Batch的记录数,防止发送过大的请求 |
partitioner.class |
消息的分区语义,对消息进行路由到指定的分区,实现分区接口 |
request.timeout.ms |
客户端等待一个请求的响应的最长时间,超时后客户端会重新发送或失败 |
timeout.ms |
服务端等待备份的应答来达到生产者设置的 ack 的最长时间,超时后不满足失败 |
11.3.3 新消费者的配置项
新消费者的配置信息参见表3。
表3 新消费者的配置信息
配置项 | 说明 |
---|---|
fetch.min.bytes |
拉取请求要求服务端返回的数据最小值,如果服务端的数据量还不够,客户端的请求会一直等待,直到服务端收集到足够的数据才会返回响应给客户端。默认值为1个字节,表示服务端处理的拉取请求数据量只要达到1个字节就立即收到响应,或者因为在等待数据的到达一直没有满足最小值时而超时后,拉取请求也会结束。将该值设置大一点,可以牺牲一些延迟来获取服务端更高的吞吐量 |
fetch.max.bytes |
服务端对一个拉取请求返回数据的最大值,默认值为50 MB |
fetch.max.wait.ms |
在没有收集到满足 fetch.min.bytes 大小的数据之前,服务端对拉取请求的响应会阻塞直到超时,默认500毫秒 |
group.id |
消费者所述的唯一消费组名称,在使用基于Kafka的偏移量管理策略,或者使用消费组管理协议的订阅方法时,必须指定消费组名称 |
heartbeat.interval.ms |
使用消费组管理协议时消费者和协调者的心跳间隔,心跳用来确保消费者的会话保持活动的状态,以及当有新消费者加入或消费者离开时可以更容易地进行平衡,该选项必须比 session.timeout.ms 小,通常设置为不大于它的1/3。默认值为3秒,我们可以将心跳值设置得更低,来更好地控制平衡:需要平衡时,心跳间隔越短就能越快地感知到 |
max.partition.fetch.bytes |
服务端返回的数据中每个分区的最大值,默认值为1 MB |
session.timeout.ms |
使用消费组管理协议检测到消费者失败的最大时间,消费者定时地向Broker发送心跳表示处于存活状态。服务端的Broker会记录消费者的心跳时间,如果在指定的会话时间内都没有收到消费者的心跳,Broker会将其从消费组中移除并启动一次平衡 |
auto.offset.reset |
Kafka中没有分区的初始偏移量,消费者任何定位分区位置。 earliest 表示重置到最旧的位置; latest 表示重置到最新的位置,默认值为 latest |
enable.auto.commit |
消费者的偏移量是否会在后台定时地提交,默认值为 true |
auto.commit.interval.ms |
消费者自动提交偏移量的时间间隔,默认值为5秒 |
max.poll.interval.ms |
使用消费组管理协议时,在调用 poll() 之间的最大延迟,它设置了消费者在下一次拉取更多记录之前允许的最长停顿时间。如果超时后消费者仍然没有调用 poll() ,那么消费者就会被认为失败了,就会启动消费组的平衡,默认值为5秒 |
max.poll.records |
在一次 poll() 调用中允许返回的最大记录数,默认值为500条 |
partition.assignment.strategy |
使用消费者管理协议时,消费者实例之间用来进行分区分配的策略,默认值为 RangeAssignor |
11.4 Kafka其他操作实验
11.4.1 ZooKeeper连接配置
Kafka的ZooKeeper配置和命令行的ZooKeeper地址不一致导致连接不上ZooKeeper,下面是server.properties的ZooKeeper连接配置,指定了Kafka在ZooKeeper中的根节点是 /kafka
:
broker.id=0 #listeners=PLAINTEXT://:9092 zookeeper.connect=localhost:2181/kafka log.dirs=/tmp/kafka-logs
如果命令行中连接的ZooKeeper地址没有加上 /kafka
,创建主题会报错可用的节点为0,加上 /kafka
后可以成功创建主题:
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 --topic test Error while executing topic command : RF: 1 larger than available brokers: 0 ERROR AdminOperationException: RF: 1 larger than available brokers: 0 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403) at kafka.admin.TopicCommand$.createTopic(TopicCommand.scala:110) at kafka.admin.TopicCommand$.main(TopicCommand.scala:61) at kafka.admin.TopicCommand.main(TopicCommand.scala) $ bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka \ --replication-factor 1 --partitions 1 --topic test Created topic "test". $ bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka test
生产者连接的是Kafka代理节点的地址,和ZooKeeper没有关系。而旧消费者连接的是ZooKeeper,所以也要加上 /kafka
才能读取到消息:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test this is a message this is another message $ bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka \ --topic test --from-beginning this is a message this is another message
上面的实验通过在Kafka服务端的配置文件中设置ZooKeeper根节点,可以在一个ZooKeeper中区分多个Kafka集群。下面的实验就利用了该功能。
11.4.2 MirrorMaker
演示消费者线程数量
单机模拟多个Kafka集群,每个集群各自只有一台服务器。不同Kafka集群的 zookeeper.connect
配置项分别是: localhost:2181/kafka
和 localhost:2181/kafka_dc
(这两个集群叫作kafka集群、kafka_dc集群)。查看ZooKeeper的节点,因为是不同的Kafka集群,所以代理节点的编号可以一样(当然由于在本机模拟多个集群,端口号不能一样):
[zk: localhost:2181(CONNECTED) 0] ls / [kafka_dc, zookeeper, kafka] [zk: localhost:2181(CONNECTED) 3] ls /kafka/brokers/ids [0] [zk: localhost:2181(CONNECTED) 4] ls /kafka_dc/brokers/ids [0]
在Kafka集群创建分区数只有一个的主题 test
,然后启动 MirrorMaker
,设置消费者线程数量为3:
$ bin/kafka-mirror-maker.sh --num.streams 3 \ --consumer.config config/consumer_source.properties \ --producer.config config/producer_dest.properties --whitelist test
ZooKeeper中消费者的数量也有3个,但是因为分区只有一个,消费者 Owner
也只有一个:
[zk: localhost:2181] ls /kafka/consumers/mm/ids [mm_zqhmac-dd52d0ea, mm_zqhmac-60c27086, mm_zqhmac-d0eece39] [zk: localhost:2181] get /kafka/consumers/mm/owners/test/0 mm_zqhmac-60c27086-0 [zk: localhost:2181] get /kafka/consumers/mm/ids/mm_zqhmac-60c27086 {"version":1,"subscription":{"test":1},"pattern":"white_list"}
因为消费者数量比分区的数量要多,所以有些消费者会分配不到分区。在执行 MirrorMaker
程序时,控制台会提示有两个消费者线程没有分配到分区。
WARN No broker partitions consumed by consumer thread mm_zqhmac-d0eece39-0 for topic test (kafka.consumer.RangeAssignor) WARN No broker partitions consumed by consumer thread mm_zqhmac-dd52d0ea-0 for topic test (kafka.consumer.RangeAssignor)
通过控制台的消费者检查 Mirror
(kafka_dc)目标集群是否有数据写入,可以看到虽然我们没有在kafka_dc集群创建 test
主题,但是通过镜像工具,源集群的数据会复制到目标集群。
$ bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka_dc test $ bin/kafka-console-consumer.sh --zookeeper localhost:2181/kafka_dc \ --topic test --from-beginning this is third message this is fouth message
检查消费组所有消费者的消费情况,也只有一个消费者:
$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \ --group mm --zookeeper localhost:2181/kafka --topic test Group Topic Pid Offset logSize Lag Owner mm test 0 4 4 0 mm_zqhmac-60c27086-0
11.4.3 生产者和消费者性能测试
Kafka提供了一些工具类,包括生产者和消费者的性能测试,端到端的延迟。下面的实验是在一个小型的Kafka集群上,并且测试主题 test-rep-3
有3个副本、6个分区:
$ zookeeper=192.168.6.55:2181,192.168.6.56:2181,192.168.6.57:2181/kafka010 $ kafka=192.168.6.52:9092,192.168.6.52:9093,192.168.6.53:9094,192.168.6.53:9095 $ bin/kafka-topics.sh --zookeeper $zookeeper --create \ --topic test-rep-3 --partitions 6 --replication-factor 3 $ bin/kafka-topics.sh --describe --zookeeper $zookeeper --topic test-rep-3 Topic:test-rep-3 PartitionCount:6 ReplicationFactor:3 Configs: Topic: test-rep-3 Partition: 0 主副本: 3 Replicas: 3,2,0 Isr: 3,2,0 Topic: test-rep-3 Partition: 1 主副本: 0 Replicas: 0,3,1 Isr: 0,3,1 Topic: test-rep-3 Partition: 2 主副本: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: test-rep-3 Partition: 3 主副本: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: test-rep-3 Partition: 4 主副本: 3 Replicas: 3,0,1 Isr: 3,0,1 Topic: test-rep-3 Partition: 5 主副本: 0 Replicas: 0,1,2 Isr: 0,1,2
接着对生产者和消费者进行性能测试(笔者的测试环境还有其他服务,所以测试结果并不是很理想,如果要对Kafka进行压测,最好模拟线上的机器配置):
#####生产者性能测试##### $ bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance \ --topic test-rep-3 --num-records 50000000 --record-size 100 \ --throughput -1 --producer-props acks=1 buffer.memory=67108864 \ batch.size=8196 bootstrap.servers=$kafka ## 第一次在集群内测试 50000000 records sent, 749906.261717 records/sec (71.52 MB/sec), 50.73 ms avg latency, 1356.00 ms max latency, 2 ms 50th, 266 ms 95th, 603 ms 99th, 1327 ms 99.9th. ## 第二次在集群内测试 50000000 records sent, 84956.858907 records/sec (8.10 MB/sec), 5781.48 ms avg latency, 17968.00 ms max latency, 9872 ms 50th, 16705 ms 95th, 17492 ms 99th, 17909 ms 99.9th. ## 第三次在集群外测试 50000000 records sent, 42554.459069 records/sec (4.06 MB/sec), 11455.58 ms avg latency, 51425.00 ms max latency, 82 ms 50th, 29290 ms 95th, 30192 ms 99th, 36732 ms 99.9th. #####消费者性能测试##### $ bin/kafka-consumer-perf-test.sh --zookeeper $zookeeper \ --messages 50000000 --topic test-rep-3 --threads 1 ## 第一次在集群内测试 start, end, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec 17:00:32:149, 17:00:56:811, 4767.4932, 193.3133, 49990789, 2027037.1016 ## 第二次在集群内测试 17:39:11:883, 17:44:03:117, 4768.3716, 16.3730, 50000000, 171683.2513 # 消费者性能测试(多线程) $ bin/kafka-consumer-perf-test.sh --zookeeper $zookeeper \ --messages 50000000 --topic test-rep-3 --threads 6
在生产者的测试过程中,有些分区由于网络或者其他原因会对ISR进行调整,日志如下:
INFO Partition [test-rep-3,1] on broker 0: Shrinking ISR from 0,1,3 to 0,1 INFO Partition [test-rep-3,5] on broker 0: Expanding ISR from 0,1 to 0,1,2 INFO Partition [test-rep-3,1] on broker 0: Expanding ISR from 0,1 to 0,1,3
这时如果查看主题信息,会发现主题中每个分区的ISR和最开始创建的时候不同。不过等生产者测试运行完毕,再过一段时间,就会恢复到刚开始的ISR,这是因为默认开启了主副本自动迁移:
$ bin/kafka-topics.sh --describe --zookeeper $zookeeper --topic test-rep-3 Topic:test-rep-3 PartitionCount:6 ReplicationFactor:3 Configs: Topic:test-rep-3 Partition: 0 主副本: 3 Replicas: 3,2,0 Isr: 3,2 Topic:test-rep-3 Partition: 1 主副本: 0 Replicas: 0,3,1 Isr: 0,1,3 Topic:test-rep-3 Partition: 2 主副本: 1 Replicas: 1,0,2 Isr: 1,0 Topic:test-rep-3 Partition: 3 主副本: 2 Replicas: 2,1,3 Isr: 2,3 Topic:test-rep-3 Partition: 4 主副本: 3 Replicas: 3,0,1 Isr: 3 Topic:test-rep-3 Partition: 5 主副本: 0 Replicas: 0,1,2 Isr: 0,1,2
11.5 第三方工具
11.5.1 Confluent Platform
Confluent的各个组件和默认端口如下:
Component | Default Port |
---|---|
Zookeeper | 2181 |
Apache Kafka brokers (plain text) | 9092 |
Schema Registry REST API | 8081 |
REST Proxy | 8082 |
Kafka Connect REST API | 8083 |
Confluent Control Center | 9021 |
安装包主要有三个目录:
confluent-3.3.0/bin/ # Driver scripts for starting/stopping services confluent-3.3.0/etc/ # Configuration files confluent-3.3.0/share/java/ # Jars
启动各个组件:
./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties & ./bin/kafka-server-start ./etc/kafka/server.properties & ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &
1. 控制中心(Controll Center)
Confluent商业产品的一个重要功能是控制中心(Controll Center)。在启动控制中心之前呢,需要修改下面三个文件的配置信息:
- Kafka服务端的配置文件:etc/kafka/server.properties
- Kafka Connect集群的配置文件:etc/kafka/connect-distributed.properties
- 控制中心中心的配置文件:etc/confluent-control-center/control-center.properties
sed 's/#metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter/metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter/g' && \ sed 's/#confluent.metrics.reporter.bootstrap.servers=localhost:9092/confluent.metrics.reporter.bootstrap.servers=localhost:9092/g' && \ sed 's/#confluent.metrics.reporter.zookeeper.connect=localhost:2181/confluent.metrics.reporter.zookeeper.connect=localhost:2181/g' && \ sed 's/#confluent.metrics.reporter.topic.replicas=1/confluent.metrics.reporter.topic.replicas=1/g' \ etc/kafka/server.properties cat <<EOF >> etc/kafka/connect-distributed.properties # Interceptor setup consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor EOF cat <<EOF >> etc/confluent-control-center/control-center.properties # Quickstart partition and replication values confluent.controlcenter.internal.topics.partitions=1 confluent.controlcenter.internal.topics.replication=1 confluent.controlcenter.command.topic.replication=1 confluent.monitoring.interceptor.topic.partitions=1 confluent.monitoring.interceptor.topic.replication=1 confluent.metrics.topic.partition=1 confluent.metrics.topic.replication=1 EOF
接着启动confluent-control-center和分布式的Kafka连接器集群:
bin/control-center-start etc/confluent-control-center/control-center.properties & bin/connect-distributed etc/kafka/connect-distributed.properties &
然后执行一些性能测试,比如执行生产者和消费者的性能测试脚本:
bin/kafka-topics --zookeeper localhost:2181 --create \ --topic test-1 --partitions 1 --replication-factor 1 bin/kafka-run-class org.apache.kafka.tools.ProducerPerformance \ --topic test-1 --num-records 50000000 --record-size 100 \ --throughput -1 --producer-props acks=1 buffer.memory=67108864 \ batch.size=8196 bootstrap.servers=localhost:9092 bin/kafka-consumer-perf-test --zookeeper localhost:2181 \ --messages 50000000 --topic test-1 --threads 1
打开浏览器: http://192.168.6.53:9021 ,观察到页面实时显示集群的相关度量曲线图:
2. 连接器(Kafka Connect)
自带的kafka-connect-elasticsearch插件的相关文件:
[qihuang.zheng@dp0653 confluent-3.2.1]$ ll etc/kafka-connect-elasticsearch/ -rw-r--r-- 1 qihuang.zheng users 803 9月 28 16:11 quickstart-elasticsearch.properties [qihuang.zheng@dp0653 confluent-3.2.1]$ ll share/java/kafka-connect-elasticsearch/ -rw-r--r-- 1 qihuang.zheng users 263965 9月 28 16:12 commons-codec-1.9.jar -rw-r--r-- 1 qihuang.zheng users 434678 9月 28 16:12 commons-lang3-3.4.jar -rw-r--r-- 1 qihuang.zheng users 61829 9月 28 16:12 commons-logging-1.2.jar -rw-r--r-- 1 qihuang.zheng users 212164 9月 28 16:12 gson-2.4.jar -rw-r--r-- 1 qihuang.zheng users 2256213 9月 28 16:12 guava-18.0.jar -rw-r--r-- 1 qihuang.zheng users 177013 9月 28 16:12 httpasyncclient-4.1.1.jar -rw-r--r-- 1 qihuang.zheng users 732765 9月 28 16:12 httpclient-4.5.1.jar -rw-r--r-- 1 qihuang.zheng users 326724 9月 28 16:12 httpcore-4.4.4.jar -rw-r--r-- 1 qihuang.zheng users 356091 9月 28 16:12 httpcore-nio-4.4.4.jar -rw-r--r-- 1 qihuang.zheng users 18398 9月 28 16:12 jest-2.0.0.jar -rw-r--r-- 1 qihuang.zheng users 216228 9月 28 16:12 jest-common-2.0.0.jar -rw-r--r-- 1 qihuang.zheng users 44524 9月 28 16:12 kafka-connect-elasticsearch-3.2.1.jar -rw-r--r-- 1 qihuang.zheng users 41071 9月 28 16:12 slf4j-api-1.7.21.jar -rw-r--r-- 1 qihuang.zheng users 10680 9月 28 16:12 slf4j-simple-1.7.5.jar
注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。