RocketMQ 顺序消费只消费一次 坑
2024-10-12 21:53:00
rocketMq实现顺序消费的原理
produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息
注意:是把把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue
单个节点(Producer端1个、Consumer端1个)
1、Producer.java
package order; import java.util.List; import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException; /**
* Producer,发送顺序消息
*/
public class Producer {
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("order_Producer");
producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); producer.start(); // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD",
// "TagE" }; for (int i = 1; i <= 5; i++) { Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, 0); System.out.println(sendResult);
} producer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2、Consumer.java (有问题)
package order;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt; /**
* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
*/
public class Consumer1 { public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); /**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicOrderTest", "*"); consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0); public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 设置自动提交
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println(msg + ",内容:" + new String(msg.getBody()));
} try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) { e.printStackTrace();
}
; return ConsumeOrderlyStatus.SUCCESS;
}
}); consumer.start(); System.out.println("Consumer1 Started.");
} }
这个地方有一个大坑,注册监听类的时候,不能使用匿名内部类。不然的话,只会消费一次,然后消费者就 挂了……挂了……挂了……
监听类要单独写!!!
正确消费者写法:
自定义监听类:
MyMessageListener
public class MyMessageListener implements MessageListenerOrderly { @Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 设置自动提交
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println(msg + ",内容:" + new String(msg.getBody()));
} try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) { e.printStackTrace();
} return ConsumeOrderlyStatus.SUCCESS;
}
}
Consumer.java
public class Consumer { public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");
consumer.setNamesrvAddr("101.200.33.225:9876"); /**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicOrderTest", "*"); MyMessageListener myMessageListener = new MyMessageListener();
consumer.registerMessageListener(myMessageListener); consumer.start(); System.out.println("Consumer1 Started.");
}
}
参考:https://www.cnblogs.com/antain/p/rocketmq.html
http://www.cnblogs.com/520playboy/p/6750023.html
http://dbaplus.cn/news-21-1123-1.html
最新文章
- js颠倒数组元素顺序reverse()
- 浏览器请求URL原理
- python sorted
- 虚拟机Linux和Windows之间互传文件的好帮手WinSCP
- iOS如何随意的穿插跳跃,push来pop去
- C# webservice初探
- TLS之上的HTTP
- 图片上传预览 支持html5的浏览器
- vue基础一
- ubuntu常见错误--Could not get lock /var/lib/dpkg/lock解决
- zabbix监控ssl证书到期时间
- 在被vue组件引用的 js 文件里获取组件实例this
- CentOS 7 镜像文件各个版本区别
- Neutron:ML2 Core Plugin
- (BFS) leetcode 279. Perfect Squares
- flask基础三
- scws简单中文分词
- HDU6341 Let Sudoku Rotate (杭电多校4J)
- Python基础3(2017-07-20)
- DDOS 攻击的防范