1.准备工作
1) 下载安装,启动activemq
2) 下载activemq jar包导入项目
2.消息生产者
package com.activemq.demo1; import javax.jms.*; import org.apache.activemq.*; /** * 消息生产者,用于生成并发送消息 */ public class ProducerTool { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "TOOL.DEFAULT"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageProducer producer = null; /** * 初始化 * @throws Exception */ private void initialize() throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connection = connectionFactory.createConnection(); /* 创建Session,参数解释: 第一个参数 是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认, 没有回应则抛出异常,消息发送程序负责处理这个错误。 第二个参数 消息的确认模式: AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次, 但传输过程中可能因为错误而丢失消息。 CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法 (会通知消息提供者收到了消息) DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息 (这种确认模式不在乎接收者收到重复的消息)。*/ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); producer = session.createProducer(destination); //设置是否持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } /** * 发送消息 * @param message * @throws Exception */ public void produceMessage(String message) throws Exception { initialize(); //发送TextMessage,还可发送MapMessage,ObjectMessage,StreamMessage TextMessage msg = session.createTextMessage(message); connection.start(); System.out.println("Producer:-> send start."); producer.send(msg); System.out.println("Producer:-> send complete."); close(); } /** * 关闭连接 * @throws JMSException */ public void close() throws JMSException { System.out.println("Producer:->Closing Connection."); if (producer != null) producer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } }
3.消息消费者
package com.activemq.demo1; import javax.jms.*; import javax.jms.Message; import org.apache.activemq.*; /** * 消息消费者,用于接收消息 */ public class ConsumerTool implements MessageListener { private String user = ActiveMQConnection.DEFAULT_USER; private String password = ActiveMQConnection.DEFAULT_PASSWORD; private String url = ActiveMQConnection.DEFAULT_BROKER_URL; private String subject = "TOOL.DEFAULT"; private Destination destination = null; private Connection connection = null; private Session session = null; private MessageConsumer consumer = null; /** * 初始化 * @throws JMSException * @throws Exception */ private void initialize() throws Exception { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue(subject); consumer = session.createConsumer(destination); } /** * 消费消息 * @throws Exception */ public void consumeMessage() throws Exception { initialize(); connection.start(); System.out.println("Consumer:->Begin listening..."); // 开始监听 consumer.setMessageListener(this); } /** * 关闭连接 * @throws JMSException */ public void close() throws JMSException { System.out.println("Consumer:->Closing connection"); if (consumer != null) consumer.close(); if (session != null) session.close(); if (connection != null) connection.close(); } /** * 消息处理函数 */ public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String msg = txtMsg.getText(); System.out.println("Consumer:->Received textMessage: " + msg); } else { System.out.println("Consumer:->Received: " + message); } close(); } catch (JMSException e) { e.printStackTrace(); } } }
4.测试类
package com.activemq.demo1; import javax.jms.*; public class Test { /** * @param args */ public static void main(String[] args) throws JMSException, Exception { ConsumerTool consumer = new ConsumerTool(); ProducerTool producer = new ProducerTool(); // 开始监听 consumer.consumeMessage(); // 延时500毫秒之后发送消息 Thread.sleep(500); producer.produceMessage("Hello, world!"); } }
相关推荐
我下载的时候是 ActiveMQ 5.14.4 Release版 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是...
实战ActiveMQ集群与应用实战视频教程实战
Java高级互联网架构师系统培训班课程Java高级互联网架构师系统培训班课程Java高级互联网架构师系统培训班课程
分布式中间件技术实战:基于springboot集成redis,mysql,activemq,-component
ActiveMQ RabbitMQ RokcetMQ Kafka实战 消息队列中间件视频教程
1、介绍ActiveMQ5.x消息队列基础特性和本地快速安装 2、SpringBoot2.x整合ActiveMQ实战之点对点消息
activeMQ1.消息队列MQ的项目实战代码 参考我的微博:http://blog.csdn.net/lovelong8808/article/details/52235132
消息中间件是搭建大型企业级项目必不可少的成员,只有掌握了消息中间件的使用和应用场景,才能开发真正的大型分布式系统。ActiveMQ支持JMS,是apache的顶级项目,在全球开源消息中间件中影响力大!
java实战
JAVA企业级项目实战技术:云原生,中台策略,分布式,高并发,高可用,微服务,前后端分离项目架构,物联网项目实战,亿级项目实战,金融项目实战,项目面试实操,秒杀项目实战,租房项目实战,在线教育项目实战 ,...
ActiveMQ基于Spring完成分布式消息队列实战 Kafka Kafka基于Zookeeper搭建高可用集群实战 kafka消息处理过程剖析 Java客户端实现Kafka生产者与消费者实例 kafka的副本机制及选举原理剖析 基于kafka实现应用...
(nginx+redis+zookeeper+activemq+storm+dubbo+netty+jvm+并发编程锁+项目实战)
资源简介:SSM Java 项目集合 一、概述 在这个平台上,我们为大家带来了一系列的 JavaSSM(Spring + SpringMVC + MyBatis)项目。这些项目旨在展示SSM框架在实际应用中的魅力,同时也为开发者提供了一个快速学习和...
MQJMS以及ActiveMQ的关系理解 主动式MQ ActiveMQ的环境搭建 深入了解topic队列与Queue队列比较点到点(X英尺) PublisherSubscriber(发布订阅者)消息模式开发流程 详细讲解企业项目中ActiveMQ使用经验 ActiveMQ与...
Java全能学习面试手册——Java面试题库.zip 01 7道消息队列ActiveMQ面试题!.pdf 02 10道Java高级必备的Netty面试题!.pdf 03 10道Java面试必备的设计模式面试题!.pdf 04 10个Java经典的List面试题!.pdf 05 10个...
内容包含:并发编程,分布式项目实战视频,Dobbo,zookeeper,redis,Nginx,kafka,RocketMQ.oracle,ActiveMQ,Netty,Jvm视频
工具和中间件,包括maven、git、Intellij IDEA、Redis、WebSocket、shiro、quartz、ElasticSearch、docker、activemq、rabbitmq、SpringCloud分布式和集群、oracle、mysql等数据库教程、微信登录、java web技术栈...
1、Maven补充和Git部分 2、基本业务功能块构建部分 3、Nginx部分 4、Varnish部分 5、Memcached部分 6、ActiveMQ部分 等等
2019/06/07 周五 上午 07:01 1,335,468 Java经典面试题.pdf 2018/11/20 周二 下午 17:31 68,644 Java面试题141.docx 2018/11/17 周六 下午 12:26 12,027 Java高级工程师.docx 2018/09/16 周日 下午 16:11 35,439 jvm...
《Java并发编程实战》 《Java多线程编程核心技术》 《Java并发编程的艺术》 《Java8实战》 《HTTP权威指南》 《Spring实战》(第4版) 《看透SpringMVC源代码分析与实践》 《Redis入门指南》(第2版) 《Redis实战》...