RocketMQ源码阅读-Broker消息接收


Broker接收 Producer发送的消息。

Broker在RocketMQ中也是一个独立的Model,rocketmq-broker。

Broker的核心类为SendMessageProcessor。
image.png

1. 从单元测试入手

同样从单元测试入手,看Broker接收消息的流程。
SendMessageProcessor的单元测试类为org.apache.rocketmq.broker.processor.SendMessageProcessorTest。
image.png
包含上面这些方法,其中init()方法是启动类,其他是测试流程的方法。
先看init()方法中Broker的启动流程。

2. Broker启动流程

单元测试中的inti()方法为:

@Before
public void init() {
   
    brokerController.setMessageStore(messageStore);
    when(messageStore.now()).thenReturn(System.currentTimeMillis());
    Channel mockChannel = mock(Channel.class);
    when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
    when(handlerContext.channel()).thenReturn(mockChannel);
    when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt());
    sendMessageProcessor = new SendMessageProcessor(brokerController);
}

init方法新建一个SendMessageProcessor,并一个传入brokerController,指定一个messageStore:

  • BrokerController: Broker的管理器
  • MessageStore: 消息存储的接口

继续看接收消息的流程SendMessageProcessor#sendMessage。

3. Broker接收消息

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                      RemotingCommand request) throws RemotingCommandException {
   
    SendMessageContext mqtraceContext;
    switch (request.getCode()) {
   
        case RequestCode.CONSUMER_SEND_MSG_BACK:
            return this.consumerSendMsgBack(ctx, request);
        default:
            // 解析请求
            SendMessageRequestHeader requestHeader = parseRequestHeader(request);
            if (requestHeader == null) {
   
                return null;
            }

            // 发送请求Context。在hook场景下使用
            mqtraceContext = buildMsgContext(ctx, requestHeader);
            // hook:处理发送消息前逻辑
            this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

            RemotingCommand response;
            if (requestHeader.isBatch()) {
   
                // 处理批量发消息逻辑
                response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
            } else {
   
                // 处理发送消息逻辑
                response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
            }

            this.executeSendMessageHookAfter(response, mqtraceContext);
            return response;
    }
}

可以看到,此方法负责解析RPC请求,最终是调用SendMessageProcessor#sendMessage方法:

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
                                    final RemotingCommand request,
                                    final SendMessageContext sendMessageContext,
                                    final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
   

    // 初始化响应
    final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

    response.setOpaque(request.getOpaque());

    response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
    response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));

    log.debug("receive SendMessage request command, {}", request);

    // 如果未开始接收消息,抛出系统异常
    final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
    if (this.brokerController.getMessageStore().now() < startTimstamp) {
   
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
        return response;
    }

    // 消息配置(Topic配置)校验
    response.setCode(-1);
    super.msgCheck(ctx, requestHeader, response);
    if (response.getCode() != -1) {
   
        return response;
    }

    final byte[] body = request.getBody();

    // 如果队列小于0,从可用队列随机选择
    int queueIdInt = requestHeader.getQueueId();
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

    if (queueIdInt < 0) {
   
        queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
    }

    // 创建MessageExtBrokerInner
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(requestHeader.getTopic());
    msgInner.setQueueId(queueIdInt);

    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
   
        return response;
    }

    msgInner.setBody(body);
    msgInner.setFlag(requestHeader.getFlag());
    MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
    msgInner.setPropertiesString(requestHeader.getProperties());
    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    msgInner.setBornHost(ctx.channel().remoteAddress());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    PutMessageResult putMessageResult = null;
    Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    // 校验是否不允许发送事务消息
    if (traFlag != null && Boolean.parseBoolean(traFlag)) {
   
        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
   
            response.setCode(ResponseCode.NO_PERMISSION);
            response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                    + "] sending transaction message is forbidden");
            return response;
        }
        putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
    } else {
   
        // 添加消息
        putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
    }
	// 处理result
    return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

}

可以看到,此方法进行了topic的校验,并创建创建MessageExtBrokerInner,随后添加消息的流程主要是调用了MessageStore#putMessage方法:
MessageStore是接口,其默认实现为DefaultMessageStore:
image.png
DefaultMessageStore#putMessage方法:

public PutMessageResult putMessage(MessageExtBrokerInner msg) {
   
    if (this.shutdown) {
   
        log.warn("message store has shutdown, so putMessage is forbidden");
        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    }

	// 从节点不允许写入
    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
   
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
   
            log.warn("message store is slave mode, so putMessage is forbidden ");
        }

        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    }
	// store是否允许写入
    if (!this.runningFlags.isWriteable()) {
   
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
   
            log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
        }

        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    } else {
   
        this.printTimes.set(0);
    }
	// 消息过长
    if (msg.getTopic().length() > Byte.MAX_VALUE) {
   
        log.warn("putMessage message topic length too long " + msg.getTopic().length());
        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
    }
	// 消息附加属性过长
    if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
   
        log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
        return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
    }

    if (this.isOSPageCacheBusy()) {
   
        return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
    }

    long beginTime = this.getSystemClock().now();
	// 添加消息到commitLog
    PutMessageResult result = this.commitLog.putMessage(msg);

    long eclipseTime = this.getSystemClock().now() - beginTime;
    if (eclipseTime > 500) {
   
        log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
    }
    this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);

    if (null == result || !result.isOk()) {
   
        this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
    }

    return result;
}

可以看到,首先是检查Broker是否可以写入,从节点不能写入,然后消息经过一系列的格式与大小校验,最终通过CommitLog.putMessage进行存储。

4. Broker接收消息时序图

SendMessageProcessor_processRequest.png

5. 小结

Producer作为客户端发送消息后,Broker作为服务端需要接收消息并存储消息。
本篇分析了broker接收消息的流程:

  • SendMessageProcessor#processRequest是RPC执行接收消息的方法,此方法主要负责解析RPC请求
  • processRequest调用SendMessageProcessor#sendMessage方法,进行了topic的校验,并创建创建MessageExtBrokerInner,随后添加消息的流程主要是调用了DefaultMessageStore#putMessage方法
  • DefaultMessageStore#putMessage方法检查Broker是否可以写入,从节点不能写入,然后消息经过一系列的格式与大小校验,最终通过commitLog.putMessage进行存储

消息存储是通过CommitLong#putMessage进行的,这个流程在下一篇《RocketMQ源码阅读-Broker消息存储》学习。

相关推荐

  1. RocketMQ消息过滤机制详解

    2024-01-18 03:22:03       48 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-18 03:22:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-18 03:22:03       101 阅读
  3. 在Django里面运行非项目文件

    2024-01-18 03:22:03       82 阅读
  4. Python语言-面向对象

    2024-01-18 03:22:03       91 阅读

热门阅读

  1. LeetCode 36. 有效的数独

    2024-01-18 03:22:03       52 阅读
  2. pcie设备驱动无法工作排查

    2024-01-18 03:22:03       53 阅读
  3. 排序补充(C语言版)

    2024-01-18 03:22:03       38 阅读
  4. C++在结构(Struct)中使用队列(Queue)

    2024-01-18 03:22:03       55 阅读
  5. Composer安装

    2024-01-18 03:22:03       52 阅读
  6. 前端开发常用的地址

    2024-01-18 03:22:03       60 阅读
  7. 低端单片机彩色屏幕的内存占用疑惑

    2024-01-18 03:22:03       42 阅读
  8. 2024-01-16 创业日记-关于用户需求分析-思考

    2024-01-18 03:22:03       40 阅读
  9. opencv通过轮廓点生成闭合图像

    2024-01-18 03:22:03       56 阅读