package com.bfxy.rocketmq.model;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import com.bfxy.rocketmq.constants.Const;

public class Consumer1 {

public Consumer1() {
try {
String group_name = "test_model_consumer_name";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.subscribe("test_model_topic2", "TagA");
consumer.setMessageModel(MessageModel.CLUSTERING);
//consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new Listener());
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}

class Listener implements MessageListenerConcurrently {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs){
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"utf-8");
String tags = msg.getTags();
//if(tags.equals("TagA")) {
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
//}
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

}

public static void main(String[] args) {
Consumer1 c1 = new Consumer1();
System.out.println("c1 start..");

}
}

//==========================================================

package com.bfxy.rocketmq.model;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import com.bfxy.rocketmq.constants.Const;

public class Consumer2 {

public Consumer2() {
try {
String group_name = "test_model_consumer_name";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
consumer.subscribe("test_model_topic2", "TagA");
consumer.setMessageModel(MessageModel.CLUSTERING);
//consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.registerMessageListener(new Listener());
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}

class Listener implements MessageListenerConcurrently {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for(MessageExt msg : msgs){
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"utf-8");
String tags = msg.getTags();
//if(tags.equals("TagA")) {
System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
//}
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

}

public static void main(String[] args) {
Consumer2 c2 = new Consumer2();
System.out.println("c2 start..");

}
}

最新文章

  1. 【sql】之查询昨天的记录
  2. Windows Azure Cloud Service (44) 将Cloud Service加入Virtual Network Subnet,并固定Virtual IP Address(VIP)
  3. java 28 - 7 JDK8的新特性 之 接口可以使用方法
  4. 取得ascii的例子
  5. 独立版Jexus
  6. mqtt实现自动监听服务器消息
  7. let 和 const
  8. 重置Visual Studio 2017的配置
  9. 汉诺塔I
  10. W3bsafe]SQLmap过狗命令的利用+教程
  11. 深度学习框架PyTorch一书的学习-第一/二章
  12. VMware Workstation 11 搭建windows server 2012 之sql server 2012集群常见问题整理
  13. [CodeForces - 919B] Perfect Number
  14. Python学习---------登陆系统代码实现
  15. Phonegap集成angular/bootstrap/animate.css教程
  16. 902. Numbers At Most N Given Digit Set
  17. python socket编程入门(编写server实例)-乾颐堂
  18. 基于微信小程序的用户列表点赞功能
  19. 洛谷 P2577 [ZJOI2005]午餐
  20. 12:Web及MySQL服务异常监测案例

热门文章

  1. jQuery EasyUI中DataGird动态生成列的方法
  2. mint-ui下拉加载(demo实例)
  3. 第五篇.python进阶
  4. SQL优化策略
  5. linux命令详解——top
  6. Google Chrome Keyboard Shortcuts
  7. kotlin面向对象入门
  8. P3731 二分图匹配必经边
  9. Summer training #11
  10. Zabbix Agent 安装指南和 Zabbix Server 设置自动发现