前言

何为原子性?它又是通过什么原理来控制线程安全的?这里主要介绍有关Atomic原子性操作的几个类的使用场景和方法。

主体概要

  • AtomicInteger

  • AtomicLong

  • LongAdder

  • AtomicReference

  • AtomicIntegerFieldUpdater

  • AtomicStampedReference

  • AtomicBoolean

主体内容

一、线程安全性定义

定义:当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的。

线程安全性主要体现在三个方面:原子性、可见性、有序性:

  • 原子性:提供了互斥访问,同一时刻只能有一个线程来对它进行操作
  • 可见性:一个线程对主内存的修改可以及时地被其他线程观察到
  • 有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序

使用示例:

1.AtomicInteger

package com.controller.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger; import com.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j;
@Slf4j
@ThreadSafe
public class AtomicIntegerTest {
//请求数
public static int clientTotal=5000;
//并发数
public static int threadTotal=200;
//计数值
public static AtomicInteger count= new AtomicInteger(0); public static void main(String[] args) throws InterruptedException{
//创建线程池
ExecutorService executorService = Executors.newCachedThreadPool();
//定义信号量(允许并发数)
final Semaphore semaphore = new Semaphore(threadTotal);
//定义计数器
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for(int i =0;i<clientTotal;i++){
executorService.execute(()->{
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception",e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}",count.get());
} private static void add(){
//先增加操作,再获取当前的值
count.incrementAndGet();
//先获取当前的值,在增加操作
//count.getAndIncrement();
}
}

重点在于.incrementAndGet()方法中的unsafe.getAndAddInt()方法,如下所示:

/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

使用了一个叫unsafe的类,看它里面getAndAddInt的实现,标红的compareAndSwapInt方法需要注意,首先这个“paramObject”是指当前调用的对象,即上面例子中的count对象,而paramLong是当前的值,比如这里想执行2+1=3的操作,paramLong就等于2,paramInt就等于1。i是从调用这个getIntVolatile方法得到底层当前的值。那么compareAndSwapInt(paramObject, paramLong, i, i + paramInt))中变量值可以解释为compareAndSwapInt(count,2, 2, 2 +1)),如果paramLong和底层的i值相同的话,将其值更新为i+paramInt。也就是期望的值与底层值相等,才会执行+1操作,最后把底层的值覆盖掉,这就是CAS的核心。

 public final int getAndAddInt(Object paramObject, long paramLong, int paramInt)
{
int i;
do
{
i = getIntVolatile(paramObject, paramLong);
} while (!compareAndSwapInt(paramObject, paramLong, i, i + paramInt));
return i;
}

2.AtomicLong

package com.controller.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong; import com.annoations.ThreadSafe; import lombok.extern.slf4j.Slf4j;
@Slf4j
@ThreadSafe
public class AtomicLongTest {
//请求数
public static int clientTotal=5000;
//并发数
public static int threadTotal=200;
//计数值
public static AtomicLong count= new AtomicLong(0); public static void main(String[] args) throws InterruptedException{
//创建线程池
ExecutorService executorService = Executors.newCachedThreadPool();
//定义信号量(允许并发数)
final Semaphore semaphore = new Semaphore(threadTotal);
//定义计数器
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for(int i =0;i<clientTotal;i++){
executorService.execute(()->{
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception",e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}",count.get());
} private static void add(){
//先增加操作,再获取当前的值
count.incrementAndGet();
//先获取当前的值,在增加操作
//count.getAndIncrement();
}
}

3.LongAdder

package com.controller.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.LongAdder; import lombok.extern.slf4j.Slf4j;
@Slf4j
public class LongAddrTest {
//请求数
public static int clientTotal=5000;
//并发数
public static int threadTotal=200;
//计数值
public static LongAdder count= new LongAdder(); public static void main(String[] args) throws InterruptedException{
//创建线程池
ExecutorService executorService = Executors.newCachedThreadPool();
//定义信号量(允许并发数)
final Semaphore semaphore = new Semaphore(threadTotal);
//定义计数器
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for(int i =0;i<clientTotal;i++){
executorService.execute(()->{
try {
semaphore.acquire();
add();
semaphore.release();
} catch (Exception e) {
log.error("exception",e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("count:{}",count);
} private static void add(){
//先增加操作,再获取当前的值
count.increment();
//先获取当前的值,在增加操作
//count.getAndIncrement();
}
}

LongAdder 与 AtomicLong有什么区别?

(1)AtomicLong 是基于 CAS 方式自旋更新的;LongAdder 是把 value 分成若干cell,并发量低的时候,直接 CAS 更新值,成功即结束。并发量高的情况,CAS更新某个cell值和需要时对cell数据扩容,成功结束;更新失败自旋 CAS 更新 cell值。取值的时候,调用 sum() 方法进行每个cell累加。

(2)AtomicLong 包含有原子性的读、写结合的api;LongAdder 没有原子性的读、写结合的api,能保证结果最终一致性。
低并发场景AtomicLong 和 LongAdder 性能相似,高并发场景 LongAdder 性能优于 AtomicLong。

4.AtomicReference

package com.controller.atomic;
import java.util.concurrent.atomic.AtomicReference;
import com.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@ThreadSafe
public class AtomicReferenceTest {
private static AtomicReference<Integer> count = new AtomicReference<>(0); public static void main(String[] args){
//如果值为0,则结果为2
count.compareAndSet(0, 2);//2
//如果值为0,则结果为1
count.compareAndSet(0, 1);//no
//如果值为1,则结果为3
count.compareAndSet(1, 3);//no
//如果值为2,则结果为4
count.compareAndSet(2, 4);//4
//如果值为3,则结果为5
count.compareAndSet(3, 5);//no
log.info("count:{}",count.get());
}
}

5.AtomicIntegerFieldUpdater

package com.controller.atomic;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class AtomicIntegerFileUpdaterTest { private static AtomicIntegerFieldUpdater<AtomicIntegerFileUpdaterTest> updater = AtomicIntegerFieldUpdater.newUpdater(AtomicIntegerFileUpdaterTest.class, "count");
@Getter
public volatile int count=100; public static void main(String[] args){
AtomicIntegerFileUpdaterTest AtomicIntegerFileUpdaterTest = new AtomicIntegerFileUpdaterTest(); if(updater.compareAndSet(AtomicIntegerFileUpdaterTest, 100, 120)){
log.info("update success,{}",AtomicIntegerFileUpdaterTest.getCount());
}
if(updater.compareAndSet(AtomicIntegerFileUpdaterTest, 100, 120)){
log.info("update success,{}",AtomicIntegerFileUpdaterTest.getCount());
}else{
log.info("update failed,{}",AtomicIntegerFileUpdaterTest.getCount());
}
}
}

加粗部分指代的是当前类"AtomicIntegerFileUpdaterTest"下的“count”变量,而count必须被volatile关键字修饰,不能被static修饰。

程序执行的结果为:

update success,120
update failed,120

compareAndSet(param1,param2,param3)指代的是如果param1中被AtomicIntegerFieldUpdater定义的对象的变量等于param2,则将其更新为param3。否则不更新。

6.AtomicStampedReference

解决CAS的ABA问题,每次增加操作,在原有版本号基础上加一。

关于ABA问题,这里作一个详细介绍:

在多线程场景下CAS会出现ABA问题,关于ABA问题这里简单科普下,例如有2个线程同时对同一个值(初始值为A)进行CAS操作,这三个线程如下
.线程1,期望值为A,欲更新的值为B
.线程2,期望值为A,欲更新的值为B
线程1抢先获得CPU时间片,而线程2因为其他原因阻塞了,线程1取值与期望的A值比较,发现相等然后将值更新为B,然后这个时候出现了线程3,期望值为B,欲更新的值为A,线程3取值与期望的值B比较,发现相等则将值更新为A,此时线程2从阻塞中恢复,并且获得了CPU时间片,这时候线程2取值与期望的值A比较,发现相等则将值更新为B,虽然线程2也完成了操作,但是线程2并不知道值已经经过了A->B->A的变化过程。 ABA问题带来的危害: 小明在提款机,提取了50元,因为提款机问题,有两个线程,同时把余额从100变为50
线程1(提款机):获取当前值100,期望更新为50,
线程2(提款机):获取当前值100,期望更新为50,
线程1成功执行,线程2某种原因block了,这时,某人给小明汇款50
线程3(默认):获取当前值50,期望更新为100,
这时候线程3成功执行,余额变为100,
线程2从Block中恢复,获取到的也是100,compare之后,继续更新余额为50!!!
此时可以看到,实际余额应该为100(-+),但是实际上变为了50(-+-)这就是ABA问题带来的成功提交。
解决方法: 在变量前面加上版本号,每次变量更新的时候变量的版本号都+,即A->B->A就变成了1A->2B->3A。

其中的compareAndSet()方法如下:

 public boolean compareAndSet(V   expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp
== current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}

7.AtomicBoolean

package com.controller.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; @Slf4j
public class AtomicBooleanTest {
private static AtomicBoolean isHappend = new AtomicBoolean(false);
//请求数
public static int clientTotal=5000;
//并发数
public static int threadTotal=200; public static void main(String[] args) throws Exception{
//创建线程池
ExecutorService executorService = Executors.newCachedThreadPool();
//定义信号量(允许并发数)
final Semaphore semaphore = new Semaphore(threadTotal);
//定义计数器
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for(int i =0;i<clientTotal;i++){
executorService.execute(()->{
try {
semaphore.acquire();
test();
semaphore.release();
} catch (Exception e) {
log.error("exception",e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("isHappend:{}",isHappend.get());
}
public static void test(){
if(isHappend.compareAndSet(false, true)){
log.info("execute..");
}
}
}

结果:true

这个类的示例用于在并发情况下,想只执行一次某段逻辑,绝对不会重复。

最新文章

  1. 再次学习 java 类的编译
  2. GIS部分理论知识备忘随笔
  3. js有关时间日期的操作
  4. android 编写动画
  5. php.ini 配置文件的深入解析
  6. ASP.NET菜鸟之路之Seesion小例子
  7. [Angular 2] Using ngrx/store and Reducers for Angular 2 Application State
  8. UVA 1558 - Number Game(博弈dp)
  9. iOS 事件处理机制与图像渲染过程
  10. WinSpy涉及的windows api
  11. loadrunner解决浏览器死机问题
  12. iOS开发之Quartz2D
  13. 从狗日的Oracle上下载jdk
  14. Sum of AP series——AP系列之和
  15. JSP EL隐含对象
  16. Linux用户登录日志查询
  17. MPP-解码示例
  18. 【spring源码分析】IOC容器初始化(十二)
  19. LeakCanary 来检查 Android 内存泄漏
  20. install pycurl

热门文章

  1. Centos7忘记mysql的root用户密码
  2. scala文件通过本地命令运行
  3. USB Windows驱动 音频解决方案芯片DP108
  4. Arduino读取串口数据并进行字符串分割
  5. 利用vim查看日志,快速定位问题《转载》
  6. [LeetCode] 931. Minimum Falling Path Sum 下降路径最小和
  7. QT事件处理–notify()
  8. 利用 Python 破解 ZIP 或 RAR 文件密码
  9. 八十八、SAP中ALV事件之二,事件的定义和事件子例程
  10. [NOIP2017] T4 跳房子 DP+二分