深入理解RocketMQ(九)---实战(代码)
2024-08-28 22:00:16
一、批量发送消息
即多条消息放入List,一次发送,从而减少网络传输,提高效率
DefaultMQProducer producer = new DefaultMQProducer("batch_send_producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start(); String topic = "batchTopic";
List<Message> messageList = new ArrayList<>(); for (int i = 0; i < 10; i++) {
Message msg = new Message(topic,"TAG1","ORDER" + i, "Hello world".getBytes());
messageList.add(msg);
}
try{
producer.send(messageList);
}catch (Exception e){
e.printStackTrace();
} producer.shutdown();
二、消息发送队列自主选择
例:可以将同一订单(不同操作,例如下单、付款、出库、订单完成等操作)发送到同一个queue中,来保证一个订单不同操作的顺序性
DefaultMQProducer producer = new DefaultMQProducer("select_queue_producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start(); String topic = "selectQueueTopic"; String[] tags = new String[]{"TAG","TAG2","TAG3","TAG4","TAG5"}; int orderId = 41;
int orderId1 = 42;
try{
for (int i = 0; i < 10; i++) {
Message msg = new Message(topic,tags[i%tags.length],"KEY" + i, ("Hello world"+i).getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get((Integer) arg % mqs.size());
}
},orderId1);
System.out.println(orderId1 + "=======" + sendResult); sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get((Integer) arg % mqs.size());
}
},orderId);
System.out.println(orderId + "=======" + sendResult);
} }catch (Exception e){
e.printStackTrace();
} producer.shutdown();
输入如下:
可以看到订单ID为41的消息,全部发送到queueId为1的队列中,订单ID为42的消息,全部发送到QueueId为2的队列中
三、订单过滤
1、TAG模式过滤(多个tag使用 || 区分)
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876"); producer.setNamesrvAddr("127.0.0.1:9876");
producer.start(); String topic = "TagFilterTopic1";
String tagA = "TagA";
String tagB = "TagB";
for (int i = ; i < ; i++) {
try {
Message msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg); msg = new Message(topic,tagB,("tagA==========22222222222").getBytes(RemotingHelper.DEFAULT_CHARSET));
sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep();
}
}
producer.shutdown();
消息消费
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tagFileterConcumer"); consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumerGroup("tagFileterConcumerGroup");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer.subscribe("TagFilterTopic1", "TagA");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s TagA Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start(); System.out.printf("Consumer Started.%n"); DefaultMQPushConsumer consumer1 = new DefaultMQPushConsumer("tagFileterConcumer1"); consumer1.setNamesrvAddr("127.0.0.1:9876");
consumer1.setConsumerGroup("tagFileterConcumerGroup1");
consumer1.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer1.subscribe("TagFilterTopic1", "TagA || TagB");
consumer1.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s TagA&TagB Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer1.start();
System.out.printf("Consumer Started.%n"); DefaultMQPushConsumer consumer2 = new DefaultMQPushConsumer("tagFileterConcumer2"); consumer2.setNamesrvAddr("127.0.0.1:9876");
consumer2.setConsumerGroup("tagFileterConcumerGroup2");
consumer2.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer2.subscribe("TagFilterTopic1", "TagB");
consumer2.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s TagB Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer2.start();
System.out.printf("Consumer Started.%n");
执行结果:可以看到只需要TagA的消费者,只输出了一条消息;只需要TagB的消费者,也只输出了一条消息
2、SQL过滤
需要开启支持sql92:在broker.conf文件中添加如下配置:enablePropertyFilter=true
发送者:
DefaultMQProducer producer = new DefaultMQProducer("sql_filter_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start(); String topic = "SqlFilterTopic1";
String tagA = "TagA";
for (int i = ; i < ; i++) {
try {
Message msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("orderStatus","");
msg.putUserProperty("userName","lcl");
msg.putUserProperty("orderId","");
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult); msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("orderStatus","");
msg.putUserProperty("userName","lcl");
msg.putUserProperty("orderId","");
sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult); msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("orderStatus","");
msg.putUserProperty("userName","mm");
msg.putUserProperty("orderId","");
sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult); msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("orderStatus","");
msg.putUserProperty("userName","lcl");
msg.putUserProperty("orderId","");
sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult); msg = new Message(topic,tagA,("tagA==========11111111111").getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("orderStatus","");
msg.putUserProperty("userName","lcl");
msg.putUserProperty("orderId","");
sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep();
}
}
producer.shutdown();
消费者:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlFileterConcumer"); consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumerGroup("tagFileterConcumerGroup");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer.subscribe("SqlFilterTopic1", MessageSelector.bySql("(orderStatus = '1' and userName = 'lcl' and orderId > 0)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s TagA Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start(); System.out.printf("Consumer Started.%n"); DefaultMQPushConsumer consumer1 = new DefaultMQPushConsumer("tagFileterConcumer1"); consumer1.setNamesrvAddr("127.0.0.1:9876");
consumer1.setConsumerGroup("tagFileterConcumerGroup1");
consumer1.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer1.subscribe("TagFilterTopic1", "TagA || TagB");
consumer1.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s TagA&TagB Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer1.start();
System.out.printf("Consumer Started.%n"); DefaultMQPushConsumer consumer2 = new DefaultMQPushConsumer("tagFileterConcumer2"); consumer2.setNamesrvAddr("127.0.0.1:9876");
consumer2.setConsumerGroup("tagFileterConcumerGroup2");
consumer2.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
consumer2.subscribe("TagFilterTopic1", "TagB");
consumer2.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s TagB Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer2.start();
System.out.printf("Consumer Started.%n");
消费者输出结果:
3、类过滤模式
最新文章
- Swift 写纯洁的TableviewCell
- PHP Fatal error: Class &#39;DOMDocument&#39; not found
- Qt 自定义消息窗口
- C# 值类型和引用类型
- python 定义实例方法
- css常用中文字体的英文名称写法
- python aes加解密
- eclupse启动报 Failed to load JavaHL Library.错
- 2016 - 1 - 22 HTTP(二)
- Spark RDDRelation
- 02 - 用wxStreamToTextRedirector和wxTextCtrl输出std::cout
- uva 10453 - Make Palindrome(dp, 记录路径)
- leetcode&mdash;sqrt
- iOS9开发之新增通知行为详解
- Eclipse清除SVN密码
- 【原创】只学到二维数组和结构体,不用链表也能写一个C贪食蛇?(四)
- MVC4加载zTree树小控件
- IO流的操作规律。
- P1582 倒水
- C++ 标准 和 C 标准 (截止到2019年03月)