1.什么是ActiveMQ

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。

JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息。

     JMS和MQ的关系:JMS是一个用于提供消息服务的技术规范,它制定了在整个消息服务提供过程中的所有数据结构和交互流程。而MQ则是消息队列服务,是面向消息中间件(MOM)的最终实现,是真正的服务提供者;MQ的实现可以基于JMS,也可以基于其他规范或标准。

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

2.ActiveMQ与Spring的集成

首先将ActiveMQ如下的jar包导入项目中。

配置activemq的spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!--(嵌入配置)activeMq消息容器-->
<amq:broker useJmx="true" persistent="true" >
<amq:managementContext>
<amq:managementContext createConnector="false"/>
</amq:managementContext>
<amq:persistenceAdapter>
<amq:kahaDB directory="${jmsDir}" >
</amq:kahaDB>
</amq:persistenceAdapter>
<amq:transportConnectors>
<amq:transportConnector uri="tcp://${jms.ip}:${jms.port}" />
</amq:transportConnectors>
</amq:broker> <!-- wireFormat.maxInactivityDuration=0&amp; --> <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://${jms.ip}?jms.useAsyncSend=true" />
</bean> <bean id="simpleJmsConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory"/>
<property name="sessionCacheSize" value="100" />
<property name="cacheConsumers" value="true"></property>
<property name="exceptionListener" ref="jmsExceptionListener"/>
</bean> <bean id="jmsExceptionListener" class="com.ibms.core.jms.JmsExceptionListener"></bean> <!-- Message 转换器 -->
<bean id="activeMqMessageConverter" class="com.ibms.core.jms.ActiveMqMessageConverter"/> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="simpleJmsConnectionFactory" />
<property name="defaultDestinationName" value="messageQueue"/>
</bean> <!-- 消息对象队列 -->
<amq:queue id="messageQueue" name="messageQueue" physicalName="messageQueue" />
<!-- 消息生产者 -->
<bean id="messageProducer" class="com.ibms.core.jms.MessageProducer"/> <!--
消息消费者
map配置的是队列中消息处理类。
键:队列中消息类 对应的类 全路径 如: com.ibms.core.model.MailModel
值:消息处理类,需要实现接口类IJmsHandler 。如:com.ibms.oa.service.jms.impl.MailHandler
用户也可以配置自己的处理方式,配置到这里。
--> <bean name="messageConsumer" class="com.ibms.core.jms.MessageConsumer">
<property name="handlers">
<map>
<entry key="com.ibms.oa.service.jms.MessageModel">
<bean class="com.ibms.oa.service.jms.MessageHandler"></bean>
</entry>
</map>
</property>
</bean> <!--
map配置的是队列中消息处理类。
键:1,2,3
值:消息处理类,需要实现接口类IMessageHandler 。
用户也可以配置自己的处理方式,配置到这里。
每增加一种消息方式的时候,需要增加对应的处理器(如下,如mailMessageHandler实现IMessageHandler 接口)--> <bean id="mailMessageHandler" class="com.ibms.oa.service.jms.MailMessageHandler"></bean>
<bean id="innerMessageHandler" class="com.ibms.oa.service.jms.InnerMessageHandler"></bean> <bean id="messageHandlerContainer " class="com.ibms.oa.service.jms.MessageHandlerContainer">
<property name="handlersMap" ref="handlersMap"/>
</bean> <bean id="handlersMap" class="java.util.LinkedHashMap">
<constructor-arg>
<map>
<entry key="1" value-ref="mailMessageHandler" />
<entry key="3" value-ref="innerMessageHandler" />
</map>
</constructor-arg>
</bean> <!--消息监听容器 -->
<bean id="messageListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="simpleJmsConnectionFactory" />
<property name="destination" ref="messageQueue" />
<property name="messageListener" ref="messageMsgListener" />
</bean> <!-- 邮件消息消费监听器 -->
<bean id="messageMsgListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<ref bean="messageConsumer"/>
</constructor-arg>
<property name="messageConverter" ref="activeMqMessageConverter" />
<property name="defaultListenerMethod" value="sendMessage" />
</bean> <bean id="messageEngine" class="com.ibms.core.engine.MessageEngine">
<property name="mailSender" ref="mailSender"/>
<property name="fromUser" value="${mail.from}"/>
</bean> </beans>

然后依次实现消息的转换器ActiveMqMessageConverter

public class ActiveMqMessageConverter implements MessageConverter {

    /**
* 转换发送消息。
*/
@Override
public Message toMessage(Object object, Session session)
throws JMSException { } /**
* 转换接收消息。
*/
@Override
public Object fromMessage(Message msg) throws JMSException { } }

消息的生产者MessageProducer

public class MessageProducer
{
private static final Log logger=LogFactory.getLog(MessageProducer.class);
@Resource(name="messageQueue")
private Queue messageQueue; @Resource
private JmsTemplate jmsTemplate; public void send(Object model)
{
logger.debug("procduce the message");
//产生邮件\短信\消息发送的消息,加到消息队列中去 jmsTemplate.convertAndSend(messageQueue, model);
}
}

消息的消费者MessageConsumer(可扩展)

/**
* 从消息队列中读取对象,并且进行消息发送。
*
* @author zxh
*
*/
public class MessageConsumer { /**
* 处理消息
*/
private Map<String, IJmsHandler> handlers = new HashMap<String, IJmsHandler>(); protected Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
public void setHandlers(Map<String, IJmsHandler> handlers) {
this.handlers = handlers;
} /**
* 发送消息
*
* @param model
* 发送的对象
* @throws Exception
*/
public void sendMessage(Object model) throws Exception {
if (BeanUtils.isNotEmpty(handlers) && BeanUtils.isNotEmpty(model)) {
IJmsHandler jmsHandler = handlers.get(model.getClass().getName());
if(jmsHandler!=null){
jmsHandler.handMessage(model);
}
else{
logger.info(model.toString());
}
} else {
throw new Exception("Object:[" + model + "] is not entity Object ");
}
}
}

3.ActiveMQ应用场景

a.非均匀应用集成
      ActiveMQ 中间件用Java语言编写,因此自然提供Java客户端 API。但是ActiveMQ  也为C/C++、.NET、Perl、PHP、Python、Ruby 和一些其它语言提供客户端。在你考虑如何集成不同平台不同语言编写应用的时候,ActiveMQ 拥有巨大优势。在这样的例子中,多种客户端API通过ActiveMQ 发送和接受消息成为可能,无论使用的是什么语言。此外,ActiveMQ 还提供交叉语言功能,该功能整合这种功能,无需使用远程过程调用(RPC)确实是个优势,因为消息协助应用解耦。

b.作为RPC的替代

         应用使用RPC同步调用十分普遍。假设大多数客户端服务器应用使用RPC,包括ATM、大多数WEB应用、信用卡系统、销售点系统等等。尽管很多系统很成功,转换使用异步消息可以带来很多好处,而且也不会放弃响应保证。系统依赖同步需求典型地限制了扩展,因为最终需求将开始起作用,从而放慢整个系统。取而代之这种不好的体验,使用异步消息,附加的消息接收器可以轻松添加,假设你的应用可以解耦。

c.两个应用之间解耦

正如之前讨论的,紧耦合架构可以导致很多问题,尤其是如果他们是分布的。松耦合架构,在另一方面,证实了更少的依赖性,能够更好地处理不可预见的改变。你不见可以在系统中改变组件而不影响整个系统,而且组件交互也相当的简单。取代使用同步方案的组件交互,组件利用异步通信。这样的松耦合遍及系统被称之为事件驱动架构(EDA)。

d.作为事件驱动架构的主干

在之前的观点中,解耦、异步风格架构允许软件本身进一步扩展(水平的可扩展性),而不是依赖硬件的可扩展性(垂直的可扩展)。想象一下一种难以置信的流量、电子商务网站像亚马逊。但一个用户在亚马逊上购买,有许多分开的阶段贯穿,订单需要履行包括订单配置、创建发票、支付流程、订单完成、运输等。然而,但一个用户实际上提交了一个订单,用户立即得到一个页面说明,“感谢您的订单”不仅如此,没有任何延迟。用户也收到了订单已经收到的邮件说明,订单配置流程由亚马逊雇佣就是个很好的例子,第一步在一种更大的、异步流程中。每一个订单步骤直接由分开的服务奋力地处理。但用户下了订单,异步调用提交订单,但是全部订单流程不会落后于通过网页浏览器进行的同步调用。反之,订单被接受并立即被确认。这个流程中剩余的步骤一步地被处理。如果发生了问题。组织流程进行,用户会被通知。这样的异步流程提供大量的可扩展性。

e.改善应用可扩展性

许多应用利用事件驱动架构,为了提供大量的可扩展性,包括像电子商务、政府、制造业和在线游戏等领域。使用异步消息在业务领域分离一个应用,许多其它可能性开始合并。考虑使用服务为特定任务设计应用的能力。这正是面向服务架构(SOA)的主干。每一个服务实现一个独立的功能,而且只是那个功能。应用通过这些服务构成来创建,在服务间使用异步消息实现通信。这种风格的应用设计被称之为复杂事件处理(CEP)。使用CEP,系统中组件之间的交互可以被进一步的分析跟踪。在考虑异步消息在系统的组件之间添加一种迂回的时候,这些可能性是无止境的。

最新文章

  1. Java进行post和get传参数
  2. 使用轻量级ORM Dapper进行增删改查
  3. FQ技术
  4. Color the Ball[HDU1199]
  5. linux ascii艺术与ansi艺术
  6. The equation - SGU 106(扩展欧几里得)
  7. 【开源项目】Android 手写记事 App(半成品)
  8. java InputStream使用
  9. CodeForces 83B Doctor
  10. loadrunner常用计数器分析
  11. Ajax禁止重复提交
  12. chrome浏览器使用技巧
  13. Dede 删除文档同时文章中的图片的方法
  14. 微信小程序 base64 图片 canvas 画布 drawImage 实现
  15. linux时间戳和时间格式的转化
  16. 【剑指offer】数组中只出现一次的数字
  17. python 模块 - 序列化 json 和 pickle
  18. c语言递归函数的调用
  19. jspSmartUpload使用初步
  20. Deep Learning of Graph Matching 阅读笔记

热门文章

  1. 一些牛人分享的ios技巧,保留着
  2. oc 是否允许远程通知
  3. CSS传统布局之页面布局实例
  4. 转载 Deep learning:二(linear regression练习)
  5. HDU3371--Connect the Cities(最小生成树)
  6. SpringMvc处理post请求乱码的filter
  7. java本地方法
  8. android 进程(复习)
  9. 数据结构——求单向链表的倒数第K个节点
  10. JavaScript高级程序设计:第一章