JMS 的全称是 Java Message Service,即 Java 消息服务,ActiveMQ 实现了 JMS 的接口。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息(生产者和消费者可以在同一个应用中,也可以不在同一个应用中)。应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。一个典型的应用例如用户注册后需要发送验证邮件,因为发送邮件是一个耗时任务,如果在注册的逻辑代码中发送邮件的话系统的响应就会很慢,可以在用户注册后立即返回,并把发送邮件的任务通过 JMS 发送到消息队列中,然后另一个专门负责发送邮件的服务从 MQ 里获取发送邮件的消息发送邮件。
如果是同一个程序里通讯的话,可以使用 Spring Event。
消息有两种类型:
点对点
: 一个消息只能被一个消费者接收处理
发布/订阅模式
: 一个消息能同时被多个消费者接收处理
消息生产者使用步骤:
配置 ConnectionFactory
配置 Destination,也就是队列
创建消息生产者对象
发送消息
消息消费者使用步骤:
配置 ConnectionFactory
创建消息消费者对象
配置消息监听容器
有消息到达时消息的消费者的 onMessage()
方法会被自动调用
程序运行前当然要先启动 ActiveMQ
下面介绍 JMS 的使用,消息的生产者是一个 Web 应用,通过访问 URL 发送消息,消费者是一个普通的 Java 应用程序。
Gradle 依赖 1 2 3 4 5 6 compile( "org.springframework:spring-jms:4.3.0.RELEASE" , "org.apache.activemq:activemq-core:5.7.0" , "org.apache.activemq:activemq-pool:5.14.1" , "javax.jms:javax.jms-api:2.0.1" )
消息生产者 项目目录 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 └── main ├── java │ └── com │ └── xtuer │ ├── controller │ │ └── ProducerController.java │ └── jms │ └── MessageProducer.java ├── resources │ └── config │ ├── jms.xml │ └── spring-mvc.xml └── webapp └── WEB-INF ├── view └── web.xml
jms.xml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns ="http://www.springframework.org/schema/beans" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation =" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd" > <bean id ="targetConnectionFactory" class ="org.apache.activemq.ActiveMQConnectionFactory" > <property name ="brokerURL" value ="tcp://localhost:61616" /> </bean > <bean id ="pooledConnectionFactory" class ="org.apache.activemq.pool.PooledConnectionFactory" > <property name ="connectionFactory" ref ="targetConnectionFactory" /> <property name ="maxConnections" value ="10" /> </bean > <bean id ="connectionFactory" class ="org.springframework.jms.connection.SingleConnectionFactory" > <property name ="targetConnectionFactory" ref ="pooledConnectionFactory" /> </bean > <bean id ="queueDestination" class ="org.apache.activemq.command.ActiveMQQueue" > <constructor-arg value ="testQueue" /> </bean > <bean id ="topicDestination" class ="org.apache.activemq.command.ActiveMQTopic" > <constructor-arg value ="testTopic" /> </bean > <bean id ="jmsTemplate" class ="org.springframework.jms.core.JmsTemplate" > <property name ="connectionFactory" ref ="connectionFactory" /> </bean > <bean id ="messageProducer" class ="com.xtuer.jms.MessageProducer" > <property name ="jmsTemplate" ref ="jmsTemplate" /> </bean > </beans >
MessageProducer.java
使用 jmsTemplate
进行消息发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package com.xtuer.jms;import org.springframework.jms.core.JmsTemplate;import org.springframework.jms.core.MessageCreator;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.Session;public class MessageProducer { private JmsTemplate jmsTemplate; public void sendMessage (Destination destination, final String message) { System.out.println("---------------生产者发了一个消息:" + message); jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage (Session session) throws JMSException { return session.createTextMessage(message); } }); } public void sendMessage (final String message) { System.out.println("---------------生产者发了一个消息:" + message); jmsTemplate.send("default-destination" , new MessageCreator() { @Override public Message createMessage (Session session) throws JMSException { return session.createTextMessage(message); } }); } public void setJmsTemplate (JmsTemplate jmsTemplate) { this .jmsTemplate = jmsTemplate; } }
ProducerController.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package com.xtuer.controller;import com.xtuer.jms.MessageProducer;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.ResponseBody;import javax.annotation.Resource;import javax.jms.Destination;@Controller public class ProducerController { @Resource(name = "queueDestination") private Destination queueDestination; @Resource(name = "topicDestination") private Destination topicDestination; @Resource(name = "messageProducer") private MessageProducer producer; @GetMapping("/test-queue") @ResponseBody public String testQueue () { String message = "Queue: " + System.nanoTime(); producer.sendMessage(queueDestination, message); return message; } @GetMapping("/test-topic") @ResponseBody public String testTopic () { String message = "Topic: " + System.nanoTime(); producer.sendMessage(topicDestination, message); return message; } }
消息消费者 项目目录 1 2 3 4 5 6 7 8 9 └── main ├── java │ ├── Main.java │ └── com │ └── xtuer │ └── jms │ └── MessageConsumer.java └── resources └── jms.xml
jms.xml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns ="http://www.springframework.org/schema/beans" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms ="http://www.springframework.org/schema/jms" xsi:schemaLocation =" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd" > <bean id ="targetConnectionFactory" class ="org.apache.activemq.ActiveMQConnectionFactory" > <property name ="brokerURL" value ="tcp://localhost:61616" /> </bean > <bean id ="pooledConnectionFactory" class ="org.apache.activemq.pool.PooledConnectionFactory" > <property name ="connectionFactory" ref ="targetConnectionFactory" /> <property name ="maxConnections" value ="10" /> </bean > <bean id ="connectionFactory" class ="org.springframework.jms.connection.SingleConnectionFactory" > <property name ="targetConnectionFactory" ref ="pooledConnectionFactory" /> <property name ="reconnectOnException" value ="true" /> </bean > <bean id ="messageConsumer" class ="com.xtuer.jms.MessageConsumer" /> <jms:listener-container connection-factory ="connectionFactory" destination-type ="queue" > <jms:listener destination ="testQueue" ref ="messageConsumer" /> </jms:listener-container > <jms:listener-container connection-factory ="connectionFactory" destination-type ="topic" > <jms:listener destination ="testTopic" ref ="messageConsumer" /> </jms:listener-container > </beans >
一个消息的消费者可以同时接收多个队列里的消息。
上面使用了 jms:listener-container
定义消息监听容器,这种方式比较简洁,推荐使用:
目的地的名字直接使用字符串即可,不需要定义目的地的对象
配置队列的类型: destination-type
配置 acknowledge: <jms:listener-container connection-factory="connectionFactory" acknowledge ="client">
Acknowledge 的使用: Controlling Message Acknowledgment
According to Spring Javadoc :
“sessionAcknowledgeMode” set to “AUTO_ACKNOWLEDGE” (default): Automatic message acknowledgment before listener execution; no redelivery in case of exception thrown.
“sessionAcknowledgeMode” set to “CLIENT_ACKNOWLEDGE”: Automatic message acknowledgment after successful listener execution; no redelivery in case of exception thrown. 经测试: Consumer 抛异常后没有 ack,Broker 会重发,不抛异常则自动 ack,效率也很高
“sessionAcknowledgeMode” set to “DUPS_OK_ACKNOWLEDGE”: Lazy message acknowledgment during or after listener execution; potential redelivery in case of exception thrown. 经测试: Consumer 抛异常后,Broker 没有重发,效率非常高
“sessionTransacted” set to “true”: Transactional acknowledgment after successful listener execution; guaranteed redelivery in case of exception thrown. 每个消费者线程每秒处理 100 条消息左右,但保证消息一定会被消费者处理
If your session is not transacted you risk message loss in the event your JVM goes down unexpectedly.
当然也可以使用下面的方式定义消息监听容器:
1 2 3 4 5 6 7 8 9 10 11 <bean id ="jmsContainer" class ="org.springframework.jms.listener.DefaultMessageListenerContainer" > <property name ="connectionFactory" ref ="connectionFactory" /> <property name ="messageListener" ref ="messageConsumer" /> <property name ="destination" ref ="queueDestination" /> </bean > 或者 <bean id ="jmsContainer" class ="org.springframework.jms.listener.DefaultMessageListenerContainer" > <property name ="connectionFactory" ref ="connectionFactory" /> <property name ="messageListener" ref ="messageConsumer" /> <property name ="destinationName" value ="testQueue" /> </bean >
MessageConsumer.java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package com.xtuer.jms;import com.alibaba.fastjson.JSON;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.TextMessage;public class MessageConsumer implements MessageListener { public void onMessage (Message message) { TextMessage textMsg = (TextMessage) message; try { System.out.println("接收到一个纯文本消息:" + textMsg.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
Main.java 1 2 3 4 5 6 7 8 import org.springframework.context.support.ClassPathXmlApplicationContext;public class Main { public static void main (String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:jms.xml" ); context.start(); } }
测试