一、常用配置属性

  以下配置文件目录均为:${activemq_home}/conf/activemq.xml

  1、定期扫描清理

    ActiveMQ中有一项功能:Delete Inactive Destination。可以处理 “ 没有消费者且未处理的Destination”,也就是 queue 或者 topic 在规定时间内,没有入队记录或者有效订阅,会被清理删除。

    下面基于Queue的配置,Topic的配置类似。

     

    其中属性定义:schedulePeriodForDestinationPurge - 必填。声明扫描闲置队列的周期,单位毫秒。默认值0,为不扫描。

           gcInactiveDestinations - 必填。针对Destination的配置,声明Broker扫描闲置队列时,是否扫描此队列。默认值false

           inactiveTimoutBeforeGC - 选填。配合gcInactiveDestinations=true时才生效。声明Destination闲置多久可以进行删除,单位毫秒。默认值60。

  2、存储空间设置

    

    其中的配置标签: memoryUsage - 表示ActiveMQ使用的内存。这个值必须大于等于destinationPolicy中设置的所有Destination的内存之和。

            storeUsage - 表示持久化存储文件的大小

            tempUsage - 非持久化消息的临时内存大小

  3、kahadb方式的持久化的刷盘方式

  

    其中属性定义:journalDiskSyncStrategy - 声明持久化方式,可选值always :periodic :never 。默认always

           journalDiskSyncInterval - 声明数据落盘的时间间隔。单位毫秒

  4、JMX配置使用

    broker标签增加:

    

    managementContext标签修改:

    

  5、消息过期时间

   

    其中标签及属性定义:timeStampingBrokerPlugin - 为持久化的消息设置过期时间,zeroExpirationOverride 为没有设置有效时间的消息设置过期时间。

                ttlCeiling 表示过期时间上线,如果程序中设置的时间超过这个值,以此值为准。

              expireMessagesPeriod -  声明扫描过期消息的时间间隔,单位毫秒。

              queue=">" 、topic=">" 表示匹配所有的Destination

              queue="tp_>" 、topic="tp_>"  表示匹配所有的 tp_ 开头的Destination

二、消息的持久化

  ActiveMQ的消息,Queue中的消息会根据配置的不同实现持久化,Topic如果不存在持久化订阅者的话会直接丢弃消息。

  持久化的配置,在${activemq_home}/conf/activemq.xml配置文件中,标签 broker内部的 persistenceAdapter 中配置持久化方案

  1、kahadb

   5.4版本之后默认的持久化方式。文件型的数据库存储,配置如下:

<persistenceAdapter>
<!-- directory:保存数据的目录; journalMaxFileLength:保存消息的文件大小 -->
<kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
</persistenceAdapter>

  2、JDBC

    ActiveMQ将消息持久化保存到数据库中,没有限定使用某一个数据库。

    配置成功后,需要在数据库中建立相应的database,后续ActiveMQ启动才能正常访问。ActiveMQ正常启动后,能够自动的创建相应的数据表:

    activemq_msgs - 用于存储消息的表

    activemq_acks - 存储订阅关系

    activemq_lock - 集群环境中使用,同一时间只能有一个Master Broker

    下文配置中以MySQL为例:

    2.1、定义数据源bean(与broker同级)

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
  <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
  <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
  <property name="username" value="activemq"/>
  <property name="password" value="activemq"/>
  <!-- 属性雷同可根据具体连接池配置 -->
  <property name="maxActive" value="200"/>
  <property name="poolPreparedStatements" value="true"/>
</bean>

  其中数据连接池可以任意其他产品,只需要将相应的jar包拷贝到 ${activemq_home}/lib 目录下即可。

    2.2、配置 persistenceAdapter 标签

<persistenceAdapter>
<!-- createTablesOnStartup标签是否启动时创建数据表,默认值为true。一般第一次启动的时候true,后续修改为false -->
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/>
</persistenceAdapter>

 

三、ActiveMQ的桥接模式 (network)

  ${activemq_home}/conf/activemq.xml 配置文件中,broker标签中,增加如下:

<networkConnectors>
<networkConnectoruri="static://(tcp://ip:61616)"/>
</networkConnector>

  在两个ActiveMQ应用中,分别配置对方的地址信息,则两个Broker就通过一个static协议进行网络连接。一个consumer连接到Broker1上目的地A,当producer发送到Broker2上的相同目的地A的时候,消息会转发到Broker1上面供消费使用。

四、虚拟Topic(Virtual Topic)

  1、Topic的两种订阅模式

    以下测试。为了能够清晰的查看消息是否持久化,可以配置成JDBC方式的持久化方案,数据库查看。

    消息生产者

package com.cfang.prebo.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicPublisher; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { private static String brokerURL = "tcp://localhost:61618";
private static ConnectionFactory connectionFactory = null;
private static Connection connection = null;
private static Session session = null;
private static MessageProducer messageProducer = null; public static void main(String[] args) throws JMSException {
connectionFactory = new ActiveMQConnectionFactory(brokerURL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("TP_TEST_03");
messageProducer = session.createProducer(destination);
Message message = session.createTextMessage("test01");
messageProducer.send(message);
connection.close();
}
}

    1.1、普通订阅

      Topic不会持久化消息,如果不存在订阅关系,则Topic会直接丢弃消息,后续再订阅也不会收到之前的任何消息。

package com.cfang.prebo.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; /**
* @author cfang
* 普通订阅
*/
public class SimpleSubscribe { private static String brokerURL = "tcp://localhost:61618";
private static ConnectionFactory connectionFactory = null;
private static Connection connection = null;
private static Session session = null; public static void main(String[] args) throws JMSException {
connectionFactory = new ActiveMQConnectionFactory(brokerURL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination topic = session.createTopic("TP_TEST_03");
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
System.out.println(message);
}
});
}
}

  生产者发送消息,数据库不保存:

    1.2、持久化订阅

    持久化订阅者一定得预先加载一次,也就是向broker注册这个消费订阅者。之后,无论订阅者是否在线,最终都会收到消息。不在线的话,等到再次连接的时候,会将没有受过的消息全部接收处理。

package com.cfang.prebo.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; /**
* @author cfang
* 持久订阅
*/
public class DurableSubscribe { private static String brokerURL = "tcp://localhost:61618";
private static ConnectionFactory connectionFactory = null;
private static Connection connection = null;
private static Session session = null; public static void main(String[] args) throws JMSException {
connectionFactory = new ActiveMQConnectionFactory(brokerURL);
connection = connectionFactory.createConnection();
connection.setClientID("test01");
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("TP_TEST_03");
TopicSubscriber consumer = session.createDurableSubscriber(topic, "Test-subscriber");
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
System.out.println(message);
}
});
}
}

  运行上面订阅程序,然后关闭程序,订阅关系会保存到表 activemq_acks 中:

  

  发送消息,会持久化到表 activemq_msgs中,消息会在持久化订阅者在线的时候供消费使用:

  

    1.3、离线的持久订阅的清理

  

  上述配置说明:broker每小时检查并删除离线1天以上的持久订阅者。  

  其中标签、属性定义: offlineDurableSubscriberTimeout -  删除的离线的持久订阅者时间,单位毫秒,默认 -1

             offlineDurableSubscriberTaskSchedule - 声明扫描时间间隔,单位毫秒。

  2、虚拟Topic的应用

    Virtual Topic,对生产者来说是个topic,对消费者来说是个queue。内部处理机制是由Broker将接收到的消息进行二次分发到每个queue,然后不同的queue对应不同的应用实现持久化,不同的消费端只需要关注自己的queue就可以了。这样,对每个queue可以做应用内可以做failover处理,也可达到了对topic的多consumer共同处理。

    下面是简单的代码测试

package com.cfang.prebo.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory;
/**
* VirtualTopic生产者
*/
public class VirtualProducer { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61618");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("VirtualTopic.TP");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message message = session.createTextMessage("hello world");
message.setStringProperty("keys", "value-bak");
producer.send(message);
session.close();
connection.close();
}
}

  

package com.cfang.prebo.activemq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory;
/**
* VirtualTopic 消费端
*
*/
public class VirtualConsumer { public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61618");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destinationA = session.createQueue("Consumer.A.VirtualTopic.TP");
Destination destinationB = session.createQueue("Consumer.B.VirtualTopic.TP"); MessageConsumer consumerA1 = session.createConsumer(destinationA);
consumerA1.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
System.out.println("A1:"+message);
}
});
MessageConsumer consumerA2 = session.createConsumer(destinationA);
consumerA2.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
System.out.println("A2:"+message);
}
}); MessageConsumer consumerB = session.createConsumer(destinationB);
consumerB.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
System.out.println("B:"+message);
}
});
System.in.read();
}
}

  其中,消费端的A1和A2为一个应用,组成内部负载和failover。

  VirtualTopic的消费端queue名称的前后缀可通过配置文件${activemq_home}/conf/activemq.xml修改,一般也不需要修改前后缀,极少应用

  在 broker标签中,加入以下内容,则前缀就修改成了VT,之前例子中的消费端,监听的queue名称就改为 “VT.A.VirtualTopic.TP” 和 “VT.B.VirtualTopic.TP”

  selectorAware 属性表明 consumer 有selector 的话,则对消息进行过滤,只有符合selector的消息才会分发到queue中。

<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name=">" prefix="VT.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>

  

  总结点VirtualTopic:

    1、虚拟Topic是一种特殊命名的Topic,系统根据命名规则将该Topic内的消息分发给当前存在的名称对应的Queue,分发是非持久化的,新加入的Queue是接收不到过去的消息的。

    2、虚拟Topic的功能完全是中间件本身额外附加的机制,对于生产者和消费者都是无感知的。

    3、虚拟Topic是非持久化的,不存在积压。
 

最新文章

  1. Spring(6)—— AOP
  2. AOJ DSL_2_D Range Update Query (RUQ)
  3. 线段树---Atlantis
  4. celery 学习笔记 01-介绍
  5. 作为一个web开发人员,哪些技术细节是在发布站点前你需要考虑到的
  6. HDU How many integers can you find 容斥
  7. oracle 自定义函数
  8. 最全面的 MySQL 索引详解
  9. MSSQLSERVER未分离LDF删除情况下的MDF附加
  10. 浅谈PCB敷铜的“弊与利”
  11. Python的对象和类型
  12. Hadoop加zookeeper构建高可靠集群
  13. Spring配置注解详解
  14. (转)JVM内存组成及分配
  15. Java货币金额转换为大写形式
  16. 声明式开发 &amp; 命令式开发
  17. HttpWebRequest using Basic authentication
  18. json文件解析
  19. [转]SpringMVC+ Mybatis 配置多数据源 + 手动切换数据源
  20. 使用 Spring Boot 搭建一套增删改查(无多余代码)

热门文章

  1. 02 http和https协议
  2. 讲解开源项目:功能强大的 JS 文件上传库
  3. Okhttp3源码解析(2)-Request分析
  4. springboot整合html时的页面的跳转404
  5. 随笔编号-08 MYSQL 存储过程,5分钟执行调用过程;
  6. 09_if条件语句的使用
  7. vue+vscode+nodejs 开发环境搭建
  8. JMeter简介及使用JMeter来访问网站
  9. B-generator 1_2019牛客暑期多校训练营(第五场)
  10. @PathVariable性能损耗分析