java架构《并发线程中级篇》
2024-09-22 07:06:51
java多线程的三大设计模式
本章主要记录java常见的三大设计模式,Future、Master-Worker和生产者-消费者模式。
一、Future模式
使用场景:数据可以不及时返回,到下一次实际要使用结果的之前,后台自动查询并返回。类似与Ajax异步加载。
原理:客户端发起请求,结果需要返回Data对象,当服务器收到请求以后,FutureData包装类实现Data接口,不查询数据库,直接返回结果。(核心)。然后后台自己开一个线程去查询数据库,RealData真
实数据类,也实现Data接口,并返回数据。当实际使用时。获取到返回的真实数据。
代码分析:
1 // FutureClient 客户端类:
2
3 public class FutureClient {
4
5 public Data request(final String queryStr){
6 //1 我想要一个代理对象(Data接口的实现类)先返回给发送请求的客户端,告诉他请求已经接收到,可以做其他的事情
7 final FutureData futureData = new FutureData();
8 //2 启动一个新的线程,去加载真实的数据,传递给这个代理对象
9 new Thread(new Runnable() {
10 @Override
11 public void run() {
12 //3 这个新的线程可以去慢慢的加载真实对象,然后传递给代理对象
13 RealData realData = new RealData(queryStr);
14 futureData.setRealData(realData);
15 }
16 }).start();
17 return futureData;
18 }
19
20
1 // Data类:
2
3
4
5 public interface Data {
6
7 String getRequest();
8
9 }
10
11
1 // FutureData类:
2
3 public class FutureData implements Data{
4
5 private RealData realData ;
6
7 private boolean isReady = false;
8
9 public synchronized void setRealData(RealData realData) {
10 //如果已经装载完毕了,就直接返回
11 if(isReady){
12 return;
13 }
14 //如果没装载,进行装载真实对象
15 this.realData = realData;
16 isReady = true;
17 //进行通知
18 notify();
19 }
20
21 @Override
22 public synchronized String getRequest() {
23 //如果没装载好 程序就一直处于阻塞状态
24 while(!isReady){
25 try {
26 wait();
27 } catch (InterruptedException e) {
28 e.printStackTrace();
29 }
30 }
31 //装载好直接获取数据即可
32 return this.realData.getRequest();
33 }
34
35
36
37 }
38
39
1 // RealData类:
2
3 public class RealData implements Data{
4
5 private String result ;
6
7 public RealData (String queryStr){
8 System.out.println("根据" + queryStr + "进行查询,这是一个很耗时的操作..");
9 try {
10 Thread.sleep(5000);
11 } catch (InterruptedException e) {
12 e.printStackTrace();
13 }
14 System.out.println("操作完毕,获取结果");
15 result = "查询结果";
16 }
17
18 @Override
19 public String getRequest() {
20 return result;
21 }
22
23 }
24
25
1 // Main测试类:
2
3 public class Main {
4
5 public static void main(String[] args) throws InterruptedException {
6
7 FutureClient fc = new FutureClient();
8 Data data = fc.request("请求参数");
9 System.out.println("请求发送成功!");
10 System.out.println("做其他的事情...");
11 String result = data.getRequest();
12 System.out.println(result);
13 }
14 }
二:Master-Worker模式(并行计算模式)
使用场景:互不影响的多任务时。返回结果需要共同返回。其好处是讲一个大任务分解成若干个小任务。并行执行,提高系统的吞吐量。
原理:核心思想是系统由两类进程协作工作;Master进程和Worker进程。Master进程负责接收和分配工作,Worker进程主要负责处理子任务。当各
个Worker进程处理完后。会将结果返回给Master,由Master做归纳和总结,并返回。
代码分析:
1 //Worker类:
2
3 public class Worker implements Runnable {
4
5 private ConcurrentLinkedQueue<Task> workQueue;
6 private ConcurrentHashMap<String, Object> resultMap;
7
8 public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
9 this.workQueue = workQueue;
10 }
11
12 public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
13 this.resultMap = resultMap;
14 }
15
16 @Override
17 public void run() {
18 while(true){
19 Task input = this.workQueue.poll();
20 if(input == null) break;
21 Object output = handle(input);
22 this.resultMap.put(Integer.toString(input.getId()), output);
23 }
24 }
25
26 private Object handle(Task input) {
27 Object output = null;
28 try {
29 //处理任务的耗时。。 比如说进行操作数据库。。。
30 Thread.sleep(500);
31 output = input.getPrice(); //模拟把Task类的价格做为结果返回
32 } catch (InterruptedException e) {
33 e.printStackTrace();
34 }
35 return output;
36 }
37
38 }
39
40
1 //Master类:
2
3 public class Master {
4
5 //1 有一个盛放任务的容器
6 private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
7
8 //2 需要有一个盛放worker的集合
9 private HashMap<String, Thread> workers = new HashMap<String, Thread>();
10
11 //3 需要有一个盛放每一个worker执行任务的结果集合
12 private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
13
14 //4 构造方法
15 public Master(Worker worker , int workerCount){
16 worker.setWorkQueue(this.workQueue);
17 worker.setResultMap(this.resultMap);
18
19 for(int i = 0; i < workerCount; i ++){
20 this.workers.put(Integer.toString(i), new Thread(worker));
21 }
22 }
23
24 //5 需要一个提交任务的方法
25 public void submit(Task task){
26 this.workQueue.add(task);
27 }
28
29 //6 需要有一个执行的方法,启动所有的worker方法去执行任务
30 public void execute(){
31 for(Map.Entry<String, Thread> me : workers.entrySet()){
32 me.getValue().start();
33 }
34 }
35
36 //7 判断是否运行结束的方法
37 public boolean isComplete() {
38 for(Map.Entry<String, Thread> me : workers.entrySet()){
39 if(me.getValue().getState() != Thread.State.TERMINATED){
40 return false;
41 }
42 }
43 return true;
44 }
45
46 //8 计算结果方法
47 public int getResult() {
48 int priceResult = 0;
49 for(Map.Entry<String, Object> me : resultMap.entrySet()){
50 priceResult += (Integer)me.getValue();
51 }
52 return priceResult;
53 }
54 }
1 // Task类:
2
3
4
5 public class Task {
6
7 private int id;
8 private int price ;
9 public int getId() {
10 return id;
11 }
12 public void setId(int id) {
13 this.id = id;
14 }
15 public int getPrice() {
16 return price;
17 }
18 public void setPrice(int price) {
19 this.price = price;
20 }
21 }
22
23
1 //main测试类:
2
3 public class Main {
4
5 public static void main(String[] args) {
6
7 int Processors= Runtime.getRuntime().availableProcessors(); //获取到当前电脑的线程数
8 System.out.println("当前电脑是"+Processors+"核");
9 Master master = new Master(new Worker(),Processors );
10 //Master master = new Master(new Worker(),20 ); //开20个线程
11 Random r = new Random();
12 for(int i = 1; i <= 100; i++){
13 Task t = new Task();
14 t.setId(i);
15 t.setPrice(r.nextInt(1000));
16 master.submit(t);
17 }
18 master.execute(); //执行任务
19 long start = System.currentTimeMillis();
20
21 while(true){
22 if(master.isComplete()){
23 long end = System.currentTimeMillis() - start;
24 int priceResult = master.getResult();
25 System.out.println("最终结果:" + priceResult + ", 执行时间:" + end);
26 break;
27 }
28 }
29 }
30 }
31
32
三:生产者-消费者模式
使用场景:消息中间件。
代码分析:
1 // main测试类:
2
3 public class Main {
4
5 public static void main(String[] args) throws Exception {
6 //内存缓冲区
7 BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
8 //生产者
9 Provider p1 = new Provider(queue);
10 Provider p2 = new Provider(queue);
11 Provider p3 = new Provider(queue);
12 //消费者
13 Consumer c1 = new Consumer(queue);
14 Consumer c2 = new Consumer(queue);
15 Consumer c3 = new Consumer(queue);
16 //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)
17
18 ExecutorService cachePool = Executors.newCachedThreadPool();
19
20 cachePool.execute(p1);
21 cachePool.execute(p2);
22 cachePool.execute(p3);
23 cachePool.execute(c1);
24 cachePool.execute(c2);
25 cachePool.execute(c3);
26
27 try {
28 Thread.sleep(3000);
29 } catch (InterruptedException e) {
30 e.printStackTrace();
31 }
32 p1.stop();
33 p2.stop();
34 p3.stop();
35 try {
36 Thread.sleep(2000);
37 } catch (InterruptedException e) {
38 e.printStackTrace();
39 }
40 }
41
42 }
43
44
1 Data类:
2
3 public final class Data {
4
5 private String id;
6 private String name;
7
8 public Data(String id, String name){
9 this.id = id;
10 this.name = name;
11 }
12
13 public String getId() {
14 return id;
15 }
16
17 public void setId(String id) {
18 this.id = id;
19 }
20
21 public String getName() {
22 return name;
23 }
24
25 public void setName(String name) {
26 this.name = name;
27 }
28
29 @Override
30 public String toString(){
31 return "{id: " + id + ", name: " + name + "}";
32 }
33
34 }
35
36
1 //Provider成产者类:
2
3
4
5 public class Provider implements Runnable{
6
7 //共享缓存区
8 private BlockingQueue<Data> queue;
9 //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态
10 private volatile boolean isRunning = true;
11 //id生成器
12 private static AtomicInteger count = new AtomicInteger();
13 //随机对象
14 private static Random r = new Random();
15
16 public Provider(BlockingQueue queue){
17 this.queue = queue;
18 }
19
20 @Override
21 public void run() {
22 while(isRunning){
23 try {
24 //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时)
25 Thread.sleep(r.nextInt(1000));
26 //获取的数据进行累计...
27 int id = count.incrementAndGet();
28 //比如通过一个getData方法获取了
29 Data data = new Data(Integer.toString(id), "数据" + id);
30 System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
31 if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
32 System.out.println("提交缓冲区数据失败....");
33 //do something... 比如重新提交
34 }
35 } catch (InterruptedException e) {
36 e.printStackTrace();
37 }
38 }
39 }
40
41 public void stop(){
42 this.isRunning = false;
43 }
44
45 }
46
47
1 // ConSumber消费者类:
2
3
4
5 public class Consumer implements Runnable{
6
7 private BlockingQueue<Data> queue;
8
9 public Consumer(BlockingQueue queue){
10 this.queue = queue;
11 }
12
13 //随机对象
14 private static Random r = new Random();
15
16 @Override
17 public void run() {
18 while(true){
19 try {
20 //获取数据
21 Data data = this.queue.take();
22 //进行数据处理。休眠0 - 1000毫秒模拟耗时
23 Thread.sleep(r.nextInt(1000));
24 System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId());
25 } catch (InterruptedException e) {
26 e.printStackTrace();
27 }
28 }
29 }
30 }
31
32
最新文章
- UI基础之UITextField相关
- 【leetcode】Populating Next Right Pointers in Each Node
- 【Spring】Spring系列7之Spring整合MVC框架
- NLTk
- WindowsPhone8 数据库增删改查
- AOP和IOC个人理解
- HDU4530:小Q系列故事——大笨钟
- BZOJ 1578: [Usaco2009 Feb]Stock Market 股票市场( 背包dp )
- SSIS中执行SQL任务组件参数传递的问题
- margin:0 auto
- 屏蔽ps联网激活
- Spring+SpringMVC+MyBatis+easyUI整合优化篇(一)System.out.print与Log
- Java:参数数量可变的方法
- 《高性能MySQL》读书笔记(上)
- MySQL数据库简识
- 6#day2总结
- spring boot2 整合(二)JPA(特别完整!)
- ORA-06553: PLS-553: character set name is not recognized, while starting Content Store
- Linux运维之系统性能---vmstat工具分析内存的瓶颈
- 关于UIImageView的显示问题——居中显示或者截取图片的中间部分显示