消费者角色:

1. 推式(一般建议用推式)

2. 拉式

消费模式:

1. 集群(cluster)                --均衡负载消费

2. 广播(broadcasting) --发布和订阅者模式

MQ的消费不会清除broker中的数据,broker数据一直存在队列中,队列offset会一直递增,因此可以通过回查来获取到丢失数据。这个时候我们可以采用pull形式较好。

push形式,MQ会记录访问的偏移量,即使宕机下次重启也会按照顺序继续消费,不会出现重复消费。

RocketMQ入门(生产者)_2中已经写过一个推式的代码,接下来就看下拉式。

/**
* 普通拉式消费者,代码编写
* @author DennyZhao
*
*/
public class PullConsumer { /**
* 暂时以map作为offset入库看待。<queueId, offset>
*/
private static Map<String, Long> offsetMap = new HashMap<String, Long>(); public static void main(String[] args) throws UnsupportedEncodingException {
//创建拉式消费者
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("pullConsumerGroup");
pullConsumer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;");
try {
pullConsumer.start();
Set<MessageQueue> mqSet= pullConsumer.fetchSubscribeMessageQueues("fruit");
while(true) {
//循环队列
for(MessageQueue mq: mqSet) {
// 从队列中获取固定偏移值
PullResult pullResult = pullConsumer.pullBlockIfNotFound(mq, "*", getOffset(mq), 32);
setOffset(mq, pullResult.getNextBeginOffset());
switch(pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
for(MessageExt msg : msgFoundList) {
String fruit = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println(fruit + " -----fruit");
}
break;
case NO_NEW_MSG:
break;
case NO_MATCHED_MSG:
break;
}
}
Thread.sleep(2000);
}
} catch (MQClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (RemotingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (MQBrokerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} /**
* set offset
* @param mq
* @param nextBeginOffset
*/
private static void setOffset(MessageQueue mq, long nextBeginOffset) {
String queueId = mq.getBrokerName() + mq.getTopic() + mq.getQueueId();
offsetMap.put(queueId, nextBeginOffset);
} /**
* 获取固定偏移值
* @param mq queueId
* @return int
*/
private static long getOffset(MessageQueue mq) {
String queueId = mq.getBrokerName() + mq.getTopic() + mq.getQueueId();
Long offset = offsetMap.get(queueId);
if(offset == null) {
offset = 0l;
}
System.out.println(offset + "---------------");
return offset;
} }

使用Schedule拉式:

/**
* ScheduleService 進行數據拉取
* @author DennyZhao
*
*/
public class PullScheduleService { public static void main(String[] args) throws MQClientException {
MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("scheduleConsumers");
scheduleService.setMessageModel(MessageModel.CLUSTERING);
scheduleService.setPullThreadNums(4);
DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer("pullConsumer");
defaultMQPullConsumer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;");
scheduleService.setDefaultMQPullConsumer(defaultMQPullConsumer);
scheduleService.registerPullTaskCallback("fruit", new PullTaskCallback() {
/**
* 数据处理
*/
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
MQPullConsumer pullConsumer = context.getPullConsumer();
try {
long offset = pullConsumer.fetchConsumeOffset(mq, false);
PullResult pull = pullConsumer.pull(mq, "*", offset, 32);
switch(pull.getPullStatus()) {
case FOUND:
// 结果输出
List<MessageExt> msgFoundList = pull.getMsgFoundList();
for(MessageExt msg : msgFoundList) {
String fruit = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("result: " + fruit);
}
break;
case NO_MATCHED_MSG:
break;
default: }
// 获取下一个循环的offset
pullConsumer.updateConsumeOffset(mq, pull.getNextBeginOffset());
// 设置下次访问时间
context.setPullNextDelayTimeMillis(1000);
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
});
scheduleService.start();
} }

参数说明:

//push主要参数
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("pushConsumerGroup");
// 从何地开始,默认(CONSUME_FROM_LAST_OFFSET)
pushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
pushConsumer.setConsumeThreadMin(2); //最小线程数
pushConsumer.setConsumeThreadMax(8); //最大线程数
pushConsumer.setConsumeTimeout(5000); //连接超时
pushConsumer.setMessageModel(MessageModel.CLUSTERING);//消息模式(集群CLUSTERING和广播BROADCASTING,default:cluster)
pushConsumer.setConsumeConcurrentlyMaxSpan(1000);//单队列最大消费数1000
pushConsumer.setConsumeMessageBatchMaxSize(1); //批量消费数1
pushConsumer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;");//集群IP
pushConsumer.setHeartbeatBrokerInterval(2000); //心跳监测
pushConsumer.setMaxReconsumeTimes(3);//重复消费次数,用于失败后重试
pushConsumer.queryMessage(topic, key, maxNum, begin, end); //获取消息
pushConsumer.fetchSubscribeMessageQueues(topic);//订阅topic
pushConsumer.registerMessageListener(new MessageListenerConcurrently());//及时普通消费型
pushConsumer.registerMessageListener(new MessageListenerOrderly()); //严格顺序消费型;
        // pull常用参数
//消息模式(集群CLUSTERING和广播BROADCASTING,default:cluster)
pullConsumer.setMessageModel(MessageModel.CLUSTERING);
pullConsumer.fetchSubscribeMessageQueues(topic); //订阅主题
pullConsumer.fetchConsumeOffset(mq, false); //获取queue当前offset位置
pullConsumer.pullBlockIfNotFound(mq, subExpression, offset, maxNums);//获取消费内容
pullConsumer.updateConsumeOffset(mq, offset); //更新消费位置
pullConsumer.setConsumerPullTimeoutMillis(5000); //连接超时

最新文章

  1. Linux学习笔记(7)-进程
  2. 循序渐进做项目系列(5):制作安装包,谁人都可以!——VS制作安装包简明教程
  3. Hadoop学习笔记—21.Hadoop2的改进内容简介
  4. WPF 保存image控件里的图片
  5. 关于Highcharts图表组件动态修改属性的方法(API)总结之Series
  6. NAND flash NOR flash SDRAM区别
  7. 10.27 noip模拟试题
  8. sublime text 配置文件中文说明
  9. 根据指定的key,将二维数组的value转换为string,适用于mysql的in查询
  10. vs2010 vs2013等vs中如何统计整个项目的代码行数
  11. PAT甲级1026 Table Tennis【模拟好题】
  12. firewall-cmd.man
  13. 426. Convert Binary Search Tree to Sorted Doubly Linked List把bst变成双向链表
  14. Web上传文件的原理及实现
  15. JAVA基础知识总结:十六
  16. bootstrap 4 移除Glyphicons
  17. python学习笔记11-文件操作方法
  18. Android HandlerThread和IntentService
  19. Java NIO Socket编程实例
  20. ajax file upload 修改

热门文章

  1. 使用VMWare虚拟机打开MFC报错:不支持16位系统
  2. tomcat之jsp连接mysql数据库
  3. Netty中的基本组件及关系
  4. VUE踩坑之路
  5. MSIL 教程
  6. Eclipse导入hadoop源码
  7. maven编译下载源码
  8. spring ref history Design philosophy
  9. CSS3之动画模块实现云朵漂浮效果
  10. Ubuntu、CenOS、Debian等不同版本简单概念与不同