今天的博客有点多,因为前几天一直用笔记录,今天都补上了。后面的博客先停一段时间,后面还有dubbo、storm、kafka、solor、nginx、keepalived、fastdfs等内容,只是因为最近准备跳槽,停更一段时间,等到新公司后再继续更新。

场景1:支付宝转1w到余额宝,支付宝扣了1w,服务挂了怎么办?余额还没有加上

场景2:订单系统和库存系统如何保持一致

如果是本地的话很好解决

  • begin transaction
      update 支付宝 - 1w;
      update 余额宝 + 1W;
    end transaction
  • 用Spring的话,方法上加 @Transaction注释

那如果是跨系统的呢?该如何解决?

有一种思路是这样的:

  1. client发送转账请求给事务协调器
  2. 事务协调器先发送扣款请求给支付宝,返回执行结果(这时并没有提交)
  3. 事务协调器在发送加款请求给余额宝,返回执行结果(这时也没有提交)
  4. 事务协调器看两个执行结果都返回OK 就执行第四步,提交2和3没有提交的更新请求。

但是这个有个问题,那就是性能很受影响,主要卡在事务协调器这里。

RocketMQ的实现方式如下(图片来自网络):

支付宝先生成 扣款信息 --> 消息队列 --> 余额宝消费消息

发送消息:

 import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
import com.alibaba.rocketmq.common.message.Message; /**
* 发送事务消息例子
*
*/
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException { TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
// 事务回查最小并发数
producer.setCheckThreadPoolMinSize(2);
// 事务回查最大并发数
producer.setCheckThreadPoolMaxSize(2);
// 队列数
producer.setCheckRequestHoldMax(2000);
producer.setTransactionCheckListener(transactionCheckListener);
producer.start(); String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" };
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
for (int i = 0; i < 100; i++) {
try {
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes());
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
System.out.println(sendResult); Thread.sleep(10);
}
catch (MQClientException e) {
e.printStackTrace();
}
} for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
} producer.shutdown(); }
}

执行本地事务

 import java.util.concurrent.atomic.AtomicInteger;

 import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.common.message.Message; /**
* 执行本地事务
*/
public class TransactionExecuterImpl implements LocalTransactionExecuter {
private AtomicInteger transactionIndex = new AtomicInteger(1); @Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
int value = transactionIndex.getAndIncrement(); if (value == 0) {
throw new RuntimeException("Could not find db");
}
else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
} return LocalTransactionState.UNKNOW;
}
}

服务器回查客户端(这个功能在开源版本中已经被咔掉了,但是我们还是要写,不然报错)

 import java.util.concurrent.atomic.AtomicInteger;

 import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.common.message.MessageExt; /**
* 未决事务,服务器回查客户端
*/
public class TransactionCheckListenerImpl implements TransactionCheckListener {
private AtomicInteger transactionIndex = new AtomicInteger(0); @Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println("server checking TrMsg " + msg.toString()); int value = transactionIndex.getAndIncrement();
if ((value % 6) == 0) {
throw new RuntimeException("Could not find db");
}
else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
} return LocalTransactionState.UNKNOW;
}
}

到这就完了,为什么只介绍发送不介绍接收呢?因为一旦消息提交到MQ就不用管了, 要相信MQ会把消息送达consumer,如果消息未能被成功消费的话,那么Producer也会回滚

如何保证分布式系统的全局性事务?

因为阿里在3.2.6版本后,砍掉了消息回查的功能,也就是consumer端是否成功消费,Producer端并不知道,所以如果要保证全局性事务,我们要有自己的实现机制:

最新文章

  1. ora-01652无法通过128(在表空间temp中)扩展temp段
  2. SQL*Loader之CASE4
  3. oracle 彻底删除用户及表空间
  4. O365 &quot;打开或关闭脚本&quot;功能
  5. GET请求中URL的最大长度限制总结
  6. [TypeScript] Configuring a New TypeScript Project
  7. poj1742 Coins(多重背包+单调队列优化)
  8. [POJ 3734] Blocks (矩阵高速幂、组合数学)
  9. fstab 介绍
  10. ASP.NET MVC+EF框架+EasyUI实现权限管理系列(11)-验证码实现和底层修改
  11. 跨域的另一种解决方案CORS(CrossOrigin Resource Sharing)跨域资源共享
  12. 深入理解null的原理
  13. react dnd demo
  14. Java作业五(2017-10-15)
  15. UVA11996 Jewel Magic
  16. 不熟,不会,未a的题列表
  17. easyGUI 用法介绍
  18. Python3学习之路~2.7 文件操作
  19. FreeSWITCH呼叫参数之sip_cid_type
  20. oozie 安装过程详解

热门文章

  1. How to Get Vertical Line from Point and Line
  2. 想成为一名成功的UX设计师吗?做好这13件事情吧
  3. vue项目 菜单侧边栏随着右侧内容盒子的高度实时变化
  4. mongo学习- mapReduce操作事例
  5. task:scheduled cron 合法
  6. Hadoop-2.7.2分布式安装手册
  7. Thrift编译错误('::malloc' has not been declared)
  8. Centos 下安装tomcat多实例
  9. 从极速飞艇源码 VantComponent 谈 小程序维护
  10. mysqldump的简单使用