Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

消息队列实现概要——深度解读分区Topic的实现 #27

Open
aCoder2013 opened this issue Jul 27, 2018 · 4 comments
Open

消息队列实现概要——深度解读分区Topic的实现 #27

aCoder2013 opened this issue Jul 27, 2018 · 4 comments

Comments

@aCoder2013
Copy link
Owner

aCoder2013 commented Jul 27, 2018

前段时间也写过几篇关于消息队列的博客, 分布式消息队列实现概要 这篇博客大致讲了一下实现一个分布式消息队列所需要考虑到的种种因素,本文就详细讲一下如何实现Partitioned topic,即分区消息队列

前言

说道message queue相信大家都不陌生,对于业务方来说很简单就是几个简单的API, 通常也不用关心其内部实现,但如果想要用好消息队列、出了问题能够cover的住,那就必须要能够了解其实现原理,知其然知其所以然。
首先Message queue有一个topic的概念,可以将其理解为日志,发消息的过程说白了其实就是打日志,而消息队列就是存储日志记录的持久层,类似java中的log4j、logback等日志系统,打了日志之后我们就会有一些分析需求,比如说用户过来一个请求,我们将参数、耗时、响应内容等打到日志中,然后会在本机部署一个agent用于抓取日志发送到jstorm集群中用于分析等。那这里我们的应用系统就对应消息队列中的生产者producer,jstorm集群就对应消费者consumer。
所以说白了其实消息队列和log4j这些组件都是日志系统,那么回到message queue的模型中去,如果我们的topic都是单分区的,那么也就是说所有的producer都是往这一个"文件"中打日志,那么这样的话性能必然会有很大的问题,类似日志系统,我们希望每台机器都能够有自己的日志文件,那么自然而然的我们就需要将单分区的topic扩展到多分区的topic。

生产者

生产者这边的实现相对来说比较简单,对于应用层来说我们提供一个统一的topic,比如说hello-world, 那么创建topic的时候,我们对内表示的时候会加一层映射hello-world-> [hello-world-1,hello-world-2...],也就是说producer实际上是往对应的子topic发送消息的,但具体往哪个发送就需要一个路由策略,一般不要求顺序的话就直接轮训发送,如果需要顺序的话就用hash即可,这个信息直接存储到zookeeper即可,说到zookeeper这里简单提一下,zk并不能给客户单提供全局一致的视图,就是说对于两个不同的客户端,zk并不能保证他们能够在任一时间都能够读到完全相同的数据,这可能是由于网络延迟等原因造成,但如果客户端需要的话可以主动调用sync()先强制同步一把数据。
启动流程:

  1. 和broker建立连接
  2. 获取所有的topic列表
  3. 根据指定的路由策略发送消息

消费者

消费者这边的话会比较麻烦一些,根据消息队列要提供的消费语义有不同的实现方案,如果要实现顺序性的话相对会复杂一些。
流程:

  1. 消费者和broker建立连接
  2. 获取topic的元数据,例如topic分区数、offset等
  3. 根据不同的语义这里就需要连接到不同的分区
    实现的区别主要是由于第三点造成的,下面我们看一下主流的几款mq是如何实现的

RocketMQ

RocketMQ的consumer使用一般是均衡消费的方式,比如有一个topic: hello-world,我们创建的时候分配了16个分区,而我们总共有4个consumer:A/B/C/D,那么就会每个consumer分配四个分区,比如consumer A就会分配: hello-world-1、hello-world-2、hello-world-3、hello-world-4这四个分区。
也就是说总共需要这么几个要素: topic的总分区数、consumer的总数。topic的总分区数比较简单,启动的时候直接从zk读取即可,关键是第二个consumer的总数,我们知道应用是会宕机的、扩容、缩容、网络问题等,都会影响consumer的总数,RocketMQ是采用的方式是从name server中获取,每个节点启动的时候会定时向name server发送心跳,如果一定时间内没收到心跳包就可以判定这个节点挂掉了。那么这个问题解决了,consumer启动的时候可以从name server 发送请求,获取consumer的列表。
RocketMQ的分配分区是在客户端执行的,也就是说consumer获取到全量的consumer列表后,根据一定的策略分配分区,然后开始消费。然后我们知道consumer会扩容、缩容等原因不停的在变化,那么这个时候就需要重新负载均衡,那么客户端如果知道consumer变化了呢,有两种方式:

  1. broker主动通知
  2. 客户端定时刷新

那么这里就会有几个问题,由于网络延迟、每个consumer启动的时间不一样,那么consumer获取到变化的时机就会不一样,也就是说数据会不一致,导致在某段时间内 某个分区被多个客户端同时消费。

Kafka

kafka最初是基于zookeeper实现的,简单来说,每个客户端都会连接zookeeper,然后建立对应的watch事件,如果某一个consumer挂掉或者新增了consumer的话,zk下对应的路径就会有变化,然后产生watch事件并通知给客户端,客户端然后执行rebalance事件,看上去貌似方案还阔以,但是会有几个问题:

  1. 脑裂问题: consumer进行rebalance的时机都是根据zk的watch事件决定的,但是由于上文说所说的问题,zk并不能保证同时全局一致的视图,不同consumer看到的数据就不会不一致,导致执行rebalance的时机不一致。
  2. 由于网络延迟,客户端不可能同时接受到watch事件,因此执行rebalance的时机会有一个小窗口
  3. 羊群效应: 一个consumer节点有变化,所有监听这个路径的consumer都会得到通知,但客户端时机可能并不关心这个事件,这样就会导致占用大量的带宽,导致其他操作延迟。

由于上述原因kafka后面进行了几次改版,核心思想就是将收集consumer全量列表的过程放到了server端,然后将rebalance的过程放到了客户端,之前有一版是两个过程都在server端,但是这样就会有个问题:由于rebalance的过程放到了服务器端,那么如果想要指定自定义的分配策略就不灵活了,比如说想要根据机架、机房等分配,因此后面将这个过程放到了客户端。
流程:

  1. Joining the Group
  2. Synchronizing Group State

consumer启动后,会首先向broker发送请求查询当前consumer group的GroupCoordinator,然后就会进入
Joining the Group阶段,向GroupCoordinator发送JoinGroupRequest,GroupCoordinator会从中选一个成为leader,然后向所有的节点发送响应,表明加入成功,但是只有leader的响应中才会包含所有的consumer元数据。
然后会进入Synchronizing Group State阶段, 每个consumer会向GroupCoordinator发送SyncGroup请求,其中leader的SyncGroup请求包含了分配的结果,等leader收到了分配的结果后就会发送响应将结果同步给所有的consumer。当然这里会有很多的corner case,比如说如果GroupCoordinator挂掉了咋办?consumer leader挂掉了又咋办?这里就不详细叙述了。

Pulsar

pulsar的话相对比较清晰,pulsar支持三种消费语义,详细可以参考之前的博客,其中一个shared模式,或者叫RoundRobin模式,所有的consumer可以连接到同一个topic,然后消息会以RoundRobin的形式,轮训发送给每一个consumer,一个消息只会下发给一个consumer。如果一个consumer挂掉了,所有发给它但是还没有ack的消息,都会重新调度发给其他consumer。
看起来实现还是比较清晰的,但是不支持顺序性,因此需要看一下自己的业务场景是否可以满足。

image

总结

这篇博客总结了一下如何实现分区消息队列,并分析了一下主流的几款mq的实现方式,没不存在一种最完美的方案,按照自己的业务场景,比如说是否要求顺序性、性能、是否需要自定义rebalance策略等,决定自己的消息队列的实现方式。

Flag Counter

@jiazhai
Copy link

jiazhai commented Aug 20, 2018

👍。找code,意外发现这里。 :)
关于顺序性:
Pulsar这个是一个Partiton。 如果对一个Partition要保证顺序性消费,就只能使用Exclusive/Failover的订阅模式,不能用share模式。
Kafka的一个Partition应该只能是类似Exclusive/Failover的消费模式;一个Partiton,只能有一个active的Consumer来消费。

@aCoder2013
Copy link
Owner Author

@jiazhai Pulsar的话写的比较简单, 改天我再完善一下,Exclusive/Failover模式应该是只能单个consumer消费所有partition的消息吧,kafka的话类似shared模式,但是多了个partition分配的过程,特定的分区同一时刻只会分配给一个consumer,但pulsar的share模式类似是RoundRobin,所以才不支持顺序性。Exclusive/Failover模式不知道理解的对不对,晚点我再确认下

@gaoyf
Copy link

gaoyf commented Jul 17, 2020

那么这里就会有几个问题,由于网络延迟、每个consumer启动的时间不一样,那么consumer获取到变化的时机就会不一样,也就是说数据会不一致,导致在某段时间内 某个分区被多个客户端同时消费

hi,你好,偶尔查到你的wiki,写的很棒。不过上面这段话不敢苟同,我之前有个问题正好分析测试了下,记录到了这里
image

也就是说某个分区被多个客户端同时消费基本上是不可能的,因为一旦rebalance,subVersion必然会更新,你有空可以做个测试验证一下。

@aCoder2013
Copy link
Owner Author

@gaoyf 赞! 这篇博客写的比较久了,我晚点验证下

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants