写点什么

手写线程池实战

作者:Java高工P7
  • 2021 年 11 月 12 日
  • 本文字数:1710 字

    阅读完需:约 6 分钟

package com.lizba.p3.threadpool;


import java.util.ArrayList;


import java.util.Collections;


import java.util.LinkedList;


import java.util.List;


import java.util.concurrent.atomic.AtomicLong;


/**


  • <p>

  • </p>

  • @Author: Liziba

  • @Date: 2021/6/17 22:34


*/


public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {


/** 线程池最大工作者线程数量 */


private static final int MAX_WORKER_SIZE = 20;


/** 线程池默认工作者线程数量 */


private static final int DEFAULT_WORKER_SIZE = 5;


/** 线程池最小工作者线程数量 */


private static final int MIN_WORKER_SIZE = 5;


/** 工作队列,也称任务队列,用来存放客户端提交的任务 */


private final LinkedList<Job> jobs = new LinkedList<>();


/** 工作者列表,需要具有同步性质,支持并发操作 */


private final List<Worker> workers = Collections.synchronizedList(new ArrayList<Worker>());


/** 工作线程的数量 */


private int workerNum = DEFAULT_WORKER_SIZE;


/** 线程编号生成器 */


private AtomicLong threadNum = new AtomicLong();


public DefaultThreadPool() {


initWorker(DEFAULT_WORKER_SIZE);


}


/**


  • 初始化线程工作者,并启动

  • @param size 初始化工作着大小


*/


private void initWorker(int size) {


for (int i = 0; i < size; i++) {


Worker worker = new Worker();


workers.add(worker);


Thread thread = new Thread(


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());


thread.start();


}


}


@Override


public void execute(Job job) {


if (job != null) {


// 添加一个任务,然后通知等待在 jobs 上的 worker


synchronized (jobs) {


jobs.add(job);


jobs.notifyAll();


}


}


}


@Override


public void shutdown() {


workers.forEach(worker -> worker.shutdown());


}


@Override


public void addWorkers(int num) {


// 此处要锁住 jobs,因为 worker 会从 jobs 获取任务,需要 jobs 通知等待中的 worker


synchronized (jobs) {


// 不允许工作者线程数操作最大值


if (num + this.workerNum > MAX_WORKER_SIZE) {


num = MAX_WORKER_SIZE - this.workerNum;


}


initWorker(num);


this.workerNum += num;


}


}


@Override


public void removeWorkers(int num) {


synchronized (jobs) {


if (num > this.workerNum) {


throw new IllegalArgumentException("超出工作者数目!");


}


int count = 0;


while (count < num) {


Worker worker = workers.get(count);


// 如果移除成功则关闭工作者,工作者将不会继续获取任务执行


if (workers.remove(worker)) {


worker.shutdown();


count++;


}


this.workerNum -= count;


}


}


}


@Override


public int getJobSize() {


return jobs.size();


}


/**


  • <p>

  • </p>

  • @Author: Liziba

  • @Date: 2021/6/17 22:41


*/


class Worker implements Runnable {


/** 是否工作 */


private volatile boolean running = Boolean.TRUE;


@Override


public void run() {


while (running) {


Job job = null;


synchronized (jobs) {


while (jobs.isEmpty()) {


try {


jobs.wait();


} catch (InterruptedException e) {


// 如果感应到外部的中断通知,则自己主动中断返回


Thread.currentThread().interrupt();


return;


}


}


// 取出任务队列的第一个任务


job = jobs.removeFirst();


}


// 执行任务


if (job != null) {


try {


job.run();


} catch (Exception e) {


e.printStackTrace();


}


}


}


}


/**


  • 关闭 worker,全部关闭意味着线程池关闭


*/


public void shutdown() {


running = false;


}


}


}


?


测试


package com.lizba.p3.threadpool;


import com.lizba.p2.SleepUtil;


/**


  • <p>

  • </p>

  • @Author: Liziba

  • @Date: 2021/6/17 23:19


*/


public class PoolTest {


public static void main(String[] args) {


DefaultThreadPool pool = new DefaultThreadPool();


// 提交 10 个任务


int size = 10;


for (int i = 0; i < size; i++) {


Thread job = new Thread(new Runnable() {


@Override


public void run() {


SleepUtil.sleepSecond(1);


System.out.println(Thread.currentThread().getName() + " 执行 Job 任务");


}


});


pool.execute(job);


}

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
手写线程池实战