写点什么

Java 线程池趣味事:这不是线程池

用户头像
Java王路飞
关注
发布于: 2021 年 02 月 20 日
Java线程池趣味事:这不是线程池

要想写出高性能高并发的应用,自然有许多关键,如 io,算法,异步,语言特性,操作系统特性,队列,内存,cpu,分布式,网络,数据结构,高性能组件。

胡说一通先。

回到主题,线程池。如果说多线程是提高系统并发能力的利器之一,那么线程池就是让这个利器更容易控制的一种工具。如果我们自己纯粹使用多线程基础特性编写,那么,必然需要相当老道的经验,才能够驾驭复杂的环境。而线程池则不需要,你只需知道如何使用,即可轻松掌控多线程,安全地为你服务。


1. 常见线程池的应用样例

线程池,不说本身很简单,但应用一定是简单的。

线程池有许多的实现,但我们只说 ThreadPoolExecutor 版本,因其应用最广泛,别无其他。当然了,还有一个定时调度线程池 ScheduledThreadPoolExecutor 另说,因其需求场景不同,无法比较。

下面,我就几个应用级别,说明下我们如何快速使用线程池。(走走过场而已,无关其他)

1.1. 初级线程池

初级版本的使用线程池,只需要借助一个工具类即可: Executors . 它提供了许多静态方法,你只需随便选一个就可以使用线程池了。比如:

// 创建固定数量的线程池Executors.newFixedThreadPool(8);// 创建无限动态创建的线程池Executors.newCachedThreadPool();// 创建定时调度线程池Executors.newScheduledThreadPool(2);// 还有个创建单线程的就不说了,都一样
复制代码

使用上面这些方法创建好的线程池,直接调用其 execute() 或者 submit() 方法,就可以实现多线程编程了。没毛病!

1.2. 中级线程池

我这里所说的中级,实际就是不使用以上超级简单方式使用线程池的方式。即你已经知道了 ThreadPoolExecutor 这个东东了。这不管你的出发点是啥!

// 自定义各线程参数ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 20, 20, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
复制代码

具体参数解释就不说了,咱们不扫盲。总之,使用这玩意儿,说明你已经开始有点门道了。

1.3. 高级线程池

实际上,这个版本就没法具体说如何做了。

但它可能是,你知道你的线程池应用场景的,你清楚你的硬件运行环境的,你会使用线程池命名的,你会定义你的队列大小的,你会考虑上下文切换的,你会考虑线程安全的,你会考虑锁性能的,你可能会自己造个轮子的。。。

2. 这不是线程池

我们通常理解的线程池,就是能够同时跑多个任务的地方。但有时候线程池不一像线程池,而像一个单线程。来看一个具体的简单的线程池的使用场景:

    // 初始化线程池    private ExecutorService executor            = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),                Runtime.getRuntime().availableProcessors(),                0L, TimeUnit.SECONDS,                new ArrayBlockingQueue<>(50),                new NamedThreadFactory("test-pool"),                new ThreadPoolExecutor.CallerRunsPolicy());    // 使用线程池处理任务    public Integer doTask(String updateIntervalDesc) throws Exception {        long startTime = System.currentTimeMillis();        List<TestDto> testList;        AtomicInteger affectNum = new AtomicInteger(0);        int pageSize = 1000;        AtomicInteger pageNo = new AtomicInteger(1);        Map<String, Object> condGroupLabel = new HashMap<>();        log.info("start do sth:{}", updateIntervalDesc);        List<Future<?>> futureList = new ArrayList<>();        do {            PageHelper.startPage(pageNo.getAndIncrement(), pageSize);            List<TestDto> list                    = testDao.getLabelListNew(condGroupLabel);            testList = list;            // 循环向线程池中提交任务            for (TestDto s : list) {                Future<?> future = executor.submit(() -> {                    try {                        // do sth...                        affectNum.incrementAndGet();                    }                    catch (Throwable e) {                        log.error("error:{}", pageNo.get(), e);                    }                });                futureList.add(future);            }        } while (testList.size() >= pageSize);        // 等待任务完成        int i = 0;        for (Future<?> future : futureList) {            future.get();            log.info("done:+{} ", i++);        }        log.info("doTask done:{}, num:{}, cost:{}ms",                updateIntervalDesc, affectNum.get(), System.currentTimeMillis() - startTime);        return affectNum.get();    }
复制代码

主要业务就是,从数据库中取出许多任务,放入线程池中运行。因为任务又涉及到 db 等的 io 操作,所以使用多线程处理,非常合理。

然而,有一种情况的出现,也许会打破这个平衡:那就是当单个任务能够快速执行完成时,而且快到刚上一任务提交完成,还没等下一次提交时,就任务就已被执行完成。这时,你就可能会看到一个神奇的现象,即一直只有一个线程在运行任务。这不是线程池该干的事,更像是单线程任务在跑。

然后,我们可能开始怀疑:某个线程被阻塞了?线程调度不公平了?队列选择不正确了?触发 jdk bug 了?线程池未完全利用的线程了?等等。。。

然而结果并非如此,究其原因只是当我们向线程池提交任务时,实际上只是向线程池的队列中添加了任务。即上面显示的 ArrayBlockingQueue 添加了任务,而线程池中的各 worker 负责从队列中获取任务进行执行。而当任务数很少时,自然只有一部分 worker 会处理执行中了。至于为什么一直是同一个线程在执行,则可能是由于 jvm 的调度机制导致。事实上,是受制于 ArrayBlockingQueue.poll() 的公平性。而这个 poll()的实现原理,则是由 wait/notify 机制的公平性决定的。

如下,是线程池的 worker 工作原理:

    // java.util.concurrent.ThreadPoolExecutor#runWorker    /**     * Main worker run loop.  Repeatedly gets tasks from queue and     * executes them, while coping with a number of issues:     *     * 1. We may start out with an initial task, in which case we     * don't need to get the first one. Otherwise, as long as pool is     * running, we get tasks from getTask. If it returns null then the     * worker exits due to changed pool state or configuration     * parameters.  Other exits result from exception throws in     * external code, in which case completedAbruptly holds, which     * usually leads processWorkerExit to replace this thread.     *     * 2. Before running any task, the lock is acquired to prevent     * other pool interrupts while the task is executing, and then we     * ensure that unless pool is stopping, this thread does not have     * its interrupt set.     *     * 3. Each task run is preceded by a call to beforeExecute, which     * might throw an exception, in which case we cause thread to die     * (breaking loop with completedAbruptly true) without processing     * the task.     *     * 4. Assuming beforeExecute completes normally, we run the task,     * gathering any of its thrown exceptions to send to afterExecute.     * We separately handle RuntimeException, Error (both of which the     * specs guarantee that we trap) and arbitrary Throwables.     * Because we cannot rethrow Throwables within Runnable.run, we     * wrap them within Errors on the way out (to the thread's     * UncaughtExceptionHandler).  Any thrown exception also     * conservatively causes thread to die.     *     * 5. After task.run completes, we call afterExecute, which may     * also throw an exception, which will also cause thread to     * die. According to JLS Sec 14.20, this exception is the one that     * will be in effect even if task.run throws.     *     * The net effect of the exception mechanics is that afterExecute     * and the thread's UncaughtExceptionHandler have as accurate     * information as we can provide about any problems encountered by     * user code.     *     * @param w the worker     */    final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            // worker 不停地向队列中获取任务,然后执行            // 其中获取任务的过程,可能被中断,也可能不会,受到线程池伸缩配置的影响            while (task != null || (task = getTask()) != null) {                w.lock();                // If pool is stopping, ensure thread is interrupted;                // if not, ensure thread is not interrupted.  This                // requires a recheck in second case to deal with                // shutdownNow race while clearing interrupt                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                        afterExecute(task, thrown);                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            processWorkerExit(w, completedAbruptly);        }    }    /**     * Performs blocking or timed wait for a task, depending on     * current configuration settings, or returns null if this worker     * must exit because of any of:     * 1. There are more than maximumPoolSize workers (due to     *    a call to setMaximumPoolSize).     * 2. The pool is stopped.     * 3. The pool is shutdown and the queue is empty.     * 4. This worker timed out waiting for a task, and timed-out     *    workers are subject to termination (that is,     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})     *    both before and after the timed wait, and if the queue is     *    non-empty, this worker is not the last thread in the pool.     *     * @return task, or null if the worker must exit, in which case     *         workerCount is decremented     */    private Runnable getTask() {        boolean timedOut = false; // Did the last poll() time out?
for (;;) { int c = ctl.get(); int rs = runStateOf(c);
// Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }
int wc = workerCountOf(c);
// Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }
try { // 可能调用超时方法,也可能调用阻塞方法 // 固定线程池的情况下,调用阻塞 take() 方法 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
复制代码

即线程池 worker 持续向队列获取任务,执行即可。而队列任务的获取,则由两个读写锁决定:

    // java.util.concurrent.ArrayBlockingQueue#take    public E take() throws InterruptedException {        final ReentrantLock lock = this.lock;        // 此处锁,保证执行线程安全性        lock.lockInterruptibly();        try {            while (count == 0)                // 此处释放锁等待,再次唤醒时,要求必须重新持有锁                notEmpty.await();            return dequeue();        } finally {            lock.unlock();        }    }    //     /**     * Inserts the specified element at the tail of this queue, waiting     * for space to become available if the queue is full.     *     * @throws InterruptedException {@inheritDoc}     * @throws NullPointerException {@inheritDoc}     */    public void put(E e) throws InterruptedException {        checkNotNull(e);        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == items.length)                notFull.await();            enqueue(e);        } finally {            lock.unlock();        }    }    /**     * Inserts element at current put position, advances, and signals.     * Call only when holding lock.     */    private void enqueue(E x) {        // assert lock.getHoldCount() == 1;        // assert items[putIndex] == null;        final Object[] items = this.items;        items[putIndex] = x;        if (++putIndex == items.length)            putIndex = 0;        count++;        // 通知取等线程,唤醒        notEmpty.signal();    }
复制代码

所以,具体谁取到任务,就是要看谁抢到了锁。而这,可能又涉及到 jvm 的高效调度策略啥的了吧。(虽然不确定,但感觉像) 至少,任务运行的表象是,所有任务被某个线程一直抢到。

3. 回归线程池

线程池的目的,在于处理一些异步的任务,或者并发的执行多个无关联的任务。在于让系统减负。而当任务的提交消耗,大于了任务的执行消耗,那就没必要使用多线程了,或者说这是错误的用法了。我们应该线程池做更重的活,而不是轻量级的。如上问题,执行性能必然很差。但我们稍做转变,也许就不一样了。

    // 初始化线程池    private ExecutorService executor            = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),                Runtime.getRuntime().availableProcessors(),                0L, TimeUnit.SECONDS,                new ArrayBlockingQueue<>(50),                new NamedThreadFactory("test-pool"),                new ThreadPoolExecutor.CallerRunsPolicy());    // 使用线程池处理任务    public Integer doTask(String updateIntervalDesc) throws Exception {        long startTime = System.currentTimeMillis();        List<TestDto> testList;        AtomicInteger affectNum = new AtomicInteger(0);        int pageSize = 1000;        AtomicInteger pageNo = new AtomicInteger(1);        Map<String, Object> condGroupLabel = new HashMap<>();        log.info("start do sth:{}", updateIntervalDesc);        List<Future<?>> futureList = new ArrayList<>();        do {            PageHelper.startPage(pageNo.getAndIncrement(), pageSize);            List<TestDto> list                    = testDao.getLabelListNew(condGroupLabel);            testList = list;            // 一批任务只向线程池中提交任务            Future<?> future = executor.submit(() -> {                for (TestDto s : list) {                    try {                        // do sth...                        affectNum.incrementAndGet();                    }                    catch (Throwable e) {                        log.error("error:{}", pageNo.get(), e);                    }                }            });            futureList.add(future);        } while (testList.size() >= pageSize);        // 等待任务完成        int i = 0;        for (Future<?> future : futureList) {            future.get();            log.info("done:+{} ", i++);        }        log.info("doTask done:{}, num:{}, cost:{}ms",                updateIntervalDesc, affectNum.get(), System.currentTimeMillis() - startTime);        return affectNum.get();    }
复制代码

即,让每个线程执行的任务足够重,以至于完全忽略提交的消耗。这样才能够发挥多线程的作用。

原文链接: http://www.cnblogs.com/yougewe/p/14421826.html

如果觉得本文对你有帮助,可以关注一下我公众号,回复关键字【面试】即可得到一份 Java 核心知识点整理与一份面试大礼包!另有更多技术干货文章以及相关资料共享,大家一起学习进步!



发布于: 2021 年 02 月 20 日阅读数: 16
用户头像

Java王路飞

关注

需要资料添加小助理vx:17375779923 即可 2021.01.29 加入

Java领域;架构知识;面试心得;互联网行业最新资讯

评论 (1 条评论)

发布
用户头像
Java 线程池趣味事:这不是线程池
2021 年 02 月 20 日 21:32
回复
没有更多了
Java线程池趣味事:这不是线程池