默认情况下rabbitmq 是根据消费者多少依次投递,投递后就删除消息.

消息不会重复投递给不同的消费者.

消费者如果遇到长时间的任务,会执行完一个消息之后再执行下一个消息,

消费者持久化:

如果一个消费者断网或者宕机.这个消息就会丢失.如果想在一个消费者宕机的情况下吧消息投递给另一个

消费者需要使用:ack确认

C#代码:

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

channel.BasicConsume(queue: "task_queue", autoAck: false, consumer: consumer);

使用这种方式.如果unack的消息太多,没有消费者处理,会吃掉很多内存,

可以使用rabbitmqctl messages_unacknowledged  查看unack消息.

服务端持久化:

如果一个rabbitmq Server宕机,默认里面的消息全部丢失,如果想持久化使用代码:

channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null); var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: properties,
body: body);

这里需要注意的是.需要新建一个队列来进行持久化,不能在以前已经建好的对象上修改属性.不然不会起作用

公平分派:

如果一个消费者负载很大,不能处理更多消息.另一个消费者负载小.可以使用代码,控制消费者可以接受消息数目

channel.BasicQos(, , false);

上述代码告诉rabbitmq.不能再接受新的消息

全部代码如下:

生产者:

public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null); var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties();
properties.Persistent = true; channel.BasicPublish(exchange: "",
routingKey: "task_queue",
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
} Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
} private static string GetMessage(string[] args)
{
return ((args.Length > ) ? string.Join(" ", args) : "Hello World!");
}

消费者:

public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "task_queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null); channel.BasicQos(prefetchSize: , prefetchCount: , global: false); Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] Received {0}", message); int dots = message.Split('.').Length - ;
Thread.Sleep(dots * ); Console.WriteLine(" [x] Done"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "task_queue",
autoAck: false,
consumer: consumer); Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}

最新文章

  1. TableViewCell重影问题
  2. Bzoj2683 简单题
  3. 【leetcode❤python】 414. Third Maximum Number
  4. 用JDBC访问MySQL
  5. hdu 5312 数学
  6. BlueDroid代码分析之GKI
  7. yii2 文件上传
  8. kuangbin_UnionFind J (POJ 2492)
  9. DBCP参数介绍
  10. java 服务端解决ajax跨域问题
  11. org.springframework.web.util.IntrospectorCleanupListener的用途
  12. 朴素贝页斯分类法 c++实现
  13. 基于Jmeter的接口自动化测试实践
  14. IOS 疯狂基础之 页面间跳转
  15. Linux下CenOS系统 安装Redis
  16. SDOI2018:原题识别
  17. jdk安装环境变量配置
  18. YSQL获取自增ID的四种方法(转发)
  19. 利用 log-pilot + elasticsearch + kibana 搭建 kubernetes 日志解决方案
  20. 构建高性能web之路------mysql读写分离实战(转)

热门文章

  1. CSS3 Transform的perspective属性
  2. FPGA型号解读
  3. mongodb安装建议
  4. Linux CentOS6.8 项目部署脚本实现
  5. 12--Python入门--文件读写--TXT文件
  6. 解决使用angular2路由后,页面刷新后报404错误。
  7. Logging常用handlers的使用
  8. 基于SPA的网页授权流程(微信OAuth2)
  9. Windows7下安装python3.6.3
  10. Codeforces1099F. Cookies【DP】【线段树】【贪心】【博弈】【沙比提(这是啥算法)】