<返回更多

热门的消息队列框架比较、使用方法、优缺点,提供示例代码

2023-10-11    架构师老卢
加入收藏
热门的消息队列框架比较、使用方法、优缺点,提供示例代码

 

消息队列(Message Queue)是一种在分布式系统中用于消息传递的通信模式。它可以将消息发送者和接收者解耦,提高系统的可靠性、可扩展性和可维护性。下面将详细介绍3-5个常用的消息队列框架,包括RabbitMQ、Kafka、ActiveMQ、RocketMQ和NATS。

  1. RabbitMQ:
    RabbitMQ是一个开源的消息队列中间件,基于AMQP(Advanced Message Queuing Protocol)协议。它具有高度的可靠性、可扩展性和灵活性,广泛应用于分布式系统中。

特性:

使用方法:
首先需要安装RabbitMQ服务器,并启动它。然后可以使用JAVA或C#等编程语言通过RabbitMQ的客户端库来发送和接收消息。

示例代码(Java):

// 发送消息
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {
    String queueName = "hello";
    channel.queueDeclare(queueName, false, false, false, null);
    String message = "Hello, RabbitMQ!";
    channel.basicPublish("", queueName, null, message.getBytes());
    System.out.println("Sent message: " + message);
}

// 接收消息
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {
    String queueName = "hello";
    channel.queueDeclare(queueName, false, false, false, null);
    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   AMQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            System.out.println("Received message: " + message);
        }
    };
    channel.basicConsume(queueName, true, consumer);
}

优缺点:

  1. Kafka:
    Kafka是一个分布式的流处理平台,也是一个高吞吐量的分布式消息队列系统。它具有持久化、可扩展和高性能的特点,广泛应用于大数据领域。

特性:

使用方法:
首先需要安装Kafka服务器,并启动它。然后可以使用Java或C#等编程语言通过Kafka的客户端库来发送和接收消息。

示例代码(Java):

// 发送消息
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.Apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
    String topic = "my-topic";
    String message = "Hello, Kafka!";
    producer.send(new ProducerRecord<>(topic, message));
    System.out.println("Sent message: " + message);
}

// 接收消息
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
    String topic = "my-topic";
    consumer.subscribe(Collections.singletonList(topic));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            String message = record.value();
            System.out.println("Received message: " + message);
        }
    }
}

优缺点:

  1. ActiveMQ:
    ActiveMQ是一个开源的消息队列中间件,支持多种消息协议,包括AMQP、STOMP和OpenWire等。它具有可靠性、可扩展性和高性能的特点,广泛应用于企业级应用。

特性:

使用方法:
首先需要安装ActiveMQ服务器,并启动它。然后可以使用Java或C#等编程语言通过ActiveMQ的客户端库来发送和接收消息。

示例代码(Java):

// 发送消息
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
try (Connection connection = factory.createConnection();
     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
    Destination destination = session.createQueue("my-queue");
    MessageProducer producer = session.createProducer(destination);
    TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
    producer.send(message);
    System.out.println("Sent message: " + message.getText());
}

// 接收消息
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
try (Connection connection = factory.createConnection();
     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
    Destination destination = session.createQueue("my-queue");
    MessageConsumer consumer = session.createConsumer(destination);
    connection.start();
    Message message = consumer.receive();
    if (message instanceof TextMessage) {
        TextMessage textMessage= (TextMessage) message;
        System.out.println("Received message: " + textMessage.getText());
    }
}

优缺点:

  1. RocketMQ:
    RocketMQ是由阿里巴巴开发的分布式消息队列中间件,它具有高吞吐量、低延迟和高可靠性的特点。RocketMQ支持消息的顺序传输和事务消息,并且具有灵活的消息模式和多样化的消息存储方式。

特性:

使用方法:
首先需要安装RocketMQ服务器,并启动它。然后可以使用Java或其他编程语言通过RocketMQ的客户端库来发送和接收消息。

示例代码(Java):

// 发送消息
DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
try {
    Message message = new Message("topic", "Hello, RocketMQ!".getBytes());
    SendResult result = producer.send(message);
    System.out.println("Sent message: " + result.getSendStatus());
} finally {
    producer.shutdown();
}

// 接收消息
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
        for (MessageExt message : messages) {
            System.out.println("Received message: " + new String(message.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();
  1. NATS:
    NATS是一个轻量级的、高性能的消息队列中间件,由Cloud Native Computing Foundation(CNCF)维护。NATS具有简单易用、快速和可靠的特点,适用于微服务架构和云原生应用。

特性:

使用方法:
首先需要安装NATS服务器,并启动它。然后可以使用Java或其他编程语言通过NATS的客户端库来发送和接收消息。

示例代码(Java):

// 发送消息
Connection connection = Nats.connect("nats://localhost:4222");
try {
    connection.publish("subject", "Hello, NATS!".getBytes());
    System.out.println("Sent message");
} finally {
    connection.close();
}

// 接收消息
Connection connection = Nats.connect("nats://localhost:4222");
try {
    Dispatcher dispatcher = connection.createDispatcher((msg) -> {
        System.out.println("Received message: " + new String(msg.getData()));
    });
    dispatcher.subscribe("subject");
    connection.flush(Duration.ZERO);
    System.out.println("Listening for messages...");
    Thread.sleep(10000);
} finally {
    connection.close();
}

欢迎分享你的使用经验!

关键词:框架      点击(7)
声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多框架相关>>>