1、RabbitMQListener,自定义消息监听器

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Exceptions; namespace MQ_Receive
{
/// <summary>
/// RabbitMq消息监听器
/// </summary>
public class RabbitMqListener
{
private ConnectionFactory _factory;
private IConnection _con;
private IModel _channel;
private EventingBasicConsumer _consumer; private readonly string _rabbitMqUri;
private readonly string _exchangeType;
private readonly string _exchangeName;
private readonly string _queueName;
private readonly string _routeKey;
private Func<string, bool> _messageHandler; /// <summary>
/// RabbitMQ消息监听器,若指定的队列不存在,则自动创建队列。并在消息交换机上绑定指定的消息路由规则(路由key)
/// </summary>
/// <param name="rabbitMqUri">连接串,如 amqp://guest:guest@localhost:5672/</param>
/// <param name="exchangeName">消息交换机</param>
/// <param name="exchangeType">交换机类型,如 ExchangeType.Direct</param>
/// <param name="queueName">要监听的队列</param>
/// <param name="routeKey">消息路由key</param>
public RabbitMqListener(string rabbitMqUri, string exchangeName, string exchangeType, string queueName, string routeKey = "")
{
this._rabbitMqUri = rabbitMqUri;
this._exchangeName = exchangeName;
this._exchangeType = exchangeType;
this._queueName = queueName;
this._routeKey = routeKey;
} /// <summary>
/// 创建连接
/// </summary>
private void CreateConnection()
{
_factory = new ConnectionFactory
{
Uri = new Uri(_rabbitMqUri),
RequestedHeartbeat = ,
AutomaticRecoveryEnabled = true,
TopologyRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds()
}; _con = _factory.CreateConnection();
_con.ConnectionShutdown += (_sender, _e) => ReMessageListen();//掉线重新连接并监听队列消息
} /// <summary>
/// 创建信道
/// </summary>
private void CreateChannel()
{
_channel = _con.CreateModel();
_channel.ExchangeDeclare(_exchangeName, _exchangeType, true, false, null);
_channel.QueueDeclare(_queueName, true, false, false, null); //创建一个消息队列,用来存储消息
_channel.QueueBind(_queueName, _exchangeName, _routeKey, null);
_channel.BasicQos(, , true); //在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息
} /// <summary>
/// 监听队列消息
/// </summary>
/// <param name="messageHandler">消息处理器,当监测到队列消息时回调该处理器</param>
/// <returns>监听状态</returns>
public bool MessageListen(Func<string, bool> messageHandler)
{
try
{
this.CreateConnection();
this.CreateChannel(); _consumer = new EventingBasicConsumer(_channel); //基于事件的消息推送方式
_consumer.Received += (_sender, _e) =>
{
string msg = Encoding.UTF8.GetString(_e.Body);
if (messageHandler != null)
{
this._messageHandler = messageHandler;
try
{
var isOk = this._messageHandler(msg);
if (isOk)
{
_channel.BasicAck(_e.DeliveryTag, false);
}
}
catch (Exception ex)
{
LoggerManager.ErrorLog.Error("消息处理器执行异常:" + ex.Message, ex);
}
}
}; _channel.BasicConsume(_queueName, false, _consumer); //手动确认
return true;
}
catch (Exception ex)
{
LoggerManager.ErrorLog.Error("尝试监听队列消息出现错误:" + ex.Message, ex);
}
return false;
} public void ReMessageListen()
{
try
{
//清除连接及频道
CleanupResource(); var mres = new ManualResetEventSlim(false); //初始化状态为false
while (!mres.Wait()) //每3秒监测一次状态,直到状态为true
{
if (MessageListen(_messageHandler))
{
mres.Set(); //设置状态为true并跳出循环
}
}
}
catch (Exception ex)
{
LoggerManager.ErrorLog.Error("尝试连接RabbitMQ服务器出现错误:" + ex.Message, ex);
}
} /// <summary>
/// 清理资源
/// </summary>
private void CleanupResource()
{
if (_channel != null && _channel.IsOpen)
{
try
{
_channel.Close();
}
catch (Exception ex)
{
LoggerManager.ErrorLog.Error("尝试关闭RabbitMQ信道遇到错误", ex);
}
_channel = null;
} if (_con != null && _con.IsOpen)
{
try
{
_con.Close();
}
catch (Exception ex)
{
LoggerManager.ErrorLog.Error("尝试关闭RabbitMQ连接遇到错误", ex);
}
_con = null;
}
}
}
}

2、调用代码

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Windows.Forms; namespace MQ_Receive
{
public partial class Form1 : Form
{
private delegate void ChangeText(string text);
private readonly ChangeText _changeText; private static string rabbitHostUri = "amqp://guest:guest@localhost:5672/";
private static string exchangeName = "order-exchange";
private static string queueName = "order-message-test-queue";
private static string routeKey = "order-message-routeKey";
private static readonly object lockObj = new object(); private static RabbitMQListener _listener;
public static RabbitMQListener RabbitMQListener
{
get
{
if (_listener == null)
{
lock (lockObj)
{
if (_listener == null)
{
_listener = new RabbitMQListener(rabbitHostUri, exchangeName, ExchangeType.Direct, queueName, routeKey);
}
}
} return _listener;
}
}
private Func<string, bool> MessageHandler
{
get {
return (msg) =>
{
this.label1.Invoke(_changeText, new object[] { msg });
return true;
};
}
} public Form1()
{
InitializeComponent();
this.label1.Text = "";
this._changeText = SetText;
} private void Form1_Load(object sender, EventArgs e)
{
RabbitMQListener.MessageListen(MessageHandler);
} private void SetText(string text)
{
this.label1.Text += text + "\n";
}
}
}

最新文章

  1. 【转】Caffe初试(六)激活层及参数
  2. 工作中的sql语句总结
  3. window7下安装第三方包报错及解决
  4. BZOJ 1061 志愿者招募(最小费用最大流)
  5. Linux下ps -ef和ps aux的区别及格式详解
  6. jquery冲突
  7. MySQL多实例配置
  8. Apache让一台虚拟主机接受多域名解析(转)
  9. UVA-514 Rails (栈)
  10. web系统数据导出功能设计实现(导出excel2003/2007 word pdf zip等)
  11. openstack controller ha测试环境搭建记录(八)——配置nova(控制节点)
  12. Redis Crackit漏洞防护
  13. 最新的爬虫工具requests-html
  14. 七、Json格式的对象都可以通过遍历来获得里面的value值
  15. 性能测试四十一:sql案例之慢sql配置、执行计划和索引
  16. 2017年天梯赛LV2题目汇总小结
  17. Yarn &amp;&amp; npm设置镜像源
  18. MFC笔记1
  19. 《深入理解Java虚拟机》读书笔记:Java内存区域
  20. c#实现word,excel转pdf代码及部分Office 2007文件格式转换为xps和pdf代码整理

热门文章

  1. Kubernetes Pod 资源限制
  2. vsdbg 下载方法 使用下载工具下载后手动安装
  3. PERFORM参数传递
  4. Visual Studio2017使用EF添加Mysql
  5. jmeter-分布式压测部署之负载机的设置
  6. badboy录制过程中出现当前页面的脚本发现错误
  7. java SSM 框架 微信自定义菜单 快递接口 SpringMVC mybatis redis shiro ehcache websocket
  8. [b0017] python 归纳 (三)_类名当参数传入
  9. vue 开发系列(九) VUE 动态组件的应用
  10. Linux设备管理(四)_从sysfs回到ktype【转】