最近,项目中使用到了ActiveMQ获取第三方推送过来的数据。具体背景是:公司需要监控全国各地车辆实时运行的GPS数据,但监控本身不是公司做的,而是交给第三方公司做,第三方采集GPS数据后推送给我们。全国各地,近万台车辆,每台车辆每隔几秒就发送一次GPS位置数据,如果我们提供API给第三方公司去调用,显然无论是第三方还是我们这边,服务器都是是扛不住的,这么做也是不合理的,于是,便采取了消息队列,第三方采集到的数据直接推送到消息队列代理服务器,而己方从消息队列服务器取数据处理。以下对项目实践及其中遇到的一些问题及解决进行概要总结。

1、ActiveMQ NMS简介

关于NMS,这里主要谈两点。

NMS API:ActiveMQ定义的一套API接口规范,你可以理解为一个API的接口,它指明了生产者或消费者如何与消息队列服务器通信。

NMS Providers:NMS API的具体实现,基于Windows或ActiveMQ下的各种协议,提供了各种实现,目前提供了ActiveMQ、STOMP、MSMQ、EMS、WCF、AMQP、MQTT、XMS几种实现。具体项目中,我采取的是ActiveMQ实现。

至于消息队列涉及到的其他概念,什么Broker、Queue、Topic、Producer、Consumer,这里不做介绍,各位可以自己查资料,这些 概念本身也不难理解的。

2、关于异常下的Broker重连

  这个异常,可能是由于网络异常,也可能是长时间没有通信,Broker把Client给断掉了,不去管它。起初,这个项目是从一位离职员工的手头接过来的,给的说法是只需要维护就够了,基本上不用调整。当时虽然说是做了重连,后来发现,就跟没做一样。发现这个,是起源于第三方频繁通知,MQ队列有积压,通知我们尽快处理。项目拿到手一看,我勒个去,直接起了一个Timer在那儿定时监控Connection状态,如果状态不对立刻重新打开连接。先不说Socket连接的浪费情况,及Timer这个.NET中近乎Bug的一个东西,这种做法实际中行之无效,因为连接异常情况下再打开,往往是打开失败的,比如上次异常连接没有关闭,状态不对,或者ClientID暂时没被Broker释放等。

  于是,针对重连,开始做第一次优化。查了IConnection元数据,发现有个ExceptionListener可用,于是便想到利用这个事件来监听并重连。改完上线,可第二天一大早过来,发现MQ又挤压了,重连时效了,打开日志看到,记录了ExceptionListener事件日志,但重连没有成功,具体原因,我想可能和优化前是一样的吧。这折腾前后完全没区别。这时候,我想,不能在现有做法里边去整了,必须回到NMS本身去整,堂堂Apache开源项目,一定有更好的重连机制,放着不用自己整,是不是傻。。。

  于是,打开官网,如愿以偿,找到了failover这个东西:

根据描述,链接异常时,随机从配置的Broker列表总选取一个进行重连。这是个好东西,于是,Broker的链接配置,由tcp://183.56.131.224:61616调整为failover:(tcp://183.56.131.224:61616)。这里没有多个Broker,只有一个,所以配置一个也是没有问题的,我们重点是利用failover。与此同时,OpenWire上发现了maxInactivityDuration这个配置项,官网描述如下:

这个也不错,配置Connection闲置多久被Broker断掉。我这里比较狠,反正Broker那个队列出了第三方往里边推,就我这儿一个人在消费,直接配置0算了,永不被杀,出问题了重连,岂不爽哉。于是,Broker进一步调整为:failover:(tcp://183.56.131.224:61616)?wireFormat.maxInactivityDuration=0。此机制我也自己写Demo验证过,无论是Broker突然停掉再开启,还是Producer停掉再开启,Consumer均能成功重连的。至此,MQ的可靠重连问题算是解决了。

3、进程重启导致Consumer链接失败

  具体情境是这样的:MQ消费者进程是寄宿在Windows服务中的,运维那边做测试或维护,会在MQ运行正常的情况下直接重启服务,有时候会重启失败,过阵子启动,又成功了。我过去,打开Windows事件日志,说是ClientID被占用,也就是说瞬间重启时候,Broker端暂时没来得及断开或者释放该ClientID对应的Connection,而我们系统中ClientID是配置死的。我又验证了下,正常运行下,先关闭服务,过几秒再开启,就没这问题,也印证了自己的推断。问题是找到了,但总不能告诉运维,每次先停止服务,再打开,不能用重启吧,哪个开发要是这样跟我说,那他妈也太不靠谱了。

  解决方案就是,ClientID动态生成,每次启动都不一样,这个ClientID仅仅是Broker用来标识一个连接端的,随便什么都无所谓,只要跟上次不一样。项目中取的是当前时分秒字符串。如下:

_connection.ClientId = DateTime.Now.ToString("yyyyMMddHHmmss");

调整完上线,再试验,那问题再无复现。

4、服务启动时间过长的问题

  随着各种奇葩情况继续出现,我这里继续被操。具体场景是:鉴于是跟第三方合作,各种第三方服务器宕机,各种网络不靠谱,你懂的。如果是消费者进程已经启动成功了,那第三方或者网络不靠谱了,我们利用2中的重连机制就已经可以了,无非就是等他们靠谱了我们自动重连上就是了。可问题是,如果第三方不靠谱,或者网络不靠谱时,我们在启动消费者Windows服务,那会出现什么情况呢?给大家实际演示下:

目前,我我的服务安装后,是这样的:

假设正常链接配置是这样的:

failover:(tcp://183.56.131.224:61616)?wireFormat.maxInactivityDuration=0

为了模拟外界异常或不可达的情况,我手动设置为如下:

failover:(tcp://183.56.131.200:61616)?wireFormat.maxInactivityDuration=0

大家注意,那个Broker地址是不可达的。

开启服务,其结果如下:

这个启动界面,你就等着吧,等个两三分钟,结果如下:

更要命的是,点击确定后,服务启动结果如下:

这就比较操蛋了,你启动失败就失败把,别给我整成启动状态啊,这不误导人么。

  一般,这种情况,就属于启动进程一直卡主,当服务启动超时时,就会出现这种情况, 启动强行被Windows终止,但那个标记为启动状态这个就不好理解,也比较坑了。这是必须要处理的事情,否则极易造成误导。对于Windows服务本身启动机制,你是没办法做任何事情的,那只能从MQ链接机制去干事情。最终, 经过查询官网文档, 再次如愿以偿,找到了以下两个配置项:

这两个配置项分别代表,启动时最大重连尝试次数,默认值0,代表无限重连,我们出问题就出现在这里,链接不上时无限重试,无限重试无限连接不上,无限链接不上再无限重试。。。然后,进程阻塞,阻塞到一定时间,Windows服务重启失败。这个我也在Connection open时候打断点调试过,确实阻塞了。那么第二个配置项代表一项操作超时时间。问题找到了,那么自然也就有解决方案了,现把链接配置为如下:

failover:(tcp://183.56.131.200:61616)?wireFormat.maxInactivityDuration=0&transport.startupMaxReconnectAttempts=3&transport.timeout=3000

这里有两点要注意:

1)原本,这里配置应该是failover:(tcp://183.56.131.224:61616)?wireFormat.maxInactivityDuration=0&transport.startupMaxReconnectAttempts=3&transport.timeout=3000,但在配置文件中, &符号是不支持的,必须转义或替换,这里采取了实体替换,具体的是&这个鬼实体符;

2)NMS.ActiveMQ v1.4.0以上版本和以前以及其他语言实现版本不大相同,1.4以上版本配置这两项参数时必须有transport前缀。这里当时也是吃过亏的。

配置调整完毕后,我们再用 这个无效地址启动服务,在经过60S以内的启动时间,画风变成了这样:

点击确定:

这个时间,和transport.timeout、transport.initialReconnectDelay、transport.startupMaxReconnectAttempts等几项配置有关。但起码时间不会像之前那样很久,并且最终Windows服务状态显示为启动了。

5、总结

  鉴于这是公司实际运作项目,就不上传代码了,如果是自己的Demo,一定毫不保留,望各位见谅。实际上,也没什么特别的,大家平时遇到这种难缠的问题,多查官网文档,官网文档搞不定,再查源码,配合动手实践,一般都不会是问题的。幸运的是,虽然很多官网文档都是英文,但绝大部分都通俗易懂,我们看上去,也都不费事儿的。

附:ActiveMQ生产者及消费者示例代码

生产者:

/// <summary>
/// 生产者启动器
/// </summary>
public class ProducerBootstrap
{
#region Private Fields private readonly IConnectionFactory _connectionFactory = null;
private IConnection _connection = null;
private IMessageProducer _producer = null; #endregion #region Constructors public ProducerBootstrap()
{
_connectionFactory = new ConnectionFactory("tcp://localhost:61616");
} #endregion #region Public Methods public void Start()
{
_connection = _connectionFactory.CreateConnection();
_connection.ExceptionListener += _connection_ExceptionListener;
_connection.Start();
ISession sesison = _connection.CreateSession();
_producer = sesison.CreateProducer(new ActiveMQQueue("guokun"));
} public void Stop()
{
_connection.Stop();
_connection.Close();
_connection.Dispose();
} public void SendMessage()
{
while (true)
{
ITextMessage message = _producer.CreateTextMessage();
message.Text = string.Format("数据:{0}", DateTime.Now);
_producer.Send(message);
Thread.Sleep();
Console.WriteLine(message.Text);
}
} #endregion #region Private Methods private void _connection_ExceptionListener(Exception exception)
{
Console.WriteLine("生产者发生异常:{0}", exception);
} #endregion
}

消费者:

public class ConsumerBootstrap
{
#region Private Fields private readonly IConnectionFactory _connectionFactory = null;
private IConnection _connection = null;
private IMessageConsumer _consumer = null; #endregion #region Constructors public ConsumerBootstrap()
{
_connectionFactory = new ConnectionFactory("failover:(tcp://localhost:61616)?wireFormat.maxInactivityDuration=0&transport.timeout=3000&transport.startupMaxReconnectAttempts=2");
} #endregion #region Public Methods public void Start()
{
_connection = _connectionFactory.CreateConnection();
_connection.ClientId = "guokun";
_connection.ExceptionListener += _connection_ExceptionListener;
_connection.Start();
ISession session = _connection.CreateSession();
_consumer = session.CreateConsumer(new ActiveMQQueue("guokun"));
_consumer.Listener += _consumer_Listener; Console.WriteLine("消费者启动成功...");
} public void Stop()
{
_connection.Stop();
_connection.Close();
_connection.Dispose();
} #endregion #region Private Methods /// <summary>
/// 消息监听处理
/// </summary>
/// <param name="message"></param>
private void _consumer_Listener(IMessage message)
{
ITextMessage textMessage = message as ITextMessage;
Console.WriteLine("{0}-{1}", DateTime.Now, textMessage.Text);
} private void _connection_ExceptionListener(Exception exception)
{
Console.WriteLine("生产者发生异常:{0}", exception);
} #endregion
}

最新文章

  1. [SharePoint 2013] Automatic deployment script
  2. JCEF3——谷歌浏览器内核Java版实现(一):使用jawt获取窗体句柄
  3. 第十二篇 SQL Server代理多服务器管理
  4. 使 div 元素看上去像一个按钮
  5. 【转】c++内存泄露检测,长文慎入!
  6. 【SQL server】安装和配置
  7. ###Linux基础 - 3
  8. 孤陋寡闻又一遭:ReportEvent API函数(有微软Service官方例子为例)
  9. [Javascript] Using console.count to Count Events
  10. sim卡中电话本(ADN)的简要格式
  11. [jstips]undefined和null的区别
  12. Spring 3.0 Aop 入门
  13. linux.go
  14. applicationContext.xml 模板
  15. mybatis拦截器处理
  16. 渗透常用dos命令,http协议及数据提交方式。 hack 某某
  17. 洛谷P3225 HNOI2012 矿场搭建
  18. tls 流量画像——直接使用图像处理的思路探索,待进一步观察
  19. 3、eclipse集成svn
  20. spring中获取applicationContext(2)

热门文章

  1. jQuery数字加减插件
  2. 怎样在Upstart机制下的系统中加入upstart事件型的任务
  3. solr主从复制
  4. Scala 的 Web 框架 Lift 开始 3.0 版本开发
  5. 必须掌握的JavaScript基本知识
  6. C#泛型回顾点滴
  7. 第三方控件netadvantage UltraWebGrid如何生成带加号多级表数据也就是带子表
  8. DEBUG不能进断点 “exited with code -1073741515”
  9. C#继承关系中【方发表】的创建和调用
  10. javascript-无间缝滚动,封装