ActiveMQ的另一种模式就SUB/HUB即发布订阅模式,是SUB/hub就是一拖N的USB分线器的意思。意思就是一个来源分到N个出口。还是上节的例子,当一个订单产生后,后台N个系统需要联动,但有一个前提是都需要收到订单信息,那么我们就需要将一个生产者的消息发布到N个消费者。

生产者:

try
{
//Create the Connection Factory
IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
using (IConnection connection = factory.CreateConnection())
{
//Create the Session
using (ISession session = connection.CreateSession())
{
//Create the Producer for the topic/queue
IMessageProducer prod = session.CreateProducer(
new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing")); //Send Messages
int i = ; while (!Console.KeyAvailable)
{
ITextMessage msg = prod.CreateTextMessage();
msg.Text = i.ToString();
Console.WriteLine("Sending: " + i.ToString());
prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue); System.Threading.Thread.Sleep();
i++;
}
}
} Console.ReadLine();
}
catch (System.Exception e)
{
Console.WriteLine("{0}", e.Message);
Console.ReadLine();
}

假设生产者每5秒发送一次消息:

消费者:

static void Main(string[] args)
{
try
{
//Create the Connection factory
IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/"); //Create the connection
using (IConnection connection = factory.CreateConnection())
{
connection.ClientId = "testing listener1";
connection.Start(); //Create the Session
using (ISession session = connection.CreateSession())
{
//Create the Consumer
IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("testing"), "testing listener1", null, false); consumer.Listener += new MessageListener(consumer_Listener); Console.ReadLine();
}
connection.Stop();
connection.Close();
}
}
catch (System.Exception e)
{
Console.WriteLine(e.Message);
}
} static void consumer_Listener(IMessage message)
{
try
{
ITextMessage msg = (ITextMessage)message;
Console.WriteLine("Receive: " + msg.Text);
}
catch (System.Exception e)
{
Console.WriteLine(e.Message);
}
}

启动一个消费者:

我们发现他是从15开始的,而不是像上节一样从头开始,再启动另一个消费者:

我们发现就是从启动时开始接受消息的,之前的消息就丢失了。

整体状态如下:

我们观察管理界面:

产生了一个testing的Topics,而订阅方有2个都订阅的是testing:

这样只需要在需要获取消息的地方订阅即可及时获得。

源代码下载

最新文章

  1. Smarty缓存的5个知识点
  2. Android进程间通信之LocalSocket通信
  3. HR外包系统 - 工资计算-几种常见账单计算规则
  4. 一些常用的C++标准函数
  5. Zabbix通过percona监控MySQL
  6. [算法]树上倍增求LCA
  7. Linux高级编程--09.线程互斥与同步
  8. js 倒计时 button不可用
  9. POJ 3744 Scout YYF I (概率dp+矩阵快速幂)
  10. content:attr()
  11. unity3D插件开发——前篇
  12. [Sdoi2017]序列计数 [矩阵快速幂]
  13. WMS二开:外挂页面开发培训
  14. repository和repertory
  15. UE4联机编译光照
  16. English Pronunciation Analysis | Advanced English Conversation
  17. 【BZOJ1055】[HAOI2008]玩具取名(动态规划)
  18. jq给单选框 radio添加或删除选中状态
  19. Linux-文件和目录属性
  20. 基于Netty4.1.29.Final的helloworld实现.使用idea

热门文章

  1. DTO学习系列之AutoMapper(五)----当EntityFramework爱上AutoMapper
  2. WCF入门教程系列四
  3. 为什么要用Math.sqrt(i)方法
  4. Mac添加或修改环境变量
  5. 让footer在底部(测试它人方法)
  6. 最大公约数(gcd):Euclid算法证明
  7. pubwin 客户端会员无法自助结账的排查方法
  8. 小试牛刀——爬topit.me的图片,附github简易上传教程
  9. LeetCode_N-Queens
  10. 【转帖】C# DllImport 系统调用使用详解 托管代码的介绍 EntryPoint的使用