之前的文章《Java分布式锁实现》中列举了分布式锁的3种实现方式,分别是基于数据库实现,基于缓存实现和基于zookeeper实现。三种实现方式各有可取之处,本篇文章就详细讲解一下Java分布式锁之基于数据库的实现方式,也是最简单最易理解的实现方式。

首先,先来阐述下“锁”的概念,锁作为一种安全防御工具,既能上锁防止别人打开,又能让持有钥匙的人打开锁,这是锁的基本功能。那再来说一下“分布式锁”,分布式锁是在分布式系统(多个独立运行系统)内的锁,相对来说,这把锁的安全级别以及作用范围更大,所以从设计上就要考虑更多东西。

现在来说,怎么基于数据库实现这把分布式锁。其实说白了就是,把锁作为数据资源存入数据库,当持有这把锁的访问者来决定是否开锁。

以下详细讲解了在多个应用服务里,怎样用数据库去实现分布式锁。

结合案例:

1.客户app取出交易(同一个客户在某一个时间点只能对某种资产做取现操作)

2.交易重试补偿(交易过程服务宕机,扫描重试补偿)

一、数据库的设计

数据库锁表的表结构如下:

 
field type comment
ID bigint 主键
OUTER_SERIAL_NO varchar 流水号
CUST_NO char 客户号
SOURCE_CODE varchar 锁操作
THREAD_NO varchar 线程号
STATUS char 锁状态
REMARK varchar 备注
CREATED_AT timestamp 创建时间
UPDATED_AT timestamp 更新时间

作为锁的必要属性有5个:系统流水号,客户号,锁操作,线程号和锁状态,下面来解释一下每种属性

流水号:锁的具体指向,比如可以是产品,可以是交易流水号(后面会说到交易同步锁、交易补偿锁的使用方式)

客户号:客户的唯一标识

锁操作:客户的某种操作,比如客户取现操作,取现补偿重试操作

线程号:当前操作线程的线程号,比如取当前线程的uuid

锁状态:P处理中,F失败,Y成功

二、代码设计

代码的目录结构如下:

主要贴一下锁操作的核心代码实现:

锁接口定义:DbLockManager.java

/**
* 锁接口 <br>
*
* @Author fugaoyang
*
*/
public interface DbLockManager { /**
* 加锁
*/
boolean lock(String outerSerialNo, String custNo, LockSource source); /**
* 解锁
*/
void unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus); }

锁接口实现类:DbLockManagerImpl.java

/**
*
* 数据库锁实现<br>
*
* @author fugaoyang
*
*/
@Service
public class DbLockManagerImpl implements DbLockManager { private final Logger LOG = LoggerFactory.getLogger(this.getClass()); @Autowired
private DbSyncLockMapper lockMapper; @Transactional
public boolean lock(String outerSerialNo, String custNo, LockSource source) { boolean isLock = false;
TradeSyncLock lock = null;
try {
lock = lockMapper.find(outerSerialNo, custNo, source.getCode()); if (null == lock) {
lock = new TradeSyncLock();
createLock(lock, outerSerialNo, custNo, source); int num = lockMapper.insert(lock);
if (num == 1) {
isLock = true;
} LOG.info(ThreadLogUtils.getLogPrefix() + "加入锁,客户号[{}],锁类型[{}]", custNo, source.getCode());
return isLock;
} // 根据交易类型进行加锁
isLock = switchSynsLock(lock, source);
LOG.info(ThreadLogUtils.getLogPrefix() + "更新锁,客户号[{}],锁类型[{}]", custNo, source.getCode()); } catch (Exception e) {
LOG.error(ThreadLogUtils.getLogPrefix() + "交易加锁异常, 客户号:" + custNo, e);
}
return isLock;
} @Transactional
public void unLock(String outerSerialNo, String custNo, LockSource source, LockStatus targetStatus) { try {
TradeSyncLock lock = lockMapper.find(outerSerialNo, custNo, source.getCode()); if (null != lock) {
lockMapper.update(lock.getId(), targetStatus.getName(), LockStatus.P.getName(),
ThreadLogUtils.getCurrThreadUuid(), ThreadLogUtils.getCurrThreadUuid());
} LOG.info(ThreadLogUtils.getLogPrefix() + "释放锁,客户号[{}],锁类型[{}]", custNo, source.getCode());
} catch (Exception e) {
LOG.error(ThreadLogUtils.getLogPrefix() + "释放锁异常, 客户号:{}", custNo, e);
}
} /**
* 匹配加锁
*/
private boolean switchSynsLock(TradeSyncLock lock, LockSource source) {
boolean isLock = false; switch (source) {
case WITHDRAW:
;
isLock = tradeSynsLock(lock);
break;
case WITHDRAW_RETRY:
;
isLock = retrySynsLock(lock);
break;
default:
;
}
return isLock;
} /**
* 交易同步锁
*/
private boolean tradeSynsLock(TradeSyncLock lock) {
// 处理中的不加锁,即不执行交易操作
if (LockStatus.P.getName().equals(lock.getStatus())) {
return false;
} int num = lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.S.getName(),
ThreadLogUtils.getCurrThreadUuid(), null);
if (num == 1) {
return true;
}
return false;
} /**
* 补偿同步锁
*/
private boolean retrySynsLock(TradeSyncLock lock) {
// 处理中或处理完成的不加锁,即不执行补偿操作
if (LockStatus.P.getName().equals(lock.getStatus()) || LockStatus.S.getName().equals(lock.getStatus())) {
return false;
} int num = lockMapper.update(lock.getId(), LockStatus.P.getName(), LockStatus.F.getName(),
ThreadLogUtils.getCurrThreadUuid(), null);
if (num == 1) {
return true;
}
return false;
} private void createLock(TradeSyncLock lock, String outerSerialNo, String custNo, LockSource source) {
lock.setOuterSerialNo(outerSerialNo);
lock.setCustNo(custNo);
lock.setSourceCode(source.getCode());
lock.setThreadNo(ThreadLogUtils.getCurrThreadUuid());
lock.setStatus(LockStatus.P.getName());
lock.setRemark(source.getDesc());
} }

获取当前线程号以及打印uuid工具类ThreadLogUtils.Java

/**
*
* 线程处理<br>
*
* @author fugaoyang
*
*/
public class ThreadLogUtils { private static ThreadLogUtils instance = null; private ThreadLogUtils() {
setInstance(this);
} // 初始化标志
private static final Object __noop = new Object();
private static ThreadLocal<Object> __flag = new InheritableThreadLocal<Object>() {
@Override
protected Object initialValue() {
return null;
}
}; // 当前线程的UUID信息,主要用于打印日志;
private static ThreadLocal<String> currLogUuid = new InheritableThreadLocal<String>() {
@Override
protected String initialValue() {
return UUID.randomUUID().toString()/* .toUpperCase() */;
}
}; private static ThreadLocal<String> currThreadUuid = new ThreadLocal<String>() {
@Override
protected String initialValue() {
return UUIDGenerator.getUuid();
}
}; public static void clear(Boolean isNew) {
if (isNew) { currLogUuid.remove(); __flag.remove(); currThreadUuid.remove(); }
} public static String getCurrLogUuid() {
if (!isInitialized()) {
throw new IllegalStateException("TLS未初始化");
} return currLogUuid.get();
} public static String getCurrThreadUuid() {
return currThreadUuid.get();
} public static void clearCurrThreadUuid() {
currThreadUuid.remove();
} public static String getLogPrefix() {
if (!isInitialized()) {
return "";
} return "<uuid=" + getCurrLogUuid() + ">";
} private static boolean isInitialized() {
return __flag.get() != null;
} /**
* 初始化上下文,如果已经初始化则返回false,否则返回true<br/>
*
* @return
*/
public static boolean initialize() {
if (isInitialized()) {
return false;
} __flag.set(__noop);
return true;
} private static void setInstance(ThreadLogUtils instance) {
ThreadLogUtils.instance = instance;
} public static ThreadLogUtils getInstance() {
return instance;
} }

两种锁的实现的大致思路如下:

1.交易同步锁

当一个客户在app取现,第一次进入时,会插入一条当前线程,状态是P,操作是取现的锁,取现成功后根据当前线程号会更新成功;

当一个客户同时多个取现操作时,只有一个取现操作会加锁成功,其它会加锁失败;

当一个客户已经在取现中,这时数据库已经有一条状态P的锁,该客户同时又做了取现,这个取现动作会尝试加锁而退出;

2.交易重试补偿锁

1.当一个客户取现加锁成功,因调用第三方支付接口超时时,后台会对该笔交易重新发起重试打款操作,这时会新加一条当前交易流水号,当前线程号,状态是P,操作是取现重试的锁,重试的支付结果是成功的话,更新该条锁数据为Y状态,否则更新该条数据为F状态;

2.当重试支付失败后,再去重试打款时,发现锁的状态是F,这时把F更新为P,继续重试,根据重试结果更新锁状态。

上面实现的是一个最基本的数据库分布式锁,满足的并发量也是基于数据库所能扛得住的,性能基本可以满足普通的交易量。

后续可以优化的部分:

1.当一个用户同时多次获取lock时,因为目前是用的乐观锁,只会有一个加锁成功,可以优化成加入while(true)循环获取lock,当失败次数到达指定次数时退出,当前的操作结束。

2.当锁表数据量随着时间增大时,可以考虑按用户对锁表进行分表分库,以减小数据库方面的压力。

3.对锁的操作可以抽象出来,作为抽象实现,比如具体的取现操作只关心取现这个业务实现。

因为时间有限,写的比较仓促,希望大家有问题可以提出,相互探讨~~

完整示例代码后续会更新到github。



最新文章

  1. DATETIME类型和BIGINT 类型互相转换
  2. js创建元素
  3. Android应用主界面底部菜单实现
  4. [Bhatia.Matrix Analysis.Solutions to Exercises and Problems]ExI.1.2
  5. Emoji表情在网页中显示
  6. [SQL注入2]FROM SQL INJECTION TO SHELL: POSTGRESQL EDITION
  7. hdu1754 基础线段树
  8. JqGrid 显示表
  9. 18. Scrum敏捷软件开发
  10. sqlserver 经典入门基础书籍
  11. Swift 响应式编程 浅析
  12. javac编译乱码
  13. (转)关于CNN中平移不变性的理解
  14. MVC ---- 怎删改查
  15. 关于STM32数据手册中的定时器信号
  16. Teamwork(The seventh day of the team)
  17. HttpClient和HttpURLConnection的区别
  18. Eclipse去掉对jQuery的错误提示
  19. 【算法笔记】A1060 Are They Equal
  20. ExpressRoute 常见问题

热门文章

  1. JMeter 压力测试使用CSV参数
  2. Angular5.0.0新特性
  3. Python Web框架篇:Django Model ORM(对象关系映射)
  4. HDU6127Hard challenge
  5. How Many Answers Are Wrong
  6. 0_Simple__cdpSimplePrint + 0_Simple__cdpSimpleQuicksort
  7. JavaScript 和 TypeScript 交叉口 —— 类型定义文件(*.d.ts)
  8. android 横竖屏切换不重走生命周期
  9. 基于webpack搭建的vue+element-ui框架
  10. HDnoip2017题解