构建高可用的ActiveMQ系统在生产环境中是非常重要的,对于这个apache的消息中间件实现高可用非常简单,只要在Apache ActiveMQ单点基本配置基础上做一次配置变更(如果在一台设备上部署多个AMQ,需要修改对应端口号),即可实现

AMQ实现高可用部署有三种方案: 
1、Master-Slave 
2、SharedFile System Master Slave 
3、JDBCMaster Slave

第一种方案由于只可以由两个AMQ实例组件,实际应用场景并不广泛; 
第三种方案支持N个AMQ实例组网,但他的性能会受限于数据库; 
第二种方案同样支持N个AMQ实例组网,但由于他是基于kahadb存储策略,亦可以部署在分布式文件系统上,应用灵活、高效且安全。

shared filesystem Master-Slave部署方式主要是通过共享存储目录来实现master和slave的热备,所有的ActiveMQ应用都在不断地获取共享目录的控制权,哪个应用抢到了控制权,它就成为master。

多个共享存储目录的应用,谁先启动,谁就可以最早取得共享目录的控制权成为master,其他的应用就只能作为slave。

Apache ActiveMQ单点基本配置的原配置内容:

<persistenceAdapter> 
            <kahaDB directory="${activemq.data}/kahadb"/> 
</persistenceAdapter>

修改为:

<persistenceAdapter> 
             <kahaDB directory="D:\\ActiveMQ Cluster\\shareBrokerData" enableIndexWriteAsync="true"  enableJournalDiskSyncs="false"/> 
</persistenceAdapter>

在D:\\ActiveMQ Cluster目录先创建shareBrokerData文件夹。

注意:

1.前面提到如果在一台设备上部署多个AMQ,需要修改对应端口号,如AMQ对外的监听端口61616和jetty的监听端口8161等。 
2.如果多套AMQ部署在不同的设备上,这里的directory应该指向一个远程的系统目录(分布式文件系统) 
3.客户端通过failover方式进行连接,多个AMQ实例地址使用英文逗号隔开,当某个实例断开时会自动重连,但如果所有实例都失效,failover默认情况下会无限期的等待下去,不会有任何提示。

下面为在一台设备上部署两个AMQ示例: 
ActiveMQ A 
1.activemq.xml修改监听端口:

<transportConnectors> 
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 
<!-- add &amp;wireFormat.maxInactivityDuration=0 --> 
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0" discoveryUri="multicast://default"/> 
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0"/>

</transportConnectors>

2.jetty.xml修改监听端口:

<property name="connectors"> 
            <list> 
                <bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector"> 
                    <property name="port" value="8166" /> 
                </bean> 
                <!-- 
                    Enable this connector if you wish to use https with web console 
                --> 
                <!-- 
                <bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector"> 
                    <property name="port" value="8162" /> 
                    <property name="keystore" value="file:${activemq.conf}/broker.ks" /> 
                    <property name="password" value="password" /> 
                </bean> 
                --> 
            </list> 
</property>

ActiveMQ B 
1.activemq.xml修改监听端口:

<transportConnectors> 
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 
<!-- add &amp;wireFormat.maxInactivityDuration=0 --> 
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0" discoveryUri="multicast://default"/> 
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0"/>

</transportConnectors>

2.jetty.xml修改监听端口:

<property name="connectors"> 
            <list> 
                <bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector"> 
                    <property name="port" value="8166" /> 
                </bean> 
                <!-- 
                    Enable this connector if you wish to use https with web console 
                --> 
                <!-- 
                <bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector"> 
                    <property name="port" value="8162" /> 
                    <property name="keystore" value="file:${activemq.conf}/broker.ks" /> 
                    <property name="password" value="password" /> 
                </bean> 
                --> 
            </list> 
</property>

Java测试程序代码: 
1.Producer:

import javax.jms.Connection; 
import javax.jms.DeliveryMode; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory; 
   
public class ProducerTool { 
   
    private String subject = "TOOL.DEFAULT";    
   
    private Destination destination = null;    
   
    private Connection connection = null;    
   
    private Session session = null;    
   
    private MessageProducer producer = null;    
   
    // 初始化 
    private void initialize() throws JMSException, Exception {    
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616?wireFormat.maxInactivityDuration=0,tcp://172.16.30.11:61617?wireFormat.maxInactivityDuration=0)");    
        connection = connectionFactory.createConnection();    
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);   
        destination = session.createQueue(subject);    
        producer = session.createProducer(destination);    
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
         
    }    
   
    // 发送消息    
    public void produceMessage(String message) throws JMSException, Exception {    
        initialize();    
        TextMessage msg = session.createTextMessage(message);    
        connection.start();    
        System.out.println("Producer:->Sending message: " + message);    
        producer.send(msg);    
        System.out.println("Producer:->Message sent complete!");    
    }    
   
    // 关闭连接     
    public void close() throws JMSException {    
        System.out.println("Producer:->Closing connection");    
        if (producer != null)    
            producer.close();    
        if (session != null)    
            session.close();    
        if (connection != null)    
            connection.close();    
   }    
}

import javax.jms.Connection; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Session; 
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory; 
   
public class ConsumerTool implements MessageListener {      
   
    private String subject = "TOOL.DEFAULT";    
   
    private Destination destination = null;    
   
    private Connection connection = null;    
   
    private Session session = null;    
   
    private MessageConsumer consumer = null;    
   
    // 初始化    
    private void initialize() throws JMSException, Exception {    
       ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616,tcp://172.16.30.11:61617)"); 
        connection = connectionFactory.createConnection();    
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);   
        destination = session.createQueue(subject);    
        consumer = session.createConsumer(destination);    
            
    }    
   
    // 消费消息       
    public void consumeMessage() throws JMSException, Exception {    
        initialize();    
        connection.start();    
            
        System.out.println("Consumer:->Begin listening...");    
        // 
        consumer.setMessageListener(this);    
        // Message message = consumer.receive();    
    }    
   
    // 关闭连接   
    public void close() throws JMSException {    
        System.out.println("Consumer:->Closing connection");    
        if (consumer != null)    
            consumer.close();    
        if (session != null)    
            session.close();    
        if (connection != null)    
            connection.close();    
    }    
   
    // 消息处理函数  
    public void onMessage(Message message) {    
        try {    
            if (message instanceof TextMessage) {    
                TextMessage txtMsg = (TextMessage) message;    
                String msg = txtMsg.getText();    
                System.out.println("Consumer:->Received: " + msg);    
            } else {    
                System.out.println("Consumer:->Received: " + message);    
            }    
        } catch (JMSException e) {    
            // TODO Auto-generated catch block    
            e.printStackTrace();    
        }    
    }    
}

2.Consumer:

import javax.jms.Connection; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Session; 
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory; 
   
public class ConsumerTool implements MessageListener {      
   
    private String subject = "TOOL.DEFAULT";    
   
    private Destination destination = null;    
   
    private Connection connection = null;    
   
    private Session session = null;    
   
    private MessageConsumer consumer = null;    
   
    // 初始化    
    private void initialize() throws JMSException, Exception {    
       ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616,tcp://172.16.30.11:61617)"); 
        connection = connectionFactory.createConnection();    
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);   
        destination = session.createQueue(subject);    
        consumer = session.createConsumer(destination);    
            
    }    
   
    // 消费消息       
    public void consumeMessage() throws JMSException, Exception {    
        initialize();    
        connection.start();    
            
        System.out.println("Consumer:->Begin listening...");    
        // 
        consumer.setMessageListener(this);    
        // Message message = consumer.receive();    
    }    
   
    // 关闭连接   
    public void close() throws JMSException {    
        System.out.println("Consumer:->Closing connection");    
        if (consumer != null)    
            consumer.close();    
        if (session != null)    
            session.close();    
        if (connection != null)    
            connection.close();    
    }    
   
    // 消息处理函数  
    public void onMessage(Message message) {    
        try {    
            if (message instanceof TextMessage) {    
                TextMessage txtMsg = (TextMessage) message;    
                String msg = txtMsg.getText();    
                System.out.println("Consumer:->Received: " + msg);    
            } else {    
                System.out.println("Consumer:->Received: " + message);    
            }    
        } catch (JMSException e) {    
            // TODO Auto-generated catch block    
            e.printStackTrace();    
        }    
    }    
}

3.Main

import javax.jms.JMSException; 
   
public class Test {    
   
    /**   
     * @param args   
     */   
    public static void main(String[] args) throws JMSException, Exception { 
    
        
        ConsumerTool consumer = new ConsumerTool();    
        ProducerTool producer = new ProducerTool();    
        // 开始监听    
        consumer.consumeMessage();    
            
        // 延时500毫秒之后发送消息    
        Thread.sleep(500);    
        producer.produceMessage("Hello, world!");    
        producer.close();    
            
        // 延时500毫秒之后停止接受消息    
        Thread.sleep(500);    
        consumer.close();    
    
    }    
}

ActiveMQ A 启动界面:

ActiveMQ B 启动界面:

AMQ A先启动,先锁文件,当AMQ B启动是,不能锁文件,但会不断的监听等待。

运行Java Test程序日志:

10:22:43.745 INFO  [] org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://172.16.30.11:61616 
Consumer:->Begin listening... 
10:22:45.623 INFO  [] org.apache.activemq.transport.failover.FailoverTransport - Successfully connected to tcp://172.16.30.11:61616?wireFormat.maxInactivityDuration=0 
Producer:->Sending message: Hello, world! 
Producer:->Message sent complete! 
Producer:->Closing connection 
Consumer:->Received: Hello, world! 
Consumer:->Closing connection

ActiveMQ A 管理界面:

异常处理:

配置好ActiveMQ后,前几次都启动成功。有一天启动时发现启动不成功,查看报错日志发现出现如下提示: 
Failed to start Apache ActiveMQ (localhost, ID:*-PC-*-*-0:1). Reason: java.io.IOException: Transport Connector could not be registered in JMX: Failed to bind to server socket: tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600 due to: java.net.BindException: Address already in use: JVM_Bind。

1.先去查看是不是端口被占用,用netstat -ano命令查看端口使用情况,发现没有端口被占用。 
2.在控制面板的服务里把正在运行的Internet Connection Sharing (ICS)为家庭和小型办公网络提供网络地址转换、寻址、名称解析和/或入侵保护服务关了,他占用着端口。 
3.把此服务关了后再启动ActvieMQ成功了。

最新文章

  1. Win10系统菜单打不开问题的解决,难道是Win10的一个Bug ?
  2. Git Push 避免用户名和密码方法
  3. Introducing Windows 10 Editions(Windows10版本介绍)
  4. 【uTenux实验】邮箱
  5. ubuntu 16.04软件源
  6. Python文件操作详解
  7. 2015年第2本(英文第1本):《The Practice of Programming》
  8. zoj 3513 Human or Pig 博弈论
  9. wust 1061 链表的合并
  10. iOS 用命令实现简单的打包过程
  11. ActiveMQ中Session设置的相关理解
  12. 根据控件Id得到控件
  13. Eclipse无法打开“Failed to load the JNI shared library”
  14. Linux中Samba详细安装
  15. route命令实例练习
  16. IT智力面试题
  17. Linux内核态用户态相关知识 &amp; 相互通信
  18. 一文让你完全弄懂Stegosaurus
  19. .net连接ORACLE数据库
  20. Keepalived + Nginx + Tomcat 的高可用负载均衡架构搭建

热门文章

  1. 九、springcloud之服务网关zuul(二)
  2. 十、springboot之web开发打包生产
  3. Gradle教程链接
  4. idea中JDK失效
  5. echarts 移动端地图数据可视化开发教程
  6. Nginx - 压缩模块
  7. CxGrid 表格列内容居中
  8. ZOJ 3469 Food Delivery(区间DP好题)
  9. (三)Rest风格的资源URL
  10. sp_executesql动态执行sql语句并将结果赋值给一变量