<返回更多

好奇延时消息如何实现吗?来看RocketMQ是怎么做的-源码解读

2022-08-03    杂文论
加入收藏

一、背景

之前在做电商相关业务的时候,有一个常见的需求场景是:用户下单之后,超过半小时不支付,就取消订单。现在我们在淘宝京东买东西,或者通过美团点外卖,下单之后,如果不在指定时间内支付,订单也会取消。 那么,如何实现这样的超时取消逻辑呢,通过消息队列的延时消息,是一个非常稳定的实现方案。

RocketMQ 就提供了这样的延时消息功能,producer 端在发送消息时,设置延迟级别,从秒级到分钟小时等等。消息在发送之后,会在消息队列的服务器进行存储。等过了设定的延迟时间之后,消息才会被consumer端消费到。

 

如果我们在下单的时候,发送一条设置延时30分钟的消息,这条消息会在30分钟之后被下游系统消费,然后判断订单有没有支付,如果没有支付,则取消订单。那么这样,通过消息队列就完成了一个延迟取消的逻辑了。

二、原理

设置延时

先来看一下如何设置消息的延时 消息体可以通过setDelayTimeLevel方法来设置延时级别

public void produce() {
    Message msg = new Message("TopicTest",
        "TagA",
        "OrderID188",
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    msg.setDelayTimeLevel(1)
    SendResult sendResult = producer.send(msg);
}

public void consume() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.subscribe("TopicTest", "TagA");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
}

其实是将延迟信息存到 Message 的 property 中(property是一个保存meta信息的hashmap)

public void setDelayTimeLevel(int level) {
    this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}

void putProperty(final String name, final String value) {
    if (null == this.properties) {
        this.properties = new HashMap<String, String>();
    }

    this.properties.put(name, value);
}

之后,broker收到 message之后,会根据 message 中设置的延时级别进行处理 可以看看延时级别的具体情况: 一共分为18个级别(1-18),对应时间从1s到2h

public class MessageStoreConfig {
    
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

}

那么整个系统是怎么做到让consumer在设定的延时之后,开始消费指定消息的呢?

不得不说,RocketMQ 的设计还是挺巧妙的,我们接着往下看。

消息预存

对于broker收到的延时消息,并不是和普通消息一样,进入发送端指定的topic中, 而是进入专门的延时topic中,延时topic有18条队列(queueId 编号0-17),queueId 和 delayLevel 的关系是 queueId + 1 = delayLevel,是一一对应的。所以计算延时消息的待执行时间deliverTimestamp之后,会将消息存入对应延时级别的队列中。

// 如果是延迟消息
if (msg.getDelayTimeLevel() > 0) {
    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
    }
    
    // 重设延迟消息的topic和queueId,topic为指定的RMQ_SYS_SCHEDULE_TOPIC
    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

    ...
    // 将实际的指定topic和queueId进行存入property,进行备份
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));        
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

    msg.setTopic(topic);
    msg.setQueueId(queueId);
}

之后,会由ScheduleMessageService来进行任务处理。ScheduleMessageService是broker启动时就开始执行的,用来处理延迟队列中的消息,处理的逻辑如下所示。

public class ScheduleMessageService extends ConfigManager {

    // key: delayLevel | value: delayTimeMillis 
    private final ConcurrentMap<Integer, Long> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);

    public void start() {
        // 创建一个Timer,用于执行定时任务
        this.timer = new Timer("ScheduleMessageTimerThread", true);
            
        // 这里对每个delayLevel的queue都创建一个DeliverDelayedMessageTimerTask,
        // 用来处理对应queue中的消息
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
    }
    
}

ScheduleMessageService启动之后,会根据延时队列的数目创建一一对应的
DeliverDelayedMessageTimerTask,然后周期执行。该类继承自TimerTask,是JDK的工具类,用于执行定时任务,原理可以参考这篇文章 如何实现定时任务- JAVA Timer/TimerTask 源码原理解析

消息转投

可以看到
DeliverDelayedMessageTimerTask实现的 run 方法,主要逻辑都在executeOnTimeup方法中,从对应的延迟队列中取出时间已到的 message,发送到 message 对应原始topic的队列中。只要队列没有发生消费积压,message 就会马上被消费了。(这部分的代码实现比较复杂,感兴趣可以去看对应的源码)

class DeliverDelayedMessageTimerTask extends TimerTask {
    private final int delayLevel;
    private final long offset;

    public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
        this.delayLevel = delayLevel;
        this.offset = offset;
    }

    @Override
    public void run() {
        try {
            if (isStarted()) {
                this.executeOnTimeup();
            }
        } catch (Exception e) {
            // XXX: warn and notify me
            log.error("ScheduleMessageService, executeOnTimeup exception", e);
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
        }
    }

    public void executeOnTimeup() {
        ConsumeQueue cq =
            ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                delayLevel2QueueId(delayLevel));

        long failScheduleOffset = offset;

        if (cq != null) {
            
            // 这部分是核心逻辑,实现的是 从延时消息队列中取出 deliverTimestamp - now <= 0 的消息,
            // 将消息从延时queue移到原本指定Topic的queue中,这些消息就马上会被consumer消费。
        } 

        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
            failScheduleOffset), DELAY_FOR_A_WHILE);
    }
}

总体的原理示意图,如下所示:

 

broker 在接收到延时消息的时候,会将延时消息存入到延时TOPIC的队列中,然后ScheduleMessageService中,每个 queue 对应的定时任务会不停地被执行,检查 queue 中哪些消息已到设定时间,然后转发到消息的原始TOPIC,这些消息就会被各自的 producer 消费了。

三、拓展-消费重试

平常在使用RocketMQ的时候,一般会依赖consumer的消费重试功能。 而consumer端的消费重试,其实也是通过这个和延时队列差不多的原理来实现的。

public void consume() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.subscribe("TopicTest", "TagA");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            // 这里如果返回RECONSUME_LATER,就会重试消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
}

RocketMQ规定,以下三种情况统一按照消费失败处理并会发起重试。

业务代码中,一般会利用重试功能去做下游逻辑的重试。而RocketMQ的重试并不是固定时间间隔重复进行,二是采取的退避式重试,重试的时间间隔会不断变长。 这个时间间隔,和设置delayLevel的延时类似。

Consumer客户端会通过processConsumeResult方法处理每一条消息的消费结果,如果判断需要进行重试,则会通过sendMessageBack方法将消息发送到broker,重试消息会带上已重试次数的信息。

broker收到消息之后,SendMessageProcessor会对重试消息进行处理,设置topic为RETRY_TOPIC,具体逻辑如下:

public class SendMessageProcessor
    extends AbstractSendMessageProcessor
    implements.NETtyRequestProcessor {

    private RemotingCommand asyncConsumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {

        // 给重试消息设置新的topic
        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

        // 根据已经发生重试的次数确定delayLevel
        if (0 == delayLevel) {
            delayLevel = 3 + msgExt.getReconsumeTimes();
        }
        msgExt.setDelayTimeLevel(delayLevel);

        // 重试次数+1
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        // 存储消息
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

        // ...
    }   
}

存储消息的时候,CommitLog.putMessage方法内部判断如果设置了delayLevel,就会重设topic为SCHEDULE_TOPIC,然后将消息存储到延时队列中,后续就是和ScheduleMessageService的逻辑相同。

整个消息重试的逻辑示意图如下所示:

 

如图所示

  1. Consumer在消费的时候,都会订阅指定的TOPIC-NORMAL_TOPIC和该ConsumerGroup对应的重试TOPIC-RETRY_GROUP1_TOPIC,同时消费来自这两个topic中的消息。
  2. 当发生消费失败后,Client端会调用sendMessageBack方法将失败消息发送回broker。
  3. broker端的SendMessageProcessor会根据当前的重试次数确定延时级别,将消息存入延时队列-SCHEDULE_TOPIC中。
  4. ScheduleMessageService会将到期的消息重新发送到重试TOPIC-RETRY_GROUP1_TOPIC中,这个时候消息被Consumer消费,就完成了整个重试过程。

可以对比之前的延时消息流程去看,其实重试消息就是将失败的消息当作延时消息进行处理,只不过最后投入的是专门的重试消息队列中。

四、总结

延时消息都是非常日常业务使用中很重要的功能,而RocketMQ通过时间片分级+多队列+定时任务,就实现了这样的功能,设计上是很巧妙的。并且消费重试采用退避式的策略,重试时间的梯度刚好与延时消息策略一致,这样就可以直接利用延时队列去完成消息重试的功能,从策略上来说非常合理(消息消费重复失败,在短时间内重试成功的可能性比较低),并且复用了底层代码,这些是值得去学习和借鉴的。

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>