Apache RocketMQ
基本流程
Remoting RPC示例
rocketmq-remoting模块采用Netty封装了RPC的调用,包括客户端和服务端之间的交互。
不同分布式系统在通信上都会实现RPC模块,比如Kafka、Hadoop等都有各自的RPC实现。
先来查看测试用例RemotingServerTest的使用方法:
- 启动RemotingServer和RemotingClient
- 调用RemotingClient的invokeAsync()或者invokeSync()、invokeOneway()方法
以异步调用为例,RemotingClient的invokeAsync()方法主要有三个参数:
- 服务端地址,RPC调用需要指定服务端的地址,这样客户端才能发送请求,让服务端处理
- 远程指令(RemotingCommand),即客户端发送的请求
- 回调对象(InvokeCallback),即客户端收到服务端返回的响应结果后,如何处理
RPC调用的具体步骤如下:
- 启动客户端和服务端
- 客户端构造远程指令对象
- 客户端通过RemotingClient同步或者异步调用
- 服务端在启动时注册的处理器,会处理客户端发送的请求,即调用处理器的processRequest()方法
- 服务端处理完请求后,返回响应给客户端
- 客户端收到服务端返回的响应结果,会触发回调对象调用operationComplete()方法
public static RemotingServer createRemotingServer() throws InterruptedException {
NettyServerConfig config = new NettyServerConfig();
RemotingServer remotingServer = new NettyRemotingServer(config);
remotingServer.registerProcessor(0, new NettyRequestProcessor() {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
request.setRemark("Hi " + ctx.channel().remoteAddress());
return request;
}
}, Executors.newCachedThreadPool());
remotingServer.start();
return remotingServer;
}
public void testInvokeAsync() throws InterruptedException, RemotingConnectException,
RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
final CountDownLatch latch = new CountDownLatch(1);
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
request.setRemark("messi");
remotingClient.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() {
public void operationComplete(ResponseFuture responseFuture) {
latch.countDown();
assertThat(responseFuture.getResponseCommand().getExtFields()).hasSize(2);
}
});
latch.await();
}
RemotingServer的registerProcessor()方法有三个参数:
- 请求编码,比如SEND_MESSAGE表示(生产者)客户端发送消息的请求
- 请求处理器,比如服务端如何处理客户端发送消息的处理器,实现类为:SendMessageProcessor
- 处理线程,每种请求编码都对应一个处理线程池。如果没有指定,则使用默认的线程池
客户端调用服务端有三种方式:同步(Sync)、异步(Async)、一次性(OneWay)。前两种有响应结果,最后一种不产生响应结果。
Netty RPC
NettyRemotingServer
在启动时,会绑定NettyServerHandler。Netty RPC的特点如下:
- 请求和响应都是用RemotingCommand对象来表示
- 服务端(NettyRemotingServer)和客户端(NettyRemotingClient)实现了抽象的NettyRemotingAbstract
- 抽象类根据不同的指令类型调用不同的处理方法,比如处理请求调用processRequestCommand,处理响应调用processResponseCommand
下面举例客户端和服务端执行一次RPC调用链路的过程:
- 客户端发送请求给服务端,通过Netty的Channel发送请求给服务端
- 服务端处理客户端发送的请求,NettyServerHandler接收的消息类型为REQUEST_COMMAND,调用processRequestCommand方法
- 服务端处理完成后,通过Netty的Channel发送响应结果给客户端
- 客户端处理服务端发送的响应,NettyClientHandler接收的消息类型为RESPONSE_COMMAND,调用processResponseCommand方法
NettyRemotingAbstract用 processorTable
变量记录了请求编码、处理器、线程池之间的关系。
- 每个请求编码都对应了一种唯一的处理器,相同请求编码的处理器是相同的
- 由于处理器与线程池组成一对,所以相同请求编码的请求在相同的线程池中执行
不同的请求编码在不同的线程池中运行,以发送消息和消费消息为例:
| 请求编码(request code) | 处理器 | 线程池 |
|---|---|---|
| SEND_MESSAGE | SendMessageProcessor | ExecutorService#1 |
| GET_MESSAGE | PullMessageProcessor | ExecutorService#2 |
以经典的RPC通信模型来看,客户端向服务端发起RPC调用请求。那么 processorTable
主要针对服务端, responseTable
则主要针对客户端。
- 客户端发起RPC调动时,会创建异步的响应对象,并放入将opaque和ResponseFuture的映射关系放入responseTable
- 当客户端收到服务端发送的响应结果后,会将opaque以及ResponseFuture从responseTable中移除
那么opaque是如何在请求和响应之间进行关联的呢?下面代码中的注释说明了opaque在请求和响应之间的设置和获取流程。
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
// 处理请求,比如服务端处理客户端发送的请求,NettyServerHandler会调用到这里
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
// 4. 从请求对象中获取opaque,那么什么时候opaque设置到请求中?
// 这里的cmd实际上是步骤3的request,因为步骤1已经有opaque,所以这里也能取到opaque
final int opaque = cmd.getOpaque();
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
// 5. 将opaque设置到响应对象中
response.setOpaque(opaque);
// 6. 发送响应对象给客户端
ctx.writeAndFlush(response);
}
// 处理响应,比如客户端处理服务端发送的响应,NettyClientHandler会调用到这里
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
// 7. 从响应对象中获取opaque,那么什么时候opaque设置到响应里?答案在步骤5中
// 这里的cmd是步骤5的response,而response的opaque来自于request
final int opaque = cmd.getOpaque();
// 8. 根据opaque从responseTable中获取出对应的ResponseFuture
final ResponseFuture responseFuture = responseTable.get(opaque);
if (responseFuture != null) {
responseFuture.setResponseCommand(cmd);
responseFuture.release();
// 9. 将opaque与ResponseFuture的映射关系从responseTable中移除,与步骤2互相对应
responseTable.remove(opaque);
// 执行客户端在发送RPC调用时定义的回调函数
if (responseFuture.getInvokeCallback() != null) {
executeInvokeCallback(responseFuture);
} else {
responseFuture.putResponse(cmd);
}
}
}
// 客户端发起RPC调用
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request) {
// 1. 从请求中获取opaque
final int opaque = request.getOpaque();
final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
// 2. 创建ResponseFuture,并记录到responseTable
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
// 3. 发起RPC调用
channel.writeAndFlush(request);
}
生产者发送消息
以example/quickstart下的Producer发送消息为例,入口方法走到DefaultMQProducerImpl的sendDefaultImpl()方法。
发送消息过程涉及下面几个步骤:
- tryToFindTopicPublishInfo():根据消息的Topic获取TopicPublishInfo
- selectOneMessageQueue():选择一个MessageQueue
- sendKernelImpl():调用内核的发送方法
- 如果是同步调用,返回SendResult,否则返回空
接下来进入DefaultMQProducerImpl的内核发送方法,主要的参数有:Message、MessageQueue、TopicPublishInfo
- 如果有Hook,构造SendMessageContext,将Message、MessageQueue等都设置为上下文对象的成员变量
- 构造SendMessageRequestHeader
- 从MQClientFactory获取getMQClientAPIImpl()实现类MQClientAPIImpl,调用sendMessage()方法
接下来进入MQClientAPIImpl的sendMessage()方法
- 根据RequestCode.SEND_MESSAGE(请求编码)和SendMessageRequestHeader(请求头)创建RemotingCommand对象
- 设置请求的body为消息内容:request.setBody(msg.getBody())
- 调用remotingClient.invokeAsync()或者invokeSync()方法
- 对于同步调用,因为要等待结果返回,所以会立即调用processSendResponse()
- processSendResponse()方法返回一个SendResult对象
private SendResult sendMessageSync(
final String addr, // Broker的地址
final String brokerName, // Broker的名字
final Message msg, // 消息内容
final long timeoutMillis,
final RemotingCommand request // 请求对象
) {
// RPC调用示例,这里的客户端是生产者,通过MQClientAPIImpl调用
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
return this.processSendResponse(brokerName, msg, response);
}
生产者通过MQClientAPIImpl发起RPC调用,request请求对象的编码是SEND_MESSAGE。这里的地址指的是Broker的地址,而不是NameServer。
虽然生产者连接的是NameServer,但这中间会有选择MessageQueue,再选择Broker的过程,由于这里先关注整体的流程,暂时不去分析具体的细节。
客户端通过 RemotingClient
调用了服务端Broker,接下来看服务端 BrokerController
的处理。
BrokerController启动时会为各种请求类型注册不同的请求处理器,比如SEND_MESSAGE注册了SendMessageProcessor处理器:
public void registerProcessor() {
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
// SendMessageProcessor有两个Hook:发送消息和消费消息的Hook。
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
}
SendMessageProcessor的processRequest()方法会处理生产者客户端发送的SEND_MESSAGE请求。
客户端在发送请求之前构建了 SendMessageContext
和 SendMessageRequestHeader
,这里对应的会首先从RemotingCommand反解析出着两个对象
- 解析请求的body,创建MessageExtBrokerInner对象
- 获取MessageStore,并调用putMessage方法,传入MessageExtBrokerInner对象
- 返回PutMessageResult,并调用handlePutMessageResult方法
- 最后返回的是一个RemotingCommand响应对象,会返回给客户端
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default: // SEND_MESSAGE的处理逻辑...
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response;
if (requestHeader.isBatch()) {
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}
接下来进入DefaultMessageStore的putMessage()方法,这个方法会调用CommitLog的putMessage()方法
- BrokerController和SendMessageProcessor都在broker模块
- MessageStore和CommitLog则在store模块
CommitLog首先获取最近的MappedFile,然后追加消息到映射文件中。
- 追加消息的回调类DefaultAppendMessageCallback是执行数据写入文件的真正方法。
- 追加完成后,有多种的磁盘刷写方式,比如同步和异步
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
AppendMessageResult result = mappedFile.appendMessage(msg, this.appendMessageCallback);
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
handleDiskFlush(result, putMessageResult, msg);
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
同样,我们省略了具体写入到CommitLog中的细节,以及如何处理磁盘的刷写、HA等细枝末节。实际上,到这里为止,
生产者客户端发起RPC调用,到服务端处理请求,服务端返回响应,客户端接收响应结果,这个过程已经分析完毕了。
Pull Consumer
PULL_MESSAGE对应的处理器是PullMessageProcessor。与生产消息调用MessageStore的putMessage()类似,
消费消息调用MessageStore的getMessage()方法,并返回GetMessageResult。
| 请求编码 | 消息处理器 | 消息存储 | 结果 |
|---|---|---|---|
| SEND_MESSAGE | SendMessageProcessor | putMessage() | PutMessageResult |
| PULL_MESSAGE | PullMessageProcessor | getMessage() | GetMessageResult |
消费者还需要提交偏移量,对应ConsumerOffsetManager的commitOffset()方法。
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) {
final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(
requestHeader.getConsumerGroup(), // 消费组
requestHeader.getTopic(), // 主题
requestHeader.getQueueId(), // 队列编号
requestHeader.getQueueOffset(), // 队列的偏移量
requestHeader.getMaxMsgNums(), // 最大的消息数量
messageFilter); // 过滤器
// .......................................................
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(
RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getCommitOffset());
}
}
存储层设计到文件操作时,生产消息会写到CommitLog,消费消息则会调用getMessage方法,给定偏移量和大小。
注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。