RocketMQ5.0延时消息时间轮算法

前言

RocketMQ 相较于其它消息队列产品的一个特性是支持延时消息,也就是说消息发送到 Broker 不会立马投递给消费者,要等待一个指定的延迟时间再投递,适用场景例如:下单后多长时间没付款系统自动关闭订单。
RocketMQ 4.x 版本的延时消息存在一定的局限性,实现原理是:Broker 内置了名称为SCHEDULE_TOPIC_XXXX的Topic,包含 18 个对应延时级别的队列,每个延时级别的时间是固定的。Broker 收到消息后,如果发现是延时消息就会改写消息的 Topic 和 queueID,再通过专门的线程定时扫描这些队列,把延迟时间已到的消息重新投递到真正的 Topic 里面。
这种实现方案相对简单,但也存在局限性。消息的延迟时间不能随意设置,只能按照给定的延迟级别来设置;最长的延迟级别是两小时,如果用户需要更久的延时,就只能自行改造方案。
RocketMQ 5.0 意识到了这个问题,终于延时消息迎来升级,废弃了之前延迟级别的设计,消费者可以设置 24 小时内的任意时间,这个限制其实还可以增加,不过堆积太多的消息会带来其它问题。

设计难点

延时消息的设计是存在一些难点的,下面逐个分析。

1、任意延迟时间如何扫描?
延迟消息肯定需要一个数据结构来存储,为了不影响消息发送的吞吐量,还必须保证写入的高性能。然后还需要有线程定时的扫描这个数据结构,把延迟时间已到的消息投递给消费者。
RocketMQ 4.x 为了保证高性能写,还是把延时消息正常写 CommitLog,只不过换个 Topic。不同延迟时长的消息写入不同队列,这样就能保证每个队列前面的消息肯定比后面的消息先投递,线程扫描的时候顺序扫,只要扫到第一个延迟时间还没到的消息,后面的消息就可以跳过了,避免全局扫描。

2、消息清理问题
RocketMQ 本身可以看作是一个缓冲区,是用来做削峰填谷的,消息不可能一直保留,所以要有定时清理机制。CommitLog 默认会保存三天,如果支持太久的延迟时间,万一 CommitLog 被清理了,延迟时间投递的时候就无法读取出原始消息内容了。

3、大量消息同时到期
大量消息同时到期也是延迟消息比较头疼的问题,毕竟是单线程扫描投递,如果大量消息同时到期,短时间内投递的消息太多,就会导致消息延迟,不过这个问题可以业务上加个随机时间解决。

时间轮算法

RocketMQ 5.0 引入了时间轮算法 (TimingWheel) 来支持任意时间的延时消息。
image.png
先看一下算法的思想,如图所示,圆环就是一个时间轮,它共有 8 个刻度,假设每个刻度代表一秒钟。延时任务会根据延迟时间添加到时间轮对应的刻度上。Data1、Data2 延迟时间都是一秒,所以被添加到刻度1上;Data4 延迟时间 14 秒,饶了一圈后被添加到刻度6上。同时,会有一个指向当前时间刻度的指针,沿着时间轮顺时针旋转,指针每秒前进一个刻度,并把当前刻度上所有的延迟时间已到的延时任务全部执行一遍。
基于时间轮算法,就可以实现任意时间的延时任务的调度执行,如果你觉得“秒”的精度不够,可以把刻度再拆分的更精细化一些,定时任务跑的频率更高一些即可。

设计实现

RocketMQ 是怎么实现时间轮算法的呢?
RocketMQ 用 TimerWheel 类来封装时间轮,它实际对应磁盘上一个固定大小的文件,默认文件路径是${StoreHome}/timerwheel,默认大小约 37 MB。
TimerWheel 由若干个刻度组成,一个刻度就代表一个时间单位,RocketMQ 用Slot类来描述刻度,默认一个 Slot 代表一秒。TimerWheel 默认有 14 天内以秒为单位的所有 Slot,即 1209600 个 Slot。
image.png
每个 Slot 占用固定的 32 字节,格式如下:

字段 长度(字节) 说明
timeMs 8 延迟时间
firstPos 8 第一个消息的位置
lastPos 8 最后一个消息的位置
num 4 消息数量
magic 4 魔数(废弃)

每个 Slot 都有对应的延迟时间,相同延迟时间的消息才会被添加进来,多个延时消息会串联成链表,Slot 用两个指针分别指向了链表的首尾地址。现在的问题是,延时消息存到哪里呢?

为了保证消息写入的性能,延时消息会顺序写入到timerlog文件里,它有点类似 CommitLog,区别是 timerlog 不会存储完整的消息,因为太浪费空间了。延时消息会被转换成固定的 52 字节的 TimerLog 写入。
写入的 TimerLog 格式如下:

字段 长度(字节) 说明
size 4 log 长度,固定值 52
prev pos 8 前一条 log 的位置
magic 4 魔数
curr write time 8 写入时间
delayed time 4 延迟时间
offsetPy 8 实际消息在 CommitLog 偏移量
sizePy 4 实际消息大小
hash code of real topic 4 真实 Topic 哈希码
reserved value 8 保留位 未使用

当有延时消息要被添加到 TimerWheel,首先要根据消息的延迟时间定位到 Slot,然后转换成 52 字节的 TimerLog,顺序写入 timerlog 文件,同时更新 Slot。
image.png
如图所示,现在要在 1号 Slot 上再添加一条延时消息1-4,需要先把1-4写入 timerlog,1-4的 prevPos 指针会指向1-3串联成链表,再把 Slot -> lastPos 指向1-4
image.png

延时消息存储起来了,接下就是线程定时扫描时间轮上的 Slot,判断消息如果到期就投递到原始 Topic 里面让消费者开始消费,如果消息没到期就重新投递进延时队列,进入下一期的时间轮。

不管是延时消息写入 timerlog、还是从 timerlog 取出消息重新投递,这些工作都是通过起单独的线程定时执行的,这里列举下相关的线程 Service:

  • TimerEnqueueGetService:从rmq_sys_wheel_timer队列取出消息,构建TimerRequest放入enqueuePutQueue
  • TimerEnqueuePutService:从enqueuePutQueue取出TimerRequest,写入timerlog
  • TimerDequeueGetService:定时扫描时间轮,取出到期的TimerRequest,放入dequeueGetQueue
  • TimerDequeueGutMessageService:从dequeueGetQueue取出TimerRequest,从CommitLog查询完整消息,放入dequeuePutQueue
  • TimerDequeuePutMessageService:从dequeuePutQueue取出TimerRequest,判断消息是否到期,到期直接投递到目标Topic,没到期进入下一期时间轮

结合流程图再回顾一下整体流程,首先是延时消息发到 Broker 被写入 CommitLog:
image.png
然后是 TimerMessageStore 启动线程把延时消息写入 timerlog、再定时扫描到期消息重新投递的过程。
image.png

源码

客户端 SDK 发送延时消息前,会把延迟时间放在 Message -> system_properties 属性里面,Broker 收到消息准备 putMessage 前,会触发PutMessageHook预留的钩子函数,其中一个叫handleScheduleMessage的钩子就是专门处理延时消息的。
最终会调用HookUtils#handleScheduleMessage方法,如果判断 Broker 启用了时间轮算法,且接收到的是延时消息,就会对消息进行转换。

public static PutMessageResult handleScheduleMessage(BrokerController brokerController,
                                                     final MessageExtBrokerInner msg) {
   
    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
   
        if (!isRolledTimerMessage(msg)) {
   
            if (checkIfTimerMessage(msg)) {
   
                if (!brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
   
                    return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_NOT_ENABLE, null);
                }
                // 是延时消息 且启用时间轮算法 消息转换
                PutMessageResult tranformRes = transformTimerMessage(brokerController, msg);
                if (null != tranformRes) {
   
                    return tranformRes;
                }
            }
        }
        // Delay Delivery
        if (msg.getDelayTimeLevel() > 0) {
   
            transformDelayLevelMessage(brokerController, msg);
        }
    }
    return null;
}

转换的过程就是解析出延迟时间,然后把延迟时间和真实 Topic、QueueID 写入 properties,最后改写 Topic:

private static PutMessageResult transformTimerMessage(BrokerController brokerController, MessageExtBrokerInner msg) {
   
    //do transform
    int delayLevel = msg.getDelayTimeLevel();
    long deliverMs;
    try {
   
        // 从properties取出延迟时间
        if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC) != null) {
   
            deliverMs = System.currentTimeMillis() + Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_SEC)) * 1000;
        } else if (msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS) != null) {
   
            deliverMs = System.currentTimeMillis() + Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELAY_MS));
        } else {
   
            deliverMs = Long.parseLong(msg.getProperty(MessageConst.PROPERTY_TIMER_DELIVER_MS));
        }
    } catch (Exception e) {
   
        return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);
    }
    if (deliverMs > System.currentTimeMillis()) {
   
        if (delayLevel <= 0 && deliverMs - System.currentTimeMillis() > brokerController.getMessageStoreConfig().getTimerMaxDelaySec() * 1000) {
   
            return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);
        }
        // 处理精度
        int timerPrecisionMs = brokerController.getMessageStoreConfig().getTimerPrecisionMs();
        if (deliverMs % timerPrecisionMs == 0) {
   
            deliverMs -= timerPrecisionMs;
        } else {
   
            deliverMs = deliverMs / timerPrecisionMs * timerPrecisionMs;
        }

        if (brokerController.getTimerMessageStore().isReject(deliverMs)) {
   
            return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_FLOW_CONTROL, null);
        }
        // 改写Topic,把真实Topic写入properties
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TIMER_OUT_MS, deliverMs + "");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
        msg.setTopic(TimerMessageStore.TIMER_TOPIC);
        msg.setQueueId(0);
    } else if (null != msg.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY)) {
   
        return new PutMessageResult(PutMessageStatus.WHEEL_TIMER_MSG_ILLEGAL, null);
    }
    return null;
}

后续就是把消息写入 CommitLog,因为改写了 Topic,所以消费者现在是没办法消费延时消息的。
接着就是 TimerEnqueueGetService 线程消费rmq_sys_wheel_timer队列,取出延时消息,构建 TimerRequest 放到 enqueuePutQueue。

public boolean enqueue(int queueId) {
   
	......
    ConsumeQueue cq = (ConsumeQueue) this.messageStore.getConsumeQueue(TIMER_TOPIC, queueId);
    SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(offset);
    for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
   
        // 取出原始消息
    	MessageExt msgExt = getMessageByCommitOffset(offsetPy, sizePy);
        // 延迟时间
        long delayedTime = Long.parseLong(msgExt.getProperty(TIMER_OUT_MS));
        // use CQ offset, not offset in Message
        msgExt.setQueueOffset(offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE));
        TimerRequest timerRequest = new TimerRequest(offsetPy, sizePy, delayedTime, System.currentTimeMillis(), MAGIC_DEFAULT, msgExt);
        while (true) {
   
            // 延时消息封装成TimerRequest入队
            if (enqueuePutQueue.offer(timerRequest, 3, TimeUnit.SECONDS)) {
   
                break;
            }
            if (!isRunningEnqueue()) {
   
                return false;
            }
        }
    }
	......
}

再是 TimerEnqueuePutService 线程从 enqueuePutQueue 取出 先前构建好的TimerRequest,写入 timerlog。

public void run() {
   
    ......
	while (!this.isStopped() || enqueuePutQueue.size() != 0) {
   
        TimerRequest firstReq = enqueuePutQueue.poll(10, TimeUnit.MILLISECONDS);
        // 延迟时间小于当前时间,入队dequeuePutQueue
        if (shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs) {
   
            dequeuePutQueue.put(req);
        } else {
   
            // 写入timerlog
            boolean doEnqueueRes = doEnqueue(req.getOffsetPy(), req.getSizePy(), req.getDelayTime(), req.getMsg());
            req.idempotentRelease(doEnqueueRes || storeConfig.isTimerSkipUnknownError());
        }
    }
	......
}

首先需要定位到 Slot,再顺序写 timerlog,最后更新 Slot 信息。

public boolean doEnqueue(long offsetPy, int sizePy, long delayedTime, MessageExt messageExt) {
   
    LOGGER.debug("Do enqueue [{}] [{}]", new Timestamp(delayedTime), messageExt);
    long tmpWriteTimeMs = currWriteTimeMs;
    // 超过一轮时间周期,到期后需要重新进入下一期时间轮等待
    boolean needRoll = delayedTime - tmpWriteTimeMs >= timerRollWindowSlots * precisionMs;
    int magic = MAGIC_DEFAULT;
    if (needRoll) {
   
        magic = magic | MAGIC_ROLL;
        if (delayedTime - tmpWriteTimeMs - timerRollWindowSlots * precisionMs < timerRollWindowSlots / 3 * precisionMs) {
   
            //give enough time to next roll
            delayedTime = tmpWriteTimeMs + (timerRollWindowSlots / 2) * precisionMs;
        } else {
   
            delayedTime = tmpWriteTimeMs + timerRollWindowSlots * precisionMs;
        }
    }
    boolean isDelete = messageExt.getProperty(TIMER_DELETE_UNIQKEY) != null;
    if (isDelete) {
   
        magic = magic | MAGIC_DELETE;
    }
    String realTopic = messageExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
    // 定位Slot,顺序写timerLog
    Slot slot = timerWheel.getSlot(delayedTime);
    ByteBuffer tmpBuffer = timerLogBuffer;
    tmpBuffer.clear();
    tmpBuffer.putInt(TimerLog.UNIT_SIZE); //size
    tmpBuffer.putLong(slot.lastPos); //prev pos
    tmpBuffer.putInt(magic); //magic
    tmpBuffer.putLong(tmpWriteTimeMs); //currWriteTime
    tmpBuffer.putInt((int) (delayedTime - tmpWriteTimeMs)); //delayTime
    tmpBuffer.putLong(offsetPy); //offset
    tmpBuffer.putInt(sizePy); //size
    tmpBuffer.putInt(hashTopicForMetrics(realTopic)); //hashcode of real topic
    tmpBuffer.putLong(0); //reserved value, just set to 0 now
    long ret = timerLog.append(tmpBuffer.array(), 0, TimerLog.UNIT_SIZE);
    if (-1 != ret) {
   
        // 更新timerWheel对应的Slot
        timerWheel.putSlot(delayedTime, slot.firstPos == -1 ? ret : slot.firstPos, ret,
            isDelete ? slot.num - 1 : slot.num + 1, slot.magic);
        addMetric(messageExt, isDelete ? -1 : 1);
    }
    return -1 != ret;
}

不管是 timerlog 还是 timerWheel 文件,都是需要频繁写的,为了提高性能,RocketMQ 均使用 mmap 技术写,然后定时 flush 到磁盘。

然后是 TimerDequeueGetService 线程,定时扫描时间轮,取出到期的TimerRequest,放入dequeueGetQueue

public int dequeue() throws Exception {
   
    // 定位到Slot
    Slot slot = timerWheel.getSlot(currReadTimeMs);
    if (-1 == slot.timeMs) {
   // Slot是空的
        moveReadTime();
        return 0;
    }
    long currOffsetPy = slot.lastPos;
    while (currOffsetPy != -1) {
   
        // 定位timerlog
        timeSbr = timerLog.getWholeBuffer(currOffsetPy);
        int position = (int) (currOffsetPy % timerLogFileSize);
        timeSbr.getByteBuffer().position(position);
        timeSbr.getByteBuffer().getInt(); //size
        prevPos = timeSbr.getByteBuffer().getLong();
        int magic = timeSbr.getByteBuffer().getInt();
        long enqueueTime = timeSbr.getByteBuffer().getLong();
        long delayedTime = timeSbr.getByteBuffer().getInt() + enqueueTime;
        long offsetPy = timeSbr.getByteBuffer().getLong();
        int sizePy = timeSbr.getByteBuffer().getInt();
        // timerlog再转换成TimerRequest
        TimerRequest timerRequest = new TimerRequest(offsetPy, sizePy, delayedTime, enqueueTime, magic);
        timerRequest.setDeleteList(deleteUniqKeys);
        for (List<TimerRequest> normalList : splitIntoLists(normalMsgStack)) {
   
            for (TimerRequest tr : normalList) {
   
                tr.setLatch(normalLatch);
            }
            // TimerRequest入队dequeueGetQueue
            dequeueGetQueue.put(normalList);
        }
    }
	......
}

再是 TimerDequeueGetMessageService 线程,从 dequeueGetQueue 取出 TimerRequest,从 CommitLog 查询完整消息,放入 dequeuePutQueue。

public void run() {
   
    while (!this.isStopped()) {
   
        List<TimerRequest> trs = dequeueGetQueue.poll(100 * precisionMs / 1000, TimeUnit.MILLISECONDS);
        for (int i = 0; i < trs.size(); ) {
   
            // CommitLog查询完整消息
            MessageExt msgExt = getMessageByCommitOffset(tr.getOffsetPy(), tr.getSizePy());
            String uniqkey = MessageClientIDSetter.getUniqID(msgExt);
            if (null != uniqkey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0 && tr.getDeleteList().contains(uniqkey)) {
   
                doRes = true;
                tr.idempotentRelease();
                perfs.getCounter("dequeue_delete").flow(1);
            } else {
   
                tr.setMsg(msgExt);
                while (!isStopped() && !doRes) {
   
                    // 入队
                    doRes = dequeuePutQueue.offer(tr, 3, TimeUnit.SECONDS);
                }
            }
        }
    }
}

最后是 TimerDequeuePutMessageService 线程,从 dequeuePutQueue 取出 TimerRequest,判断消息是否到期,到期直接投递到真实的 Topic,没到期进入下一期时间轮。

public void run() {
   
    while (!this.isStopped() || dequeuePutQueue.size() != 0) {
   
        TimerRequest tr = dequeuePutQueue.poll(10, TimeUnit.MILLISECONDS);
        // 消息转换 如果到期则复原Topic queueID
        MessageExtBrokerInner msg = convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic()));
        doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
        while (!doRes && !isStopped()) {
   
            if (!isRunningDequeue()) {
   
                dequeueStatusChangeFlag = true;
                tmpDequeueChangeFlag = true;
                break;
            }
            // 重写写入CommitLog
            doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
            Thread.sleep(500 * precisionMs / 1000);
        }
    }
    .......
}

时间轮对应的类是 TimerWheel,它对应磁盘上的一个文件,由若干个 Slot 组成,因为要随机读写,所以 RocketMQ 使用 RandomAccessFile 来读写文件。

public class TimerWheel {
   
    // 总的Slot数量
    public final int slotsTotal;
    // 时间精度
    public final int precisionMs;
    // 文件名
    private String fileName;
    private final RandomAccessFile randomAccessFile;
    private final FileChannel fileChannel;
    // mmap
    private final MappedByteBuffer mappedByteBuffer;
    private final ByteBuffer byteBuffer;
    private final ThreadLocal<ByteBuffer> localBuffer = new ThreadLocal<ByteBuffer>() {
   
        @Override
        protected ByteBuffer initialValue() {
   
            return byteBuffer.duplicate();
        }
    };
    // 时间轮文件大小
    private final int wheelLength;
}

Slot 类的属性:

public class Slot {
   
    public static final short SIZE = 32;
    public final long timeMs; // 延迟时间
    public final long firstPos; // 第一个消息的位置
    public final long lastPos; // 最后一个消息的位置
    public final int num; // 消息数量
    public final int magic; //no use now, just keep it
}

最后是 TimerLog,它底层对应一组文件,单个文件限制在 100MB 大小,如果写满了就创建新的文件继续写。因为是顺序写的,所以效率很高。

public class TimerLog {
   
    private static InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    public final static int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
    private final static int MIN_BLANK_LEN = 4 + 8 + 4;
    public final static int UNIT_SIZE = 4  //size
            + 8 //prev pos
            + 4 //magic value
            + 8 //curr write time, for trace
            + 4 //delayed time, for check
            + 8 //offsetPy
            + 4 //sizePy
            + 4 //hash code of real topic
            + 8; //reserved value, just in case of
    public final static int UNIT_PRE_SIZE_FOR_MSG = 28;
    public final static int UNIT_PRE_SIZE_FOR_METRIC = 40;
    private final MappedFileQueue mappedFileQueue;
    private final int fileSize;
}

尾巴

RocketMQ 5.0 解除了 4.x 版本延时消息延迟级别的时间限制,现在生产者可以设置任意延迟时间了,功能上更加强大,实现上也更加复杂。RocketMQ 引入了新的时间轮算法,简单理解就是把时间按照精度划分成 N 个Slot,消息会按照延迟时间加入到对应的 Slot,然后线程定时扫描时间轮,把 Slot 对应的到期消息重新投递即可。新的算法不仅实现更复杂,RocketMQ 还需要额外写 timerwheel 和 timerlog 文件,这两个文件也是要持久化定期刷盘的。

相关推荐

  1. 时间算法

    2024-01-04 11:12:02       32 阅读
  2. 时间算法

    2024-01-04 11:12:02       11 阅读
  3. for循环时间计算

    2024-01-04 11:12:02       35 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-04 11:12:02       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-04 11:12:02       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-04 11:12:02       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-04 11:12:02       18 阅读

热门阅读

  1. 数据挖掘--决策树

    2024-01-04 11:12:02       37 阅读
  2. 总结HarmonyOS的技术特点

    2024-01-04 11:12:02       31 阅读
  3. HarmonyOS开发环境配置

    2024-01-04 11:12:02       39 阅读
  4. HarmonyOS简介

    2024-01-04 11:12:02       34 阅读