写点什么

我看 JAVA 之 Thread & ThreadLocal

用户头像
awen
关注
发布于: 2021 年 02 月 27 日

我看 JAVA 之 Thread & ThreadLocal


注:基于 jdk11


Thread


Thread 是程序里执行的一个线程。JVM 允许一个应用程序中可以有多个线程并发执行。

每一个线程都有一个优先级,高优先级的线程优于低优先级的线程先执行。同时,线程还可以被标记为守护线程。线程在被创建的时候优先级默认等同于创建者的优先级。


创建一个 Thread 通常有如下几种方式:


  1. 继承 Thread 类,重写 run()方法

  2. 实现 Runnable 接口,重写 run()方法

  3. 匿名类方式


实现了如下接口


  1. Runnable 被 FunctionalInterface 注解的接口,定义了 public abstract void run()方法供子类去实现。


几个重要的成员变量


private volatile String name; 被 volatile 修饰的 name,每个线程必须有一个唯一的名字,方便调试,一般为 Thread-nextThreadNum()

private boolean daemon = false; 是否守护进程,默认为否

private boolean stillborn = false;

private long eetop;

private Runnable target; 执行目标

private ThreadGroup group; 线程组,默认为 ecurity.getThreadGroup() 或 父线程所在组

private ClassLoader contextClassLoader;

private AccessControlContext inheritedAccessControlContext;

private static int threadInitNumber; 与 Thread-拼接构成线程默认名称,private static synchronized int nextThreadNum()对其递增 threadInitNumber++

ThreadLocal.ThreadLocalMap threadLocals = null;

ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

private final long stackSize; 为当前线程申请的栈空间,默认为 0,取决于 vm 设计实现,有些 vm 会直接忽略此配置

private long nativeParkEventPointer;

private final long tid; 当前线程 ID

private static long threadSeqNumber; 线程 id 计数器,private static synchronized long nextThreadID()对其递增++threadSeqNumber

private volatile int threadStatus;

volatile Object parkBlocker;

private volatile Interruptible blocker;

private final Object blockerLock = new Object();

public static final int MIN_PRIORITY = 1; 线程可以设置的最小优先级

public static final int NORM_PRIORITY = 5;线程默认优先级

public static final int MAX_PRIORITY = 10;线程可以设置的最大优先级

线程的优先级会对应到不同操作系统的优先级,JVM 不一定设置的优先级进行线程调度

异常处理相关

//当前线程异常处理 handler,由 volatile 修饰

private volatile UncaughtExceptionHandler uncaughtExceptionHandler;

//所有线程缺省异常处理 handler,由 static volatile 修饰

private static volatile UncaughtExceptionHandler defaultUncaughtExceptionHandler;


几个重要的方法


  1. 启动线程,JVM 会调用当前线程的 run 方法

  2. 停止线程,已过时

  3. 中断

  4. join 插队并阻塞当前执行线程,使用 loop + wait 的方式实现

  5. suspend 与 resume 要成对出现,如果 A 线程访问某个资源 x 时 suspend(),那么没有任何线程可以访问资源 x 直到 A 线程被 resume()


线程状态及状态转换


状态定义

  public enum State {        NEW,        RUNNABLE,        BLOCKED,        WAITING,        TIMED_WAITING,        TERMINATED;    }
复制代码

状态图

_2020_01_06_11_42_57

例子:

package chapter02;
public class TestThread {
public static void main(String [] args) throws InterruptedException { final Thread thread0 = new Thread(new Runnable() { @Override public void run() { System.out.println("进入run"); try { System.out.printf("enter run(), thread0' state: %s\n", Thread.currentThread().getState()); Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); System.out.println("异常处理"); System.out.printf("on catch interrupt, thread0 isInterrupted or not ? %s \n", Thread.currentThread().isInterrupted()); System.out.printf("on catch interrupt, thread0' state: %s\n", Thread.currentThread().getState());
return; } System.out.println("退出run"); } });
Thread thread1 = new Thread(new Runnable() { @Override public void run() { System.out.println("进入thread1's run"); try { Thread.sleep(1000); System.out.printf("before interrupt, thread0 isInterrupted or nott ? %s \n", thread0.isInterrupted()); System.out.printf("enter thread1's run(), thread0' state: %s\n", thread0.getState()); Thread.sleep(1000); thread0.interrupt(); System.out.printf("after interrupt, thread0 isInterrupted or not ? %s \n", thread0.isInterrupted()); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("退出thread1's run"); } }); System.out.printf("after new(), thread0' state: %s\n", thread0.getState()); thread0.start(); System.out.printf("after start(), thread0' state: %s\n", thread0.getState()); thread1.start(); thread0.join(); System.out.printf("after join(), thread0' state: %s\n", thread0.getState()); System.out.println("退出");


}}
复制代码

打印结果如下:

after new(), thread0' state: NEWafter start(), thread0' state: RUNNABLE进入runenter run(), thread0' state: RUNNABLE进入thread1's runbefore interrupt, thread0 isInterrupted or nott ?  false  enter thread1's run(), thread0' state: TIMED_WAITINGafter interrupt, thread0 isInterrupted or not ?  false  退出thread1's run异常处理on catch interrupt, thread0 isInterrupted or not ? false on catch interrupt, thread0' state: RUNNABLEafter join(), thread0' state: TERMINATED退出
复制代码


异常捕获


说明:

//当前线程异常处理 handler,由 volatile 修饰

private volatile UncaughtExceptionHandler uncaughtExceptionHandler;

//所有线程缺省异常处理 handler,由 static volatile 修饰

private static volatile UncaughtExceptionHandler defaultUncaughtExceptionHandler;

例子:

package chapter02;
public class TestThread {
public static void main(String [] args) throws InterruptedException { //全局异常处理器 Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { System.out.println("-" + Thread.currentThread().getName()); String threadName = t.getName(); System.out.printf("global exception handler >> : current thread's name is %s, ", threadName); System.out.printf("the error is %s \n",e.getLocalizedMessage()); } });
final Thread thread0 = new Thread(new Runnable() { @Override public void run() { System.out.println("进入thread0's run"); System.out.printf("enter run(), thread0' state: %s\n", Thread.currentThread().getState()); try { Thread.sleep(5000); } catch (InterruptedException e) {// e.printStackTrace();// System.out.println("异常处理");// System.out.printf("on catch interrupt, thread0 isInterrupted or not ? %s \n", Thread.currentThread().isInterrupted());// System.out.printf("on catch interrupt, thread0' state: %s\n", Thread.currentThread().getState());//// return;
throw new RuntimeException(e); } System.out.println("退出thread0's run"); } }); //thread0异常处理器 thread0.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { System.out.println("-" + Thread.currentThread().getName()); String threadName = t.getName(); System.out.printf("thread0 exception handler >> : current thread's name is %s, ", threadName); System.out.printf("the error is %s \n",e.getLocalizedMessage()); } });
Thread thread1 = new Thread(new Runnable() { @Override public void run() { System.out.println("进入thread1's run"); try { Thread.sleep(1000); System.out.printf("before interrupt, thread0 isInterrupted or nott ? %s \n", thread0.isInterrupted()); System.out.printf("enter thread1's run(), thread0' state: %s\n", thread0.getState()); Thread.sleep(1000); thread0.interrupt(); System.out.printf("after interrupt, thread0 isInterrupted or not ? %s \n", thread0.isInterrupted()); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("退出thread1's run"); } }); System.out.printf("after new(), thread0' state: %s\n", thread0.getState()); thread0.start(); System.out.printf("after start(), thread0' state: %s\n", thread0.getState()); thread1.setDaemon(true); thread1.start(); thread0.join(); thread1.join(); System.out.printf("after join(), thread0' state: %s\n", thread0.getState()); System.out.println("退出");
}}
复制代码

打印结果如下:

after new(), thread0' state: NEWafter start(), thread0' state: RUNNABLE进入thread0's runenter run(), thread0' state: RUNNABLE进入thread1's runbefore interrupt, thread0 isInterrupted or nott ?  false  enter thread1's run(), thread0' state: TIMED_WAITINGafter interrupt, thread0 isInterrupted or not ?  true  -Thread-0thread0 exception handler >> : current thread's name is Thread-0, the error is java.lang.InterruptedException: sleep interrupted 退出thread1's runafter join(), thread0' state: TERMINATED退出
复制代码


ThreadLocal


说明:

jdk1.2 开始,为解决多线程程序的并发问题提供了一种新的思路 ThreadLocal。使用这个工具类可以很简洁地编写出优美的多线程

程序,ThreadLocal 并不是一个 Thread,而是 Thread 的局部变量。

源码:

public class ThreadLocal<T> {
private final int threadLocalHashCode = nextHashCode(); private static AtomicInteger nextHashCode = new AtomicInteger(); private static final int HASH_INCREMENT = 0x61c88647; private static int nextHashCode() { return nextHashCode.getAndAdd(HASH_INCREMENT); }
protected T initialValue() { return null; }
public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) { return new SuppliedThreadLocal<>(supplier); }
public ThreadLocal() { }
public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); }
boolean isPresent() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); return map != null && map.getEntry(this) != null; }
private T setInitialValue() { T value = initialValue(); Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { map.set(this, value); } else { createMap(t, value); } if (this instanceof TerminatingThreadLocal) { TerminatingThreadLocal.register((TerminatingThreadLocal<?>) this); } return value; } //设置线程本地值,如果已经存在覆盖,否则为当前线程创建新的ThreadLocalMap,赋值给当前线程的threadLocals局部变量 public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { map.set(this, value); } else { createMap(t, value); } } //删除本地值,不调用此方法在线程销毁后jvm也会回收,调用此方法后,如果多次访问get()方法可能导致多次触发initialValue() public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); if (m != null) { m.remove(this); } }
ThreadLocalMap getMap(Thread t) { return t.threadLocals; }
void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); }
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) { return new ThreadLocalMap(parentMap); }
T childValue(T parentValue) { throw new UnsupportedOperationException(); }
static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {
private final Supplier<? extends T> supplier;
SuppliedThreadLocal(Supplier<? extends T> supplier) { this.supplier = Objects.requireNonNull(supplier); }
@Override protected T initialValue() { return supplier.get(); } }
static class ThreadLocalMap {
/** * The entries in this hash map extend WeakReference, using * its main ref field as the key (which is always a * ThreadLocal object). Note that null keys (i.e. entry.get() * == null) mean that the key is no longer referenced, so the * entry can be expunged from table. Such entries are referred to * as "stale entries" in the code that follows. */ static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value;
Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } }
private static final int INITIAL_CAPACITY = 16; private Entry[] table; private int size = 0; private int threshold; // Default to 0 private void setThreshold(int len) { threshold = len * 2 / 3; } private static int nextIndex(int i, int len) { return ((i + 1 < len) ? i + 1 : 0); } private static int prevIndex(int i, int len) { return ((i - 1 >= 0) ? i - 1 : len - 1); }
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) { table = new Entry[INITIAL_CAPACITY]; int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); table[i] = new Entry(firstKey, firstValue); size = 1; setThreshold(INITIAL_CAPACITY); } private ThreadLocalMap(ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table; int len = parentTable.length; setThreshold(len); table = new Entry[len];
for (Entry e : parentTable) { if (e != null) { @SuppressWarnings("unchecked") ThreadLocal<Object> key = (ThreadLocal<Object>) e.get(); if (key != null) { Object value = key.childValue(e.value); Entry c = new Entry(key, value); int h = key.threadLocalHashCode & (len - 1); while (table[h] != null) h = nextIndex(h, len); table[h] = c; size++; } } } } private Entry getEntry(ThreadLocal<?> key) { int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key, i, e); } private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; int len = tab.length;
while (e != null) { ThreadLocal<?> k = e.get(); if (k == key) return e; if (k == null) expungeStaleEntry(i); else i = nextIndex(i, len); e = tab[i]; } return null; } private void set(ThreadLocal<?> key, Object value) {
// We don't use a fast path as with get() because it is at // least as common to use set() to create new entries as // it is to replace existing ones, in which case, a fast // path would fail more often than not.
Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { ThreadLocal<?> k = e.get();
if (k == key) { e.value = value; return; }
if (k == null) { replaceStaleEntry(key, value, i); return; } }
tab[i] = new Entry(key, value); int sz = ++size; if (!cleanSomeSlots(i, sz) && sz >= threshold) rehash(); }
private void remove(ThreadLocal<?> key) { Entry[] tab = table; int len = tab.length; int i = key.threadLocalHashCode & (len-1); for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { if (e.get() == key) { e.clear(); expungeStaleEntry(i); return; } } }
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) { Entry[] tab = table; int len = tab.length; Entry e;
// Back up to check for prior stale entry in current run. // We clean out whole runs at a time to avoid continual // incremental rehashing due to garbage collector freeing // up refs in bunches (i.e., whenever the collector runs). int slotToExpunge = staleSlot; for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len)) if (e.get() == null) slotToExpunge = i;
// Find either the key or trailing null slot of run, whichever // occurs first for (int i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get();
// If we find key, then we need to swap it // with the stale entry to maintain hash table order. // The newly stale slot, or any other stale slot // encountered above it, can then be sent to expungeStaleEntry // to remove or rehash all of the other entries in run. if (k == key) { e.value = value;
tab[i] = tab[staleSlot]; tab[staleSlot] = e;
// Start expunge at preceding stale entry if it exists if (slotToExpunge == staleSlot) slotToExpunge = i; cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); return; }
// If we didn't find stale entry on backward scan, the // first stale entry seen while scanning for key is the // first still present in the run. if (k == null && slotToExpunge == staleSlot) slotToExpunge = i; }
// If key not found, put new entry in stale slot tab[staleSlot].value = null; tab[staleSlot] = new Entry(key, value);
// If there are any other stale entries in run, expunge them if (slotToExpunge != staleSlot) cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); }
/** * Expunge a stale entry by rehashing any possibly colliding entries * lying between staleSlot and the next null slot. This also expunges * any other stale entries encountered before the trailing null. See * Knuth, Section 6.4 * * @param staleSlot index of slot known to have null key * @return the index of the next null slot after staleSlot * (all between staleSlot and this slot will have been checked * for expunging). */ private int expungeStaleEntry(int staleSlot) { Entry[] tab = table; int len = tab.length;
// expunge entry at staleSlot tab[staleSlot].value = null; tab[staleSlot] = null; size--;
// Rehash until we encounter null Entry e; int i; for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; tab[i] = null; size--; } else { int h = k.threadLocalHashCode & (len - 1); if (h != i) { tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until // null because multiple entries could have been stale. while (tab[h] != null) h = nextIndex(h, len); tab[h] = e; } } } return i; }
private boolean cleanSomeSlots(int i, int n) { boolean removed = false; Entry[] tab = table; int len = tab.length; do { i = nextIndex(i, len); Entry e = tab[i]; if (e != null && e.get() == null) { n = len; removed = true; i = expungeStaleEntry(i); } } while ( (n >>>= 1) != 0); return removed; }
private void rehash() { expungeStaleEntries();
// Use lower threshold for doubling to avoid hysteresis if (size >= threshold - threshold / 4) resize(); } private void resize() { Entry[] oldTab = table; int oldLen = oldTab.length; int newLen = oldLen * 2; Entry[] newTab = new Entry[newLen]; int count = 0;
for (Entry e : oldTab) { if (e != null) { ThreadLocal<?> k = e.get(); if (k == null) { e.value = null; // Help the GC } else { int h = k.threadLocalHashCode & (newLen - 1); while (newTab[h] != null) h = nextIndex(h, newLen); newTab[h] = e; count++; } } }
setThreshold(newLen); size = count; table = newTab; } private void expungeStaleEntries() { Entry[] tab = table; int len = tab.length; for (int j = 0; j < len; j++) { Entry e = tab[j]; if (e != null && e.get() == null) expungeStaleEntry(j); } } }}
复制代码


用户头像

awen

关注

Things happen for a reason. 2019.11.15 加入

还未添加个人简介

评论

发布
暂无评论
我看JAVA 之 Thread & ThreadLocal