C# .net 使用rabbitmq消息队列——EasyNetQ插件介绍
EasyNetQ 是一个简洁而适用的RabbitMQ .NET类库,本质上是一个在RabbitMQ.Client之上提供服务的组件集合。
应用使用rabbitmq需要经过总线接口IBus或者IAdvanceBus,大部分时候我们使用的是IBus,它提供了三种消息模式:Publish/Subscribe, Request/Response和 Send/Receive,可以满足大多数需求。
EasyNetQ规定,每一个你想发送的消息必须用一个class表示,简单说就是一个实体类。默认情况下,当你发布一个消息,EasyNetQ会检查消息类型,基于类型名称、命名空间和程序集创建交换机和队列,当然也可以使用指定的名称去创建交换机和队列,而消息默认使用Newtonsoft.Json 序列化成JSON格式,当然我们也可以替换成自己序列化方式,实现很简单,比如我们要使用xml格式序列化,因为EasyNetQ内部使用Ioc容器组织在一起,那么我们只需要实现ISerializer接口并注册到DI容器中即可。
1、EasyNetQ安装
使用Nuget搜索EasyNetQ直接安装即可:
2、EasyNetQ连接
using System; namespace EasyNetQ.Demo
{
public class TextMessage
{
public string Text { get; set; }
}
class Program
{
static void Main(string[] args)
{
//方式1
var connectionString = "host=192.168.18.129;virtualHost=/;username=admin;password=123456;timeout=60";
var bus1 = RabbitHutch.CreateBus(connectionString);
//方式2
HostConfiguration host = new HostConfiguration();
host.Host = "192.168.18.129";
host.Port = 5672; ConnectionConfiguration connection = new ConnectionConfiguration();
connection.Port = 5672;
connection.Password = "123456";
connection.UserName = "admin";
connection.VirtualHost = "/";
connection.Timeout = 60;
connection.Hosts = new HostConfiguration[] { host }; var bus2 = RabbitHutch.CreateBus(connection, services => { }); //使用bus实现业务
//关闭连接字需要调用bus的Dispose方法
Console.ReadKey();
}
}
}
其实EasyNetQ连接过程其实就是IBus的创建过程,IBus的创建有两种方式,一种是以连接字符串的形式,类似数据库的连接,一种是通过ConnectionConfiguration来创建,其实连接字符串最终也是构建这个对象来连接rabbitmq,而连接字符串的连接类型主要有一下几个:
host 这个字段是必选的。如要具体指定你要连接服务器端口,你用标准格式 host:port。假如你省略了端口号,AMQP默认端口是5672。如果连接到RabbitMQ集群,需要指定每一个集群节点用逗号(.)分隔
virtualhost 默认虚拟主机是’/’
username 默认是'guest'
password 默认为'guest'
requestedHearbeat 默认为10秒钟。没有心跳设置为0
prefetchcount 默认为50.这个值是在EasyNetQ发送ack之前发送给RabbitMQ的消息数。不限制设置为0(不推荐). 为了在消费者之间保持公平和平衡设置为1.
persistentMessages 默认为true。这个决定了在发送消息时采用什么样的delivery_mode。设置为true,RabbitMQ将会把消息持久化到磁盘,并且在服务器重启后仍会存在。设置为false可以提高性能收益。
timeout 模式值为10秒。不限制超时时间设置为0.当超时事时抛出System.TimeoutException.
关闭连接字需要调用Dispose方法即可,所以,在使用IBus时,切记不要无脑的使用using
3、EasyNetQ模式:Publish/Subscribe
using System; namespace EasyNetQ.Demo
{
public class TextMessage
{
public string Text { get; set; }
}
class Program
{
static void Main(string[] args)
{
var connectionString = "host=192.168.18.129;virtualHost=/;username=admin;password=123456;timeout=60"; //订阅/Subscribe
{
var bus = RabbitHutch.CreateBus(connectionString);
bus.Subscribe<TextMessage>("subscriptionId", tm =>
{
Console.WriteLine("Recieve Message: {0}", tm.Text);
});
} //发布/Publish
using (var bus = RabbitHutch.CreateBus(connectionString, registerServices => { }))
{
var input = "";
Console.WriteLine("Please enter a message. 'q'/'Q' to quit.");
while ((input = Console.ReadLine()).ToLower() != "q")
{
bus.Publish(new TextMessage
{
Text = input
});
}
}
}
}
}
这个例子是EasyNetQ三种模式中的Publish/Subscribe,上面的代码执行后,在rabbitmq中会创建一个名为 EasyNetQ.Demo.TextMessage, EasyNetQ.Demo 的交换机和一个名为 EasyNetQ.Demo.TextMessage, EasyNetQ.Demo_subscriptionId 的队列,当然,交换机和队列的名称也可以自定义。
发布订阅模式有主要有以下特点:
1、发布消息使用Publish方法
bus.Publish(new TextMessage
{
Text = input
});
一个消息类型对应一个交换机,如果交换机不存在,会自动创建一个topic类型的交换机,默认情况下,交换机名称为消息类型的全限定名(命名空间.类型,程序集名),例如上面的例子将会创建名为EasyNetQ.Demo.TextMessage, EasyNetQ.Demo 的交换机,当然,名称可以自定义。
2、订阅消息调用Subscribe方法
bus.Subscribe<TextMessage>("subscriptionId", tm =>
{
Console.WriteLine("Recieve Message: {0}", tm.Text);
});
每次调用Subscribe方法都会去创建一个新的队列(如果不存在), Subscribe方法有一个subscriptionId参数,默认情况下,队列名称为全限定名(命名空间.类型,程序集名)+subscriptionId,而且队列将绑定到对应的交换机,默认绑定的路由是#,即队列可以接受所有发布到交换机的消息,例如上面的例子会创建名为 EasyNetQ.Demo.TextMessage, EasyNetQ.Demo_subscriptionId 的队列,绑定的路由是#。队列名和路由均可自定义。
因为Subscribe方法的subscriptionId参数,在未明确指定队列的情况下,如果使用同一个subscriptionId参数,则表示从同一个队列消费,因此可以使用subscriptionId来区分队列。
注:因为队列是在Subscribe方法执行后创建的,如果未调用Subscribe方法,即队列还未创建,那么调用Publish方法时,只会生成一个交换机,而且发布的消息将会丢失。
3、队列名有两种方式自定义。
一种是使用QueueAttribute特性标识消息类型,指定队列名和交换机名,这个也是交换机自定义名称的方式
[Queue("my_queue_name", ExchangeName = "my_exchange_name")]
public class TextMessage
{
public string Text { get; set; }
}
当使用特性声明队列名称时,真实创建的队列名其实是:my_queue_name + subscriptionId,如上面的例子会创建名为my_queue_name_subscriptionId的队列。
另一种是在使用Subscribe方法订阅消息是指定,当在这里指定时,subscriptionId就无效了,生成的队列名就是自定义的名称,不带subscriptionId。
bus.Subscribe<TextMessage>("subscriptionId", tm =>
{
Console.WriteLine("Recieve Message: {0}", tm.Text);
}, cfg =>
{
cfg.WithQueueName("my_queue_name")//生成的队列名是my_queue_name,不带subscriptionId
.WithTopic("a#");//路由匹配以a开头的路由,WithTopic是可以多次调用,让队列可以以不同的路由绑定到交换机
});
另外,这种方式还可以指定队列绑定到交换机的路由,当订阅处指定路由后,需要在发布消息时也指定对应路由才能将消息发布到队列。
bus.Publish(new TextMessage
{
Text = input
}, "a#");//以a#为路由发布消息
或者
bus.Publish(new TextMessage
{
Text = input
}, cfg =>
{
cfg.WithTopic("a#");//以a#为路由发布消息
});
4、可以订阅消息,当然也可以取消订阅,当使用Subscribe订阅消息时,会返回一个ISubscriptionResult,取消订阅只需要调用ISubscriptionResult的Dispose方法即可
subscriptionResult.Dispose();
// 这个等价与 subscriptionResult.ConsumerCancellation.Dispose();
调用Dispose方法将停止EasyNetQ对队列的消费,并且关闭这个消费者的channel。
注意:IBus和IAndvancedBus的dispose,能够取消所有消费者,并关闭对RabbitMQ的连接。
5、消息的发布订阅也提供了异步操作,我们可以根据自己的需求来决定是否使用
//异步发布
bus.PublishAsync(new TextMessage
{
Text = input
});
//异步订阅
bus.SubscribeAsync<TextMessage>("subscriptionId", async tm =>
{
await Task.Run(() =>
{
Console.WriteLine("Recieve Message: {0}", tm.Text);
});
});
6、消息的发布与订阅过程不必再同一个项目中,也不需要使用同一个IBus,只需要注意发布订阅过程中的交换机和队列即可,因为默认情况下,交换机名和队列名都是根据消息类型生成,所以此时最好具体指明交换机名和队列名。
4、EasyNetQ模式:Request/Response
using System; namespace EasyNetQ.Demo
{
public class MyRequest
{
public string Text { get; set; }
}
public class MyResponse
{
public string Text { get; set; }
}
class Program
{
static void Main(string[] args)
{
var connectionString = "host=192.168.18.129;virtualHost=/;username=admin;password=123456;timeout=60"; //响应/Respond
{
var bus = RabbitHutch.CreateBus(connectionString);
bus.Respond<MyRequest, MyResponse>(request =>
{
return new MyResponse { Text = "Respond: " + request.Text };
});
}//请求/Request
using (var bus = RabbitHutch.CreateBus(connectionString))
{
var input = "";
Console.WriteLine("Please enter a message. 'q'/'Q' to quit.");
while ((input = Console.ReadLine()).ToLower() != "q")
{
var response = bus.Request<MyRequest, MyResponse>(new MyRequest
{
Text = input
}, cfg => { });
Console.WriteLine(response.Text);
}
} }
}
}
Request/Response模式类似于请求响应的过程——发送一个请求到服务器,服务器然后处理请求后返回一个响应。上述代码执行后,会在rabbitmq中创建一个名为easy_net_q_rpc的交换机,但是类型是direct,另外还会创建两个队列,名为EasyNetQ.Demo.MyRequest, EasyNetQ.Demo和easynetq.response.xxxxxxxxxxxxxxx。
请求响应模式的特点:
1、通过Request方法发送请求
var response = bus.Request<MyRequest, MyResponse>(new MyRequest
{
Text = input
}, cfg => { });
请求之后会创建一个名为easy_net_q_rpc的交换机(如果不存在),类型是direct,同时,当前线程会阻塞,等待消息被消费处理,或者等待一段时间后(如例子中连接字符串中的timeout=60),会抛出System.TimeoutException异常。另外这里还会生成一个easynetq.response.xxxxxxxxxxxxxxx的队列,这是一个auto-delete类型的队列,同时与当前连接绑定,当连接关闭后,这个队列就会自动删除,而且这个队列将会自动绑定到easy_net_q_rpc交换机,路由名就是队列名。。
2、通过Respond方法处理请求发送的消息
bus.Respond<MyRequest, MyResponse>(request =>
{
return new MyResponse { Text = "Respond: " + request.Text };
});
Respond方法执行后,会生成一个队列,队列名是请求消息的全限定名(命名空间.类型,程序集名),同时队列将自动绑定到easy_net_q_rpc交换机,路由名就是队列名,之后所有该请求类型的消息都将发送到这个队列。而且Respond方法也将从这个队列获取消息消费。另外,我们是可以指定这个队列名:
bus.Respond<MyRequest, MyResponse>(request =>
{
return new MyResponse { Text = "Respond: " + request.Text };
}, cfg =>
{
cfg.WithQueueName("my_queue_name");
});
只有当请求发送的消息被Respond方法中的处理过程处理后,Request才能得到结果继续执行,所以,尽可能的不要在Respond方法中写过多的耗时操作。
注:因为队列是在Respond方法执行后创建的,如果在未调用Respond方法时,即队列未创建,那么Request方法发送的消息将会丢失,而Request方法发送消息后将会造成线程阻塞,那么最终的结果就是一直等待到System.TimeoutException异常抛出。
3、Request方法和Respond方法都提供了异步操作
//异步请求
var task = bus.RequestAsync<MyRequest, MyResponse>(new MyRequest
{
Text = input
}, cfg => { });
task.Wait();
//异步响应
bus.RespondAsync<MyRequest, MyResponse>(async request =>
{
return await Task.Run(() => new MyResponse { Text = "Respond: " + request.Text });
});
4、同样的,请求和响应过程可以在不同项目中,但是要注意所对应的队列,因为队列名和交换机转发路由都是根据请求消息生成的,所以此时建议使用自定义队列名的方式。
5、EasyNetQ模式:Send/Receive
using System; namespace EasyNetQ.Demo
{
public class MyMessage
{
public string Text { get; set; }
}
class Program
{
static void Main(string[] args)
{
var connectionString = "host=192.168.18.129;virtualHost=/;username=admin;password=123456;timeout=60"; //接收/Receive
{
var bus = RabbitHutch.CreateBus(connectionString);
bus.Receive<MyMessage>("my_queue_name", r => Console.WriteLine("Receive:" + r.Text));
} //发送/Send
using (var bus = RabbitHutch.CreateBus(connectionString))
{
var input = "";
Console.WriteLine("Please enter a message. 'q'/'Q' to quit.");
while ((input = Console.ReadLine()).ToLower() != "q")
{
bus.Send("my_queue_name", new MyMessage
{
Text = input
});
}
} }
}
}
上述代码执行完成后,会创建一个名为my_queue_name的队列,但是不会创建交换机,也就是说Send/Receive不是基于交换机,为什么有了Publish/Subsrcibe和Request/Response模式,还要添加一个Send/Receive模式?发送接收模式主要是针对队列,前面说了,EasyNetQ规定每个发送的消息必须是一个实体类型,消息经过序列化之后放到rabbitmq中去的,而Publish/Subsrcibe和Request/Response模式默认都是一个队列对应一个实体,也就是说,一个队列中只能存放一个类型序列化后的数据。而Send/Receive模式允许一个队列存放多种类型格式化后的数据,在接收时再根据类型匹配。
发送接收模式特点:
1、发送消息使用Send方法
bus.Send("my_queue_name", new MyMessage
{
Text = input
});
Send方法执行后会创建队列(如果不存在),而且可以往同一队列中发送不同类型的消息
bus.Send("my_queue_name", new CatMessage
{
Miao = "Miao"
});
bus.Send("my_queue_name", new DogMessage
{
Wang = "Wang"
});
2、接收消息使用Receive方法
bus.Receive<MyMessage>("my_queue_name", r =>
{
Console.WriteLine("Receive:" + r.Text);
});
Receive方法也会创建队列(如果不存在),当队列中有多种类型的消息时,可以在Receive方法中的hander中添加不同类型的接收者
bus.Receive("my_queue_name", hander =>
{
hander.Add<CatMessage>(r =>
{
Console.WriteLine("Cat:" + r.Miao);
});
hander.Add<DogMessage>(r =>
{
Console.WriteLine("Dog:" + r.Wang);
});
});
3、每次调用Receive方法都会创建一个消费者,多次调用Receive方法后,同一队列的消息会分发到不同消费者中。对于一个消费者,如果消息送达了消费者的接收队列,会根据添加的类型进行匹配,如果匹配成功,则用对应的逻辑处理,如果没有匹配到任何接收者,EasyNetQ将会把消息带着一个异常“No handler for message type写进EasyNetQ的错误队列”。
4、同样的,发送和接收可以在不同项目中,但是由于接收过程是根据类型匹配的,那么就需要发送和接收中使用的是同一类型,而且,每一个Receive方法中添加的接收器要尽可能的覆盖所接收的队列中所有的消息类型,这样才能保证不会导致消息匹配不到接受者。
5、Send方法和Receive方法也有异步操作。
bus.SendAsync("my_queue_name", new MyMessage
{
Text = input
});
bus.Receive<MyMessage>("my_queue_name", async r =>
{
await Task.Run(() =>
{
Console.WriteLine("Receive:" + r.Text);
});
});
最新文章
- Unity性能优化(4)-官方教程Optimizing graphics rendering in Unity games翻译
- shell script
- mysql 常用知识
- [HDU 4821] String (字符串哈希)
- ASP.NET页面跳转的三种方法比较
- 关于中文乱码的解决方法(URL方式)
- MySQL外键的作用和创建
- 分享整理的免费API接口
- 自动化测试KPI考评的一种方法
- spring 方法验证参数
- Web请求过程
- solr中重建索引(转)
- PTA——猴子吃桃
- JSONP方法解决跨域请求
- ML(4.2): R CART
- BOM的节点方法和属性
- Oracle 序列(sequence)的创建、修改及删除
- CentOS7 防火墙配置firewall-cmd
- php模板引擎smarty
- cms-详细页面-1