一、批量发送消息

  即多条消息放入List,一次发送,从而减少网络传输,提高效率

DefaultMQProducer producer = new DefaultMQProducer("batch_send_producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start(); String topic = "batchTopic";
List<Message> messageList = new ArrayList<>(); for (int i = 0; i < 10; i++) {
Message msg = new Message(topic,"TAG1","ORDER" + i, "Hello world".getBytes());
messageList.add(msg);
}
try{
producer.send(messageList);
}catch (Exception e){
e.printStackTrace();
} producer.shutdown();

二、消息发送队列自主选择

例:可以将同一订单(不同操作,例如下单、付款、出库、订单完成等操作)发送到同一个queue中,来保证一个订单不同操作的顺序性

DefaultMQProducer producer = new DefaultMQProducer("select_queue_producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start(); String topic = "selectQueueTopic"; String[] tags = new String[]{"TAG","TAG2","TAG3","TAG4","TAG5"}; int orderId = 41;
int orderId1 = 42;
try{
for (int i = 0; i < 10; i++) {
Message msg = new Message(topic,tags[i%tags.length],"KEY" + i, ("Hello world"+i).getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get((Integer) arg % mqs.size());
}
},orderId1);
System.out.println(orderId1 + "=======" + sendResult); sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get((Integer) arg % mqs.size());
}
},orderId);
System.out.println(orderId + "=======" + sendResult);
} }catch (Exception e){
e.printStackTrace();
} producer.shutdown();

输入如下:

可以看到订单ID为41的消息,全部发送到queueId为1的队列中,订单ID为42的消息,全部发送到QueueId为2的队列中

三、订单过滤

1、TAG模式过滤(多个tag使用 || 区分)

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876"); producer.setNamesrvAddr("127.0.0.1:9876");
producer.start(); String topic = "TagFilterTopic1";
String tagA = "TagA";
String tagB = "TagB";
for (int i = ; i < ; i++) {
try {
Message msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg); msg = new Message(topic,tagB,("tagA==========22222222222").getBytes(RemotingHelper.DEFAULT_CHARSET));
sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep();
}
}
producer.shutdown();

消息消费

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tagFileterConcumer");

        consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumerGroup("tagFileterConcumerGroup");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer.subscribe("TagFilterTopic1", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s TagA Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start(); System.out.printf("Consumer Started.%n"); DefaultMQPushConsumer consumer1 = new DefaultMQPushConsumer("tagFileterConcumer1"); consumer1.setNamesrvAddr("127.0.0.1:9876");
consumer1.setConsumerGroup("tagFileterConcumerGroup1");
consumer1.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer1.subscribe("TagFilterTopic1", "TagA || TagB");
consumer1.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s TagA&TagB Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer1.start();
System.out.printf("Consumer Started.%n"); DefaultMQPushConsumer consumer2 = new DefaultMQPushConsumer("tagFileterConcumer2"); consumer2.setNamesrvAddr("127.0.0.1:9876");
consumer2.setConsumerGroup("tagFileterConcumerGroup2");
consumer2.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer2.subscribe("TagFilterTopic1", "TagB");
consumer2.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s TagB Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer2.start();
System.out.printf("Consumer Started.%n");

执行结果:可以看到只需要TagA的消费者,只输出了一条消息;只需要TagB的消费者,也只输出了一条消息

2、SQL过滤

需要开启支持sql92:在broker.conf文件中添加如下配置:enablePropertyFilter=true

发送者:

DefaultMQProducer producer = new DefaultMQProducer("sql_filter_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start(); String topic = "SqlFilterTopic1";
String tagA = "TagA";
for (int i = ; i < ; i++) {
try {
Message msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("orderStatus","");
msg.putUserProperty("userName","lcl");
msg.putUserProperty("orderId","");
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult); msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("orderStatus","");
msg.putUserProperty("userName","lcl");
msg.putUserProperty("orderId","");
sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult); msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("orderStatus","");
msg.putUserProperty("userName","mm");
msg.putUserProperty("orderId","");
sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult); msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("orderStatus","");
msg.putUserProperty("userName","lcl");
msg.putUserProperty("orderId","");
sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult); msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("orderStatus","");
msg.putUserProperty("userName","lcl");
msg.putUserProperty("orderId","");
sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep();
}
}
producer.shutdown();

消费者:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlFileterConcumer");

        consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumerGroup("tagFileterConcumerGroup");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer.subscribe("SqlFilterTopic1", MessageSelector.bySql("(orderStatus = '1' and userName = 'lcl' and orderId > 0)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s TagA Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start(); System.out.printf("Consumer Started.%n"); DefaultMQPushConsumer consumer1 = new DefaultMQPushConsumer("tagFileterConcumer1"); consumer1.setNamesrvAddr("127.0.0.1:9876");
consumer1.setConsumerGroup("tagFileterConcumerGroup1");
consumer1.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer1.subscribe("TagFilterTopic1", "TagA || TagB");
consumer1.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s TagA&TagB Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer1.start();
System.out.printf("Consumer Started.%n"); DefaultMQPushConsumer consumer2 = new DefaultMQPushConsumer("tagFileterConcumer2"); consumer2.setNamesrvAddr("127.0.0.1:9876");
consumer2.setConsumerGroup("tagFileterConcumerGroup2");
consumer2.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer2.subscribe("TagFilterTopic1", "TagB");
consumer2.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s TagB Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer2.start();
System.out.printf("Consumer Started.%n");

消费者输出结果:

3、类过滤模式

最新文章

  1. Swift 写纯洁的TableviewCell
  2. PHP Fatal error: Class &#39;DOMDocument&#39; not found
  3. Qt 自定义消息窗口
  4. C# 值类型和引用类型
  5. python 定义实例方法
  6. css常用中文字体的英文名称写法
  7. python aes加解密
  8. eclupse启动报 Failed to load JavaHL Library.错
  9. 2016 - 1 - 22 HTTP(二)
  10. Spark RDDRelation
  11. 02 - 用wxStreamToTextRedirector和wxTextCtrl输出std::cout
  12. uva 10453 - Make Palindrome(dp, 记录路径)
  13. leetcode&mdash;sqrt
  14. iOS9开发之新增通知行为详解
  15. Eclipse清除SVN密码
  16. 【原创】只学到二维数组和结构体,不用链表也能写一个C贪食蛇?(四)
  17. MVC4加载zTree树小控件
  18. IO流的操作规律。
  19. P1582 倒水
  20. C++ 标准 和 C 标准 (截止到2019年03月)

热门文章

  1. 【HIVE】(3)联合查询join、时间戳函数、字符串函数
  2. MySQL 高级—— 锁机制
  3. Java实现 蓝桥杯 算法训练 Rotatable Number(暴力)
  4. Java实现 LeetCode 674 最长连续递增序列(暴力)
  5. Java实现 蓝桥杯 乘积最大
  6. Java实现 LeetCode 18 四数之和
  7. Java实现微生物增殖
  8. Java实现第八届蓝桥杯字母组串
  9. class 类前向声明
  10. iOS-AutoLayout中动画使用的细节 和 iOS layout机制