并发程序设计之Future模式

一)、使用Future模式的原因

当某一段程序提交了一个请求,期待得到一个答复,但服务程序对这个请求的处理可能很慢,在单线程的环境中,调用函数是同步的,必须等到服务程序返回结果后才能进行其他处理,在这段时间里,客户端一直处于等待状态。

二)、Future模式

使用并发的设计思想,解决客户端发送请求到应用程序,等待响应数据时间过长的问题.

三)、Future模式的核心结构

1.main: 系统启动类

作用:调用Client,发送请求。

2.client: 发送请求类

 作用:返回Data对象,即FutureData对象,并开启一个线程给返回的FutureData对象装配RealData。

3.Data:数据接口

 作用:FutureData和RealData共同实现了这个接口。

4.FutureData: 虚假数据类

 作用:Future数据,构造快,虚假数据,装配RealData对象,相当于RealData的一个代理。

5.RealData: 真实数据类

   作用:返回服务程序处理的真实数据。

四)、Future模式的实现流程

1).用户发送client请求服务到应用程序,先返回一个虚假的数据FutureData类,并启一个新的线程,用于装配真正的数据返回对象。

2).提供一个接口类Data,类中提供一个接口方法,虚假类FutureData和真实类RealData分别实现对应的接口方法。

  1. .将新的线程装配成功的RealData对象加入FutureData中,FutureData相当于RealData的代理对象,在接口方法中调用RealData,的到应用程序真实的响应数据。

五)、Future模式的简单复现

模拟用户在淘宝购买商品的场景:

购买香水,用户购买商品后返回一个商品订单(相当于FutureData),并没有马上返回商品,用户需等待物流才能获得商品在等待物流派送的这段时间用户可以做其他事情(用户执行其他的业务逻辑),当货物到达再取货。当用户想要取货时,可以通过物流信息来监控货物是否到达(状态值isReady),若物流状态为未到达,用户不可取货,只能等待,当物流到达,用户取货(result())。

系统启动类:ClientService

/**
* 系统启动类
*/
public class ClientService {
public static void main(String[] args) {
//调用Client,发送请求
Client client = new Client();
//得到一个FutureData对象
Data data = client.request("channel香水 x 1:购买");
System.out.println("我在执行其他的业务");
//模拟客户端发送请请求后,继续执行其他的业务逻辑
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//用户的到真实的数据对象
System.out.println(data.result());
}
}

客户端请求类:Client

public class Client {
/**
* 客户端发送请求后,先返回一个FutureData,并开启一个线程,装配一个RealData到返回的FutureData中
*
*/
public Data request(String requestStr){
//创建一个Future对象
FutureData future = new FutureData();
//开启一个线程,创建RealData对象,并将该对象装配到future中
new Thread(){
@Override
public void run(){
//创建RealData对象
RealData realData = new RealData(requestStr);
//装配realData对象到future中
future.setRealData(realData);
}
}.start();
//用户请求后立即返回一个FutureData对象,执行其它的业务逻辑
return future;
}
}

接口类:Data

/**
* FutureData和RealData的共同接口
*/
public interface Data {
String result();
}

虚假数据类:FutureData

该类需要使用到线程的唤醒机制

1).使用isReady来监控对象是否被注入,isReady = false,对象未注入,线程请求

    request(),使用wait()让线程进入等待状态。
  1. .isReady = true,注入对象,使用notifyAll()唤醒等待线程。

原因: RealData的方法构造很慢,当调用FutureData的result()来获取真实数据

          时,RealData对象未被注入,因为FutureData的result()是调用了RealData

          的的result()方法,此时,对象未注入,抛出NullPointException。

注:wait()和notify()/notifyAll要配合synchonized使用。

/**
* 该类需要使用到线程的唤醒机制:
* 使用isReady来监控对象是否被注入,isReady = false,对象未注 入,线程请求request(),使用wait()让线程进入等待状态
* isReady = true,注入对象,使用notifyAll()唤醒等待线程。
*
* 原因: RealData的方法构造很慢,当调用FutureData的result()来获取真实数据时,RealData对象未被注入,因为FutureData的result()
* 是调用了RealData的result()方法,此时,对象未注入,抛出NullPointException。
*/
public class FutureData implements Data {
/**
* 装配对象
*/
RealData realData = null; /**
* 装配标识符,监控装配对象是否成功注入
*/
Boolean isReady = false; /**
* 注入RealData
* @return
*/
public synchronized void setRealData(RealData realData){
if(isReady){
return;
}
this.realData = realData;
//注入对象后,将isReady设置为true
isReady = true;
//唤醒等待的线程
notifyAll();
} /**
* 实际上使用RealData获取真实的数据并返回结果
* @return
*/
@Override
public synchronized String result() {
//当对象未被注入时,等待对象的注入
if(!isReady){
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return realData.result();
}
}

真实数据类:RealData

/**
* 真正的数据返回类
*/
public class RealData implements Data{
private String requestStr; /**
* 模拟在构造RealData对象时要花费很多时间
*/
public RealData(String requestStr) {
StringBuffer sb = new StringBuffer();
for(int i = 0; i < 100; i++){
sb.append(requestStr);
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} @Override
public String result() {
return "亲~ ,您的货物已经到件了哦!";
}
}

结果:

我在执行其他的业务
亲~ ,您的货物已经到件了哦!

六)、JDK内置实现

JDK提供了一个FutureTask和Callable接口来实现程序的异步操作。

1)、FutureTask类:

定义: 是一个线程类,能做为一个单独的线程运行。

1.实现RunableFuture接口

2.RunableFuture 继承了Runable、Future两个接口

2)、主要属性:

private Callable<V> callable;
//call()方法的返回值
private Object outcome;
//call()方法的执行状态
private volatile int state;
//新创建状态
private static final int NEW = 0;
//完成状态
private static final int COMPLETING = 1;
//标准状态
private static final int NORMAL = 2;
//异常状态
private static final int EXCEPTIONAL = 3;
//取消状态
private static final int CANCELLED = 4;
//正在中断状态
private static final int INTERRUPTING = 5;
//中断状态
private static final int INTERRUPTED = 6;

3)、构造方法new FutureTask(Callable callable) :

接收一个Callable类型的对象

public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

4)、Callable接口:接口提供了一个call()方法。

在FutureTask的run()方法中会调用这个方法。call()方法中实现并发线程的主要逻辑。

public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

5)、FutureTask的run()方法

实现逻辑:

1.调用Callable的call()方法,并获取其返回值。

2.将返回值赋值给outcome对象。

3.设置state状态值 state = NORMAL = 2。

public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//调用call()方法,并获取返回值
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
} //将返回值赋值给outcome变量
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}

6)、FutureTask的get()方法

1).判断call()方法的状态。

2).若sate <=COMPLETING = 2,线程会进入等待状态。

3).相反,则返回outcome值,即call()方法的返回值。

public V get() throws InterruptedException, ExecutionException {
int s = state;
//判断call()方法的执行状态,判断是否进入等待状态
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
} //返回outcome的值
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

7)、使用JDK自带的Future模式复现商品购买场景:

系统启动类:

/**
* 模拟淘宝下单:
* 购买香水,用户购买商品后返回一个商品订单,并没有马上返回商品,用户需 * 等待物流才能获得商品在等待物流派送的这段时间用户可以做其他事情,当货 * 物到达再取货。当用户想要取货时,可以通过物流信息来监控货物是否到达, * 若物流状态为未到达,用户不可取货,只能等待。
*/
public class JDK_ClientService {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建一个FutureTask对象,相当于上文FutureData,该对象是一个线程类
FutureTask future = new FutureTask(new RealData("chanta(nel香水 x 1: 购买!"));
//创将线程池执行器,返回一个ThreadPool对象
ExecutorService executor = Executors.newFixedThreadPool(1);
//开启线程,发送请求,执行future线程的run()方法,执行请求的具体逻辑
executor.submit(future);
System.out.println("我要去执行其他业务了");
//模拟程序执行其它逻辑
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//得到请求的真实返回数据,get方法会判断监控call()的状态,当call()的状态未执行或未执行完毕,线程进入等待状态。
System.out.println(future.get());
} }

真实类:实现Callabel接口,复写call()方法,定义Run方法的具体逻辑

public class RealData implements Callable {
private String requestStr;
/**
* 轻量级的构造方法
*/
public RealData(String requestStr) {
this.requestStr = requestStr;
} /**
* 将复杂的业务逻辑都放在call方法中
* @return
* @throws Exception
*/
@Override
public Object call(){
StringBuffer sb = new StringBuffer();
for(int i = 0; i < 10; i++){
sb.append(requestStr);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//指定FutureTask的返回对象
return "亲~, 您的香水已到货!";
}
}

结果:

我要去执行其他业务了
亲~, 您的香水已到货!

结论:

Jdk中Future模式的实现主要是依赖Callable接口,FutureTask类。

首先,在创建FutureTask对象的时候传入一个Callable的子类对象,获取线程池执行器,通过线程池执行器来开启FutureTask的线程,执行run()方法,在run()方法中调用Callable的子类对象的call()方法,并使用一个outcome变量来存储call()方法的返回值,使用一个int类型的变量state来监控call()方法的执行情况,用户可以通过get()方法来获取call()方法的返回值,get()方法根据state值来决定程序是否进入等待状态,若state = normal = 2 = 正常值,返回call()的返回结果。

最新文章

  1. 高级查询---嵌套and分页
  2. Centos6.5 下安装PostgreSQL9.4数据库
  3. ofbiz进击 。 ofbiz 退货流程(包含获取可退货项流程分析 以及 取消退货项的过程分析)
  4. encodeURIComponent编码后java后台的解码
  5. 禁止指定目录执行php文件
  6. MLlib-协同过滤
  7. 《Programming WPF》翻译 第7章 2.图形
  8. #Eureka 客户端和服务端间的交互
  9. 小白都会超详细--ELK日志管理平台搭建教程
  10. PHP变量传值赋值和引用赋值,变量销毁
  11. ansible的logging模块用来写日志
  12. Python机器学习(基础篇---监督学习(支持向量机))
  13. python 的zip 函数小例子
  14. 新建一个Windows Service的方法
  15. 用Redis实现分布式锁 与 实现任务队列
  16. eclipse git 创建新分支 合并分支 删除分支
  17. CentOS Tomcat启动 Neither the JAVA_HOME nor the JRE_HOME environment variable is defined
  18. 【SLAM】安装 g2o_viewer
  19. angular学习笔记(十二)-控制器
  20. BZOJ4659:lcm

热门文章

  1. mybatis 使用redis实现二级缓存(spring boot)
  2. scp -本地文件上传服务器,指定端口
  3. java script三大组成部分
  4. Python能做什么,自学Python效果怎么样?
  5. nginx基于uwsgi部署Django
  6. 项目spring boot 写es hbase 运行内存溢出
  7. 【IntelliJ IDEA】 常用快捷键列表
  8. Django实现WebSSH操作物理机或虚拟机
  9. iOS开发高级分享 - iOS上的设备标识符和指纹
  10. 如何往Spark社区做贡献,贡献代码