Uwl.Admin.Core中使用RabbitMQ消息队列:

本文负责讲解RabbitMQ的使用

Uwl.Admin.Core使用的技术有:

  *、Async和Await 异步编程

  *、Repository + Service 仓储模式编程;仓储模式支持工作单元

  *、Swagger 前后端文档说明,基于RESTful风格编写接口

  *、Cors 简单的跨域解决方案

  *、JWT自定义策略授权权限验证

  *、依赖注入选择的是官方自带的DI注入,没有使用第三方框架,ORM使用EF Core,数据库使用的是Sql server,(后期会扩展MySql版本);

  *、AutoMapper 自动对象映射、

  *、Linq To Sql \ lambda表达式树查询;(表达式树查询是个人扩展的,表达式树的使用方法请参考Uwl.Data.Server.MenuServer的多条件查询)

  *、登录认证方式使用JWT认证方式,后台接口使用SwaggerUI展示,角色权限使用  自定义权限处理器PermissionHandler 继承与微软官方 IAuthorizationRequirement;

  *、Excel导入导出使用的是Epplus第三方框架,导入导出只需要配置Attribute特性就好,不需要在自己写列名;导出只支持List导出,暂时不支持Datatable;(Excel使用方法请参考UserController控制器)

  *、Rabbit MQ消息队列(目前暂无业务使用场景后期准备用来记录日志)

  *、Redis 轻量级分布式缓存;(Redis使用方法请参考Uwl.Data.Server.MenuServer类)

  *、QuartzNet第三方任务框架;(使用方法请参考类库Uwl.ScheduledTask.Job.TestJobOne类)

  *、IdentityServer4授权模式已开发完成,未发布演示服务器代码在github;(Identityserver4Auth分支)

RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。

RabbitMQ安装

请参考我的第一篇博客:

安装完成之后访问Web控制台

http://服务器ip:15672/ 注意配置防火墙,默认用户名密码都是guest,若新建用户一定要记得配置权限。guest仅限localhost访问,外网无法使用此账号!

.NET Core 使用RabbitMQ

通过nuget安装:https://www.nuget.org/packages/RabbitMQ.Client/

定义生产者.

本文的代码生产者是基础的消息队列生产者,源代码请看我的开源项目 UWl.Admin.Core

public class RabbitServer: IRabbitMQ
{
private IConnection connection;
private ConnectionFactory connectionFactory;
public RabbitServer()
{
try
{
connectionFactory = new ConnectionFactory()
{
UserName = Appsettings.app(new string[] { "RabbitMQConfig", "UserName" }),
Password = Appsettings.app(new string[] { "RabbitMQConfig", "Password" }),
HostName = Appsettings.app(new string[] { "RabbitMQConfig", "HostName" }),
AutomaticRecoveryEnabled= Convert.ToBoolean(Appsettings.app(new string[] { "RabbitMQConfig", "AutomaticRecoveryEnabled" })),
TopologyRecoveryEnabled= Convert.ToBoolean(Appsettings.app(new string[] { "RabbitMQConfig", "TopologyRecoveryEnabled" })),
};
}
catch (Exception)
{
throw;
}
} public IConnection GetConnection()
{
return this.connectionFactory.CreateConnection();
}
/// <summary>
/// RabbitMQ指定队列名称模式发送消息
/// </summary>
/// <param name="queuename">队列名字</param>
/// <param name="obj">传输数据</param>
public void SendData(string queuename, object obj)
{
connection = GetConnection();
if (obj == null)
return;
if (connection == null)
return;
if (queuename.IsNullOrEmpty())
return;
using (connection)
{
using (var channel= connection.CreateModel())
{
//声明一个队列 //队列模式 一共有四种
channel.QueueDeclare(queuename, false, false, false, null);
//第一个参数:预计大小,第二个参数每次读取几个,第三个参数是否本地
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
//交付模式
var prop = channel.CreateBasicProperties();
// 非持久性(1)或持久性(2)。
prop.DeliveryMode = 2;
//将对象转化为json字符串
var json = JsonConvert.SerializeObject(obj);
//将字符串转换为二进制
var bytes= Encoding.UTF8.GetBytes(json);
//开始传送
channel.BasicPublish("", queuename, prop,bytes);
}
}
}
}
定义消费者.

消费者我是使用.Net Core控制台程序来写的源代码放到了百度网盘请自行下载 RebbitMQDemo 链接: https://pan.baidu.com/s/1n9CaSiAuB9t63Fh_YIU78A 提取码: 3939

//创建连接工厂
ConnectionFactory factory = new ConnectionFactory
{
UserName = "wzw",
Password = "wzw",
HostName = "localhost"
};
//创建连接
var connection = factory.CreateConnection();
//创建通道
var channel = connection.CreateModel(); //接收到的消息处理事件
EventingBasicConsumer Recipient = new EventingBasicConsumer(channel);
Recipient.Received += (ch, ea) =>
{
var RecipientMsg = Encoding.UTF8.GetString(ea.Body);
Console.WriteLine($"后台处理方法收到消息:{RecipientMsg}");
//确认该消息已被处理
channel.BasicAck(ea.DeliveryTag, false);
Console.WriteLine($"消息已经处理【{ea.DeliveryTag}】");
};
channel.BasicConsume("hello", false, Recipient);
Console.WriteLine("后台处理方法已启动");
Console.ReadKey();
channel.Dispose();
connection.Close();

RabbitMQ消费失败的处理

RabbitMQ采用消息应答机制,即消费者收到一个消息之后,需要发送一个应答,然后RabbitMQ才会将这个消息从队列中删除,如果消费者在消费过程中出现异常,断开连接切没有发送应答,那么RabbitMQ会将这个消息重新投递。

使用RabbitMQ的Exchange

前面我们可以看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的情况是,生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)

AMQP协议中的核心思想就是生产者和消费者隔离,生产者从不直接将消息发送给队列。生产者通常不知道是否一个消息会被发送到队列中,只是将消息发送到一个交换机。先由Exchange来接收,然后Exchange按照特定的策略转发到Queue进行存储。同理,消费者也是如此。Exchange 就类似于一个交换机,转发各个消息分发到相应的队列中。

最新文章

  1. Oracle 11g RAC停止和启动步骤
  2. UE4 创建进程,打开额外程序 方法 笔记
  3. (转)IC验证概述
  4. NodeJS实例系列~环境搭建,Hello world归来!
  5. ReactNative环境配置
  6. Maven是如何工作的
  7. C#将HTML导出Excel
  8. Android开发之onClick事件的三种写法(转)
  9. SCOI2015题解 &amp;&amp; 考试小结
  10. sql server 2000 对应 sql server 2005的row_number()、rank()、DENSE_RANK( )、ntile( )等用法
  11. Java之JSON数据
  12. SpringBoot初步
  13. 数十种TensorFlow实现案例汇集:代码+笔记(转)
  14. [UVa 1326]Jurassic Remains
  15. Git与SVN的区别(面试常问)
  16. GIMP使用笔记
  17. Github上搭建个人博客记录
  18. 第161天:CSS3实现兼容性的渐变背景(gradient)效果
  19. CodeForces - 1015D
  20. lightoj 1306 - Solutions to an Equation 扩展的欧几里得

热门文章

  1. 洛谷1052——过河(DP+状态压缩)
  2. 云南农职《JavaScript交互式网页设计》 综合机试试卷⑤——简单分类菜单
  3. getRequestDispatcher 中请求转发和请求包含的使用说明
  4. Oracle 查询NULL字段/空字符串
  5. tomcat 可部署4种方式
  6. gitlab新增ssh
  7. Ubuntu 18.04 server安装+搭建Seacms v10.1网站
  8. 《剑指offer》面试题53 - I. 在排序数组中查找数字 I
  9. leetcode 347. 前 K 个高频元素
  10. CMake语法—普通变量与包含、宏(Normal Variable And Include、Macro)