1. 客户端怎样显式地使用事务?

producer 开启事务(代码片段):

ActiveMQSession session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("TEST.FOO");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 开启事务
// 发送 TransactionInfo 消息 BEGIN
session.getTransactionContext().begin(); for (int i = 0; i < 2; i++) {
// Create a message
String text = "zhang";
TextMessage message = session.createTextMessage(text);
producer.send(message);
}
// session.getTransactionContext().rollback();
//提交事务
// 发送 TransactionInfo 消息 COMMIT_ONE_PHASE
session.getTransactionContext().commit();

2. broker 处理事务的入口:

TransportConnection.processBeginTransaction
TransportConnection.processCommitTransactionOnePhase
TransportConnection.processCommitTransactionTwoPhase

broker 处理事务的逻辑在 TransactionBroker 类中。

那么,具体在 Queue 中是怎样体现事务的呢?

ActiveMQ 客户端默认不会开启事务,而如果客户端显式地开启了事务,则 Queue 中可能会存在多个事务,一个事务中必然会有一个消息列表,当客户端提交事务时,Queue 接收事务对应的消息列表,而如果客户端回滚事务,则 Queue 会删除这些消息。

Queue 中的事务变量:

// 键是Transaction,值是对应的消息列表
final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new ConcurrentHashMap<Transaction, SendSync>();
private final LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>();

Queue 内部类 SendSync 封装了消息和同步操作:

class SendSync extends Synchronization {

    class MessageContext {
public Message message;
public ConnectionContext context; public MessageContext(ConnectionContext context, Message message) {
this.context = context;
this.message = message;
}
} final Transaction transaction;
// 这就是我要找的消息列表
List<MessageContext> additions = new ArrayList<MessageContext>(); public SendSync(Transaction transaction) {
this.transaction = transaction;
} public void add(ConnectionContext context, Message message) {
additions.add(new MessageContext(context, message));
} @Override
public void beforeCommit() throws Exception {
synchronized (orderIndexUpdates) {
orderIndexUpdates.addLast(transaction);
}
} @Override
public void afterCommit() throws Exception {
ArrayList<SendSync> syncs = new ArrayList<SendSync>(200);
sendLock.lockInterruptibly();
try {
synchronized (orderIndexUpdates) {
Transaction next = orderIndexUpdates.peek();
while( next!=null && next.isCommitted() ) {
syncs.add(sendSyncs.remove(orderIndexUpdates.removeFirst()));
next = orderIndexUpdates.peek();
}
}
for (SendSync sync : syncs) {
sync.processSend();
}
} finally {
sendLock.unlock();
}
for (SendSync sync : syncs) {
sync.processSent();
}
} // called with sendLock
private void processSend() throws Exception { for (Iterator<MessageContext> iterator = additions.iterator(); iterator.hasNext(); ) {
MessageContext messageContext = iterator.next();
// It could take while before we receive the commit
// op, by that time the message could have expired..
if (broker.isExpired(messageContext.message)) {
broker.messageExpired(messageContext.context, messageContext.message, null);
destinationStatistics.getExpired().increment();
iterator.remove();
continue;
}
sendMessage(messageContext.message);
messageContext.message.decrementReferenceCount();
}
} private void processSent() throws Exception {
for (MessageContext messageContext : additions) {
messageSent(messageContext.context, messageContext.message);
}
} @Override
public void afterRollback() throws Exception {
try {
for (MessageContext messageContext : additions) {
messageContext.message.decrementReferenceCount();
}
} finally {
sendSyncs.remove(transaction);
}
}
}

3. 那么 XA 事务又是什么呢?ActiveMQ 实现了分布式事务,当系统中存在多数据源的情况下,也许会需要使用 XA ,为了方便,只提供一个单数据源的例子:

Xid xid = new MyXid(1, new byte[]{0x01}, new byte[]{0x02});
session.getTransactionContext().start(xid, XAResource.TMSUCCESS);
// 操作mq
session.getTransactionContext().end(xid, XAResource.TMSUCCESS);
int prepare = session.getTransactionContext().prepare(xid);
System.out.println("prepare:" + prepare);
// 根据prepare结果决定是否提交
session.getTransactionContext().commit(xid, false);

这些操作步骤,和 MySQL的 XA 是一样的,也是 start,end,prepare,commit,实现的都是javax transaction 那一套接口。

public class MyXid implements Xid {
private int formatId;
private byte[] globalTid;
private byte[] branchQ; public MyXid(int formatId, byte[] globalTid, byte[] branchQ) {
this.formatId = formatId;
this.globalTid = globalTid;
this.branchQ = branchQ;
} public byte[] getBranchQualifier() {
return this.branchQ;
} public int getFormatId() {
return formatId;
} public byte[] getGlobalTransactionId() {
return this.globalTid;
}
}

最新文章

  1. JAVA语言的本质优势
  2. 浏览器桌面通知Notification探究
  3. MEF
  4. ASP.NET-【缓存】-使用ASP.NET缓存
  5. 黄聪:wordpress源码解析-目录结构-文件调用关系(转)
  6. HDU 4597 Play Game 记忆化DP
  7. 一、MongoDB的下载、安装与部署
  8. 《Prism 5.0源码走读》UnityBootstrapper
  9. 浅析Objective-C字面量
  10. 微软Hololens学院教程-Hologram 211-Gestures(手势)【微软教程已经更新,本文是老版本】
  11. ArcGIS 网络分析[2.5] VRP(车辆配送)【较难】
  12. springMVC实现登陆
  13. scrollReveal.js – 页面滚动显示动画JS
  14. 关于hover和after、before合用
  15. java abstract构造函数调用
  16. Extjs实现Grid表格显示【一】
  17. SELECT查询结果集INSERT到数据表
  18. Python编写API接口
  19. Using ADB and fastboot
  20. 你的组织使用了 windows defender 应用程序控制来阻止此应用

热门文章

  1. Python学习笔记3-string
  2. 【MySQL】【一】shell
  3. GYM 101064 2016 USP Try-outs G. The Declaration of Independence 主席树
  4. Intellij idea 2017 图标含义
  5. springboot + mybatis 的项目,实现简单的CRUD
  6. 日期时间函数 mysql 和sqlserver 中对于常用函数的日期和时间函数的区别
  7. &quot;不是内部或外部命令&quot;
  8. 学习笔记43—Linux基础集
  9. QT绘制饼图
  10. Codeforces 995 E - Number Clicker