<返回更多

如何保证MQ消息不丢失?

2022-06-04    蚂蚁背大象的日常
加入收藏

1 背景

金融系统中MQ消息的消息丢失是不允许的,消息的丢失会导致支付状态订单状态出现混乱。接下来聊一下如何保证MQ消息不丢失,以笔者公司使用的RocketMQ为例。

2 RokectMQ消息什么情况下会丢失?

MQ的消息生成到消费主要经历三个阶段:MQ消息生产、RocketMQ Broker存储消息、消费者消息对应的消息。如下图:

 

如何保证MQ消息不丢失?

 

 

从上图可以知道消息丢失主要会发生在下面几个地方:

3 如何解决RokectMQ消息丢失

解决消息丢失从消息丢失的地方入手。

3.1 消息生产防止消息丢失

RocketMQ消息生产方式有三种:同步发送消息、异步发送消息、One-Way发送消息。不同的发送方式使用不同的场景:

同步发送消息: 重要的通知(订单状态的更新)、短信系统。

异步发送消息: 通常用于响应时间敏感的业务场景。

One-way: 主要用于对可靠性要求不高的场景,在金融的场景下不适用。一般是用于日志收集。

根据上面三种发送方式的特点, one-way 消息发送模式本身就是对消息的丢失无法保证。所以如果你的系统对消息丢失零容忍不能使用 one-way 的方式发送。同步发送消息和异步发送消息 都可以判断消息的发送状态判断消息是否已经发送到Broker。这里是选择同步发送还是异步发送消息看业务的需要,同步发送比较关心发送后返回的结果对时间的要求不是那么敏感。异步发送对消息返回时间敏感。

 

如何保证MQ消息不丢失?

 

 

SendResult定义说明(来自RocketMQ官方)

3.2 RocketMQ Broker防丢失消息

首先了解一下Broker集群部署模式(官方方案)。

单Master模式

这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。

多Master模式

一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下

多Master多Slave模式-异步复制

每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下

多Master多Slave模式-同步双写

每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

如果是想不存在消息丢失的情况,那么在多Master的情况下要配置消息同步刷盘,而在 多Master多Slave模式-同步双写 的情况下配置同步刷盘。

3.3 消费端处理消息

消息消费示例代码如下:

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
         
        // Specify name server addresses.
        consumer.setNamesrvAddr("localhost:9876");
        
        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                //处理消息
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

在处理消息的时候,如果消息处理失败返回的状态不应该是
ConsumeConcurrentlyStatus.CONSUME_SUCCESS 。如果消息处理失败返回的是
ConsumeConcurrentlyStatus.CONSUME_SUCCESS 消息就不能再次被消息。在Broker看来就是已经消费完成。

4 总结

MQ消息的丢失主要发生在发送、存储、消费消息的三个阶段,所以需要防止消息丢失也要从这三个方面着手。

我是蚂蚁背大象,文章对你有帮助点赞关注我,文章有不正确的地方请您斧正留言评论~谢谢!

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>