1. 消费端集群消费(负载均衡)

 示例代码:

/**
* Producer,发送消息
*
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("message_producer");
producer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
producer.start(); for (int i = 0; i < 100; i++) {
try {
Message msg = new Message("TopicTest",// topic
"Tag1",// tag
("Hello RocketMQ " + i).getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
} producer.shutdown();
}
} /**
* Consumer,订阅消息
*/
public class Consumer1 { public Consumer1() {
try {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
consumer.registerMessageListener(new Listener());
consumer.start();
} catch (MQClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} class Listener implements MessageListenerConcurrently { @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags); System.out.println("======暂停=====");
Thread.sleep(60000);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} } public static void main(String[] args) throws InterruptedException, MQClientException {
Consumer1 consumer1 = new Consumer1();
System.out.println("Consumer1 Started.");
}
} /**
* Consumer,订阅消息
*/
public class Consumer2 { public Consumer2() {
try {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
consumer.registerMessageListener(new Listener());
consumer.start();
} catch (MQClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} class Listener implements MessageListenerConcurrently { @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} } public static void main(String[] args) throws InterruptedException, MQClientException {
Consumer2 consumer2 = new Consumer2();
System.out.println("Consumer2 Started.");
}
}

一个生产者,两个消费者,注意两个消费者的组名要一样。

先启动两个消费者(customer1,customer2),通过控制台查看如下:

再启动生产者生成100条消息,消费情况如下:

生成的100条消息被customer1和customer2平均的消费了。可以通过consumer.setAllocateMessageQueueStrategy去设置分配策略。

BTW:这是默认的模式,可以通过consumer.setMessageModel设置,MessageModel.CLUSTERING | MessageModel.BROADCASTING,如果是广播消费,则每个客户端都会收到生产端的所有消息

2.消息未响应会重发

代码示例:

public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("message_producer");
producer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
producer.start(); for (int i = 0; i < 1; i++) {
try {
Message msg = new Message("TopicTest",// topic
"Tag1",// tag
("Hello RocketMQ " + i).getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
} producer.shutdown();
}
} public class Consumer1 { public Consumer1() {
try {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
consumer.registerMessageListener(new Listener());
consumer.start();
} catch (MQClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} class Listener implements MessageListenerConcurrently { @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags); System.out.println("======暂停=====");
Thread.sleep(600000);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} } public static void main(String[] args) throws InterruptedException, MQClientException {
Consumer1 consumer1 = new Consumer1();
System.out.println("Consumer1 Started.");
}
} public class Consumer2 { public Consumer2() {
try {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
consumer.registerMessageListener(new Listener());
consumer.start();
} catch (MQClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} class Listener implements MessageListenerConcurrently { @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} } public static void main(String[] args) throws InterruptedException, MQClientException {
Consumer2 consumer2 = new Consumer2();
System.out.println("Consumer2 Started.");
}
}

先启动consumer1,再启动consumer2,最后启动producer

consumer1收到了消息,consumer2没有收到消息,这时把consumer1强制停止,也就是说consumer1不会给MQ返回响应,查看结果:

consumer2也收到消息了,说明在MQ没收到消费端响应的情况下,会重发消息。

3. 修改topic的队列数

默认的队列数是4个,可以从执行结果中看出:queueId都是0-3

细节可以看https://www.cnblogs.com/dyfh/p/4113677.html

可以增加设置producer.createTopic("TopicTest", "TopicTest", 8);

 

最新文章

  1. zend studio 常用快捷键
  2. Ruby学习之module
  3. paip.指针 引用 c++ java的使用总结.
  4. win95+ie3-win10+ie11 浏览器执行漏洞
  5. Android 网络连接判断与处理
  6. jmeter笔记6
  7. 嵌入式 hi3518c裸板uboot烧写、kernel烧写、fs烧写小结
  8. 知识点1-1:什么是ASP.NET MVC
  9. Oralce生成前N年的年数据
  10. hdu_5832_A water problem(模拟)
  11. 《HelloGitHub》第 14 期
  12. js中数学运算的处理
  13. [ZJOI2006]物流运输 SPFA+DP
  14. 适配器模式(Adapter Pattern)
  15. 【css】常用css
  16. yii2 Gridview网格小部件
  17. mongoose中connect()、createConnection()和connection的区别和作用
  18. 背水一战 Windows 10 (70) - 控件(控件基类): UIElement - Transform3D(3D变换), Projection(3D投影)
  19. Python3基础语法你学会了么
  20. 浅谈 equals 和 == 的区别

热门文章

  1. Go:闭包
  2. 【Codeforces 924C】Riverside Curio
  3. c++ 上机实验题
  4. gif &amp; tools
  5. hrbust oj 1536 Leonardo&#39;s Notebook 置换群问题
  6. HDU 1160 排序或者通过最短路两种方法解决
  7. RSA的共模攻击
  8. PLSQL 下载地址 Spring jar包
  9. codevs1174 靶形数独
  10. Abstract factory抽象工厂--对象创建型