1. PushConsumer

  推,Broker主动向Consumer推消息,它Consumer的一种,应用通常向对象注册一个Listener接口,一旦接收到消息,Consumer对象立刻回调Linstener接口方法。Push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

  缺点:

  慢消费无疑是Push模式最大的致命伤,如果消费者的速度比发送者的速度慢很多,势必造成消息在broker的堆积。假设这些消息都是有用的无法丢弃的,消息就要一直在broker端保存。当然这还不是最致命的,最致命的是broker给consumer推送一堆consumer无法处理的消息,consumer不是reject就是error,然后来回踢皮球。所以push适合于没有慢消费情况的场景下

  前面使用的所有示例中的Consuemr都是采用的push方式,所以这里就不在具体写示例代码了。

2. PullConsumer

  拉,Consumer主动的从Broker拉取消息,主动权由应用控制,可以实现批量的消费消息。Pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

  反观Pull模式,consumer可以按需消费,不用担心自己处理不了的消息来骚扰自己,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适消息延迟与忙等。

  这是Pull模式最大的短板。由于主动权在消费方,消费方无法准确地决定何时去拉取最新的消息。如果一次Pull取到消息了还可以继续去Pull,如果没有Pull取到则需要等待一段时间重新Pull,在阿里中的解决是长轮询 Pull,消费者如果尝试拉取失败,不是直接return,而是把连接挂在那里wait,服务端如果有新的消息到来,把连接notify起来,这也是不错的思路。但海量的长连接block对系统的开销还是不容小觑的,还是要合理的评估时间间隔,给wait加一个时间上限比较好。

  使用PullConsumser的示例代码如下:

package com.wangx.rocketmq.quickstart;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*
*/
public class PullConsumer {
//保存上一次消费的消息位置
private static final Map offsetTable = new HashMap ();
public static void main(String[] args) throws MQClientException { //实例化pullConsumer
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_consumer_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
//拉取PullConsumerTopicTest topic下的所有消息队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("PullConsumerTopicTest"); //遍历消息队列
for (MessageQueue mq : mqs) {
System.err.println("Consume from the queue: " + mq);
SINGLE_MQ:
//
while (true) {
try {
//设置上次消费消息下标
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.println(pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
//根据结果状态,如果找到消息,批量消费消息
case FOUND:
List<MessageExt> messageExtList = pullResult.getMsgFoundList();
for (MessageExt m : messageExtList) {
System.out.println(new String(m.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
//保存上次消费的消息下标,这里使用了一个全局HashMap来保存
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offsetTable.put(mq, offset);
} //获取上次消费的消息的下表
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = (Long) offsetTable.get(mq);
if (offset != null) {
return offset;
}
return 0;
}
}

  Producer跟普通或其他消息发送方式一样即可。

最新文章

  1. UWP开发:APP之间的数据交互(以微信为例)
  2. java发送邮件..转
  3. MySql视图、存储过程、函数、索引
  4. memcached for windows 修改端口和最大内存,以及常用命令
  5. python Quicksort demo
  6. 常见MFC UI界面库
  7. bean的生命周期以及延迟实例化
  8. spring、mybatis事务配置和控制
  9. java_自定义标签运行原理
  10. SQL语句计算距离今天生日还差几天
  11. Redis需要多少内存预留-内存占用多少才安全
  12. JavaScript 比较好的建议
  13. array_column 函数, 以及在PHP5.5之下的替代方法
  14. Unite 2017 | 从《闹闹天宫》看MOBA游戏里的网络同步技术
  15. pyqt、webkit和qt之间的关系
  16. Weekly Contest 129
  17. poj1847 Tram(Dijkstra || Floyd || SPFA)
  18. SpringBoot SpringApplication底层源码分析与自动装配
  19. python使用SQLAlchemy对mysql操作
  20. Spring思维导图(IOC篇)

热门文章

  1. 企业级任务调度框架Quartz(4) 多个job实例注册到任务调度器上
  2. Spring Boot 项目学习 (一) 项目搭建
  3. 使用js控制文本超出部分显示省略号
  4. 训练1-D
  5. java的基本数据类型及运算符等
  6. android onConfigurationChanged的那点事
  7. Java 接口技术 Interface
  8. nodejs-安装及卸载
  9. HDU 5372 Segment Game
  10. crm2013使用图片字段