package org.rui.thread.block2;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue; import org.rui.thread.LiftOff; /**
* 生产者-消费者与队列
*
* @author lenovo
*
*/ class LiftOffRunner implements Runnable { private BlockingQueue<LiftOff> rockets; public LiftOffRunner(BlockingQueue<LiftOff> b) {
rockets = b;
} //加入一个任务到队列
public void add(LiftOff lo) {
//将指定元素插入此队列中(假设马上可行且不会违反容量限制),
try {
rockets.put(lo);
} catch (InterruptedException e) {
e.printStackTrace();
} } @Override
public void run() { try {
while (!Thread.interrupted()) {
// 获取并移除此队列的头部,在元素变得可用之前一直等待(假设有必要)。
LiftOff rocket = rockets.take();
rocket.run();
} } catch (InterruptedException e) {
System.out.println("中断退出");
}
System.out.println("x exiting liftOffRunner"); }
} public class TestBlockingQueues { static void getkey() {
try {
// compensate for windows/linux difference in the
// 回车键产生的结果
new BufferedReader(new InputStreamReader(System.in)).readLine();
} catch (IOException e) {
e.printStackTrace();
}
} static void getkey(String message) {
System.out.println(message);
getkey();
} static void tets(String msg, BlockingQueue<LiftOff> queue) {
System.out.println(msg);
LiftOffRunner runner = new LiftOffRunner(queue); //启动一个线程
Thread t = new Thread(runner);
t.start(); for (int i = 0; i < 5; i++) {
//加入任务到LiftOffRunner队列中
runner.add(new LiftOff(5));
} //输入控制台
getkey("press 'enter' (" + msg + ")");
t.interrupt();
System.out.println(" 完了 " + msg + "test"); } public static void main(String[] args) {
tets("LinkedBlockingQueue", new LinkedBlockingQueue<LiftOff>());// unlimited // size
tets("ArrayBlockingQueue", new ArrayBlockingQueue<LiftOff>(3));// fied // size
tets("SynchronousQueue", new SynchronousQueue<LiftOff>());// size of 1 } }

package org.rui.thread.block2;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; /**
* 吐司BlockingQueue
* @author lenovo
*
*/ class Toast {
public enum Status {
DRY/* 干的 */, BUTTERED/* 涂黄油 */, JAMMED// 果酱
} private Status status = Status.DRY;
private final int id; public Toast(int idn) {
id = idn;
} public void butter() {
status = Status.BUTTERED;
} public void jam() {
status = Status.JAMMED;
} public Status getStatus() {
return status;
} public int getId() {
return id;
} public String toString() {
return "Toast " + id + ":" + status;
}
} /**
* 吐司队列
*
* @author lenovo
*
*/
class ToastQueue extends LinkedBlockingQueue<Toast> {
} class Toaster implements Runnable {
private ToastQueue toastQueue;
private int count = 0;
private Random rand = new Random(47); public Toaster(ToastQueue tq) {
toastQueue = tq;
} @Override
public void run() {
try {
while (!Thread.interrupted()) {
TimeUnit.MILLISECONDS.sleep(100 + rand.nextInt(500));
// 制作 toast
Toast t = new Toast(count++);
System.out.println(t);
// insert into queue
toastQueue.put(t); }
} catch (InterruptedException e) {
System.out.println("Toaster interrupted");
}
System.out.println("toaster off");
}
} // apply butter to toast
class Butterer implements Runnable {
private ToastQueue dryQueue, butteredQueue; public Butterer(ToastQueue dry, ToastQueue buttered) {
dryQueue = dry;
butteredQueue = buttered;
} @Override
public void run() {
try { while (!Thread.interrupted()) {
// blocks until next piece of toast is available 块,直到下一块面包
Toast t = dryQueue.take();
t.butter();
System.out.println(t);
butteredQueue.put(t);
}
} catch (InterruptedException e) {
System.out.println("涂黄油 interrupted");
}
System.out.println("涂黄油 off");
} } // apply jam to buttered toast
class Jammer implements Runnable {
private ToastQueue butteredQueue, finishedQueue; public Jammer(ToastQueue butteredQueue, ToastQueue finishedQueue) {
this.butteredQueue = butteredQueue;
this.finishedQueue = finishedQueue;
} @Override
public void run() {
try { while (!Thread.interrupted()) {
// blocks until next piece of toast is available 块,直到下一块面包
Toast t = butteredQueue.take();
t.jam();
System.out.println(t);
finishedQueue.put(t); }
} catch (InterruptedException e) {
System.out.println("涂果酱 interrupted");
}
System.out.println("涂果酱 off");
} } // ////使用烤面包 consume the toast
class Eater implements Runnable {
private ToastQueue finishedQueue;
private int counter = 0; public Eater(ToastQueue finished) {
finishedQueue = finished;
} @Override
public void run() {
try { while (!Thread.interrupted()) {
Toast t = finishedQueue.take();
// verify that the toast is coming in order 确认面包来了
// and that all pieces are getting jammed ,全部碎片越来越挤
if (t.getId() != counter++
|| t.getStatus() != Toast.Status.JAMMED) {
System.out.println("===>>>>error" + t);
System.exit(1); } else {
System.out.println("吃!" + t);
} }
} catch (InterruptedException e) {
System.out.println("食者 interrupted");
}
System.out.println(" 食者 off");
}
} /**
* main
*
* @author lenovo
*
*/
public class ToastOMatic { public static void main(String[] args) throws InterruptedException {
ToastQueue dryQueue = new ToastQueue();
ToastQueue butteredQueue = new ToastQueue();
ToastQueue finishedQueue = new ToastQueue(); ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Toaster(dryQueue));//烤面包
exec.execute(new Butterer(dryQueue, butteredQueue));//涂黄油
exec.execute(new Jammer(butteredQueue, finishedQueue));//上果酱
exec.execute(new Eater(finishedQueue));//吃
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow(); }
}
/**output:
Toast 0:DRY
Toast 0:BUTTERED
Toast 0:JAMMED
吃!Toast 0:JAMMED
Toast 1:DRY
Toast 1:BUTTERED
Toast 1:JAMMED
吃!Toast 1:JAMMED
Toast 2:DRY
Toast 2:BUTTERED
Toast 2:JAMMED
吃!Toast 2:JAMMED
...
...
Toast 10:DRY
Toast 10:BUTTERED
Toast 10:JAMMED
吃!Toast 10:JAMMED
Toast 11:DRY
Toast 11:BUTTERED
Toast 11:JAMMED
吃!Toast 11:JAMMED
Toast 12:DRY
Toast 12:BUTTERED
Toast 12:JAMMED
吃!Toast 12:JAMMED
Toast 13:DRY
Toast 13:BUTTERED
Toast 13:JAMMED
吃!Toast 13:JAMMED
Toast 14:DRY
Toast 14:BUTTERED
Toast 14:JAMMED
吃!Toast 14:JAMMED
食者 interrupted
Toaster interrupted
食者 off
涂果酱 interrupted
涂果酱 off
涂黄油 interrupted
涂黄油 off
toaster off */

package org.rui.thread.block2;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; /**
* 任务间使用管道进行输入、输出
*
* @author lenovo
*
*/
class Sender implements Runnable {
private Random rand = new Random(47);
private PipedWriter out = new PipedWriter(); public PipedWriter getPipedWriter() {
return out;
} @Override
public void run() {
try {
while (true) {
for (char c = 'A'; c <= 'z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep(rand.nextInt(500)); }
}
} catch (IOException e) {
System.out.println(e + " sender write Exception");
} catch (InterruptedException e) {
System.out.println(e + " sender sleep interrupted");
} } } class Receiver implements Runnable { private PipedReader in; public Receiver(Sender sender) throws IOException {
in = new PipedReader(sender.getPipedWriter());
} @Override
public void run() {
try {
while (true) {
// blocks until characters are there
System.out.println("Read:" + (char) in.read() + ","); }
} catch (IOException e) {
System.out.println(e+"receiver read execption");
} } } public class PipedIO {
// 接收器 Receiver
public static void main(String[] args) throws IOException, InterruptedException {
Sender sender = new Sender();
Receiver receiver = new Receiver(sender); ExecutorService exec=Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(receiver); TimeUnit.SECONDS.sleep(4);
exec.shutdownNow(); }
} /**outpt:
Read:A,
Read:B,
Read:C,
Read:D,
Read:E,
Read:F,
Read:G,
Read:H,
Read:I,
Read:J,
Read:K,
Read:L,
Read:M,
Read:N,
Read:O,
Read:P,
java.lang.InterruptedException: sleep interrupted sender sleep interrupted
Read:Q,
java.io.IOException: Write end deadreceiver read execption */

最新文章

  1. mysql函数大全
  2. sqlserver数据以及日志文件的设置小结
  3. 利用python的双向队列(Deque)数据结构实现回文检测的算法
  4. 对button或radiobutton制作样式
  5. linux 查找目录或文件详解
  6. SQL使用开窗函数与CTE查询每月销售额的前几名
  7. [Xamarin] 關於發出Notification 的大小事 (转帖)
  8. LeetCode 3 Longest Substring Without Repeating Characters 解题报告
  9. 让Windows7运行速度更快的BIOS优化设置教程
  10. JavaWeb项目开发案例精粹-第4章博客网站系统-003Dao层
  11. 防止跨域(jsonp详解)
  12. ARC - strong和weak指针
  13. PAT (Advanced Level) 1017. Queueing at Bank (25)
  14. NodeJS的Cluster模块使用
  15. 201521123002 《Java程序设计》第2周学习总结
  16. selenium2 webdriver 常用的python 函数
  17. CheckForIllegalCrossThreadCalls = false 是干嘛的?
  18. php学习----运算符
  19. css解决td单元格内文字溢出
  20. bugfree3.0.1-BUG解决方案修改

热门文章

  1. BZOJ 1537 cdq分治
  2. 9.13[XJOI] NOIP训练32
  3. C#缓存
  4. Windows 文件自动同步共享工具
  5. Windows系统开发常用类-------------Environment类
  6. (转载)Android 方法数超过64k、编译OOM、编译过慢解决方案。
  7. TypeScript简单的代码片段
  8. FBX骨骼坐标系与模型坐标系的关系
  9. Memory management in RxSwift – DisposeBag
  10. 路飞学城Python-Day78