线程通讯指的是多个线程之间通过共享内存或消息传递等方式来协调和同步它们的执行。在多线程编程中,通常会出现多个线程需要共同完成某个任务的情况,这时就需要线程之间进行通讯,以保证任务能够顺利地执行。
线程通讯的实现方式主要有以下两种:
常见场景
线程通讯的常见场景有以下几个:
多个线程共同完成某个任务:例如一个爬虫程序需要多个线程同时抓取不同的网页,然后将抓取结果合并保存到数据库中。这时需要线程通讯来协调各个线程的执行顺序和共享数据。
避免资源冲突:多个线程访问共享资源时可能会引发竞争条件,例如多个线程同时读写一个文件或数据库。这时需要线程通讯来同步线程之间的数据访问,避免资源冲突。
保证顺序执行:在某些情况下,需要保证多个线程按照一定的顺序执行,例如一个多线程排序算法。这时需要线程通讯来协调各个线程的执行顺序。
线程之间的互斥和同步:有些场景需要确保只有一个线程能够访问某个共享资源,例如一个计数器。这时需要使用线程通讯机制来实现线程之间的互斥和同步。
实现方法
线程通讯的实现方法有以下几种:
等待和通知机制:使用 Object 类的 wait() 和 notify() 方法来实现线程之间的通讯。当一个线程需要等待另一个线程执行完某个操作时,它可以调用 wait() 方法使自己进入等待状态,同时释放占有的锁,等待其他线程调用 notify() 或 notifyAll() 方法来唤醒它。被唤醒的线程会重新尝试获取锁并继续执行。
信号量机制:使用 Java 中的 Semaphore 类来实现线程之间的同步和互斥。Semaphore 是一个计数器,用来控制同时访问某个资源的线程数。当某个线程需要访问共享资源时,它必须先从 Semaphore 中获取一个许可证,如果已经没有许可证可用,线程就会被阻塞,直到其他线程释放了许可证。
栅栏机制:使用 Java 中的 CyclicBarrier 类来实现多个线程之间的同步,它允许多个线程在指定的屏障处等待,并在所有线程都达到屏障时继续执行。
锁机制:使用 Java 中的 Lock 接口和 Condition 接口来实现线程之间的同步和互斥。Lock 是一种更高级的互斥机制,它允许多个条件变量(Condition)并支持在同一个锁上等待和唤醒。
具体代码实现
等待通知实现
以下是一个简单的 wait() 和 notify() 方法的等待通知示例:
public class WaitNotifyDemo {
public static void main(String[] args) {
Object lock = new Object();
ThreadA threadA = new ThreadA(lock);
ThreadB threadB = new ThreadB(lock);
threadA.start();
threadB.start();
}
static class ThreadA extends Thread {
private Object lock;
public ThreadA(Object lock) {
this.lock = lock;
}
public void run() {
synchronized (lock) {
System.out.println("ThreadA start...");
try {
lock.wait(); // 线程A等待
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ThreadA end...");
}
}
}
static class ThreadB extends Thread {
private Object lock;
public ThreadB(Object lock) {
this.lock = lock;
}
public void run() {
synchronized (lock) {
System.out.println("ThreadB start...");
lock.notify(); // 唤醒线程A
System.out.println("ThreadB end...");
}
}
}
}
复制代码
在这个示例中,定义了一个共享对象 lock,ThreadA 线程先获取 lock 锁,并调用 lock.wait() 方法进入等待状态。ThreadB 线程在获取 lock 锁之后,调用 lock.notify() 方法唤醒 ThreadA 线程,然后 ThreadB 线程执行完毕。
运行以上程序的执行结果如下:
ThreadA start...
ThreadB start...
ThreadB end...
ThreadA end...
信号量实现
在 Java 中使用 Semaphore 实现信号量,Semaphore 是一个计数器,用来控制同时访问某个资源的线程数。当某个线程需要访问共享资源时,它必须先从 Semaphore 中获取一个许可证,如果已经没有许可证可用,线程就会被阻塞,直到其他线程释放了许可证。它的示例代码如下:
import java.util.concurrent.Semaphore;
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 5; i++) {
new Thread(new Worker(i, semaphore)).start();
}
}
static class Worker implements Runnable {
private int id;
private Semaphore semaphore;
public Worker(int id, Semaphore semaphore) {
this.id = id;
this.semaphore = semaphore;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("Worker " + id + " acquired permit.");
Thread.sleep(1000);
System.out.println("Worker " + id + " released permit.");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
复制代码
在这个示例中,创建了一个 Semaphore 对象,并且设置了许可数为 2。然后创建了 5 个 Worker 线程,每个 Worker 线程需要获取 Semaphore 的许可才能执行任务。每个 Worker 线程在执行任务之前先调用 semaphore.acquire() 方法获取许可,如果没有许可则会阻塞,直到 Semaphore 释放许可。执行完任务之后调用 semaphore.release() 方法释放许可。 运行以上程序的执行结果如下:
Worker 0 acquired permit.
Worker 1 acquired permit.
Worker 1 released permit.
Worker 0 released permit.
Worker 2 acquired permit.
Worker 3 acquired permit.
Worker 2 released permit.
Worker 4 acquired permit.
Worker 3 released permit.
Worker 4 released permit.
栅栏实现
在 Java 中,可以使用 CyclicBarrier 或 CountDownLatch 来实现线程的同步,它们两个使用类似,接下来我们就是 CyclicBarrier 来演示一下线程的同步,CyclicBarrier 的示例代码如下:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("All threads have reached the barrier.");
}
});
for (int i = 1; i <= 3; i++) {
new Thread(new Worker(i, cyclicBarrier)).start();
}
}
static class Worker implements Runnable {
private int id;
private CyclicBarrier cyclicBarrier;
public Worker(int id, CyclicBarrier cyclicBarrier) {
this.id = id;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println("Worker " + id + " is working.");
Thread.sleep((long) (Math.random() * 2000));
System.out.println("Worker " + id + " has reached the barrier.");
cyclicBarrier.await();
System.out.println("Worker " + id + " is continuing the work.");
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
}
复制代码
在这个示例中,创建了一个 CyclicBarrier 对象,并且设置了参与线程数为 3。然后创建了 3 个 Worker 线程,每个 Worker 线程会先执行一些任务,然后等待其他线程到达 Barrier。所有线程都到达 Barrier 之后,Barrier 会释放所有线程并执行设置的 Runnable 任务。 运行以上程序的执行结果如下:
Worker 2 is working.
Worker 3 is working.
Worker 1 is working.
Worker 3 has reached the barrier.
Worker 1 has reached the barrier.
Worker 2 has reached the barrier.
All threads have reached the barrier.
Worker 2 is continuing the work.
Worker 3 is continuing the work.
Worker 1 is continuing the work.
从以上执行结果可以看出,CyclicBarrier 保证了所有 Worker 线程都到达 Barrier 之后才能继续执行后面的任务,这样可以保证线程之间的同步和协作。在本示例中,所有线程都在 Barrier 处等待了一段时间,等所有线程都到达 Barrier 之后才继续执行后面的任务。
锁机制实现
以下是一个使用 Condition 的示例:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionDemo {
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
private volatile boolean flag = false;
public static void main(String[] args) {
ConditionDemo demo = new ConditionDemo();
new Thread(demo::waitCondition).start();
new Thread(demo::signalCondition).start();
}
private void waitCondition() {
lock.lock();
try {
while (!flag) {
System.out.println(Thread.currentThread().getName() + " is waiting for signal.");
condition.await();
}
System.out.println(Thread.currentThread().getName() + " received signal.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private void signalCondition() {
lock.lock();
try {
Thread.sleep(3000); // 模拟等待一段时间后发送信号
flag = true;
System.out.println(Thread.currentThread().getName() + " sends signal.");
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
复制代码
在这个示例中,创建了一个 Condition 对象和一个 Lock 对象,然后创建了两个线程,一个线程等待 Condition 信号,另一个线程发送 Condition 信号。
等待线程在获得锁后,判断标志位是否为 true,如果为 false,则等待 Condition 信号;如果为 true,则继续执行后面的任务。
发送线程在获得锁后,等待一段时间后,将标志位设置为 true,并且发送 Condition 信号。 运行以上程序的执行结果如下:
Thread-0 is waiting for signal.
Thread-1 sends signal.
Thread-0 received signal.
从上面执行结果可以看出,等待线程在等待 Condition 信号的时候被阻塞,直到发送线程发送了 Condition 信号,等待线程才继续执行后面的任务。Condition 对象提供了一种更加灵活的线程通信方式,可以精确地控制线程的等待和唤醒。
小结
线程通讯指的是多个线程之间通过共享内存或消息传递等方式来协调和同步它们的执行,它的实现方法有很多:比如 wait() 和 notify() 的等待和通知机制、Semaphore 信号量机制、CyclicBarrier 栅栏机制,以及 Condition 的锁机制等。
评论