准备工作纯JAVA项目依赖
创建一个maven-java项目在pom.xml中添加以下依赖:
com.RabbitMQ:amqp-client:5.15.0org.projectlombok:lombok:1.18.24junit:junit:4.13.2org.Apache.logging.log4j:log4j-api:2.18.0org.apache.logging.log4j:log4j-core:2.18.0org.apache.logging.log4j:log4j-slf4j-impl:2.18.0org.slf4j:slf4j-api:1.7.36cn.hutool:hutool-core:5.8.5
日志配置文件
创建log4j2.xml文件,用于输出日志:
连接工具类
package wj.mq.utils;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class ConnUtils {public static Connection newConnection(){ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.56.61");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("123456");Connection connection = null;try {connection = factory.newConnection();} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);return connection;
最后的项目结构如下:
SpringBoot项目依赖
配置文件server.port=8888spring.Application.name=helloworld# 配置mqspring.rabbitmq.host=192.168.56.61spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=admin# 配置日志logging.level.root=INFOlogging.file.name=./logs/log.log
相关注解
@RabbitListener(queues = {"HelloSpring"}) : 可以注解在类上,也可以注解在方法上,消费消息。
@RabbitHandler() : 注解在方法上,与@RabbitListener共同使用。
简单模式
java项目生产者Publisher代码package wj.rabbitmq.demo01;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import lombok.extern.slf4j.Slf4j;import wj.mq.utils.ConnUtils;import java.nio.charset.StandardCharsets;@Slf4jpublic class Publisher {public static void main(String[] args) throws Exception {String queueName = "HelloQueue";//获取一个连接try(Connection connection = ConnUtils.newConnection()){//获取信道Channel channel = connection.createChannel();//声明一个队列名称为channel.queueDeclare(queueName, true, false,false, null);//发送消息:Exchange空默认使用/交换机channel.basicPublish("",queueName,null,"HelloWorld".getbytes(StandardCharsets.UTF_8));log.info("送信息完成");
查看UI
发送完成以后,查看RabbitMQ的UI管理界面中的Queue选项卡:
点HelloQueue这个队列名称,进入详细界面:
查看Bindings,可以看到目前是默认交换机(即AMQP Default)绑定到当前这个队列:
消费者Consumer代码package wj.rabbitmq.demo01;import com.rabbitmq.client.*;import lombok.extern.slf4j.Slf4j;import wj.mq.utils.ConnUtils;@Slf4jpublic class Consumer {public static void main(String[] args) throws Exception {String queueName = "HelloQueue";try (Connection con = ConnUtils.newConnection()) {Channel channel = con.createChannel();channel.queueDeclare(queueName, true, false,false, null);//接收到数据以后的回调函数Delivercallback callback = (consumerTag, message) -> {byte[] body = message.getBody();String str = new String(body);log.info("接收到信息:{}", str);//取消处理信息的回调CancelCallback cancelCallback = consumerTag -> {//ignore//消费监听channel.basicConsume(queueName, false, callback, cancelCallback);
截图:
运行后效果,即输出从队列中读取的数据:
09:40:22.886 [pool-2-thread-4] 接收到信息:HelloWorld
SpringBoot项目创建Springboot项目并添加以下依赖
配置类的主要功能是配置Queue,Exchange等。以下我这儿仅需要一个Queue所以仅配置了一个Queue。
package wj.mq.config;import java.util.HashMap;import java.util.Map;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import lombok.extern.slf4j.Slf4j;@Slf4j@Configurationpublic class RabbitMQConfig {* 在使用队列之前,必须要先声明这个队列,如果不在这儿声明,就必须要在@RabbitListener时添加queuesToDeclare如下:@RabbitListener(queues = {"HelloSpring"},queuesToDeclare = @Queue("HelloSpring"))* @return@Beanpublic Queue helloSpringQueue() {Map args = new HashMap<>();args.put("name", "HelloSpring");Queue queue = new Queue("HelloSpring",true, false, false,args);log.info("队列创建成功:{}",queue.getName());return queue;
启动类
与普通的启动的类类似,只是为了使用调度功能,添加了@EnableScheduling注解。
package wj.mq;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.ResponseBody;import lombok.extern.slf4j.Slf4j;@Slf4j@Controller@EnableScheduling@SpringBootApplicationpublic class HelloWorldApplication {public static void main(String[] args) {SpringApplication.run(HelloWorldApplication.class, args);log.info("Started..");@ResponseBody@RequestMapping(value = {"/",""})public String index() {return "index";
添加EnableScheduling注解,后面用到定时任务,让生产者定时发布消息:
生产者
convertAndSend可以发布任意的对象,默认会被转换成指定的格式
package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;@Slf4j@Componentpublic class HelloWorldSender {@Autowiredprivate RabbitTemplate rabbitTemplate;private static int times = 1;@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send() {String msg = ""+times;rabbitTemplate.convertAndSend("HelloSpring",msg);log.info("Send {} ok",msg);times++;
添加@Component注解为Spring Bean组件。
注入rabbitTemplate。
通过convertAndSend(QueueName,Object)发送数据给批定的队列。
消费者package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;@Slf4j@Component@RabbitListener(queues = {"HelloSpring"})public class HelloWorldReceiver {@RabbitHandlerpublic void onMessage(String message) {log.info("消费者1,接收到信息: {}",message);
在类上添加注解@RabbitListener(queues=..)
在接收信息的方法上添加@RabbitHandler
消费者2(另一种使用注解的方法)
package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;* 第二种接收 信息的方法,就是直接在方法上添加RabbitListener@Slf4j@Componentpublic class HelloWorldReceiver2 {* 并设置为手工确认@RabbitListener(queues = "HelloSpring",ackMode ="MANUAL")public void onMessage(Message message,Channel channel) throws Exception {log.info("消费者2,接收到信息: {}",new String(message.getBody()));long id = message.getMessageProperties().getDeliveryTag();//手工确认channel.basicAck(id, false);
直接将注解写在方法上,并设置为手工确认。
注意接收的参数为Message对象和Channel对象。
使用channel.ack进行手工确认。
生产者-发布一个对象声明JavaBean必须实现序列化接口package wj.mq.domain;import java.io.Serializable;import lombok.Getter;import lombok.Setter;@Getter@Setterpublic class Person implements Serializable {private static final long serialVersionUID = 1L;private String name;private Integer age;生产者
注意以下通过convertAndSend直接发布了一个对象。
package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import wj.mq.domain.Person;@Componentpublic class HelloWorldSender {@Autowiredprivate Queue helloSpringQueue;@Autowiredprivate RabbitTemplate rabbitTemplate;private static int times = 1;@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send() {Person person = new Person();//声明对象并直接发布person.setName("Jack");person.setAge(times);rabbitTemplate.convertAndSend(helloSpringQueue.getName(),person);times++;
查看RabbitMQ UI可见编码类型为序列化再转base64的方式:
消费者-消费一个对象直接接收消费对象package wj.mq.rabbitmq.helloworld;import cn.hutool.core.thread.ThreadUtil;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.alibaba.fastJSON2.JSONObject;import lombok.extern.slf4j.Slf4j;import wj.mq.domain.Person;import java.util.Random;@Slf4j@Component@RabbitListener(queues = {"HelloSpring"})public class HelloWorldReceiver {private Random random = new Random();private int count =1;* 直接接收Peson对象即可@RabbitHandler()public void onMessage(Person message) {int sleep = 1000*random.nextInt(20);ThreadUtil.sleep(sleep);log.info("消费者1,休眠{}ms后处理完成信息:{},共处理{}消息",sleep,JSONObject.toJSONString(message),count++);
接收Message对象,然后自己做转换package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;import com.alibaba.fastjson2.JSONObject;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import wj.mq.domain.Person;import java.io.ByteArrayInputStream;import java.io.ObjectInputStream;* 第二种接收 信息的方法,就是直接在方法上添加RabbitListener@Slf4j@Componentpublic class HelloWorldReceiver2 {int count = 1;* 并设置为手工确认@RabbitListener(queues = "HelloSpring",ackMode ="MANUAL")public void onMessage(@Payload()Message message,Channel channel) throws Exception {int sleep = 0;//通过ObjectReader读取对象ObjectInputStream objIn = new ObjectInputStream(new ByteArrayInputStream(message.getBody()));Object readObject = objIn.readObject();Person p = (Person) readObject;objIn.close();log.info("消费者2,休眠{}ms后处理完成信息: {},共处理{}消息",sleep,JSONObject.toJSONString(p),count++);long id = message.getMessageProperties().getDeliveryTag();channel.basicAck(id, false);//手工确认
最后显示的结果,与之前相同
最终项目结构
运行测试
两个消费者,依次接收RabbitMQ发送的消息:
关于springboot中的更多设置设置MessageConvert
默认情况下,通过rabbitTemplate.sendAndConvert(..)发送的object对象会使用Java序列化方式传输。可以在UI界面上,通过读取一个Queue中的消息的方式,查看队列中的数据格式:java-serialized。
我们可以修改这种格式,如JSON
以下注意给RabbitTemplate.setMessageConverter(..)即可以设置为JSON转换格式。
package wj.mq.config.helloworld;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class HelloWorldConfig {@Autowiredprivate ConnectionFactory connectionFactory;@Beanpublic Queue helloSpringQueue() {return new Queue("HelloSpringQueue", true, false,false);@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rt =new RabbitTemplate();rt.setConnectionFactory(connectionFactory);rt.setMessageConverter(messageConverter());return rt;
设置后,查看Message的格式
接收 deliverTag
仅通过@RabbitListener的方法上,可以通过添加@Header接收具体指定的数据,如下。
注意@header的部分的代码。
package wj.mq.rabbitmq.helloworld;import cn.hutool.core.thread.ThreadUtil;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;import com.alibaba.fastjson2.JSONObject;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import wj.mq.domain.Person;import java.util.Random;@Slf4j@Component@RabbitListener(queues = {"#{helloSpringQueue.name}"})public class HelloWorldReceiver {private Random random = new Random();private int count =1;* 直接接收Peson对象即可@RabbitHandler()public void onMessage(@Payload() Person message,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) {int sleep = 1000*random.nextInt(20)*0;ThreadUtil.sleep(sleep);log.info("消费者1,休眠{}ms后处理完成信息:{},共处理{}消息,消息tag={}",sleep,JSONObject.toJSONString(message),count++,deliveryTag);
或,直接将@RabbitListener注解在方法上
可也可以接收一个Message对象
最后显示的效果如下: