【孔乙已】生产者消费者有四样写法,android 插件化和组件化
Lock
中的Condition
可以实现上面Object
的wait
,notify
一样的效果
await
对应wait
,signal
对应notify
,signalAll
对应notifyAll
下面直接来看看实现,代码与使用wait
,notify
基本上是一样的,只是同步方式不同
public class ProductorConsumerDemo2 {
private static ReentrantLock lock = new ReentrantLock();private static Condition full = lock.newCondition();private static Condition empty = lock.newCondition();
public static void main(String[] args) {LinkedList linkedList = new LinkedList();ExecutorService service = Executors.newFixedThreadPool(15);for (int i = 0; i < 5; i++) {service.submit(new Productor(linkedList, 8, lock));}for (int i = 0; i < 10; i++) {service.submit(new Consumer(linkedList, lock));}
}
static class Productor implements Runnable {
private List<Integer> list;private int maxLength;private Lock lock;
public Productor(List list, int maxLength, Lock lock) {this.list = list;this.maxLength = maxLength;this.lock = lock;}
@Overridepublic void run() {while (true) {lock.lock();try {while (list.size() == maxLength) {System.out.println("生产者" + Thread.currentThread().getName() + " list 以达到最大容量,进行 wait");full.await();System.out.println("生产者" + Thread.currentThread().getName() + " 退出 wait");}Random random = new Random();int i = random.nextInt();System.out.println("生产者" + Thread.currentThread().getName() + " 生产数据" + i);list.add(i);empty.signalAll();Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}}}
static class Consumer implements Runnable {
private List<Integer> list;private Lock lock;
public Consumer(List list, Lock lock) {this.list = list;this.lock = lock;}
@Overridepublic void run() {while (true) {lock.lock();try {while (list.isEmpty()) {System.out.println("消费者" + Thread.currentThread().getName() + " list 为空,进行 wait");empty.await();System.out.println("消费者" + Thread.currentThread().getName() + " 退出 wait");}Integer element = list.remove(0);System.out.println("消费者" + Thread.currentThread().getName() + " 消费数据:" + element);full.signalAll();Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}}}
}
3.BlockingQueue
方式实现
由于BlockingQueue
内部实现就附加了两个阻塞操作。
即当队列已满时,阻塞向队列中插入数据的线程,直至队列中未满;当队列为空时,阻塞从队列中获取数据的线程,直至队列非空时为止.
所以使用BlockingQueue
来实现生产者消费者模式非常简单方便,关于BlockingQueue
的更多细节可见:[并发容器之 BlockingQueue 详解](
)
下面直接看下生产者消费者实现
public class ProductorConsumerDmoe3 {
private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
public static void main(String[] args) {ExecutorService service = Executors.newFixedThreadPool(15);for
(int i = 0; i < 5; i++) {service.submit(new Productor(queue));}for (int i = 0; i < 10; i++) {service.submit(new Consumer(queue));}}
static class Productor implements Runnable {private BlockingQueue queue;
public Productor(BlockingQueue queue) {this.queue = queue;}
@Overridepublic void run() {try {while (true) {Random random = new Random();int i = random.nextInt();System.out.println("生产者" + Thread.currentThread().getName() + "生产数据" + i);queue.put(i);Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}}}
static class Consumer implements Runnable {private BlockingQueue queue;
public Consumer(BlockingQueue queue) {this.queue = queue;}
@Overridepublic void run() {try {while (true) {Integer element = (Integer) queue.take();System.out.println("消费者" + Thread.currentThread().getName() + "正在消费数据" + element);Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}}}
}
4.Kotlin Channel
方式实现
随着Kotlin
的普及,使用协程来处理并发也变成了一个更加方便的选择
使用Kotlin Channel
同样可以实现生产者消费者模式
1.一个 Channel
是一个和 BlockingQueue
非常相似的概念。
2.Channel
相比BlockingQueue
代替了阻塞的 put
操作并提供了挂起的 send
,还替代了阻塞的 take
操作并提供了挂起的 receive
3.相比BlockingQueue
的阻塞,Channel
的挂起性能更好
4.Channel
还有个特点是阻塞队列没有,它可以随时关闭,当发送者接收到关闭指令,将立即停止发送。
实现如下:
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {for (x in 1..5) {val item = x * xprintln("生产者生产了:$item")send(x * x)delay(1000)}}
fun main() = runBlocking {val squares = produceSquares()squares.consumeEach {println("消费者消费了 $it")}println("Done!")}通过Channel
方式,可以比较方便的实现生产者消费者模式总结
本文主要介绍了生产者消费者模式的四种写法
1.wait/notify
方式实现
2.Lock
方式实现
3.BlockingQueue
方式实现
4.Kotlin Channel
方式实现
评论