写点什么

Java 并发编程实战(4)- 死锁

发布于: 2021 年 01 月 15 日
Java并发编程实战(4)- 死锁

在这篇文章中,我们主要讨论一下死锁及其解决办法。

概述

在上一篇文章中,我们讨论了如何使用一个互斥锁去保护多个资源,以银行账户转账为例,当时给出的解决方法是基于 Class 对象创建互斥锁。

这样虽然解决了同步的问题,但是能在现实中使用吗?答案是不可以,尤其是在高并发的情况下,原因是我们使用的互斥锁的范围太大,以转账为例,我们的做法会锁定整个账户 Class 对象,这样会导致转账操作只能串行进行,但是在实际场景中,大量的转账操作业务中的双方是不相同的,直接在 Class 对象级别上加锁是不能接受的。

那如果在对象实例级别上加锁,使用细粒度锁,会有什么问题?可能会发生死锁。

我们接下来看一下造成死锁的原因和可能的解决方案。

死锁案例

什么是死锁?

死锁是指一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象。

一般来说,当我们使用细粒度锁时,它在提升性能的同时,也可能会导致死锁。

我们还是以银行转账为例,来看一下死锁是如何发生的。

首先,我们先定义个 BankAccount 对象,来存储基本信息,代码如下。

public class BankAccount {	private int id;	private double balance;	private String password;	public int getId() {		return id;	}	public void setId(int id) {		this.id = id;	}	public double getBalance() {		return balance;	}	public void setBalance(double balance) {		this.balance = balance;	}}
复制代码

接下来,我们使用细粒度锁来尝试完成转账操作,代码如下。

public class BankTransferDemo {		public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {		synchronized(sourceAccount) {			synchronized(targetAccount) {				if (sourceAccount.getBalance() > amount) {					System.out.println("Start transfer.");					System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));					sourceAccount.setBalance(sourceAccount.getBalance() - amount);					targetAccount.setBalance(targetAccount.getBalance() + amount);					System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));				}			}		}	}}
复制代码

我们用下面的代码来做简单测试。

	public static void main(String[] args) throws InterruptedException {		BankAccount sourceAccount = new BankAccount();		sourceAccount.setId(1);		sourceAccount.setBalance(50000);				BankAccount targetAccount = new BankAccount();		targetAccount.setId(2);		targetAccount.setBalance(20000);				BankTransferDemo obj = new BankTransferDemo();				Thread t1 = new Thread(() ->{			for (int i = 0; i < 10000; i++) {				obj.transfer(sourceAccount, targetAccount, 1);			}		});				Thread t2 = new Thread(() ->{			for (int i = 0; i < 10000; i++) {				obj.transfer(targetAccount, sourceAccount, 1);			}		});				t1.start();		t2.start();				t1.join();		t2.join();				System.out.println("Finished.");	}
复制代码

测试代码中包含了 2 个线程,其中 t1 线程循环从 sourceAccount 向 targetAccount 转账,而 t2 线程会循环从 targetAccount 向 sourceAccount 转账。

从运行结果来看,t1 线程中的循环在运行 600 次左右时,t2 线程也创建好,开始循环转账了,这时就会发生死锁,导致 t1 线程和 t2 线程都无法继续执行。

我们可以用下面的资源分配图来更直观的描述死锁。



死锁的原因和预防

并发程序一旦死锁,一般没有特别好的办法,很多时候我们只能重启应用,因此,解决死锁问题的最好办法是规避死锁。

我们先来看一下死锁发生的条件,一个叫Coffman的牛人,于 1971 年在 ACM Computing Surveys 发表了一篇名为System Deadlocks的文章,他总结了只有以下四个条件全部满足的情况下,才会发生死锁:

  • 互斥,共享资源 X 和 Y 只能被一个线程占用。

  • 占有且等待,线程 t1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X。

  • 不可抢占,其他线程不能强行抢占线程 t1 占有的资源。

  • 循环等待,线程 t1 等待线程 t2 占有的资源,线程 t2 等待线程 t1 占有的资源,就是循环等待。

通过上述描述,我们能够推导出,只要破坏上面其中一个条件,就可以避免死锁的发生。

但是第一个条件互斥,是不可以被破坏的,否则我们就没有用锁的必要了,那么我们来看如何破坏其他三个条件。

破坏占用且等待条件

如果要破坏占用且等待条件,我们可以尝试一次性申请全部资源,这样就不需要等待了。

在实现过程中,我们需要创建一个新的角色,负责同时申请和同时释放全部资源,我们可以将其称为 Allocator。

我们来看一下具体的代码实现。

public class Allocator {		private volatile static Allocator instance;		private Allocator() {}		public static Allocator getInstance() {		if (instance == null) {			synchronized(Allocator.class) {				if (instance == null) {					instance = new Allocator();				}			}		}				return instance;	}		private Set<Object> lockObjs = new HashSet<Object>();		public synchronized boolean apply(Object... objs) {		for (Object obj : objs) {			if (lockObjs.contains(obj)) {				return false;			}		}		for (Object obj : objs) {			lockObjs.add(obj);		}				return true;	}		public synchronized void free(Object... objs) {		for (Object obj : objs) {			if (lockObjs.contains(obj)) {				lockObjs.remove(obj);			}		}	}}
复制代码

Allocator 是一个单例模式,它会使用一个 Set 对象来保存所有需要处理的资源,然后使用 apply()和 free()来同时锁定或者释放所有资源,它们会接收不固定参数。

我们来看一下新的 transfer()方法应该怎么写。

	public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {		Allocator allocator = Allocator.getInstance();		while(!allocator.apply(sourceAccount, targetAccount));		try {			synchronized(sourceAccount) {				synchronized(targetAccount) {					if (sourceAccount.getBalance() > amount) {						System.out.println("Start transfer.");						System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));						sourceAccount.setBalance(sourceAccount.getBalance() - amount);						targetAccount.setBalance(targetAccount.getBalance() + amount);						System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));					}				}			}		}		finally {			allocator.free(sourceAccount, targetAccount);		}	}
复制代码

我们可以看到,transfer()方法中,首先获取 Allocator 实例,然后调用 apply(),传入 sourceAccount 和 targetAccount 实例,请注意这里使用了 while 循环,即直到 apply()返回 true,才会退出循环,此时,Allocator 已经锁定了 sourceAccount 和 targetAccount,接下来,我们使用 synchronized 关键字来锁定 sourceAccount 和 targetAccount,然后执行转账的业务逻辑。这里并不是必须要用 synchronized,但是这样做可以避免其他操作来影响转账操作,例如如果转账的过程中对 sourceAccount 实例进行取钱操作,如果不用 synchronized,就有可能引发并发问题。

下面是测试代码。

	public static void main(String[] args) throws InterruptedException {		BankAccount sourceAccount = new BankAccount();		sourceAccount.setId(1);		sourceAccount.setBalance(50000);				BankAccount targetAccount = new BankAccount();		targetAccount.setId(2);		targetAccount.setBalance(20000);				BankTransferDemo obj = new BankTransferDemo();				Thread t1 = new Thread(() ->{			for (int i = 0; i < 10000; i++) {				obj.transfer(sourceAccount, targetAccount, 1);			}		});				Thread t2 = new Thread(() ->{			for (int i = 0; i < 10000; i++) {				obj.transfer(targetAccount, sourceAccount, 1);			}		});				t1.start();		t2.start();				t1.join();		t2.join();				System.out.println("Finished.");	}
复制代码

程序是可以正常执行的,结果和我们预期一致。

在这里,我们需要保证锁对象的不可变性,对于 BankAccount 对象来说,id 属性可以看做是其主键,id 相同的 BankAccount 实例,从业务角度来说,指向的都是同一个账户,但是对于锁对象来说,id 相同的不同实例,会产生不同的锁,从而引发并发问题。

我们来看下面修改后的测试代码。

public static void main(String[] args) throws InterruptedException {						BankTransferDemo obj = new BankTransferDemo();				Thread t1 = new Thread(() ->{			for (int i = 0; i < 10000; i++) {				// 这里应该从后端获取账户实例,此处只做演示。				BankAccount sourceAccount = new BankAccount();				sourceAccount.setId(1);				sourceAccount.setBalance(50000);								BankAccount targetAccount = new BankAccount();				targetAccount.setId(2);				targetAccount.setBalance(20000);				obj.transfer(sourceAccount, targetAccount, 1);			}		});				Thread t2 = new Thread(() ->{			for (int i = 0; i < 10000; i++) {				// 这里应该从后端获取账户实例,此处只做演示。				BankAccount sourceAccount = new BankAccount();				sourceAccount.setId(1);				sourceAccount.setBalance(50000);								BankAccount targetAccount = new BankAccount();				targetAccount.setId(2);				targetAccount.setBalance(20000);				obj.transfer(targetAccount, sourceAccount, 1);			}		});				t1.start();		t2.start();				t1.join();		t2.join();				System.out.println("Finished.");	}
复制代码

上述代码中,每次转账都创建新的 BankAccount 实例,然后将其传入 Allocator,这样做,是不能够正常处理的,因为每次使用的互斥锁都作用在不同的实例上,这一点,需要特别注意。

破坏不可抢占条件

破坏不可抢占条件很简单,解决的关键在于能够主动释放它占有的资源,但是 synchronized 是不能做到这一点的。

synchronized 申请资源的时候,如果申请失败,线程会直接进入阻塞状态,什么都不能做,已经锁定的资源也无法释放。

我们可以使用 java.util.concurrent 包中的 Lock 对象来实现这一点,相关代码如下。

    private Lock lock = new ReentrantLock();	    public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {        try {            lock.lock();            if (sourceAccount.getBalance() > amount) {                System.out.println("Start transfer.");                System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));                sourceAccount.setBalance(sourceAccount.getBalance() - amount);                targetAccount.setBalance(targetAccount.getBalance() + amount);                System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));            }        }        finally {            lock.unlock();        }    }
复制代码

破坏循环条件

破坏循环条件,需要对资源进行排序,然后按序申请资源。

我们来看下面的代码。

	public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {		BankAccount left = sourceAccount;		BankAccount right = targetAccount;		if (sourceAccount.getId() > targetAccount.getId()) {			left = targetAccount;			right = sourceAccount;		}		synchronized(left) {			synchronized(right) {				if (sourceAccount.getBalance() > amount) {					System.out.println("Start transfer.");					System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));					sourceAccount.setBalance(sourceAccount.getBalance() - amount);					targetAccount.setBalance(targetAccount.getBalance() + amount);					System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));				}			}		}	}
复制代码

在这里,我们假设 BankAccount 中的id是主键,我们按照id对 sourceAccount 和 targetAccount 进行排序,之后按照id从小到大申请资源,这样就不会有死锁发生了。

我们在解决并发问题的时候,可能会有多种方式,我们需要评估一下各个解决方案,从中选择一个成本最低的方案。

对于我们一直谈论的转账示例,破坏循环条件可能是一个比较好的解决方法。

使用等待-通知机制

我们上面在破坏占用且等待条件时,使用了如下的死循环:

    while(!allocator.apply(sourceAccount, targetAccount));
复制代码

在并发量不高的情况下,这样写没有问题,但是在高并发的情况下,这样写可能需要循环太多次才能拿到锁,太消耗 CPU 了,属于蛮干型。

在这种情况下,一种合理的方案是:如果线程要求的条件不满足,那么线程阻塞自己,进入等待状态,当线程要求的条件满足后,通知等待的线程重新执行,这里线程阻塞就避免了循环消耗 CPU 的问题。

这就是我们要讨论的等待-通知机制。

Java 中的等待-通知机制

Java 中的等待-通知机制流程是怎样的?

线程首先获取互斥锁,当线程要求的条件不满足时,释放互斥锁,进入等待状态;当要求的条件满足时,通知等待的线程,重新获取互斥锁。

Java 使用 synchronized 关键字配合 wait()、notify()、notifyAll()三个方法实现等待-通知机制。

在并发程序中,当一个线程进入临界区后,由于某些条件没有满足,需要进入等待状态,Java 对象的 wait()方法能够实现这一点。当线程要求的条件满足时,Java 对象的 notify()和 notifyAll()方法就可以通知等待的线程,它会告诉线程,你需要的条件曾经满足过,之所以说曾经,是因为 notify()只能保证在通知的那一时刻,条件是满足的,而被通知线程的执行时刻和通知时刻一般不会重合,所以在线程开始执行的时候,可能条件又不满足了。

另外需要注意,被通知的线程重新执行时,还需要获取互斥锁,因为之前在调用 wait()方法时,互斥锁已经被释放了。

wait()、notify()和 notifyAll()三个方法能够被调用的前提是已经获取了响应的互斥锁,所以这三个方法都是在 synchronized{}内部被调用的。

下面我们来看一下修改后的 Allocator,其中 apply()和 free()方法的代码如下。

	public synchronized void apply(Object... objs) {		for (Object obj : objs) {			while (lockObjs.contains(obj)) {				try {					this.wait();				} catch (InterruptedException e) {					System.out.println(e.getMessage());				}			}		}		for (Object obj : objs) {			lockObjs.add(obj);		}	}		public synchronized void free(Object... objs) {		for (Object obj : objs) {			if (lockObjs.contains(obj)) {				lockObjs.remove(obj);			}		}		this.notifyAll();	}
复制代码

对应的 transfer()方法的代码如下。

	public void transfer(BankAccount sourceAccount, BankAccount targetAccount, double amount) {	Allocator allocator = Allocator.getInstance();	allocator.apply(sourceAccount, targetAccount);	try {		synchronized(sourceAccount) {			synchronized(targetAccount) {				if (sourceAccount.getBalance() > amount) {					System.out.println("Start transfer.");					System.out.println(String.format("Before transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));					sourceAccount.setBalance(sourceAccount.getBalance() - amount);					targetAccount.setBalance(targetAccount.getBalance() + amount);					System.out.println(String.format("After transfer, source balance:%s, target balance:%s", sourceAccount.getBalance(), targetAccount.getBalance()));				}			}		}	}	finally {		allocator.free(sourceAccount, targetAccount);	}}
复制代码

运行结果和我们期望是一致的。

条件曾经满足

在上述代码中,我们可以发现,apply()方法中的判断条件之前是 if,现在改成了 while, while (lockObjs.contains(obj)),这样做可以解决条件曾经满足的问题。

因为当 wait()返回时,有可能条件已经发生了变化,曾经条件满足,但是现在已经不满足了,所以要重新检验条件是否满足。

这是一种范式,是一种经典的做法。

notify() vs notifyAll()

notify()和 notifyAll()有什么区别?

notify()会随机的通知等待队列中的一个线程, 而 notifyAll()会通知等待队列中的所有线程。

我们尽量使用 notifyAll()方法,因为 notify()可能会导致某些线程永远不会被通知到。

假设我们有一个实例,它有资源 A、B、C、D,我们使用实例对象来创建互斥锁。

  • 线程 t1 申请到了 A、B

  • 线程 t2 申请到了 C、D

  • 线程 t3 试图申请 A、B,失败,进入等待队列

  • 线程 t4 试图申请 C、D,失败,进入等待队列

  • 此时,线程 t1 执行结束,释放锁

  • 线程 t1 调用实例的 notify()来通知等待队列中的线程,有可能被通知的是线程 t4,但线程 t4 申请的是 C、D 还被线程 t2 占用,所以线程 t4 只能继续等待

  • 此时,线程 t2 执行结束,释放锁

  • 线程 t2 调用实例的 notify()来通知等待队列中的线程,t3 或者 t4 只能有 1 个被唤醒并正常执行,另外 1 个则再也没有机会被唤醒

wait()和 sleep()的区别

wait()方法与 sleep()方法的不同之处在于,wait()方法会释放对象的“锁标志”。当调用某一对象的 wait()方法后,会使当前线程暂停执行,并将当前线程放入对象等待池中,直到调用了 notify()方法后,将从对象等待池中移出任意一个线程并放入锁标志等待池中,只有锁标志等待池中的线程可以获取锁标志,它们随时准备争夺锁的拥有权。当调用了某个对象的 notifyAll()方法,会将对象等待池中的所有线程都移动到该对象的锁标志等待池。

sleep()方法需要指定等待的时间,它可以让当前正在执行的线程在指定的时间内暂停执行,进入阻塞状态,该方法既可以让其他同优先级或者高优先级的线程得到执行的机会,也可以让低优先级的线程得到执行机会。但是 sleep()方法不会释放“锁标志”,也就是说如果有 synchronized 同步块,其他线程仍然不能访问共享数据。

总结一下,wait()和 sleep()区别如下。

  • wait()释放资源,sleep()不释放资源

  • wait()需要被唤醒,sleep()不需要

  • wait()是 object 顶级父类的方法,sleep()则是 Thread 的方法

wait()和 sleep()都会让渡 CPU 执行时间,等待再次调度!

发布于: 2021 年 01 月 15 日阅读数: 17
用户头像

点滴技术感悟,记录人生成长 2017.10.25 加入

还未添加个人简介

评论

发布
暂无评论
Java并发编程实战(4)- 死锁