1 api

    java.util.concurrent包下的新类。LinkedBlockingQueue就是其中之一,是一个阻塞的线程安全的队列,底层采用链表实现。
     
      LinkedBlockingQueue构造的时候若没有指定大小,则默认大小为Integer.MAX_VALUE,当然也可以在构造函数的参数中指定大小。LinkedBlockingQueue不接受null。

添加元素的方法有三个:add,put,offer,且这三个元素都是向队列尾部添加元素的意思。

区别:

add方法在添加元素的时候,若超出了度列的长度会直接抛出异常:

public static void main(String args[]){
try {
LinkedBlockingQueue<String> queue=new LinkedBlockingQueue(2);

queue.add("hello");
queue.add("world");
queue.add("yes");
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
//运行结果:
java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(Unknown Source)
at com.wjy.test.GrandPather.main(GrandPather.java:12)

xxxxxxxxxx
16
16
 
1
public static void main(String args[]){  
2
        try {  
3
            LinkedBlockingQueue<String> queue=new LinkedBlockingQueue(2);  
4
              
5
            queue.add("hello");  
6
            queue.add("world");  
7
            queue.add("yes");  
8
        } catch (Exception e) {  
9
            // TODO: handle exception  
10
            e.printStackTrace();  
11
        }  
12
    }  
13
//运行结果:  
14
java.lang.IllegalStateException: Queue full  
15
    at java.util.AbstractQueue.add(Unknown Source)  
16
    at com.wjy.test.GrandPather.main(GrandPather.java:12)  

put方法,若向队尾添加元素的时候发现队列已经满了会发生阻塞一直等待空间,以加入元素。

public static void main(String args[]){
try {
LinkedBlockingQueue<String> queue=new LinkedBlockingQueue(2);

queue.put("hello");
queue.put("world");
queue.put("yes");

System.out.println("yes");
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
//运行结果:
//在queue.put("yes")处发生阻塞
//下面的“yes”无法输出

xxxxxxxxxx
17
17
 
1
public static void main(String args[]){  
2
        try {  
3
            LinkedBlockingQueue<String> queue=new LinkedBlockingQueue(2);  
4
              
5
            queue.put("hello");  
6
            queue.put("world");  
7
            queue.put("yes");  
8
              
9
            System.out.println("yes");  
10
        } catch (Exception e) {  
11
            // TODO: handle exception  
12
            e.printStackTrace();  
13
        }  
14
    }  
15
//运行结果:  
16
//在queue.put("yes")处发生阻塞  
17
//下面的“yes”无法输出  
    offer方法在添加元素时,如果发现队列已满无法添加的话,会直接返回false。     
    
public static void main(String args[]){
try {
LinkedBlockingQueue<String> queue=new LinkedBlockingQueue(2);

boolean bol1=queue.offer("hello");
boolean bol2=queue.offer("world");
boolean bol3=queue.offer("yes");

System.out.println(queue.toString());
System.out.println(bol1);
System.out.println(bol2);
System.out.println(bol3);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
//运行结果:
[hello, world]
true
true
false

xxxxxxxxxx
22
22
 
1
public static void main(String args[]){  
2
        try {  
3
            LinkedBlockingQueue<String> queue=new LinkedBlockingQueue(2);  
4
              
5
            boolean bol1=queue.offer("hello");  
6
            boolean bol2=queue.offer("world");  
7
            boolean bol3=queue.offer("yes");  
8
              
9
            System.out.println(queue.toString());  
10
            System.out.println(bol1);  
11
            System.out.println(bol2);  
12
            System.out.println(bol3);  
13
        } catch (Exception e) {  
14
            // TODO: handle exception  
15
            e.printStackTrace();  
16
        }  
17
    }  
18
//运行结果:  
19
[hello, world]  
20
true  
21
true  
22
false  
    从队列中取出并移除头元素的方法有:poll,remove,take。     

poll: 若队列为空,返回null。

remove:若队列为空,抛出NoSuchElementException异常。

take:若队列为空,发生阻塞,等待有元素。

2基于LinkedBlockingQueue的生产者和消费者

package com.queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueTest1 {
public static void main(String[] args) {
LinkedBlockingQueueTest1 test = new LinkedBlockingQueueTest1();
// 建立一个装苹果的篮子
Basket basket = test.new Basket();
ExecutorService service = Executors.newCachedThreadPool();
Producer producer = test.new Producer("生产者001", basket);
Producer producer2 = test.new Producer("生产者002", basket);
Consumer consumer = test.new Consumer("消费者001", basket);
service.submit(producer);
service.submit(producer2);
service.submit(consumer);
// 程序运行5s后,所有任务停止
try {
Thread.sleep(1000 * 5);
} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdownNow();
}

//定义篮子
public class Basket {
// 篮子,能够容纳3个苹果
BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);
// 生产苹果,放入篮子
public void produce() throws InterruptedException {
// put方法放入一个苹果,若basket满了,等到basket有位置
basket.put("An apple");
}
// 消费苹果,从篮子中取走
public String consume() throws InterruptedException {
// take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)
return basket.take();
}
}
// 定义苹果生产者
class Producer implements Runnable {
private String instance;
private Basket basket;
public Producer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
// 生产苹果
System.out.println(instance + "生产苹果");
basket.produce();
// 休眠300ms
Thread.sleep(300);
}
} catch (InterruptedException ex) {
System.out.println("Producer Interrupted");
}
}
}
// 定义苹果消费者
class Consumer implements Runnable {
private String instance;
private Basket basket;
public Consumer(String instance, Basket basket) {
this.instance = instance;
this.basket = basket;
}
public void run() {
try {
while (true) {
// 消费苹果
System.out.println(instance + "消费苹果" + basket.consume());
// 休眠1000ms
Thread.sleep(150);
}
} catch (InterruptedException ex) {
System.out.println("Consumer Interrupted");
}
}
}
}

85
 
1
package com.queue;
2
import java.util.concurrent.BlockingQueue;
3
import java.util.concurrent.ExecutorService;
4
import java.util.concurrent.Executors;
5
import java.util.concurrent.LinkedBlockingQueue;
6
public class LinkedBlockingQueueTest1 {
7
    public static void main(String[] args) {
8
        LinkedBlockingQueueTest1 test = new LinkedBlockingQueueTest1();
9
        // 建立一个装苹果的篮子
10
        Basket basket = test.new Basket();
11
        ExecutorService service = Executors.newCachedThreadPool();
12
        Producer producer = test.new Producer("生产者001", basket);
13
        Producer producer2 = test.new Producer("生产者002", basket);
14
        Consumer consumer = test.new Consumer("消费者001", basket);
15
        service.submit(producer);
16
        service.submit(producer2);
17
        service.submit(consumer);
18
        // 程序运行5s后,所有任务停止
19
        try {
20
            Thread.sleep(1000 * 5);
21
        } catch (InterruptedException e) {
22
            e.printStackTrace();
23
        }
24
        service.shutdownNow();
25
    }
26
    
27
    //定义篮子
28
    public class Basket {
29
        // 篮子,能够容纳3个苹果
30
        BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);
31
        // 生产苹果,放入篮子
32
        public void produce() throws InterruptedException {
33
            // put方法放入一个苹果,若basket满了,等到basket有位置
34
            basket.put("An apple");
35
        }
36
        // 消费苹果,从篮子中取走
37
        public String consume() throws InterruptedException {
38
            // take方法取出一个苹果,若basket为空,等到basket有苹果为止(获取并移除此队列的头部)
39
            return basket.take();
40
        }
41
    }
42
    // 定义苹果生产者
43
    class Producer implements Runnable {
44
        private String instance;
45
        private Basket basket;
46
        public Producer(String instance, Basket basket) {
47
            this.instance = instance;
48
            this.basket = basket;
49
        }
50
        public void run() {
51
            try {
52
                while (true) {
53
                    // 生产苹果
54
                    System.out.println(instance + "生产苹果");
55
                    basket.produce();
56
                    // 休眠300ms
57
                    Thread.sleep(300);
58
                }
59
            } catch (InterruptedException ex) {
60
                System.out.println("Producer Interrupted");
61
            }
62
        }
63
    }
64
    // 定义苹果消费者
65
    class Consumer implements Runnable {
66
        private String instance;
67
        private Basket basket;
68
        public Consumer(String instance, Basket basket) {
69
            this.instance = instance;
70
            this.basket = basket;
71
        }
72
        public void run() {
73
            try {
74
                while (true) {
75
                    // 消费苹果
76
                    System.out.println(instance + "消费苹果" + basket.consume());
77
                    // 休眠1000ms
78
                    Thread.sleep(150);
79
                }
80
            } catch (InterruptedException ex) {
81
                System.out.println("Consumer Interrupted");
82
            }
83
        }
84
    }
85
}

3示例2

 并发库中的BlockingQueue是一个比较好玩的类,顾名思义,就是阻塞队列。该类主要提供了两个方法put()和take(),前者将一个对象放到队列中,如果队列已经满了,就等待直到有空闲节点;后者从head取一个对象,如果没有对象,就等待直到有可取的对象。

 下面的例子比较简单,一个读线程,用于将要处理的文件对象添加到阻塞队列中,
 另外四个写线程用于取出文件对象,为了模拟写操作耗时长的特点,特让线程睡眠一段随机长度的时间。另外,该Demo也使用到了线程池和原子整型(AtomicInteger),AtomicInteger可以在并发情况下达到原子化更新,避免使用了synchronized,而且性能非常高。由于阻塞队列的put和take操作会阻塞,为了使线程退出,特在队列中添加了一个“标识”,算法中也叫“哨兵”,当发现这个哨兵后,写线程就退出。
public class LinkedBlockingQueueTest {
static long randomTime() {
return (long) (Math.random() * 1000);
}
@Test
public void testName() throws Exception {
AtomicInteger rc = new AtomicInteger();
int incrementAndGet = rc.incrementAndGet();
System.out.println(incrementAndGet);
}

public static void main(String[] args) {
// 能容纳100个文件
final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);
// 线程池
final ExecutorService exec = Executors.newFixedThreadPool(5);
final File root = new File("D:\\JavaLib");
// 完成标志
final File exitFile = new File("");
// 读个数
final AtomicInteger rc = new AtomicInteger();
// 写个数
final AtomicInteger wc = new AtomicInteger();
// 读线程
Runnable read = new Runnable() {
public void run() {
scanFile(root);
scanFile(exitFile);
}
public void scanFile(File file) {
if (file.isDirectory()) {
File[] files = file.listFiles(new FileFilter() {
public boolean accept(File pathname) {
return pathname.isDirectory() || pathname.getPath().endsWith(".java");
}
});
for (File one : files)
scanFile(one);
} else {
try {
int index = rc.incrementAndGet();
System.out.println("Read0: " + index + " " + file.getPath());
queue.put(file);
} catch (InterruptedException e) {
}
}
}
};
exec.submit(read);
// 四个写线程
for (int index = 0; index < 4; index++) {
// write thread
final int NO = index;
Runnable write = new Runnable() {
String threadName = "Write" + NO;
public void run() {
while (true) {
try {
Thread.sleep(randomTime());
int index = wc.incrementAndGet();
File file = queue.take();
// 队列已经无对象
if (file == exitFile) {
// 再次添加"标志",以让其他线程正常退出
queue.put(exitFile);
break;
}
System.out.println(threadName + ": " + index + " " + file.getPath());
} catch (InterruptedException e) {
}
}
}
};
exec.submit(write);
}
exec.shutdown();
}
}

​x
79
                    }
 
1
public class LinkedBlockingQueueTest {
2
    static long randomTime() {
3
        return (long) (Math.random() * 1000);
4
    }
5
    @Test
6
    public void testName() throws Exception {
7
        AtomicInteger rc = new AtomicInteger();
8
        int incrementAndGet = rc.incrementAndGet();
9
        System.out.println(incrementAndGet);
10
    }
11
    
12
    
13
    public static void main(String[] args) {
14
        // 能容纳100个文件
15
        final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);
16
        // 线程池
17
        final ExecutorService exec = Executors.newFixedThreadPool(5);
18
        final File root = new File("D:\\JavaLib");
19
        // 完成标志
20
        final File exitFile = new File("");
21
        // 读个数
22
        final AtomicInteger rc = new AtomicInteger();
23
        // 写个数
24
        final AtomicInteger wc = new AtomicInteger();
25
        // 读线程
26
        Runnable read = new Runnable() {
27
            public void run() {
28
                scanFile(root);
29
                scanFile(exitFile);
30
            }
31
            public void scanFile(File file) {
32
                if (file.isDirectory()) {
33
                    File[] files = file.listFiles(new FileFilter() {
34
                        public boolean accept(File pathname) {
35
                            return pathname.isDirectory() || pathname.getPath().endsWith(".java");
36
                        }
37
                    });
38
                    for (File one : files)
39
                        scanFile(one);
40
                } else {
41
                    try {
42
                        int index = rc.incrementAndGet();
43
                        System.out.println("Read0: " + index + " " + file.getPath());
44
                        queue.put(file);
45
                    } catch (InterruptedException e) {
46
                    }
47
                }
48
            }
49
        };
50
        exec.submit(read);
51
        // 四个写线程
52
        for (int index = 0; index < 4; index++) {
53
            // write thread
54
            final int NO = index;
55
            Runnable write = new Runnable() {
56
                String threadName = "Write" + NO;
57
                public void run() {
58
                    while (true) {
59
                        try {
60
                            Thread.sleep(randomTime());
61
                            int index = wc.incrementAndGet();
62
                            File file = queue.take();
63
                            // 队列已经无对象
64
                            if (file == exitFile) {
65
                                // 再次添加"标志",以让其他线程正常退出
66
                                queue.put(exitFile);
67
                                break;
68
                            }
69
                            System.out.println(threadName + ": " + index + " " + file.getPath());
70
                        } catch (InterruptedException e) {
71
                        }
72
                    }
73
                }
74
            };
75
            exec.submit(write);
76
        }
77
        exec.shutdown();
78
    }
79
}
















最新文章

  1. Oracle 判断某個字段的值是不是数字
  2. Android ListView添加多种类型的ItemView
  3. IOS
  4. 【收藏】Android屏幕适配全攻略(最权威的Google官方适配指导)
  5. Sql Server事务简单用法
  6. elasticsearch分词插件的安装
  7. NovaMind使用教程
  8. 观摩制作小游戏(js应用)
  9. SimPholders2 模拟器 App 文件路径查看工具
  10. 计算几何基础——矢量和叉积 &amp;&amp; 叉积、线段相交判断、凸包(转载)
  11. 核心概念 &mdash;&mdash; 服务容器
  12. 异常关闭MyEclipse 8.6后,不能重启
  13. cocos2D-x 3.5 引擎解析之--引用计数(Ref),自己主动释放池(PoolManager),自己主动释放池管理器( AutoreleasePool)
  14. 安卓开源框架SlidingMenu使用
  15. 《CSS动画实用技巧》课程笔记
  16. spring-cloud-sleuth 和 分布式链路跟踪系统
  17. 后端for循环补充
  18. 4.3 thymeleaf模板引擎的使用
  19. 理解 Python 中的可变参数 *args 和 **kwargs:
  20. Vue使用Typescript开发编译时提示“ERROR in ./src/main.ts Module build failed: TypeError: Cannot read property &#39;afterCompile&#39; of undefined”的解决方法

热门文章

  1. 孤荷凌寒自学python第二十七天python的datetime模块及初识datetime.date模块
  2. [DM8168]Linux下SPI驱动测试
  3. FileZilla修改文件大小格式
  4. deeplearning4j——卷积神经网络对验证码进行识别
  5. Dom的深度优先遍历和广度优先遍历
  6. Codeforces Round #325 (Div. 2) B
  7. 3.5 实例讲解Lucene索引的结构设计
  8. vs2012 有效产品密钥
  9. 【ZOJ4070】Function and Function(签到)
  10. SQL Server 中使用 Try Catch 处理异常