java多线程的三大设计模式

本章主要记录java常见的三大设计模式,Future、Master-Worker和生产者-消费者模式。

一、Future模式

    使用场景:数据可以不及时返回,到下一次实际要使用结果的之前,后台自动查询并返回。类似与Ajax异步加载。

    原理:客户端发起请求,结果需要返回Data对象,当服务器收到请求以后,FutureData包装类实现Data接口,不查询数据库,直接返回结果。(核心)。然后后台自己开一个线程去查询数据库,RealData真

       实数据类,也实现Data接口,并返回数据。当实际使用时。获取到返回的真实数据。

        

      代码分析:

     

 1 //  FutureClient 客户端类:
2
3       public class FutureClient {
4
5       public Data request(final String queryStr){
6           //1 我想要一个代理对象(Data接口的实现类)先返回给发送请求的客户端,告诉他请求已经接收到,可以做其他的事情
7           final FutureData futureData = new FutureData();
8           //2 启动一个新的线程,去加载真实的数据,传递给这个代理对象
9           new Thread(new Runnable() {
10           @Override
11           public void run() {
12               //3 这个新的线程可以去慢慢的加载真实对象,然后传递给代理对象
13               RealData realData = new RealData(queryStr);
14               futureData.setRealData(realData);
15               }
16           }).start();
17           return futureData;
18           }
19
20

    

 1  // Data类:
2
3       
4
5         public interface Data {
6
7           String getRequest();
8
9         }
10
11   

    

 1  // FutureData类:   
2
3         public class FutureData implements Data{
4
5         private RealData realData ;
6
7           private boolean isReady = false;
8
9           public synchronized void setRealData(RealData realData) {
10           //如果已经装载完毕了,就直接返回
11           if(isReady){
12                 return;
13                 }
14               //如果没装载,进行装载真实对象
15                  this.realData = realData;
16                  isReady = true;
17               //进行通知
18                 notify();
19              }
20
21           @Override
22           public synchronized String getRequest() {
23           //如果没装载好 程序就一直处于阻塞状态
24             while(!isReady){
25             try {
26               wait();
27               } catch (InterruptedException e) {
28                 e.printStackTrace();
29               }
30               }
31               //装载好直接获取数据即可
32               return this.realData.getRequest();
33               }
34
35
36
37            }
38
39

     

 1 // RealData类:   
2
3         public class RealData implements Data{
4
5         private String result ;
6
7         public RealData (String queryStr){
8             System.out.println("根据" + queryStr + "进行查询,这是一个很耗时的操作..");
9             try {
10               Thread.sleep(5000);
11               } catch (InterruptedException e) {
12                 e.printStackTrace();
13               }
14                 System.out.println("操作完毕,获取结果");
15                 result = "查询结果";
16               }
17
18            @Override
19               public String getRequest() {
20               return result;
21              }
22
23             }
24
25

     

 1 // Main测试类:            
2
3         public class Main {
4
5             public static void main(String[] args) throws InterruptedException {
6
7             FutureClient fc = new FutureClient();
8             Data data = fc.request("请求参数");
9             System.out.println("请求发送成功!");
10             System.out.println("做其他的事情...");
11             String result = data.getRequest();
12             System.out.println(result);
13           }
14         }

 

二:Master-Worker模式(并行计算模式)      

    使用场景:互不影响的多任务时。返回结果需要共同返回。其好处是讲一个大任务分解成若干个小任务。并行执行,提高系统的吞吐量。

    原理:核心思想是系统由两类进程协作工作;Master进程和Worker进程。Master进程负责接收和分配工作,Worker进程主要负责处理子任务。当各
       个Worker进程处理完后。会将结果返回给Master,由Master做归纳和总结,并返回。

      

      

      

    

    

    

    代码分析:

      

 1 //Worker类:
2
3       public class Worker implements Runnable {
4
5         private ConcurrentLinkedQueue<Task> workQueue;
6         private ConcurrentHashMap<String, Object> resultMap;
7
8         public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
9           this.workQueue = workQueue;
10         }
11
12         public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
13           this.resultMap = resultMap;
14         }
15
16         @Override
17         public void run() {
18           while(true){
19             Task input = this.workQueue.poll();
20               if(input == null) break;
21                 Object output = handle(input);
22                 this.resultMap.put(Integer.toString(input.getId()), output);
23               }
24             }
25
26         private Object handle(Task input) {
27             Object output = null;
28               try {
29                 //处理任务的耗时。。 比如说进行操作数据库。。。
30                 Thread.sleep(500);
31                 output = input.getPrice(); //模拟把Task类的价格做为结果返回
32                 } catch (InterruptedException e) {
33                 e.printStackTrace();
34                 }
35                 return output;
36              }
37
38         }
39
40

    

 1  //Master类:
2
3       public class Master {
4
5         //1 有一个盛放任务的容器
6         private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
7
8         //2 需要有一个盛放worker的集合
9         private HashMap<String, Thread> workers = new HashMap<String, Thread>();
10
11         //3 需要有一个盛放每一个worker执行任务的结果集合
12         private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
13
14         //4 构造方法
15         public Master(Worker worker , int workerCount){
16           worker.setWorkQueue(this.workQueue);
17           worker.setResultMap(this.resultMap);
18
19           for(int i = 0; i < workerCount; i ++){
20               this.workers.put(Integer.toString(i), new Thread(worker));
21             }
22             }
23
24           //5 需要一个提交任务的方法
25        public void submit(Task task){
26             this.workQueue.add(task);
27             }
28
29           //6 需要有一个执行的方法,启动所有的worker方法去执行任务
30       public void execute(){
31           for(Map.Entry<String, Thread> me : workers.entrySet()){
32               me.getValue().start();
33             }
34             }
35
36           //7 判断是否运行结束的方法
37       public boolean isComplete() {
38           for(Map.Entry<String, Thread> me : workers.entrySet()){
39               if(me.getValue().getState() != Thread.State.TERMINATED){
40               return false;
41                 }
42               }
43             return true;
44           }
45
46           //8 计算结果方法
47       public int getResult() {
48           int priceResult = 0;
49           for(Map.Entry<String, Object> me : resultMap.entrySet()){
50           priceResult += (Integer)me.getValue();
51           }
52           return priceResult;
53           }
54       }

    

     

 1 // Task类:
2
3         
4
5       public class Task {
6
7         private int id;
8         private int price ;
9         public int getId() {
10           return id;
11            }
12         public void setId(int id) {
13           this.id = id;
14           }
15         public int getPrice() {
16           return price;
17           }
18         public void setPrice(int price) {
19           this.price = price;
20         }
21        }
22
23

      

 1 //main测试类:
2
3          public class Main {
4
5           public static void main(String[] args) {
6
7           int Processors= Runtime.getRuntime().availableProcessors(); //获取到当前电脑的线程数
8           System.out.println("当前电脑是"+Processors+"核");
9           Master master = new Master(new Worker(),Processors );
10           //Master master = new Master(new Worker(),20 ); //开20个线程
11           Random r = new Random();
12           for(int i = 1; i <= 100; i++){
13             Task t = new Task();
14             t.setId(i);
15             t.setPrice(r.nextInt(1000));
16             master.submit(t);
17             }
18             master.execute(); //执行任务
19             long start = System.currentTimeMillis();
20
21             while(true){
22             if(master.isComplete()){
23             long end = System.currentTimeMillis() - start;
24             int priceResult = master.getResult();
25             System.out.println("最终结果:" + priceResult + ", 执行时间:" + end);
26             break;
27             }
28             }
29           }
30         }
31
32

三:生产者-消费者模式

    使用场景:消息中间件。

      

    代码分析:

       

 1 // main测试类:  
2
3           public class Main {
4
5           public static void main(String[] args) throws Exception {
6           //内存缓冲区
7           BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
8           //生产者
9           Provider p1 = new Provider(queue);
10           Provider p2 = new Provider(queue);
11           Provider p3 = new Provider(queue);
12           //消费者
13           Consumer c1 = new Consumer(queue);
14           Consumer c2 = new Consumer(queue);
15           Consumer c3 = new Consumer(queue);
16           //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)  
17
18           ExecutorService cachePool = Executors.newCachedThreadPool();
19
20           cachePool.execute(p1);
21           cachePool.execute(p2);
22           cachePool.execute(p3);
23           cachePool.execute(c1);
24           cachePool.execute(c2);
25           cachePool.execute(c3);
26
27           try {
28             Thread.sleep(3000);
29             } catch (InterruptedException e) {
30               e.printStackTrace();
31             }
32               p1.stop();
33               p2.stop();
34               p3.stop();
35           try {
36             Thread.sleep(2000);
37             } catch (InterruptedException e) {
38               e.printStackTrace();
39             }
40           }
41
42         }
43
44

     

 1  Data类:        
2
3         public final class Data {
4
5         private String id;
6         private String name;
7
8         public Data(String id, String name){
9           this.id = id;
10           this.name = name;
11           }
12
13         public String getId() {
14           return id;
15           }
16
17         public void setId(String id) {
18           this.id = id;
19           }
20
21         public String getName() {
22           return name;
23           }
24
25         public void setName(String name) {
26           this.name = name;
27           }
28
29         @Override
30         public String toString(){
31           return "{id: " + id + ", name: " + name + "}";
32           }
33
34         }
35
36

     

 1  //Provider成产者类:
2
3         
4
5         public class Provider implements Runnable{
6
7             //共享缓存区
8           private BlockingQueue<Data> queue;
9             //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态
10           private volatile boolean isRunning = true;
11             //id生成器
12           private static AtomicInteger count = new AtomicInteger();
13             //随机对象
14           private static Random r = new Random();
15
16           public Provider(BlockingQueue queue){
17             this.queue = queue;
18             }
19
20           @Override
21           public void run() {
22           while(isRunning){
23             try {
24               //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时)
25               Thread.sleep(r.nextInt(1000));
26               //获取的数据进行累计...
27               int id = count.incrementAndGet();
28               //比如通过一个getData方法获取了
29               Data data = new Data(Integer.toString(id), "数据" + id);
30               System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
31               if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
32               System.out.println("提交缓冲区数据失败....");
33               //do something... 比如重新提交
34               }
35             } catch (InterruptedException e) {
36               e.printStackTrace();
37             }
38           }
39           }
40
41           public void stop(){
42             this.isRunning = false;
43           }
44
45         }
46
47

    

 1  // ConSumber消费者类:
2
3         
4
5         public class Consumer implements Runnable{
6
7           private BlockingQueue<Data> queue;
8
9           public Consumer(BlockingQueue queue){
10           this.queue = queue;
11           }
12
13           //随机对象
14           private static Random r = new Random();
15
16           @Override
17           public void run() {
18             while(true){
19               try {
20               //获取数据
21               Data data = this.queue.take();
22               //进行数据处理。休眠0 - 1000毫秒模拟耗时
23               Thread.sleep(r.nextInt(1000));
24               System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId());
25               } catch (InterruptedException e) {
26                 e.printStackTrace();
27               }
28             }
29             }
30          }
31
32

最新文章

  1. UI基础之UITextField相关
  2. 【leetcode】Populating Next Right Pointers in Each Node
  3. 【Spring】Spring系列7之Spring整合MVC框架
  4. NLTk
  5. WindowsPhone8 数据库增删改查
  6. AOP和IOC个人理解
  7. HDU4530:小Q系列故事——大笨钟
  8. BZOJ 1578: [Usaco2009 Feb]Stock Market 股票市场( 背包dp )
  9. SSIS中执行SQL任务组件参数传递的问题
  10. margin:0 auto
  11. 屏蔽ps联网激活
  12. Spring+SpringMVC+MyBatis+easyUI整合优化篇(一)System.out.print与Log
  13. Java:参数数量可变的方法
  14. 《高性能MySQL》读书笔记(上)
  15. MySQL数据库简识
  16. 6#day2总结
  17. spring boot2 整合(二)JPA(特别完整!)
  18. ORA-06553: PLS-553: character set name is not recognized, while starting Content Store
  19. Linux运维之系统性能---vmstat工具分析内存的瓶颈
  20. 关于UIImageView的显示问题——居中显示或者截取图片的中间部分显示

热门文章

  1. CAP理论和BASE理论及数据库的ACID中关于一致性及不同点的思考
  2. Hive日期函数总结(转学习使用)
  3. 数据库的查询(结合YGGL.sql)
  4. 【递推】P1028数的计算
  5. MySql创建存储过程,并使用事件定时调用
  6. 一个上传图片,预览图片的小demo
  7. 为什么.NET Standard 仍然有意义?
  8. 【JDBC核心】JDBC 概述
  9. 相同的class的各位object互为友元(friend)
  10. Head First 设计模式 —— 15. 与设计模式相处