<返回更多

微服务与中间件系列——RabbitMQ,SpringAMQP使用

2022-08-26    Java互联网技术栈
加入收藏

同步通讯

发送方发出数据后,等接收方发回响应以后才发下一个数据包的通讯方式

同步调用的时效性强,可以立即获取结果

同步调用的问题

我们以前在使用Feign或OpenFeign时,就是使用的同步调用

  1. 代码耦合度高:每次加入新的需求,都要修改原来的代码
  2. 性能低:调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和。
  3. 资源浪费:调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
  4. 级联失败:如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务群故障

异步通讯

发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式

其常见的实线就是事件驱动模式

可以实现服务解耦的问题,性能得到提升,吞吐量提高,服务没有强依赖性,不必担心级联失败问题,实现服务削峰

异步通信的问题

  1. 依赖于Broker的可靠性、安全性、吞吐能力
  2. 架构复杂了,业务没有明显的流程线,不好追踪管理

什么是MQ

N (Message Quene):翻译为j消息队列,通过典型的生产者和消费者模型生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入轻松地实现系统间解耦。别名为消息中间件,通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

1.ActiveMQ

ActiveM 是A4pache出品,最流行的,能力强劲的开源消息总线。它是一个完全支持/8规范的的消息中间件。丰富的API,多种集群架构模式让认kctiveMA在业界成为老牌的消息中间件,在中小型企业颇受欢迎!

2.Kafka

Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特点是基于Pu11的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。8.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。

3.RocketMQ

RocketNQ是阿里开源的消息中间件,它是纯JAVA开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketNO思路起源于Kafka,但并不是Kafka的一个Copy,它对消息的可靠传输及事务性做了优化,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。

4.RabbitMQ

RabbitNQ是使用Erlang语言开发的开源消息队列系统,基于ANQP协议来实现。ANQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。ANQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

Docker安装RabbitMQ

docker pull rabbitmq
//启动
docker run 
-e RABBITMQ_DEFAULT_USER=syf20020816 
-e RABBITMQ_DEFAULT_PASS=20020816 
--name mq 
--hostname mq1 
-p 15672:15672 
-p 5672:5672 
-d 
rabbitmq:3.10-management

 

访问15672端口

 

 

RabbitMq架构

 

  • channel:操作MQ的工具
  • exchange:路由消息到队列中
  • queue:缓存消息
  • virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

QuickStart

简单项目中(点对点的简单模型)

 

1.引入依赖

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.15.0</version>
</dependency>

2.建立虚拟主机

 

添加完成后如图:

 

3.建立生产者

/**
 * 直连式连接
 */
public class Publisher {
 
    public static void main(String[] args) throws IOException, TimeoutException {
 
        //创建连接mq的连接工厂对象
        final ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机
        connectionFactory.setHost("192.168.112.101");
        //设置端口号
        connectionFactory.setPort(5672);
        //设置连接虚拟主机
        connectionFactory.setVirtualHost("/test");
        //设置虚拟主机的用户名密码
        connectionFactory.setUsername("ssf2asdas6");
        connectionFactory.setPassword("204545454");
        //获取连接对象
        final Connection connection = connectionFactory.newConnection();
        //获取连接中的通道
        final Channel channel = connection.createChannel();
        //通道绑定对应的消息队列
        //参数1:队列名称,不存在则创建
        //参数2:定义队列特性是否需要持久化,true:持久化,false:非持久化
        //参数3:是否独占队列
        //参数4:是否在消费完成后删除队列
        //参数5:拓展附加参数
        channel.queueDeclare("demo1",false,false,false,null);
        //发布消息
        //参数1:交换机名称(exchange)
        //参数2:队列名称
        //参数3:传递消息额外设置
        //参数4:消息的具体内容
        channel.basicPublish("","demo1",null,"hello world".getBytes());
        //关闭资源
        channel.close();
        connection.close();

    }
}

你可以看到在队列里就多了一条消息了

 

4.建立消费者

public class Consumer {
 
    public static void main(String[] args) throws IOException, TimeoutException {
 
        //创建链接工厂
        final ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.112.101");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/test");
        connectionFactory.setUsername("ssf2asdas6");
        connectionFactory.setPassword("204545454");
        //创建连接对象
        final Connection connection = connectionFactory.newConnection();
        //创建通道
        final Channel channel = connection.createChannel();
        channel.queueDeclare("demo1",false,false,false,null);
        //消费消息
        //参数1:消费队列的名称
        //参数2:开启消息的自动确认机制
        //参数3:消费时的回调接口
        channel.basicConsume("demo1",true, new DefaultConsumer(channel){
 
            //最后一个参数:消息队列中取出的消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
                System.out.println(new String(body));
            }
        });
        //关闭资源,若不关闭则一直进行监听
        channel.close();
        connection.close();
    }
}

 

消费后就看到这里队列里的消息就清零了

 

SpringAMQP

Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

官方网址

https://spring.io/projects/spring-amqp

QuickStart

1. 初始化一个简单微服务项目

 

结构如下

 

2.编写yaml配置

无论你的消息发布者还是消息消费者都需要使用以下yaml配置

spring:
  rabbitmq:
    host: 192.168.112.101
    port: 5672
    virtual-host: /test
    username: sysdaa6
    password: 20asdsa16

3.发送消息

这里注意的是,你的队列一定要是存在的

@SpringBootTest
class PublisherApplicationTests {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads() {
 
        rabbitTemplate.convertAndSend("demo1","hello springAMQP!");

    }
}

4.接收消息(消费)

@Component
public class SpringRabbitListener {
 
    @RabbitListener(queues = "demo1")
    public void listenQueue(String msg){
 
        System.out.println(msg);
    }
}

5.启动测试

 

WorkQueue模型

使用工作队列模型

工作队列,可以提高消息处理速度,避免队列消息堆积

 

消息预取机制

由于消息预取机制的存在,消息会平均地分配给每一个消费者

 

修改消费者的yaml修改预取机制

spring:
  rabbitmq:
    host: 192.168.112.101
    port: 5672
    virtual-host: /test
    username: your username
    password: your password
    listener:
      simple:
        prefetch: 1 #表示每次只能获取一条消息,处理完才能获取下一条

消息发送者

@SpringBootTest
class PublisherApplicationTests {
 
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void testWorkQueue() throws InterruptedException {
 
        String queueName = "demo1";
        String msg = "this is the msg:";
        for (int i = 0; i <= 50; i++) {
 
            rabbitTemplate.convertAndSend(queueName,msg+i);
            Thread.sleep(50);
        }
    }
}

消息消费者

@Component
public class SpringRabbitListener {
 

    @RabbitListener(queues = "demo1")
    public void listenWorkQueue(String msg) throws InterruptedException {
 
        System.out.println("=====consumer 1:=====|"+ LocalDateTime.now());
        System.out.println(msg);
        Thread.sleep(50);
    }

    @RabbitListener(queues = "demo1")
    public void listenWorkQueue2(String msg) throws InterruptedException {
 
        System.err.println("=====consumer 2:=====|"+ LocalDateTime.now());
        System.err.println(msg);
        Thread.sleep(500);
    }
}

 

发布订阅模式

发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)

exchange负责消息路由,而不是存储,路由失败则消息丢失

 

常见exchange类型包括

  1. Fanout:广播
  2. Direct:路由
  3. Topic:话题

交换机的作用

  1. 接收publisher发送的消息
  2. 将消息按照规则路由到与之绑定的队列
  3. 不能缓存消息,路由失败,消息丢失
  4. FanoutExchange的会将消息路由到每个绑定的队列

消息消费者

定义一个 FanoutConfig 配置类进行交换机和队列的绑定

@Configuration
public class FanoutConfig {
 
    //交换机
    @Bean
    public FanoutExchange fanoutExchange() {
 
        return new FanoutExchange("test.fanout");
    }

    //队列1
    @Bean
    public Queue fanoutQueue1() {
 
        return new Queue("fanout.queue1");
    }

    //绑定队列1到交换机
    @Bean
    public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
 
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    //队列2
    @Bean
    public Queue fanoutQueue2() {
 
        return new Queue("fanout.queue2");
    }

    //绑定队列2到交换机
    @Bean
    public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
 
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

消息发送者

@Test
    void testSendFanoutExchange(){
 
        //交换机名称
        String exchangeName = "test.fanout";
        //消息
        String msg = "test for send FanoutExchange";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"",msg);
    }

 


 

 

路由模式

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

 

消息发送者

消息发送者中指明routerKey

@Test
    void testSendDirectExchange(){
 
        //交换机名称
        String exchangeName = "test.direct";
        //消息
        String msg = "test for router to black queue";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"black",msg);
    }
    @Test
    void testSendDirectExchange2(){
 
        //交换机名称
        String exchangeName = "test.direct";
        //消息
        String msg = "test for router to white queue";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"white",msg);
    }

消息消费者

定义一个监听组件使用注解形式指定队列名称,交换机名称和类型(默认direct),以及路由通道

@Component
public class SpringRabbitListener {
 

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "test.direct",type = ExchangeTypes.DIRECT),
            key = {
 "black","white"}
    ))
    public void listenDirectQueue1(String msg){
 
        System.out.println("direct queue1:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "test.direct",type = ExchangeTypes.DIRECT),
            key = {
 "black","green"}
    ))
    public void listenDirectQueue2(String msg){
 
        System.out.println("direct queue2:"+msg);
    }
}

 


 


 

话题模式

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。

如: person.zhangsan

Queue与Exchange指定BindingKey时可以使用通配符:

  • #:代指0个或多个单词
  • *:代指一个单词

 

消息发送者

@Test
    public void  testTopicExchange(){
 
        //交换机名称
        String exchangeName = "test.topic";
        //消息
        String msg = "test for topic in china shanghai";
        //发送消息
        rabbitTemplate.convertAndSend(exchangeName,"china.shanghai",msg);
    }

消息消费者

@RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name ="test.topic",type = ExchangeTypes.TOPIC),
            key = "china.shanghai"
    ))
    public void listenTopicQueue1(String msg){
 
        System.out.println("topic:china.shanghai:"+msg);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name ="test.topic",type = ExchangeTypes.TOPIC),
            key = "american.newyork"
    ))
    public void listenTopicQueue2(String msg){
 
        System.out.println("topic:american.newyork:"+msg);
    }
结果

 


 

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>