写点什么

面试卡在多线程?那就分享几道 Java 多线程高频面试题,面试不用愁

作者:钟奕礼
  • 2022-11-29
    湖南
  • 本文字数:12880 字

    阅读完需:约 42 分钟

  1. 多线程中的忙循环是什么?忙循环就是程序员用循环让一个线程等待,不像传统方法 wait()、 sleep() 或 yield(),它们都放弃了 CPU 控制,而忙循环不会放弃 CPU,它就是在运行一个空循环。


这么做的目的是为了保留 CPU 缓存,在多核系统中,一个等待线程醒来的时候可能会在另一个内核运行,这样会重建缓存。为了避免重建缓存和减少等待重建的时间就可以使用它了。


  1. 什么是自旋锁?没有获得锁的线程一直循环在那里看是否该锁的保持者已经释放了锁,这就是自旋锁。

  2. 什么是互斥锁?互斥锁:从等待到解锁过程,线程会从 sleep 状态变为 running 状态,过程中有线程上下文的切换,抢占 CPU 等开销。

  3. 自旋锁的优缺点?自旋锁不会引起调用者休眠,如果自旋锁已经被别的线程保持,调用者就一直循环在那里看是否该自旋锁的保持者释放了锁。由于自旋锁不会引起调用者休眠,所以自旋锁的效率远高于互斥锁。


虽然自旋锁效率比互斥锁高,但它会存在下面两个问题: 1、自旋锁一直占用 CPU,在未获得锁的情况下,一直运行,如果不能在很短的时间内获得锁,会导致 CPU 效率降低。 2、试图递归地获得自旋锁会引起死锁。递归程序决不能在持有自旋锁时调用它自己,也决不能在递归调用时试图获得相同的自旋锁。


由此可见,我们要慎重的使用自旋锁,自旋锁适合于锁使用者保持锁时间比较短并且锁竞争不激烈的情况。正是由于自旋锁使用者一般保持锁时间非常短,因此选择自旋而不是睡眠是非常必要的,自旋锁的效率远高于互斥锁。


  1. 如何在两个线程间共享数据?同一个 Runnable,使用全局变量。


第一种:将共享数据封装到一个对象中,把这个共享数据所在的对象传递给不同的 Runnable


第二种:将这些 Runnable 对象作为某一个类的内部类,共享的数据作为外部类的成员变量,对共享数据的操作分配给外部类的方法来完成,以此实现对操作共享数据的互斥和通信,作为内部类的 Runnable 来操作外部类的方法,实现对数据的操作


class ShareData {private int x = 0;


public synchronized void addx(){x++;System.out.println("x++ : "+x);}public synchronized void subx(){x--;System.out.println("x-- : "+x);}}


public class ThreadsVisitData {


public static ShareData share = new ShareData();


public static void main(String[] args) {//final ShareData share = new ShareData();new Thread(new Runnable() {public void run() {for(int i = 0;i<100;i++){share.addx();}}}).start();new Thread(new Runnable() {public void run() {for(int i = 0;i<100;i++){share.subx();}}}).start();}}


  1. Java 中 Runnable 和 Callable 有什么不同?Runnable 和 Callable 都是接口, 不同之处: 1.Callable 可以返回一个类型 V,而 Runnable 不可以 2.Callable 能够抛出 checked exception,而 Runnable 不可以。 3.Runnable 是自从 java1.1 就有了,而 Callable 是 1.5 之后才加上去的 4.Callable 和 Runnable 都可以应用于 executors。而 Thread 类只支持 Runnable.


import java.util.concurrent.Callable;


import java.util.concurrent.ExecutionException;


import java.util.concurrent.ExecutorService;


import java.util.concurrent.Executors;


import java.util.concurrent.Future;


public class ThreadTestB {


public static void main(String[] args) {


ExecutorService e=Executors.newFixedThreadPool(10);


Future f1=e.submit(new MyCallableA());


Future f2=e.submit(new MyCallableA());


Future f3=e.submit(new MyCallableA());


System.out.println("--Future.get()....");


try {


System.out.println(f1.get());


System.out.println(f2.get());


System.out.println(f3.get());


} catch (InterruptedException e1) {


e1.printStackTrace();


} catch (ExecutionException e1) {


e1.printStackTrace();


}


e.shutdown();


}


}


class MyCallableA implements Callable<String>{


public String call() throws Exception {


System.out.println("开始执行 Callable");


String[] ss={"zhangsan","lisi"};


long[] num=new long[2];


for(int i=0;i<1000000;i++){


num[(int)(Math.random()*2)]++;


}


    if(num[0]>num[1]){          return ss[0];      }else if(num[0]<num[1]){          throw new Exception("弃权!");      }else{          return ss[1];      }  } 
复制代码


}


7. Java 中 CyclicBarrier 和 CountDownLatch 有什么不同?CountDownLatch 和 CyclicBarrier 都能够实现线程之间的等待,只不过它们侧重点不同:


CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后,它才执行;CyclicBarrier 一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行;另外,CountDownLatch 是不能够重用的,而 CyclicBarrier 是可以重用的。CountDownLatch 的用法:


public class Test {public static void main(String[] args) {


final CountDownLatch latch = new CountDownLatch(2);


     new Thread(){         public void run() {             try {                 System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");                Thread.sleep(3000);                System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");                latch.countDown();            } catch (InterruptedException e) {                e.printStackTrace();            }         };     }.start();           new Thread(){         public void run() {             try {                 System.out.println("子线程"+Thread.currentThread().getName()+"正在执行");                 Thread.sleep(3000);                 System.out.println("子线程"+Thread.currentThread().getName()+"执行完毕");                 latch.countDown();            } catch (InterruptedException e) {                e.printStackTrace();            }         };     }.start();           try {         System.out.println("等待2个子线程执行完毕...");        latch.await();        System.out.println("2个子线程已经执行完毕");        System.out.println("继续执行主线程");    } catch (InterruptedException e) {        e.printStackTrace();    } }
复制代码


}CyclicBarrier 用法:


public class Test {public static void main(String[] args) {int N = 4;CyclicBarrier barrier = new CyclicBarrier(N,new Runnable() {@Overridepublic void run() {System.out.println("当前线程"+Thread.currentThread().getName());


}});


    for(int i=0;i<N;i++)        new Writer(barrier).start();}static class Writer extends Thread{    private CyclicBarrier cyclicBarrier;    public Writer(CyclicBarrier cyclicBarrier) {        this.cyclicBarrier = cyclicBarrier;    }
@Override public void run() { System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据..."); try { Thread.sleep(5000); //以睡眠来模拟写入数据操作 System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println("所有线程写入完毕,继续处理其他任务..."); }}
复制代码


}8. Java 中 interrupted 和 isInterruptedd 方法的区别?interrupt 方法用于中断线程。调用该方法的线程的状态为将被置为"中断"状态。


注意:线程中断仅仅是置线程的中断状态位,不会停止线程。需要用户自己去监视线程的状态为并做处理。支持线程中断的方法(也就是线程中断后会抛出 interruptedException 的方法)就是在监视线程的中断状态,一旦线程的中断状态被置为“中断状态”,就会抛出中断异常。


isInterrupted 只是简单的查询中断状态,不会对状态进行修改。


  1. concurrentHashMap 的源码理解以及内部实现原理,为什么他是同步的且效率高 ConcurrentHashMap 分析


ConcurrentHashMap 的结构是比较复杂的,都深究去本质,其实也就是数组和链表而已。我们由浅入深慢慢的分析其结构。


先简单分析一下,ConcurrentHashMap 的成员变量中,包含了一个 Segment 的数组(final Segment<K,V>[] segments;),而 Segment 是 ConcurrentHashMap 的内部类,然后在 Segment 这个类中,包含了一个 HashEntry 的数组(transient volatile HashEntry<K,V>[] table;)。而 HashEntry 也是 ConcurrentHashMap 的内部类。HashEntry 中,包含了 key 和 value 以及 next 指针(类似于 HashMap 中 Entry),所以 HashEntry 可以构成一个链表。


所以通俗的讲,ConcurrentHashMap 数据结构为一个 Segment 数组,Segment 的数据结构为 HashEntry 的数组,而 HashEntry 存的是我们的键值对,可以构成链表。


首先,我们看一下 HashEntry 类。


HashEntry


HashEntry 用来封装散列映射表中的键值对。在 HashEntry 类中,key,hash 和 next 域都被声明为 final 型,value 域被声明为 volatile 型。其类的定义为:


static final class HashEntry<K,V> {final int hash;final K key;volatile V value;volatile HashEntry<K,V> next;


    HashEntry(int hash, K key, V value, HashEntry<K,V> next) {        this.hash = hash;        this.key = key;        this.value = value;        this.next = next;    }    ...    ...
复制代码


}HashEntry 的学习可以类比着 HashMap 中的 Entry。我们的存储键值对的过程中,散列的时候如果发生“碰撞”,将采用“分离链表法”来处理碰撞:把碰撞的 HashEntry 对象链接成一个链表。


Segment


Segment 的类定义为 static final class Segment<K,V> extends ReentrantLock implements Serializable。其继承于 ReentrantLock 类,从而使得 Segment 对象可以充当锁的角色。Segment 中包含 HashEntry 的数组,其可以守护其包含的若干个桶(HashEntry 的数组)。Segment 在某些意义上有点类似于 HashMap 了,都是包含了一个数组,而数组中的元素可以是一个链表。


table:table 是由 HashEntry 对象组成的数组如果散列时发生碰撞,碰撞的 HashEntry 对象就以链表的形式链接成一个链表 table 数组的数组成员代表散列映射表的一个桶每个 table 守护整个 ConcurrentHashMap 包含桶总数的一部分如果并发级别为 16,table 则守护 ConcurrentHashMap 包含的桶总数的 1/16。


count 变量是计算器,表示每个 Segment 对象管理的 table 数组(若干个 HashEntry 的链表)包含的 HashEntry 对象的个数。之所以在每个 Segment 对象中包含一个 count 计数器,而不在 ConcurrentHashMap 中使用全局的计数器,是为了避免出现“热点域”而影响并发性。


/**


  • Segments are specialized versions of hash tables. This

  • subclasses from ReentrantLock opportunistically, just to

  • simplify some locking and avoid separate construction./static final class Segment<K,V> extends ReentrantLock implements Serializable {/*

  • The per-segment table. Elements are accessed via

  • entryAt/setEntryAt providing volatile semantics.*/transient volatile HashEntry<K,V>[] table;

  • /**

  • The number of elements. Accessed only either within locks

  • or among other volatile reads that maintain visibility./transient int count;transient int modCount;/*

  • 装载因子*/final float loadFactor;}ConcurrentHashMap


ConcurrentHashMap 的结构中包含的 Segment 的数组,在默认的并发级别会创建包含 16 个 Segment 对象的数组。通过我们上面的知识,我们知道每个 Segment 又包含若干个散列表的桶,每个桶是由 HashEntry 链接起来的一个链表。如果 key 能够均匀散列,每个 Segment 大约守护整个散列表桶总数的 1/16。


并发写操作


在 ConcurrentHashMap 中,当执行 put 方法的时候,会需要加锁来完成。我们通过代码来解释一下具体过程: 当我们 new 一个 ConcurrentHashMap 对象,并且执行 put 操作的时候,首先会执行 ConcurrentHashMap 类中的 put 方法,该方法源码为:


/**


  • Maps the specified key to the specified value in this table.

  • Neither the key nor the value can be null.

  • <p> The value can be retrieved by calling the <tt>get</tt> method

  • with a key that is equal to the original key.

  • @param key key with which the specified value is to be associated

  • @param value value to be associated with the specified key

  • @return the previous value associated with <tt>key</tt>, or

  • @throws NullPointerException if the specified key or value is null*/@SuppressWarnings("unchecked")public V put(K key, V value) {Segment<K,V> s;if (value == null)throw new NullPointerException();int hash = hash(key);int j = (hash >>> segmentShift) & segmentMask;if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegments = ensureSegment(j);return s.put(key, hash, value, false);}我们通过注释可以了解到,ConcurrentHashMap 不允许空值。该方法首先有一个 Segment 的引用 s,然后会通过 hash() 方法对 key 进行计算,得到哈希值;继而通过调用 Segment 的 put(K key, int hash, V value, boolean onlyIfAbsent)方法进行存储操作。该方法源码为:


final V put(K key, int hash, V value, boolean onlyIfAbsent) {//加锁,这里是锁定的 Segment 而不是整个 ConcurrentHashMapHashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);V oldValue;try {HashEntry<K,V>[] tab = table;//得到 hash 对应的 table 中的索引 indexint index = (tab.length - 1) & hash;//找到 hash 对应的是具体的哪个桶,也就是哪个 HashEntry 链表 HashEntry<K,V> first = entryAt(tab, index);for (HashEntry<K,V> e = first;;) {if (e != null) {K k;if ((k = e.key) == key ||(e.hash == hash && key.equals(k))) {oldValue = e.value;if (!onlyIfAbsent) {e.value = value;++modCount;}break;}e = e.next;}else {if (node != null)node.setNext(first);elsenode = new HashEntry<K,V>(hash, key, value, first);int c = count + 1;if (c > threshold && tab.length < MAXIMUM_CAPACITY)rehash(node);elsesetEntryAt(tab, index, node);++modCount;count = c;oldValue = null;break;}}} finally {//解锁 unlock();}return oldValue;}关于该方法的某些关键步骤,在源码上加上了注释。


需要注意的是:加锁操作是针对的 hash 值对应的某个 Segment,而不是整个 ConcurrentHashMap。因为 put 操作只是在这个 Segment 中完成,所以并不需要对整个 ConcurrentHashMap 加锁。所以,此时,其他的线程也可以对另外的 Segment 进行 put 操作,因为虽然该 Segment 被锁住了,但其他的 Segment 并没有加锁。同时,读线程并不会因为本线程的加锁而阻塞。


正是因为其内部的结构以及机制,所以 ConcurrentHashMap 在并发访问的性能上要比 Hashtable 和同步包装之后的 HashMap 的性能提高很多。在理想状态下,ConcurrentHashMap 可以支持 16 个线程执行并发写操作(如果并发级别设置为 16),及任意数量线程的读操作。


总结


在实际的应用中,散列表一般的应用场景是:除了少数插入操作和删除操作外,绝大多数都是读取操作,而且读操作在大多数时候都是成功的。正是基于这个前提,ConcurrentHashMap 针对读操作做了大量的优化。通过 HashEntry 对象的不变性和用 volatile 型变量协调线程间的内存可见性,使得 大多数时候,读操作不需要加锁就可以正确获得值。这个特性使得 ConcurrentHashMap 的并发性能在分离锁的基础上又有了近一步的提高。


ConcurrentHashMap 是一个并发散列映射表的实现,它允许完全并发的读取,并且支持给定数量的并发更新。相比于 HashTable 和用同步包装器包装的 HashMap(Collections.synchronizedMap(new HashMap())),ConcurrentHashMap 拥有更高的并发性。在 HashTable 和由同步包装器包装的 HashMap 中,使用一个全局的锁来同步不同线程间的并发访问。同一时间点,只能有一个线程持有锁,也就是说在同一时间点,只能有一个线程能访问容器。这虽然保证多线程间的安全并发访问,但同时也导致对容器的访问变成串行化的了。


ConcurrentHashMap 的高并发性主要来自于三个方面:


用分离锁实现多个线程间的更深层次的共享访问。用 HashEntery 对象的不变性来降低执行读操作的线程在遍历链表期间对加锁的需求。通过对同一个 Volatile 变量的写 / 读访问,协调不同线程间读 / 写操作的内存可见性。使用分离锁,减小了请求 同一个锁的频率。


通过 HashEntery 对象的不变性及对同一个 Volatile 变量的读 / 写来协调内存可见性,使得 读操作大多数时候不需要加锁就能成功获取到需要的值。由于散列映射表在实际应用中大多数操作都是成功的 读操作,所以 2 和 3 既可以减少请求同一个锁的频率,也可以有效减少持有锁的时间。通过减小请求同一个锁的频率和尽量减少持有锁的时间 ,使得 ConcurrentHashMap 的并发性相对于 HashTable 和用同步包装器包装的 HashMap 有了质的提高。


  1. BlockingQueue 的使用?BlockingQueue 的原理


阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。


BlockingQueue 的核心方法:


add(E e): 添加元素,如果 BlockingQueue 可以容纳,则返回 true,否则报异常 offer(E e): 添加元素,如果 BlockingQueue 可以容纳,则返回 true,否则返回 false.put(E e): 添加元素,如果 BlockQueue 没有空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续.poll(long timeout, TimeUnit timeUnit): 取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 timeout 参数规定的时间,取不到时返回 nulltake(): 取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 Blocking 有新的对象被加入为止 BlockingQueue 常用实现类


ArrayBlockingQueue: 有界的先入先出顺序队列,构造方法确定队列的大小.LinkedBlockingQueue: 无界的先入先出顺序队列,构造方法提供两种,一种初始化队列大小,队列即有界;第二种默认构造方法,队列无界(有界即 Integer.MAX_VALUE)SynchronousQueue: 特殊的 BlockingQueue,没有空间的队列,即必须有取的方法阻塞在这里的时候才能放入元素。PriorityBlockingQueue: 支持优先级的阻塞队列 ,存入对象必须实现 Comparator 接口 (需要注意的是 队列不是在加入元素的时候进行排序,而是取出的时候,根据 Comparator 来决定优先级最高的)。BlockingQueue<> 队列的作用


BlockingQueue 实现主要用于生产者-使用者队列,BlockingQueue 实现是线程安全的。所有排队方法都可以使用内部锁或其他形式的并发控制来自动达到它们的目的


这是一个生产者-使用者场景的一个用例。注意,BlockingQueue 可以安全地与多个生产者和多个使用者一起使用 此用例来自 jdk 文档


//这是一个生产者类 class Producer implements Runnable {private final BlockingQueue queue;Producer(BlockingQueue q) {queue = q;}public void run() {try {while(true) {queue.put(produce());}} catch (InterruptedException ex) {... handle ...}}Object produce() {...}}


//这是一个消费者类 class Consumer implements Runnable {private final BlockingQueue queue;Consumer(BlockingQueue q) { queue = q; }public void run() {try {while(true) {consume(queue.take());}} catch (InterruptedException ex) {... handle ...}}void consume(Object x) {...}}


//这是实现类 class Setup {void main() {//实例一个非阻塞队列 BlockingQueue q = new SomeQueueImplementation();//将队列传入两个消费者和一个生产者中 Producer p = new Producer(q);Consumer c1 = new Consumer(q);Consumer c2 = new Consumer(q);new Thread(p).start();new Thread(c1).start();new Thread(c2).start();}}11. ThreadPool 的深入考察?引言


合理利用线程池能够带来三个好处。


降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。


线程池的使用


我们可以通过 ThreadPoolExecutor 来创建一个线程池。


new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);创建一个线程池需要输入几个参数:


corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的 prestartAllCoreThreads 方法,线程池会提前创建并启动所有基本线程。runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按 FIFO (先进先出) 排序元素,吞吐量通常要高于 ArrayBlockingQueue。静态工厂方法 Executors.newFixedThreadPool()使用了这个队列。SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这个队列。PriorityBlockingQueue:一个具有优先级的无限阻塞队列。maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是 AbortPolicy,表示无法处理新任务时抛出异常。以下是 JDK1.5 提供的四种策略。AbortPolicy:直接抛出异常。CallerRunsPolicy:只用调用者所在线程来运行任务。DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。DiscardPolicy:不处理,丢弃掉。 当然也可以根据应用场景需要来实现 RejectedExecutionHandler 接口自定义策略。如记录日志或持久化不能处理的任务。keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。向线程池提交任务


我们可以使用 execute 提交的任务,但是 execute 方法没有返回值,所以无法判断任务是否被线程池执行成功。通过以下代码可知 execute 方法输入的任务是一个 Runnable 类的实例。


threadsPool.execute(new Runnable() {@Overridepublic void run() {// TODO Auto-generated method stub}});我们也可以使用 submit 方法来提交任务,它会返回一个 future,那么我们可以通过这个 future 来判断任务是否执行成功,通过 future 的 get 方法来获取返回值,get 方法会阻塞住直到任务完成,而使用 get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。


Future<Object> future = executor.submit(harReturnValuetask);try {Object s = future.get();} catch (InterruptedException e) {// 处理中断异常} catch (ExecutionException e) {// 处理无法执行任务异常} finally {// 关闭线程池 executor.shutdown();}线程池的关闭


我们可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池,它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。


只要调用了这两个关闭方法的其中一个,isShutdown 方法就会返回 true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用 shutdown 来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow。


源码分析


上面的流程分析让我们很直观的了解了线程池的工作原理,让我们再通过源代码来看看是如何实现的。线程池执行任务的方法如下:


public void execute(Runnable command) {if (command == null)throw new NullPointerException();//如果线程数小于基本线程数,则创建线程并执行当前任务 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {//如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。if (runState == RUNNING && workQueue.offer(command)) {if (runState != RUNNING || poolSize == 0)ensureQueuedTaskHandled(command);}//如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务。else if (!addIfUnderMaximumPoolSize(command))//抛出 RejectedExecutionException 异常 reject(command); // is shutdown or saturated}}工作线程。线程池创建线程时,会将线程封装成工作线程 Worker,Worker 在执行完任务后,还会无限循环获取工作队列里的任务来执行。我们可以从 Worker 的 run 方法里看到这点:


public void run() {try {Runnable task = firstTask;firstTask = null;while (task != null || (task = getTask()) != null) {runTask(task);task = null;}} finally {workerDone(this);}}合理的配置线程池


要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:


任务的性质:CPU 密集型任务,IO 密集型任务和混合型任务。任务的优先级:高,中和低。任务的执行时间:长,中和短。任务的依赖性:是否依赖其他系统资源,如数据库连接。任务性质不同的任务可以用不同规模的线程池分开处理。CPU 密集型任务配置尽可能小的线程,如配置 Ncpu+1 个线程的线程池。IO 密集型任务则由于线程并不是一直在执行任务,则配置尽可能多的线程,如 2*Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的 CPU 个数。


优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。


执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。


依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,如果等待的时间越长 CPU 空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用 CPU。


建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行 SQL 变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然我们的系统所有的任务是用的单独的服务器部署的,而我们使用不同规模的线程池跑不同类型的任务,但是出现这样问题时也会影响到其他任务。


线程池的监控


通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用


taskCount:线程池需要执行的任务数量。completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于 taskCount。largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不+getActiveCount:获取活动的线程数。通过扩展线程池进行监控。通过继承线程池并重写线程池的 beforeExecute,afterExecute 和 terminated 方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。如:


protected void beforeExecute(Thread t, Runnable r) { }12. Java 中 Semaphore 是什么?Java 中的 Semaphore 是一种新的同步类,它是一个计数信号。


从概念上讲,信号量维护了一个许可集合。如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可。每个 release()添加一个许可,从而可能释放一个正在阻塞的获取者。


但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。


信号量常常用于多线程的代码中,比如数据库连接池。


  1. 同步方法和同步代码块的区别是什么?同步方法默认用 this 或者当前类 class 对象作为锁; 同步代码块可以选择以什么来加锁,比同步方法要更细颗粒度,我们可以选择只同步会发生同步问题的部分代码而不是整个方法; 同步方法使用关键字 synchronized 修饰方法,而同步代码块主要是修饰需要进行同步的代码,用 synchronized(object){代码内容}进行修饰;

  2. 同步方法和同步代码块的区别是什么?同步方法默认用 this 或者当前类 class 对象作为锁; 同步代码块可以选择以什么来加锁,比同步方法要更细颗粒度,我们可以选择只同步会发生同步问题的部分代码而不是整个方法; 同步方法使用关键字 synchronized 修饰方法,而同步代码块主要是修饰需要进行同步的代码,用 synchronized(object){代码内容}进行修饰;

  3. 如何确保 N 个线程可以访问 N 个资源同时又不导致死锁?使用多线程的时候,一种非常简单的避免死锁的方式就是:指定获取锁的顺序,并强制线程按照指定的顺序获取锁。因此,如果所有的线程都是以同样的顺序加锁和释放锁,就不会出现死锁了。


Java 读者福利:笔者把近一年经历过的 Java 岗位面试,和一些刷过的面试题都做成了 PDF,PDF 都是免费分享,需要的小伙伴可以+ VX: mxk6072


用户头像

钟奕礼

关注

还未添加个人签名 2021-03-24 加入

还未添加个人简介

评论

发布
暂无评论
面试卡在多线程?那就分享几道Java多线程高频面试题,面试不用愁_Java_钟奕礼_InfoQ写作社区