[聚合文章] 深入解析中间件之-RocketMQ

消息系统 2017-10-19 18 阅读

Apache RocketMQ

基本流程

Remoting RPC示例

rocketmq-remoting模块采用Netty封装了RPC的调用,包括客户端和服务端之间的交互。

不同分布式系统在通信上都会实现RPC模块,比如Kafka、Hadoop等都有各自的RPC实现。

先来查看测试用例RemotingServerTest的使用方法:

  • 启动RemotingServer和RemotingClient
  • 调用RemotingClient的invokeAsync()或者invokeSync()、invokeOneway()方法

以异步调用为例,RemotingClient的invokeAsync()方法主要有三个参数:

  • 服务端地址,RPC调用需要指定服务端的地址,这样客户端才能发送请求,让服务端处理
  • 远程指令(RemotingCommand),即客户端发送的请求
  • 回调对象(InvokeCallback),即客户端收到服务端返回的响应结果后,如何处理

RPC调用的具体步骤如下:

  • 启动客户端和服务端
  • 客户端构造远程指令对象
  • 客户端通过RemotingClient同步或者异步调用
  • 服务端在启动时注册的处理器,会处理客户端发送的请求,即调用处理器的processRequest()方法
  • 服务端处理完请求后,返回响应给客户端
  • 客户端收到服务端返回的响应结果,会触发回调对象调用operationComplete()方法
public static RemotingServer createRemotingServer() throws InterruptedException {
    NettyServerConfig config = new NettyServerConfig();
    RemotingServer remotingServer = new NettyRemotingServer(config);
    remotingServer.registerProcessor(0, new NettyRequestProcessor() {
        @Override
        public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
            request.setRemark("Hi " + ctx.channel().remoteAddress());
            return request;
        }
    }, Executors.newCachedThreadPool());
    remotingServer.start();
    return remotingServer;
}

public void testInvokeAsync() throws InterruptedException, RemotingConnectException,
    RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {

    final CountDownLatch latch = new CountDownLatch(1);
    RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
    request.setRemark("messi");
    remotingClient.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() {
        public void operationComplete(ResponseFuture responseFuture) {
            latch.countDown();
            assertThat(responseFuture.getResponseCommand().getExtFields()).hasSize(2);
        }
    });
    latch.await();
}

RemotingServer的registerProcessor()方法有三个参数:

  • 请求编码,比如SEND_MESSAGE表示(生产者)客户端发送消息的请求
  • 请求处理器,比如服务端如何处理客户端发送消息的处理器,实现类为:SendMessageProcessor
  • 处理线程,每种请求编码都对应一个处理线程池。如果没有指定,则使用默认的线程池

客户端调用服务端有三种方式:同步(Sync)、异步(Async)、一次性(OneWay)。前两种有响应结果,最后一种不产生响应结果。

Netty RPC

NettyRemotingServer 在启动时,会绑定NettyServerHandler。Netty RPC的特点如下:

  • 请求和响应都是用RemotingCommand对象来表示
  • 服务端(NettyRemotingServer)和客户端(NettyRemotingClient)实现了抽象的NettyRemotingAbstract
  • 抽象类根据不同的指令类型调用不同的处理方法,比如处理请求调用processRequestCommand,处理响应调用processResponseCommand

下面举例客户端和服务端执行一次RPC调用链路的过程:

  • 客户端发送请求给服务端,通过Netty的Channel发送请求给服务端
  • 服务端处理客户端发送的请求,NettyServerHandler接收的消息类型为REQUEST_COMMAND,调用processRequestCommand方法
  • 服务端处理完成后,通过Netty的Channel发送响应结果给客户端
  • 客户端处理服务端发送的响应,NettyClientHandler接收的消息类型为RESPONSE_COMMAND,调用processResponseCommand方法

NettyRemotingAbstract用 processorTable 变量记录了请求编码、处理器、线程池之间的关系。

  • 每个请求编码都对应了一种唯一的处理器,相同请求编码的处理器是相同的
  • 由于处理器与线程池组成一对,所以相同请求编码的请求在相同的线程池中执行

不同的请求编码在不同的线程池中运行,以发送消息和消费消息为例:

请求编码(request code) 处理器 线程池
SEND_MESSAGE SendMessageProcessor ExecutorService#1
GET_MESSAGE PullMessageProcessor ExecutorService#2

以经典的RPC通信模型来看,客户端向服务端发起RPC调用请求。那么 processorTable 主要针对服务端, responseTable 则主要针对客户端。

  • 客户端发起RPC调动时,会创建异步的响应对象,并放入将opaque和ResponseFuture的映射关系放入responseTable
  • 当客户端收到服务端发送的响应结果后,会将opaque以及ResponseFuture从responseTable中移除

那么opaque是如何在请求和响应之间进行关联的呢?下面代码中的注释说明了opaque在请求和响应之间的设置和获取流程。

protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
    new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
 protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
    new ConcurrentHashMap<Integer, ResponseFuture>(256);

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    if (cmd != null) {
        switch (cmd.getType()) {
            case REQUEST_COMMAND:
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}

// 处理请求,比如服务端处理客户端发送的请求,NettyServerHandler会调用到这里
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    // 4. 从请求对象中获取opaque,那么什么时候opaque设置到请求中?
    // 这里的cmd实际上是步骤3的request,因为步骤1已经有opaque,所以这里也能取到opaque
    final int opaque = cmd.getOpaque();
    final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
    // 5. 将opaque设置到响应对象中
    response.setOpaque(opaque);
    // 6. 发送响应对象给客户端
    ctx.writeAndFlush(response);
}

// 处理响应,比如客户端处理服务端发送的响应,NettyClientHandler会调用到这里
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
    // 7. 从响应对象中获取opaque,那么什么时候opaque设置到响应里?答案在步骤5中
    // 这里的cmd是步骤5的response,而response的opaque来自于request
    final int opaque = cmd.getOpaque();
    // 8. 根据opaque从responseTable中获取出对应的ResponseFuture
    final ResponseFuture responseFuture = responseTable.get(opaque);
    if (responseFuture != null) {
        responseFuture.setResponseCommand(cmd);
        responseFuture.release();
        // 9. 将opaque与ResponseFuture的映射关系从responseTable中移除,与步骤2互相对应
        responseTable.remove(opaque);
        // 执行客户端在发送RPC调用时定义的回调函数
        if (responseFuture.getInvokeCallback() != null) {
            executeInvokeCallback(responseFuture);
        } else {
            responseFuture.putResponse(cmd);
        }
    }
}

// 客户端发起RPC调用
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request) {
    // 1. 从请求中获取opaque
    final int opaque = request.getOpaque();
    final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
    // 2. 创建ResponseFuture,并记录到responseTable
    this.responseTable.put(opaque, responseFuture);
    final SocketAddress addr = channel.remoteAddress();
    // 3. 发起RPC调用
    channel.writeAndFlush(request);
}

生产者发送消息

以example/quickstart下的Producer发送消息为例,入口方法走到DefaultMQProducerImpl的sendDefaultImpl()方法。

发送消息过程涉及下面几个步骤:

  • tryToFindTopicPublishInfo():根据消息的Topic获取TopicPublishInfo
  • selectOneMessageQueue():选择一个MessageQueue
  • sendKernelImpl():调用内核的发送方法
  • 如果是同步调用,返回SendResult,否则返回空

接下来进入DefaultMQProducerImpl的内核发送方法,主要的参数有:Message、MessageQueue、TopicPublishInfo

  • 如果有Hook,构造SendMessageContext,将Message、MessageQueue等都设置为上下文对象的成员变量
  • 构造SendMessageRequestHeader
  • 从MQClientFactory获取getMQClientAPIImpl()实现类MQClientAPIImpl,调用sendMessage()方法

接下来进入MQClientAPIImpl的sendMessage()方法

  • 根据RequestCode.SEND_MESSAGE(请求编码)和SendMessageRequestHeader(请求头)创建RemotingCommand对象
  • 设置请求的body为消息内容:request.setBody(msg.getBody())
  • 调用remotingClient.invokeAsync()或者invokeSync()方法
  • 对于同步调用,因为要等待结果返回,所以会立即调用processSendResponse()
  • processSendResponse()方法返回一个SendResult对象
private SendResult sendMessageSync(
    final String addr, // Broker的地址
    final String brokerName, // Broker的名字
    final Message msg, // 消息内容
    final long timeoutMillis,
    final RemotingCommand request // 请求对象
) {
    // RPC调用示例,这里的客户端是生产者,通过MQClientAPIImpl调用
    RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
    assert response != null;
    return this.processSendResponse(brokerName, msg, response);
}

生产者通过MQClientAPIImpl发起RPC调用,request请求对象的编码是SEND_MESSAGE。这里的地址指的是Broker的地址,而不是NameServer。

虽然生产者连接的是NameServer,但这中间会有选择MessageQueue,再选择Broker的过程,由于这里先关注整体的流程,暂时不去分析具体的细节。

客户端通过 RemotingClient 调用了服务端Broker,接下来看服务端 BrokerController 的处理。

BrokerController启动时会为各种请求类型注册不同的请求处理器,比如SEND_MESSAGE注册了SendMessageProcessor处理器:

public void registerProcessor() {
    SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
    // SendMessageProcessor有两个Hook:发送消息和消费消息的Hook。
    sendProcessor.registerSendMessageHook(sendMessageHookList);
    sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
}

SendMessageProcessor的processRequest()方法会处理生产者客户端发送的SEND_MESSAGE请求。

客户端在发送请求之前构建了 SendMessageContextSendMessageRequestHeader ,这里对应的会首先从RemotingCommand反解析出着两个对象

  • 解析请求的body,创建MessageExtBrokerInner对象
  • 获取MessageStore,并调用putMessage方法,传入MessageExtBrokerInner对象
  • 返回PutMessageResult,并调用handlePutMessageResult方法
  • 最后返回的是一个RemotingCommand响应对象,会返回给客户端
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    SendMessageContext mqtraceContext;
    switch (request.getCode()) {
        case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.consumerSendMsgBack(ctx, request);
        default: // SEND_MESSAGE的处理逻辑...
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            mqtraceContext = buildMsgContext(ctx, requestHeader);
            this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
            RemotingCommand response;
            if (requestHeader.isBatch()) {
                response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
            } else {
                response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
            }
            this.executeSendMessageHookAfter(response, mqtraceContext);
            return response;
    }
}

接下来进入DefaultMessageStore的putMessage()方法,这个方法会调用CommitLog的putMessage()方法

  • BrokerController和SendMessageProcessor都在broker模块
  • MessageStore和CommitLog则在store模块

CommitLog首先获取最近的MappedFile,然后追加消息到映射文件中。

  • 追加消息的回调类DefaultAppendMessageCallback是执行数据写入文件的真正方法。
  • 追加完成后,有多种的磁盘刷写方式,比如同步和异步
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    AppendMessageResult result = mappedFile.appendMessage(msg, this.appendMessageCallback);
    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    handleDiskFlush(result, putMessageResult, msg);
    handleHA(result, putMessageResult, msg);
    return putMessageResult;
}

同样,我们省略了具体写入到CommitLog中的细节,以及如何处理磁盘的刷写、HA等细枝末节。实际上,到这里为止,

生产者客户端发起RPC调用,到服务端处理请求,服务端返回响应,客户端接收响应结果,这个过程已经分析完毕了。

Pull Consumer

PULL_MESSAGE对应的处理器是PullMessageProcessor。与生产消息调用MessageStore的putMessage()类似,

消费消息调用MessageStore的getMessage()方法,并返回GetMessageResult。

请求编码 消息处理器 消息存储 结果
SEND_MESSAGE SendMessageProcessor putMessage() PutMessageResult
PULL_MESSAGE PullMessageProcessor getMessage() GetMessageResult

消费者还需要提交偏移量,对应ConsumerOffsetManager的commitOffset()方法。

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) {
    final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(
            requestHeader.getConsumerGroup(),   // 消费组
            requestHeader.getTopic(),           // 主题
            requestHeader.getQueueId(),         // 队列编号
            requestHeader.getQueueOffset(),     // 队列的偏移量
            requestHeader.getMaxMsgNums(),      // 最大的消息数量
            messageFilter);                     // 过滤器
    // .......................................................
    if (storeOffsetEnable) {
        this.brokerController.getConsumerOffsetManager().commitOffset(
            RemotingHelper.parseChannelRemoteAddr(channel),
            requestHeader.getConsumerGroup(), 
            requestHeader.getTopic(), 
            requestHeader.getQueueId(), 
            requestHeader.getCommitOffset());
    }
}

存储层设计到文件操作时,生产消息会写到CommitLog,消费消息则会调用getMessage方法,给定偏移量和大小。

注:本文内容来自互联网,旨在为开发者提供分享、交流的平台。如有涉及文章版权等事宜,请你联系站长进行处理。