JAVA线程同步 (二)notify()与notifyAll()-***
编写多线程程序需要进行线程协作,前面介绍的利用互斥来防止线程竞速是来解决线程协作的衍生危害的。编写线程协作程序的关键是解决线程之间的协调问题,在这些任务中,某些可以并行执行,但是某些步骤需要所有的任务都结束之后才能开动。
wait()与notifyAll()
- 在wait()期间对象锁是释放的
- 可以通过notify()、notifyAll(),或者令时间到期,从wait()中恢复执行。
import java.util.concurrent.*;
import static net.mindview.util.Print.*;
/*线程同步
* 对CAR对象,反复打蜡、抛光 */ class Car {
private boolean waxOn = false; public synchronized void waxed() {
waxOn = true; //打蜡完成,通知抛光线程
notifyAll();
} public synchronized void buffed() {
waxOn = false; //抛光完成,通知打蜡线程
notifyAll();
} public synchronized void waitForWaxing() throws InterruptedException {
while (waxOn == false)
wait();
} public synchronized void waitForBuffing() throws InterruptedException {
while (waxOn == true)
wait();
}
} class WaxOn implements Runnable {
private Car car; public WaxOn(Car c) {
car = c;
} public void run() {
try {
while (!Thread.interrupted()) {
printnb("Wax On! ");
TimeUnit.MILLISECONDS.sleep(200);
car.waxed();
car.waitForBuffing();
}
} catch (InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax On task");
}
} class WaxBuffed implements Runnable {
private Car car; public WaxBuffed(Car c) {
car = c;
} public void run() {
try {
while (!Thread.interrupted()) {
car.waitForWaxing();
printnb("Wax Off! ");
TimeUnit.MILLISECONDS.sleep(200);
car.buffed();
}
} catch (InterruptedException e) {
print("Exiting via interrupt");
}
print("Ending Wax Off task");
}
} public class WaxOMatic {
public static void main(String[] args) throws Exception {
Car car = new Car();
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new WaxOn(car)); //打蜡线程
exec.execute(new WaxBuffed(car)); //抛光线程
TimeUnit.SECONDS.sleep(5);
exec.shutdownNow(); //中断执行器中的所有线程
}
}
显示结果:
Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Wax Off! Wax On! Exiting via interrupt
Ending Wax On task
Exiting via interrupt
Ending Wax Off task
- 你可能有多个任务出于相同的原因在等待一个锁,而第一个唤醒任务可能已经改变这种状况(即使你没有这么做,有人也会通过继承你的类去这么做)。如果属于这种情况,那么这个任务应该被再次挂起,直至其感兴趣的条件发生变化。
- 也有可能某些任务处于不同的原因在等待你的对象上锁(在这种情况下必须使用(notifyAll))。在这种情况下,你需要检查是否已经由正确的原因唤醒,如果不是,就再次调用wait()。
notify()与notifyAll()因为在技术上,可能会有多个任务在单个Car对象上处于wait()状态,因此调用notifyAll()比调用notify()要更安全。但是,上面程序的结构只会有一个任务处于wait()状态,因此你可以使用notify()来代替notifyAll()。
notify()与notifyAll()
notify()与notifyAll()的使用,请看下面的例子来理解为什么对象需要加锁:
import java.util.concurrent.*;
import java.util.*; class Blocker {
synchronized void waitingCall() {
try {
while (!Thread.interrupted()) {
wait();
System.out.print(Thread.currentThread() + " ");
}
} catch (InterruptedException e) {
// OK to exit this way
}
} synchronized void prod() {
notify();
} synchronized void prodAll() {
notifyAll();
}
} class Task implements Runnable {
static Blocker blockerA = new Blocker(); //thread-1至thread-5 5个线程锁定blockerA public void run() {
blockerA.waitingCall();
}
} class Task2 implements Runnable {
// A separate Blocker object:
static Blocker blockerB = new Blocker(); //thread-6锁定blockerB public void run() {
blockerB.waitingCall();
}
} public class NotifyVsNotifyAll {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++)
exec.execute(new Task());
exec.execute(new Task2());
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
boolean prod = true; public void run() {
if (prod) {
System.out.print("\nnotify() ");
Task.blockerA.prod(); //定时使用notify()唤醒等待blockerA阻塞的线程
prod = false;
} else {
System.out.print("\nnotifyAll() ");
Task.blockerA.prodAll(); //定时使用notifyAll()唤醒blockerA阻塞的所有线程
prod = true;
}
}
}, 400, 400); // Run every .4 second
TimeUnit.SECONDS.sleep(5); // Run for a while...
timer.cancel();
System.out.println("\nTimer canceled");
TimeUnit.MILLISECONDS.sleep(500);
System.out.print("Task2.blocker.prodAll() ");
Task2.blockerB.prodAll(); //唤醒blockerB阻塞的线程
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("\nShutting down");
exec.shutdownNow(); // Interrupt all tasks
}
}
可以看到Thread1-5等待同一个锁资源blockerA,notify()只能唤醒一个线程,notifyAll()可以唤醒全部等待的线程,但不能唤醒等待blockerB资源的线程Task2。
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-2,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-5,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-2,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-5,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-5,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-2,5,main]
notify() Thread[pool-1-thread-1,5,main]
notifyAll() Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-4,5,main] Thread[pool-1-thread-5,5,main]
Timer canceled
Task2.blocker.prodAll() Thread[pool-1-thread-6,5,main]
Shutting down
用wait()和notifyAll()实现生产者消费者问题
import java.util.concurrent.*;
import static net.mindview.util.Print.*; class Meal {
private final int orderNum; public Meal(int orderNum) {
this.orderNum = orderNum;
} public String toString() {
return "Meal " + orderNum;
}
} class WaitPerson implements Runnable {
private Restaurant restaurant; public WaitPerson(Restaurant r) {
restaurant = r;
} public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal == null)
wait(); //等待生产
}
print("Waitperson got " + restaurant.meal);
synchronized (restaurant.chef) {
restaurant.meal = null;
restaurant.chef.notifyAll(); //已消费,通知生产者
}
}
} catch (InterruptedException e) {
print("WaitPerson interrupted");
}
}
} class Chef implements Runnable {
private Restaurant restaurant;
private int count = 0; public Chef(Restaurant r) {
restaurant = r;
} public void run() {
try {
while (!Thread.interrupted()) {
synchronized (this) {
while (restaurant.meal != null)
wait(); //等待消费
}
if (++count == 10) {
print("Out of food, closing");
restaurant.exec.shutdownNow(); //中断线程
}
printnb("Order up! ");
synchronized (restaurant.waitPerson) {
restaurant.meal = new Meal(count);
restaurant.waitPerson.notifyAll(); //已生产,通知消费者
}
TimeUnit.MILLISECONDS.sleep(100);
}
} catch (InterruptedException e) {
print("Chef interrupted");
}
}
} public class Restaurant {
Meal meal;
ExecutorService exec = Executors.newCachedThreadPool();
WaitPerson waitPerson = new WaitPerson(this); //服务员-消费者
Chef chef = new Chef(this); //厨师-生产者 public Restaurant() {
exec.execute(chef);
exec.execute(waitPerson);
} public static void main(String[] args) {
new Restaurant();
}
}
Order up! Waitperson got Meal 1
Order up! Waitperson got Meal 2
Order up! Waitperson got Meal 3
Order up! Waitperson got Meal 4
Order up! Waitperson got Meal 5
Order up! Waitperson got Meal 6
Order up! Waitperson got Meal 7
Order up! Waitperson got Meal 8
Order up! Waitperson got Meal 9
Out of food, closing
Order up! WaitPerson interrupted
Chef interrupted
上面这个程序可以证明出来是线程安全的。不过使用这种方式实在是太晦涩了,生产者消费者问题的机制需要我们去控制,实际上,java并发类库为我们提供了这种模型的实现,我们待会会用阻塞队列来重写这个问题。
使用显式的Lock和Condition对象
class Car {
private boolean waxOn = false;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void waxed() {
lock.lock();
try {
waxOn = true;
condition.signalAll();
} finally {
lock.unlock();
}
}
public void buffed( ) {
lock.lock();
try {
waxOn = false;
condition.signalAll();
} finally {
lock.unlock();
}
}
public void waitForWaxing( ) throws InterruptedException{
lock.lock();
try{
while(waxOn == false)
condition.await();
} finally {
lock.unlock();
}
}
public void waitForBuffing( ) throws InterruptedException {
lock.lock();
try {
while(waxOn == true)
condition.await( );
} finally {
lock.unlock();
}
}
}
使用BlockingQueue来解决生产者消费者问题
class Meal {
} class WaitPerson implements Runnable {
private String name;
private RestaurantBlookingQueue restaurant;
public WaitPerson(String name, RestaurantBlookingQueue res) {
this.name = name;
this.restaurant = res;
} @Override
public void run() {
try {
while (!Thread.interrupted()) {
restaurant.meals.take();
System.out.println(name + "taked a Meal");
Thread.sleep(100);
}
} catch (InterruptedException e) {
System.out.println(name + " is ended via InterruptedException !");
return;
}
System.out.println(name + " is ended via InterruptedException !");
}
} class Chef implements Runnable {
private String name;
private RestaurantBlookingQueue restaurant; public Chef(String name, RestaurantBlookingQueue res) {
this.name = name;
this.restaurant = res;
} @Override
public void run() {
try {
while (!Thread.interrupted()) {
restaurant.meals.put(new Meal());
System.out.println(this.name + "made a meal");
Thread.sleep(100);
}
} catch (InterruptedException e) {
System.out.println(name + " is ended via InterruptedException !");
return;
}
System.out.println(name + " is ended via InterruptedException !");
}
} public class RestaurantBlookingQueue {
public BlockingQueue<Meal> meals = new ArrayBlockingQueue<Meal>(10);
public List<WaitPerson> waitPersons = new ArrayList<WaitPerson>();
public List<Chef> chefs = new ArrayList<Chef>(); public static void main(String[] args) throws InterruptedException {
RestaurantBlookingQueue res = new RestaurantBlookingQueue();
ExecutorService exec = Executors.newCachedThreadPool();
Chef chef1 = new Chef("chef1", res);
Chef chef2 = new Chef("chef2", res);
res.chefs.add(chef1);
res.chefs.add(chef2);
exec.execute(chef1);
exec.execute(chef2);
WaitPerson waitPerson1 = new WaitPerson("waitPerson1", res);
WaitPerson waitPerson2 = new WaitPerson("waitPerson2", res);
res.waitPersons.add(waitPerson1);
res.waitPersons.add(waitPerson2);
exec.execute(waitPerson1);
exec.execute(waitPerson2); // TimeUnit.MILLISECONDS.sleep(3000);
// exec.shutdownNow();
}
}
任务间使用管道进行输入/输出
通过输入/输出在线程间进行通信通常很有用。提供线程功能的类库以"管道"的形式对线程的输入/输出提供了支持。它们在Java输入/输出类库中的对应物就是PipedWriter类(允许任务向管道写)和PipedReader类(允许不同任务从同一个管道读)。这个模型可以看成是"生产者-消费者"问题的变体。管道基本是一个阻塞队列,存在于多个引入BlookingQueue之前的Java版本。
class Sender implements Runnable {
private Random rand = new Random(47);
private PipedWriter out = new PipedWriter();
public PipedWriter getPipedWriter( ) {return out;}
public void run( ) {
try {
while(true) {
for(char c = 'A' ; c <= 'z'; c++) {
out.write(c);
TimeUnit.MILLISECONDS.sleep( rand.nextInt(500));
}
}
} catch (IOException e) {
System.out.println(e + " Sender write exception");
} catch (InterruptedException e) {
System.out.println(e + " Sender sleep exception");
}
}
} class Receiver implements Runnable {
private PipedReader in;
public Receiver(Sender sender) throws IOException {
in = new PipedReader(sender.getPipedWriter());
}
public void run( ) {
try {
while(true) {
System.out.print("Read: "+(char)in.read() + ", ");
}
} catch (IOException e) {
System.out.println(e + " Receiver read exception");
}
}
} public class PipedIO {
public static void main(String []args) throws Exception {
Sender sender = new Sender( );
Receiver receiver = new Receiver( sender );
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(sender);
exec.execute(receiver);
TimeUnit.SECONDS.sleep( 4 );
exec.shutdownNow();
}
}
死锁
死锁本是操作系统的中概念,因为操作系统中会遇到很多可能发生死锁的状况。但我们在并发程序经常也需要预防死锁,特别是多个线程在并发的访问多个对象的时候。首先,我们需要从逻辑上避免死锁发生的可能性,例如哲学家进餐问题,一般在程序中的解决方式是一次性将资源完全分配给它,为了提供并发度,需要我们进一步缩小并发锁的范围。除了逻辑上预防并发,我们还需要处理意外情况,例如获取到资源的线程中途挂掉,我们需要释放资源,在程序中即释放锁,在程序中可以通过try-catch实现。
最新文章
- 浅析MySQL复制
- Mac OS X 中一些常用的命令行技巧
- 手机开发中的AP与BP的概念
- Tomcat基本入门知识及发布,虚拟访问及启动碰到的错误,虚拟目录,虚拟路径,各种Tomcat的配置
- redis 原子增一的妙用
- URL编码知识摘抄备忘
- HTML5之FileReader的使用
- ember.js:使用笔记5 使用view
- LAMMP架构的企业级应用
- windows xp 安装mysql5.6.17-ERROR 1045 (28000): Access denied for user &#39;root&#39;@&#39;localhost&#39; (using password
- Javascript之Dom学习
- 假设我的朋友账号分别是v{1,2,3,4,5},且这五人想要共享一个目录,因此应该加入同一个群组,假设这个群组为vbird,且这五个账号的密码均为password.那该如何建置这五个账号?
- Sublime 远程连接 Linux服务器
- this.setData , that.setData , this.data.val三者之间的区别和作用
- win10安装mysql5.7.20解压版
- matplotlib figure图像-【老鱼学matplotlib】
- python大法好——mysql防注入
- PS 使用笔记 - PS 让工作台适应 当前图层
- 【算法和数据结构】_15_小算法_打印EOF的值
- Puppet主机、模块、类、资源、变量、参数、标签命名规范
热门文章
- Spring核心技术(六)——Spring中Bean的生命周期
- HttpURLConnection绕过HTTPS的SSL验证
- PS注意点
- MySQL数据库连接不上的一种可能的解决办法
- C++ stringstream的用法
- 视图 v$sql,v$sqlarea,$sqltext,v$sqltext_with_newlines 的差异
- 前端开发:JavaScript---DOM &; BOM
- MySQL数据库:SQL语句基础、库操作、表操作、数据类型、约束条件、表之间的关系
- JPA中映射关系详细说明(一对多,多对一,一对一、多对多)、@JoinColumn、mappedBy说明
- Linux下汇编语言学习笔记17 ---