<返回更多

SpringBoot 与RabbitMQ、RocketMQ高可靠、高性能、分布式应用实践

2023-03-09  今日头条  摆脱格子衫
加入收藏


Spring Boot 是一个基于 Spring 框架的快速开发框架,而 RabbitMQ 和 RocketMQ 则是常用的消息队列中间件。下面是它们常用的一些用法和场景。

订单处理

电商等系统中,下单后需要进行一系列的处理,包括库存扣减、支付通知等。可以将订单相关的消息发送到消息队列中,由消费者异步地进行处理,从而提高系统的处理能力和可靠性。

使用 RabbitMQ 实现订单处理的示例代码:

// 发送订单消息 rabbitTemplate.convertAndSend("order-exchange", "order-routing-key", order); // 消费订单消息 @RabbitListener(queues = "order-queue") public void handleOrderMessage(Order order) { // 处理订单相关的业务逻辑 }

使用 RocketMQ 实现订单处理的示例代码:

// 发送订单消息 DefaultMQProducer producer = new DefaultMQProducer("order-producer-group"); producer.setNamesrvAddr("localhost:9876"); Message message = new Message("order-topic", "order-tag", order.toString().getBytes()); producer.send(message); // 消费订单消息 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer-group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("order-topic", "order-tag"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // 处理订单相关的业务逻辑 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); 日志处理

在分布式系统中,各个节点产生的日志需要进行集中处理和存储,以便后续的分析和监控。可以使用消息队列将日志消息发送到中心化的日志系统中。

使用 RabbitMQ 实现日志处理的示例代码:

// 发送日志消息 rabbitTemplate.convertAndSend("log-exchange", "log-routing-key", log); // 消费日志消息 @RabbitListener(queues = "log-queue") public void handleLogMessage(Log log) { // 处理日志相关的业务逻辑 }

使用 RocketMQ 实现日志处理的示例代码:

// 发送日志消息 DefaultMQProducer producer = new DefaultMQProducer("log-producer-group"); producer.setNamesrvAddr("localhost:9876"); Message message = new Message("log-topic", "log-tag", log.toString().getBytes()); producer.send(message); // 消费日志消息 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("log-consumer-group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("log-topic", "log-tag"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // 处理日志相关的业务逻辑 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); 分布式事务

在分布式系统中,不同的服务之间需要进行事务管理,以保证数据的一致性和可靠性。可以使用消息队列来实现分布式事务的消息确认和回滚。

使用 RabbitMQ 实现分布式事务的示例代码:

// 发送订单消息和库存消息 rabbitTemplate.invoke(new RabbitCallback<Void>() { public Void doInRabbit(Channel channel) throws Exception { channel.txSelect(); channel.basicPublish("order-exchange", "order-routing-key", null, order.toString().getBytes()); channel.basicPublish("stock-exchange", "stock-routing-key", null, stock.toString().getBytes()); channel.txCommit(); return null; } }); // 消费订单消息和库存消息 @RabbitListener(queues = "order-queue") @Transactional public void handleOrderMessage(Order order) { // 处理订单相关的业务逻辑 } @RabbitListener(queues = "stock-queue") @Transactional public void handleStockMessage(Stock stock) { // 处理库存相关的业务逻辑 }

使用 RocketMQ 实现分布式事务的示例代码:

// 发送订单消息和库存消息 TransactionMQProducer producer = new TransactionMQProducer("transaction-producer-group"); producer.setNamesrvAddr("localhost:9876"); producer.setTransactionListener(new TransactionListener() { public LocalTransactionState executeLocalTransaction(Message message, Object arg) { // 执行本地事务 return LocalTransactionState.COMMIT_MESSAGE; } public LocalTransactionState checkLocalTransaction(MessageExt message) { // 检查本地事务状态 return LocalTransactionState.COMMIT_MESSAGE; } }); Message message1 = new Message("order-topic", "order-tag", order.toString().getBytes()); Message message2 = new Message("stock-topic", "stock-tag", stock.toString().getBytes()); TransactionSendResult result = producer.sendMessageInTransaction(Arrays.asList(message1, message2), null); // 消费订单消息和库存消息 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction-consumer-group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("order-topic", "order-tag"); consumer.subscribe("stock-topic", "stock-tag"); consumer.registerMessageListener(new MessageListenerOrderly() { public ConsumeOrderlyStatus consumeMessage(List<MessageExt> messages, ConsumeOrderlyContext context) { for (MessageExt message : messages) { // 处理订单或库存相关的业务逻辑 } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); 消息队列比较

RabbitMQ 和 RocketMQ 都是常用的消息队列中间件,它们在特点和使用上有些区别。

RabbitMQ 采用 AMQP(高级消息队列协议),支持多种编程语言和多种操作系统。RabbitMQ 的消息传输可靠性较高,但性能较低。 RocketMQ 采用自定义的协议,适合 JAVA 应用,支持高吞吐量和高可用性。RocketMQ 的消息传输可靠性较低,但性能较高。
声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>