<返回更多

Java如何使用消息中间件ActiveMQ?

2023-08-16    运维开发木子李
加入收藏

 

创建ActiveMQ连接:

import JAVAx.jms.*;
import org.Apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void mAIn(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 启动连接
            connection.start();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

创建ActiveMQ会话:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

创建ActiveMQ队列:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建队列
            Queue queue = session.createQueue("myQueue");
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

创建ActiveMQ主题:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建主题
            Topic topic = session.createTopic("myTopic");
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

发送消息到队列:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建队列
            Queue queue = session.createQueue("myQueue");
            
            // 创建消息生产者
            MessageProducer producer = session.createProducer(queue);
            
            // 创建消息
            TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
            
            // 发送消息
            producer.send(message);
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

发送消息到主题:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建主题
            Topic topic = session.createTopic("myTopic");
            
            // 创建消息生产者
            MessageProducer producer = session.createProducer(topic);
            
            // 创建消息
            TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
            
            // 发送消息
            producer.send(message);
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

接收队列中的消息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建队列
            Queue queue = session.createQueue("myQueue");
            
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(queue);
            
            // 接收消息
            Message message = consumer.receive();
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("Received message: " + textMessage.getText());
            }
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

接收主题中的消息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建主题
            Topic topic = session.createTopic("myTopic");
            
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(topic);
            
            // 接收消息
            Message message = consumer.receive();
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("Received message: " + textMessage.getText());
            }
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

发布-订阅模式中的发布者:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建主题
            Topic topic = session.createTopic("myTopic");
            
            // 创建消息生产者
            MessageProducer producer = session.createProducer(topic);
            
            // 创建消息
            TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
            
            // 发送消息
            producer.send(message);
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

发布-订阅模式中的订阅者:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建主题
            Topic topic = session.createTopic("myTopic");
            
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(topic);
            
            // 接收消息
            Message message = consumer.receive();
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("Received message: " + textMessage.getText());
            }
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

使用消息监听器接收队列中的消息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建队列
            Queue queue = session.createQueue("myQueue");
            
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(queue);
            
            // 注册消息监听器
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("Received message: " + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            
            // 启动连接
            connection.start();
            
            // 等待接收消息
            Thread.sleep(5000);
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

使用消息监听器接收主题中的消息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建主题
            Topic topic = session.createTopic("myTopic");
            
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(topic);
            
            // 注册消息监听器
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    if (message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("Received message: " + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            
            // 启动连接
            connection.start();
            
            // 等待接收消息
            Thread.sleep(5000);
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

使用消息选择器接收队列中的消息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建队列
            Queue queue = session.createQueue("myQueue");
            
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(queue, "age > 18");
            
            // 接收消息
            Message message = consumer.receive();
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("Received message: " + textMessage.getText());
            }
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

使用消息选择器接收主题中的消息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            
            // 创建主题
            Topic topic = session.createTopic("myTopic");
            
            // 创建消息消费者
            MessageConsumer consumer = session.createConsumer(topic, "age > 18");
            
            // 接收消息
            Message message = consumer.receive();
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("Received message: " + textMessage.getText());
            }
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

使用事务发送消息:

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQExample {
    public static void main(String[] args) {
        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            
            // 创建连接
            Connection connection = factory.createConnection();
            
            // 创建会话
            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
            
            // 创建队列
            Queue queue = session.createQueue("myQueue");
            
            // 创建消息生产者
            MessageProducer producer = session.createProducer(queue);
            
            // 创建消息
            TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
            
            // 发送消息
            producer.send(message);
            
            // 提交事务
            session.commit();
            
            // 关闭会话
            session.close();
            
            // 关闭连接
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

这些示例覆盖了ActiveMQ的常用用法,包括创建连接、创建会话、创建队列和主题、发送和接收消息等。你可以根据自己的需求选择相应的示例进行参考和使用。

关键词:ActiveMQ      点击(6)
声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多ActiveMQ相关>>>