TIP
本文主要是介绍 JMS规范-Spring JMS组件 。
# Spring JMS各组件详解
# JmsTemplate
用来send、receive消息(receive是同步式的,会block)。每个JmsTemplate
实例拥有自己的配置,比如:connectionFactory
、sessionTransacted
、sessionAcknowledgeMode
、deliveryMode
、timeToLive
等等,所以需根据不同场景配置提供不同的JmsTemplate
Bean而不是一个Singleton Bean通吃所有JMS操作。
下面是类图(只包含了部分关键属性):
# ConnectionFactory
Spring提供了两个javax.jms.ConnectionFactory
的实现:SingleConnectionFactory (opens new window)和CachingConnectionFactory (opens new window)。它们实际上是一种Wrapper,用来缓存如:Connection
、Session
、MessageProducer
、MessageConsumer
。
事实上JmsTemplate的Javadoc (opens new window)提到过:
NOTE: The ConnectionFactory used with this template should return pooled Connections (or a single shared Connection) as well as pooled Sessions and MessageProducers. Otherwise, performance of ad-hoc JMS operations is going to suffer.
在Spring JMS文档的Caching Messaging Resources (opens new window)中也提到了需要优化资源使用以提升性能:
The standard API involves creating many intermediate objects. To send a message the following 'API' walk is performed ConnectionFactory->Connection->Session->MessageProducer->send Between the ConnectionFactory and the Send operation there are three intermediate objects that are created and destroyed. To optimise the resource usage and increase performance two implementations of
ConnectionFactory
are provided.
下面是类图(只包含了部分关键属性):
# SingleConnectionFactory
SingleConnectionFactory (opens new window)顾名思义,无论调用多少次createConnection(..)
都返回同一个Connection
实例。但是它并不缓存Session
,也就是说调用一次createSession(...)
就会创建一个新的实例。
可以通过SingleConnectionFactoryTest (opens new window)了解详情。
所以在大多数情况下,不推荐使用SingleConnectionFactory
。
# CachingConnectionFactory
CachingConnectionFactory (opens new window)继承自SingleConnectionFactory
,除了依然保留缓存同一个Connection
实例的特性外,还增加了对于Session
、MessageProducer
、MessageConsumer
的缓存。
CachingConnectionFactory
其内部维护了一个Acknowledge Mode -> List<Session>
的Map,sessionCacheSize实际上指的是List<Session>
的大小,所以最多会有4 * sessionCacheSize数量的Session
被缓存(因为JMS规定了四种Acknowledge Mode)。
并且CachingConnectionFactory
其本质不是一个Object Pool,所以不会因为实际请求Session数量超出sessionCacheSize导致block或者返回null,可以放心使用。
CachingConnectionFactory
返回的每个Session
内部都有ConsumerCacheKey -> MessageConsumer
以及DestinationCacheKey -> MessageProducer
的Map,用来缓存MessageProducer
和MessageConsumer
。
可以通过CachingConnectionFactory (opens new window)了解详情。
# MessageListenerContainer
Spring JMS中有一个特性MessageListenerContainer (opens new window),按照官方文档的说法:
A message listener container is used to receive messages from a JMS message queue and drive the MessageListener that is injected into it.
上面提到的MessageListener
就是javax.jms.MessageListener
,第一次看到这个东西感觉有点奇怪,因为MessageListener
的正规用法应该MessageConsumer.setMessageListener()
就行了。
因为MessageListenerContainer
继承自SmartLifeCycle
,所以它提供了程序启动时开启connection、session,程序关闭是关闭session、connection的功能,能够让你不用操心资源回收问题。
下面介绍一下两个实现SimpleMessageListenerContainer (opens new window)和DefaultMessageListenerContainer (opens new window)。
下面是类图(只包含了部分关键属性):
# SimpleMessageListenerContainer
SimpleMessageListenerContainer (opens new window)使用MessageConsumer.setMessageListener()
来监听消息,它不支持参与外部事务(比如PlatformTransactionManager)。
它是可以持有多个MessageConsumer
实例的。代码如下:
// ...
private int concurrentConsumers = 1;
private Set<Session> sessions;
private Set<MessageConsumer> consumers;
// ...
protected void initializeConsumers() throws JMSException {
// Register Sessions and MessageConsumers.
synchronized (this.consumersMonitor) {
if (this.consumers == null) {
this.sessions = new HashSet<Session>(this.concurrentConsumers);
this.consumers = new HashSet<MessageConsumer>(this.concurrentConsumers);
Connection con = getSharedConnection();
for (int i = 0; i < this.concurrentConsumers; i++) {
Session session = createSession(con);
MessageConsumer consumer = createListenerConsumer(session);
this.sessions.add(session);
this.consumers.add(consumer);
}
}
}
}
其处理消息的方式有两种:1)传统的MessageConsumer.setMessageListener()
;2)使用Executor
。
if (this.taskExecutor != null) {
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(final Message message) {
taskExecutor.execute(new Runnable() {
@Override
public void run() {
processMessage(message, session);
}
});
}
});
}
else {
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
processMessage(message, session);
}
});
}
# DefaultMessageListenerContainer
DefaultMessageListenerContainer (opens new window)和SimpleMessageListenerContainer (opens new window)不同,它使用MessageConsumer.receive()
来处理消息,并且支持XA transaction。
因为receive()
是同步的、blocking方法,其性能没有setMessageListener()
好,所以它非常依赖多线程(TaskExecutor
),这也也带来来dynamic scaling的好处。
请注意不要对Topic采用多线程,否则会收到重复的消息,详情见官方文档 (opens new window)。
# 异步接收消息
同步接收消息的方式有JmsTemplate.receive*()
和MessageConsumer.receive*()
,这里不多讲,重点讲异步接收消息的几种方式。
# MessageListener & MessageListenerContainer
把MessageListener
包装到MessageListenerContainer
里接收消息,例子参见官方文档Asynchronous Reception - Message-Driven POJOs (opens new window)
# SessionAwareMessageListener
SessionAwareMessageListener (opens new window)是Spring提供和MessageListener
类似的接口,MessageListenerContainer
支持这个接口,用法和MessageListener
一样。
# MessageListenerAdapter
MessageListenerAdapter (opens new window)是Spring提供的另一个异步接收消息的方式,它MessageListener
与SessionAwareMessageListener
更灵活,因为它采用反射机制来把消息传递到你的接收消息的方法上。
使用方法见官方文档the SessionAwareMessageListener interface (opens new window)。
# @JmsListener
@JmsListener (opens new window)是另一种接收消息的方法,怎么使用它可以看官方文档Annotation-driven listener endpoints (opens new window)。
@JmsListener
和MessageListener
、SessionAwareMessageListener
、MessageListenerAdapter
一样也需要一个Container,用户可以通过@JmsListener.containerFactory
属性来指定JmsListenerContainerFactory (opens new window)。
Spring提供了两种JmsListenerContainerFactory实现:
- DefaultJmsListenerContainerFactory (opens new window),用来生产DefaultMessageListenerContainer,Spring Boot提供
DefaultJmsListenerContainerFactoryConfigurer
作为配置工具 - SimpleJmsListenerContainerFactory (opens new window),用来生产SimpleMessageListenerContainer
所以在使用@JmsListener
需要仔细的选择正确的JmsListenerContainerFactory
,而不是全局采用一种配置。
# 总结
使用Spring JMS时有需要注意以下三点:
- 根据实际情况,配置合适的ConnectionFactory Bean,如有需要可以有多个ConnectionFactory Bean。
- JmsTemplate, MessageListenerContainer, JmsListenerContainerFactory需根据实际情况配置不同Bean,避免全局使用一套。
- JmsTemplate, MessageListenerContainer, JmsListenerContainerFactory选择合适的ConnectionFactory。
- 设定好合适的Executor/线程池大小,避免大量Thread block。
下面是一张各个组件的关系图。
# 参考资料
- Spring JMS (opens new window)
- Spring JMS Listener Adapters (opens new window)
- JMS Javadoc (opens new window)
spring (opens new window)jms (opens new window)
# 参考文章
- https://segmentfault.com/a/1190000012875201