1。MessageDispatch消息分发信息

    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH;

    protected ConsumerId consumerId;
protected ActiveMQDestination destination;
protected Message message;
protected int redeliveryCounter; protected transient long deliverySequenceId;
protected transient Object consumer;
protected transient Runnable transmitCallback;
protected transient Throwable rollbackCause;

2,自己主动确认。是再接受到消息,反馈给上层应用之前就给确认

在afterMessageIsConsumed方法中
先deliveredMessages.clear();接着 session.sendAck(ack);

3,在从tcp取到消息后放到unconsumedMessages等待消费

4,在从unconsumedMessages取出消息预处理后。在beforeMessageIsConsumed方法加消息加到deliveredMessages 里面。

unconsumedMessages; 消费者从mq接受到消息存储的位置,还没有消费

5,receive内部消费之前

 private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
md.setDeliverySequenceId(session.getNextDeliveryId());
lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
if (!isAutoAcknowledgeBatch()) {
synchronized(deliveredMessages) {
deliveredMessages.addFirst(md);
}
if (session.getTransacted()) {
if (transactedIndividualAck) {
immediateIndividualTransactedAck(md);
} else {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
}
}
}

6,receive内部消费之后

private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
if (unconsumedMessages.isClosed()) {
return;
}
if (messageExpired) {
acknowledge(md, MessageAck.DELIVERED_ACK_TYPE);
stats.getExpiredMessageCount().increment();
} else {
stats.onMessage();
if (session.getTransacted()) {
// Do nothing.
} else if (isAutoAcknowledgeEach()) {
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
synchronized (deliveredMessages) {
if (!deliveredMessages.isEmpty()) {
if (optimizeAcknowledge) {
ackCounter++; // AMQ-3956 evaluate both expired and normal msgs as
// otherwise consumer may get stalled
if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
ackCounter = 0;
session.sendAck(ack);
optimizeAckTimestamp = System.currentTimeMillis();
}
// AMQ-3956 - as further optimization send
// ack for expired msgs when there are any.
// This resets the deliveredCounter to 0 so that
// we won't sent standard acks with every msg just
// because the deliveredCounter just below
// 0.5 * prefetch as used in ackLater()
if (pendingAck != null && deliveredCounter > 0) {
session.sendAck(pendingAck);
pendingAck = null;
deliveredCounter = 0;
}
}
} else {
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack!=null) {
deliveredMessages.clear();
session.sendAck(ack);
}
}
}
}
deliveryingAcknowledgements.set(false);
}
} else if (isAutoAcknowledgeBatch()) {
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
} else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
boolean messageUnackedByConsumer = false;
synchronized (deliveredMessages) {
messageUnackedByConsumer = deliveredMessages.contains(md);
}
if (messageUnackedByConsumer) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
}
else {
throw new IllegalStateException("Invalid session state.");
}
}
}

最新文章

  1. 列表屏幕(List Screen)
  2. 1306. Sorting Algorithm 2016 12 30
  3. The 2015 China Collegiate Programming Contest H. Sudoku hdu 5547
  4. token原理
  5. POJ 1236-Network of Schools (图论-有向图强联通tarjan)
  6. appium向右滑动
  7. eval绑定decimal数据后,如何去掉后面没有意义的0?
  8. ASP.NET程序中动态修改web.config中的设置项目(后台CS代码)
  9. 安装Python及工具
  10. 安装tcpreplay时报错:configure: error: libdnet not found
  11. WebStorm 的使用(一)
  12. CF 567C Geometric Progression
  13. oracle linux 安装过程错误 :Error in invoking target ‘agent nmhs’ of makefile
  14. Asp.Net MVC4.0 官方教程 入门指南之二--添加一个控制器
  15. Ubuntu13.04 安装Redmine
  16. weka对数据进行预测
  17. java基本的要点
  18. hadoop大数据技术架构详解
  19. 五十二、linux 编程——网络介绍
  20. Cmake知识----编写CMakeLists.txt文件编译C/C++程序

热门文章

  1. seleniumIDE使用
  2. Matplotlib中文乱码解决办法
  3. oracle主键自增长
  4. EF的三种模式
  5. CSS3box-shadow属性的使用
  6. 浅谈Android反调试 之 PTRACE_TRACEME
  7. hdu 4301 dp
  8. Pushlets的初始化陷阱
  9. 【BZOJ1059】矩阵游戏(二分图最大匹配)
  10. 浅谈JS的arguments对象