上述就是MQ中有关Consumer的类图,下面来介绍一下每个类

1.MQAdmin:底层类,上篇博客已经提过,就不再此重提

2.MQConsumer:Consumer公共的接口,常用的方法如下

如果消费失败的话,消息将会返回到broker中,并且延迟一会消费的时间

void sendMessageBack(final MessageExt msg, final int delayLevel, final String brokerName)  throws RemotingException, MQBrokerException, InterruptedException, MQClientException;

3.MQPushConsumer:Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法

4.MQPullConsumer:Consumer的一种,应用通常主动调用Consumer的拉消息方法从Broker拉消息,主动权由应用控制

在上图中出现了两类的消费者分别是PushConsumer和PullConsumer,下面来看一下

PushConsumer:通过注册监听的方式来消费信息

  1. <span style="font-family:Comic Sans MS;font-size:18px;">/**
  2. * @FileName: Consumer.java
  3. * @Package:com.test
  4. * @Description: TODO
  5. * @author: LUCKY
  6. * @date:2015年12月28日 下午2:43:23
  7. * @version V1.0
  8. */
  9. package com.test;
  10. import java.util.List;
  11. import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
  12. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  13. import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  14. import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  15. import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
  16. import com.alibaba.rocketmq.common.message.Message;
  17. import com.alibaba.rocketmq.common.message.MessageExt;
  18. /**
  19. * @ClassName: Consumer
  20. * @Description: 模拟消费者
  21. * @author: LUCKY
  22. * @date:2015年12月28日 下午2:43:23
  23. */
  24. public class ConsumerTest {
  25. public static void main(String[] args) {
  26. DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a");
  27. consumer.setNamesrvAddr("100.66.154.81:9876");
  28. try {
  29. // 订阅PushTopic下Tag为push的消息,都订阅消息
  30. consumer.subscribe("PushTopic", "push");
  31. // 程序第一次启动从消息队列头获取数据
  32. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  33. //可以修改每次消费消息的数量,默认设置是每次消费一条
  34. // consumer.setConsumeMessageBatchMaxSize(10);
  35. //注册消费的监听
  36. consumer.registerMessageListener(new MessageListenerConcurrently() {
  37. //在此监听中消费信息,并返回消费的状态信息
  38. public ConsumeConcurrentlyStatus consumeMessage(
  39. List<MessageExt> msgs,
  40. ConsumeConcurrentlyContext context) {
  41. // msgs中只收集同一个topic,同一个tag,并且key相同的message
  42. // 会把不同的消息分别放置到不同的队列中
  43. for(Message msg:msgs){
  44. System.out.println(new String(msg.getBody()));
  45. }
  46. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  47. }
  48. });
  49. consumer.start();
  50. Thread.sleep(5000);
  51. //5秒后挂载消费端消费
  52. consumer.suspend();
  53. } catch (Exception e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. }
  58. </span>

 PullConsumer:通过拉去的方式来消费消息

  1. <span style="font-family:Comic Sans MS;font-size:18px;">/**
  2. * @FileName: Consumer.java
  3. * @Package:com.test
  4. * @Description: TODO
  5. * @author: LUCKY
  6. * @date:2015年12月28日 下午2:43:23
  7. * @version V1.0
  8. */
  9. package com.test;
  10. import java.util.Set;
  11. import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
  12. import com.alibaba.rocketmq.client.consumer.MessageQueueListener;
  13. import com.alibaba.rocketmq.common.message.MessageQueue;
  14. /**
  15. * @ClassName: Consumer
  16. * @Description: 模拟消费者
  17. * @author: LUCKY
  18. * @date:2015年12月28日 下午2:43:23
  19. */
  20. public class ConsumerPullTest {
  21. public static void main(String[] args) {
  22. DefaultMQPullConsumer consumer=new DefaultMQPullConsumer();
  23. consumer.setNamesrvAddr("100.66.154.81:9876");
  24. consumer.setConsumerGroup("broker");
  25. try {
  26. consumer.start();
  27. Set<MessageQueue> messageQueues=  consumer.fetchSubscribeMessageQueues("PushTopic");
  28. for(MessageQueue messageQueue:messageQueues){
  29. System.out.println(messageQueue.getTopic());
  30. }
  31. //消息队列的监听
  32. consumer.registerMessageQueueListener("", new MessageQueueListener() {
  33. @Override
  34. //消息队列有改变,就会触发
  35. public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,
  36. Set<MessageQueue> mqDivided) {
  37. // TODO Auto-generated method stub
  38. }
  39. });
  40. } catch (Exception e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. }
  45. </span>

一般在应用中都会采用push的方法来自动的消费信息

最新文章

  1. Q_OBJECT
  2. 找到SQL Server的序列号
  3. js表单提交,面向对象
  4. Android 数据存储之 文件存储
  5. 『TCP/IP详解——卷一:协议』读书笔记——05
  6. OpenJudge计算概论-校门外的树
  7. WebView 获取网页点击事件
  8. Xcode 添加代码块
  9. jdbc知识问答 分类: 面试 2015-07-10 22:05 5人阅读 评论(0) 收藏
  10. [Search]swf 转mp4,未成功
  11. 【5】JAVA---地址App小软件(DeletePanel.class)(表现层)
  12. java中File类详解
  13. C语言实现ifconfig获取网卡接收和发送流量统计
  14. 如何添加“在这里打开PowerShell”到Windows中的上下文菜单
  15. Git源码管理工具使用
  16. 【Spring】Spring随笔索引
  17. 网络通信协议六之IP地址和MAC地址特征分析
  18. Swift要点:从Objective-C开发者的角度看Swift
  19. 如何使用Total Recorder录制软件发出的声音
  20. C++练习 | 求解二叉树的高度

热门文章

  1. Python并发(二)
  2. nginx的常用负载均衡算法,分别是
  3. SVR回归
  4. 聊聊、Jstack 解决生产问题
  5. [译] Pandas中根据列的值选取多行数据
  6. spring scope 属性的取值
  7. 九度oj 题目1366:栈的压入、弹出序列
  8. Linux硬件资源管理与外设设备使用、系统运行机制及用户管理
  9. [LOJ#526]「LibreOJ β Round #4」子集
  10. ——CentOS 7 安装SQL Server2019