参考 https://blog.csdn.net/asdf08442a/article/details/54882769 整理出来的测试 demo

1、produce 生产者

 1 package com.bwdz.sp.comm.util.test;
2
3 import org.apache.rocketmq.client.exception.MQBrokerException;
4 import org.apache.rocketmq.client.exception.MQClientException;
5 import org.apache.rocketmq.client.producer.DefaultMQProducer;
6 import org.apache.rocketmq.client.producer.SendResult;
7 import org.apache.rocketmq.client.producer.SendStatus;
8 import org.apache.rocketmq.common.message.Message;
9 import org.apache.rocketmq.remoting.exception.RemotingException;
10
11 import java.util.UUID;
12
13 /**
14 * Created by xy on 2018/11/16.
15 */
16 public class SyncProducer {
17 private static DefaultMQProducer producer = null;
18
19 public static void main(String[] args) {
20 System.out.print("[----------]Start\n");
21 int pro_count = 1;
22 if (args.length > 0) {
23 pro_count = Integer.parseInt(args[0]);
24 }
25 boolean result = false;
26 try {
27 ProducerStart();
28 for (int i = 1; i < pro_count; i++) {
29 String msg = "hello rocketmq "+ i+"".toString();
30 SendMessage("qch_20170706", //topic
31 "Tag"+i, //tag
32 "Key"+i, //key
33 msg); //body
34 System.out.print(msg + "\n");
35 }
36 }finally {
37 producer.shutdown();
38 }
39 System.out.print("[----------]Succeed\n");
40 }
41
42 private static boolean ProducerStart() {
43 producer = new DefaultMQProducer("pro_qch_test");
44 producer.setNamesrvAddr("192.168.69.173:9876");
45 producer.setInstanceName(UUID.randomUUID().toString());
46 try {
47 producer.start();
48 } catch(MQClientException e) {
49 e.printStackTrace();
50 return false;
51 }
52 return true;
53 }
54
55 private static boolean SendMessage(String topic,String tag,String key, String str) {
56 Message msg = new Message(topic,tag,key,str.getBytes());
57 try {
58 SendResult result = producer.send(msg);
59 SendStatus status = result.getSendStatus();
60 System.out.println("___________________________SendMessage: "+status.name());
61 } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
62 e.printStackTrace();
63 return false;
64 }
65 return true;
66 }
67 }

2、consumer 消费者

 1 package com.bwdz.sp.comm.util.test;
2
3 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
4 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
5 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
6 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
7 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
8 import org.apache.rocketmq.common.message.MessageExt;
9
10 import java.util.List;
11 import java.util.UUID;
12
13 /**
14 * Created by xy on 2018/11/16.
15 */
16 public class ConsumerTest {
17 public static void main(String[] args) {
18 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("con_qch_test");
19 consumer.setInstanceName(UUID.randomUUID().toString());
20 consumer.setConsumeMessageBatchMaxSize(32);
21 consumer.setNamesrvAddr("192.168.69.173:9876");
22 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
23 consumer.registerMessageListener(new MessageListenerConcurrently() {
24 @Override
25 public ConsumeConcurrentlyStatus consumeMessage(
26 List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
27 for(MessageExt me : list) {
28 if("Tag1".equals(me.getTags())){
29 System.out.println("处理 Tag1 业务");
30 System.out.println(new String(me.getBody()) + "消费成功" + "\n");
31 }else if("Tag2".equals(me.getTags())){
32 System.out.println("处理 Tag2 业务");
33 System.out.println(new String(me.getBody()) + "消费成功" + "\n");
34 }else if("Tag3".equals(me.getTags())){
35 System.out.println("处理 Tag3 业务");
36 System.out.println(new String(me.getBody()) + "消费失败" + "\n");
37 return ConsumeConcurrentlyStatus.RECONSUME_LATER;
38 }else{
39 //consumer.subscribe("qch_20170706", "Tag1||Tag2||Tag3");
40 System.out.println("过滤掉的业务"+ me.getKeys());
41 }
42 }
43 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
44 }
45 });
46 try {
47 consumer.subscribe("qch_20170706", "Tag1||Tag2||Tag3");
48 consumer.start();
49 } catch (Exception e) {
50 e.printStackTrace();
51 }
52 }
53 }

先运行produce,控制台输出结果:

[----------]Start
___________________________SendMessage: SEND_OK
hello rocketmq 1
___________________________SendMessage: SEND_OK
hello rocketmq 2
___________________________SendMessage: SEND_OK
hello rocketmq 3
___________________________SendMessage: SEND_OK
hello rocketmq 4
[----------]Succeed

再运行consumer,控制台输出结果:

注:消息 ”hello rocketmq 4“ 被consumer里47行代码过滤掉了,所以不会被消费;消息 “hello rocket 3” 在消费的时候被指定失败ConsumeConcurrentlyStatus.RECONSUME_LATER,表示消费失败,如果被指定失败,表明此消息下次还可以继续发送到consumer被继续消费处理,其他消息则不会被再一次消费

处理 Tag2 业务
hello rocketmq 2消费成功 处理 Tag3 业务
hello rocketmq 3消费失败 处理 Tag1 业务
hello rocketmq 1消费成功

consumer再次运行,控制台输出结果(直到被指定成功ConsumeConcurrentlyStatus.CONSUME_SUCCESS,Broker服务才不会继续发送消息):

处理 Tag3 业务
hello rocketmq 3消费失败

最新文章

  1. gitbook安装与使用之windows下搭建gitbook平台
  2. hibernate缓存机制详细分析 复制代码 内部资料 请勿转载 谢谢合作
  3. 使用开源免费类库在.net中操作Excel
  4. 安装Maven、Eclipse设置、添加地址JAR
  5. angularjs指令(二)
  6. C语言清除输入缓存方法记录[转]
  7. 远程连接centos
  8. 解码一个加密的js文件
  9. python中的model模板中的数据类型
  10. hadoop权威指南 chapter1 Meet Hadoop
  11. GNU自动补全模块readline解析
  12. 1. 初次尝试Core Data 应用程序(Core Data 应用开发实践指南)
  13. 如何使用kali的Searchsploit查找软件漏洞
  14. 58. Length of Last Word(easy, 字符串问题)
  15. RN 各种小问题
  16. springboot pom.xml记
  17. java 学习:在java中启动其他应用,由jenkins想到的
  18. 验证码处理类:UnCodebase.cs + BauDuAi 读取验证码的值(并非好的解决方案)
  19. C#绘制三角形并填充,使用winform实现qq聊天气泡
  20. Php cli模式下执行报错/usr/bin/php: /usr/local/lib/libxml2.so.2: no version information available (required by /usr/bin/php)

热门文章

  1. VMware提示Device/Credential Guard不兼容
  2. 我用go-zero开发了第一个线上项目
  3. Spark-6-如何缓解消除数据倾斜
  4. Promise对象,究竟为何物?
  5. MySQL索引与SQL注入
  6. java 的IO类库的基本架构
  7. Springboot 源码解析-自定装配
  8. Qt学习笔记-更高级的文本编辑器-完善第一版-gif动画
  9. 使用Arduino点亮ESP-01S,ESP8266-01S上的板载LED
  10. vue项目中的路由守卫