什么是线程
现代操作系统在运行一个程序时会为其创建一个进程,例如启动一个Java程序,操作系统就会创建一个Java进程。现代操作系统调度的最小单元是线程,也叫轻量级进程,在一个进程里面可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,并且能够访问共享的内存变量。处理器在这些线程上高速切换,让使用者感觉到这些线程在同时运行。
一个Java程序从main()方法开始执行,然后按照既定的代码逻辑执行,看似没有其他线程参与,但实际上Java程序天生就是多线程程序,因为执行main()方法的是一个名称为main的线程,可以使用下面代码查看一个普通的Java程序包含哪些线程:
public class MultiThread {
public static void main(String[] args) {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadBean.dumpAllThreads(false, false);
for (ThreadInfo threadInfo : threadInfos) {
System.out.println("[" + threadInfo.getThreadId() + "] " + threadInfo.getThreadName());
}
}
}
运行结果:
[5] Attach Listener
[4] Signal Dispatcher
[3] Finalizer
[2] Reference Handler
[1] main
可以看到,一个Java程序不仅仅是main()方法的运行,而是main线程和多个其它线程同时在运行。
为什么要用多线程
只执行了一个简单的Java程序,却要启动那么多"无关"的线程,是不是把简单的问题复杂化了?当然不是,因为正确的使用多线程,总是会带来显著的好处,而使用多线程的原因主要有一下几点:
更多的处理器核心
随着处理器上的核心数越来越多,以及超线程技术的广泛运用,现在的计算机都比以前更擅长并行计算,而处理器性能的提升方式,也从更高的主频向更多的核心发展。
线程是大多数系统调度的基本单元,一个程序作为一个进程来运行,程序运行过程中能够创建多个线程,而一个线程在同一时刻只能运行在一个处理器上。试想一下,一个单线程程序在运行时只能使用一个处理器,那么即使计算机有再多的处理器也无法显著提高程序运行效率。相反,如果使用多线程,将计算逻辑分配到多个处理器上,就会显著减少程序处理时间,而且可以随着更多处理器的加入而变得更加有效率
更快的响应时间
有时候我们会写比较复杂的代码,例如创建一笔订单,它包括插入订单数据、生成订单快照、发送邮件通知卖家和记录货品销售数量等。用户从单击提交按钮开始,就要等待这些操作全部完成后才能看到订购成功后的结果,但是这么多业务,如何能够让其更快的完成呢?
在上面场景中,我们可以使用多线程,即将数据一致性不强的操作派发给其它线程去处理,如生成订单快照、发送邮件等等,这样做的好处是响应用户请求的线程能够尽可能快的完成,缩短了响应时间,提升了用户体验
更好的编程模型
Java为多线程编程提供了良好的编程模型,使开发人员更加专注于问题的解决,即为所遇到的问题建立合适的模型,而不是绞尽脑汁地考虑如何将其多线程化。一旦开发人员建立好了模型,稍做修改总是能够方便地映射到Java提供的多线程编程模型上。
线程的状态
Java线程在运行的生命周期中可能处于下表中列出的6中状态,在给定的一个时刻,线程只能处于其中一种状态:
|状态名称|说明 |
|--|--|
|NEW |初始状态,线程被构建,但还没有调用start()方法 |
|RUNNABLE| 运行状态,Java线程将操作系统中的就绪和运行两种状态笼统地称作"运行中"|
|BLOCKED|阻塞状态,表示线程阻塞于锁|
|WAITING|等待状态,表示线程进入等待,进入该状态表示线程需要等待其他线程作出一些特殊动作(通知或中断)|
|TIME_WAITING|超时等待状态,该状态不同于WAITING,他是可以在指定的时间自行返回的|
|TERMINATED|终止状态,表示当前线程已执行完毕|
下面我们可以使用jstack命令尝试查看示例代码运行时的线程信息:
public class ThreadState {
public static void main(String[] args) {
new Thread(() -> {
while(true) {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "TimeWaitingThread").start();
new Thread(new Waiting(), "WaitingThread").start();
new Thread(new Blocked(), "BlockedThread - 1").start();
new Thread(new Blocked(), "BlockedThread - 2").start();
}
static class Waiting implements Runnable {
@Override
public void run() {
while(true) {
synchronized(Waiting.class) {
try {
Waiting.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
static class Blocked implements Runnable {
@Override
public void run() {
synchronized (Blocked.class) {
while(true) {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
打开命令终端,键入jps,输出如下:
可以看到运行示例对应的线程ID是1764,接着再输入jstack 1764,部分输出入下所示:
C:\Users\xxx>jstack 1764
2019-05-14 08:52:54
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.112-b15 mixed mode):
"DestroyJavaVM" #13 prio=5 os_prio=0 tid=0x000000000231f000 nid=0x2838 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"BlockedThread - 2" #12 prio=5 os_prio=0 tid=0x000000005870d000 nid=0x2660 waiting for monitor entry [0x000000005908f000]
java.lang.Thread.State: BLOCKED (on object monitor)
at com.yrk.concurrent.ThreadState$Blocked.run(ThreadState.java:46)
waiting to lock <0x00000000d6031f38> (a java.lang.Class for com.yrk.concurrent.ThreadState$Blocked)
at java.lang.Thread.run(Thread.java:745)
"BlockedThread - 1" #11 prio=5 os_prio=0 tid=0x0000000058706000 nid=0x32cc waiting on condition [0x0000000058f5e000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at com.yrk.concurrent.ThreadState$Blocked.run(ThreadState.java:46)
locked <0x00000000d6031f38> (a java.lang.Class for com.yrk.concurrent.ThreadState$Blocked)
at java.lang.Thread.run(Thread.java:745)
"WaitingThread" #10 prio=5 os_prio=0 tid=0x0000000058705800 nid=0x2824 in Object.wait() [0x000000005896f000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
waiting on <0x00000000d6030690> (a java.lang.Class for com.yrk.concurrent.ThreadState$Waiting)
at java.lang.Object.wait(Object.java:502)
at com.yrk.concurrent.ThreadState$Waiting.run(ThreadState.java:30)
locked <0x00000000d6030690> (a java.lang.Class for com.yrk.concurrent.ThreadState$Waiting)
at java.lang.Thread.run(Thread.java:745)
"TimeWaitingThread" #9 prio=5 os_prio=0 tid=0x00000000586ea000 nid=0x318c waiting on condition [0x0000000058d7e000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:340)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
at com.yrk.concurrent.ThreadState.lambda$0(ThreadState.java:11)
at com.yrk.concurrent.ThreadState$$Lambda$1/834600351.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
通过示例,我们了解到Java程序运行中线程状态的具体含义,线程在自身的生命周期中, 并不是固定地处于某个状态,而是随着代码的执行,在不同的状态之间进行切换,Java线程状态变迁如下图:
由上图可以看到,线程创建之后调用start()方法开始运行,当线程调用wait()方法之后,线程进入等待状态;进入等待状态的线程需要依靠其它线程的通知才能返回到运行状态,而超时等待状态相当于在等待状态的基础上增加了超时限制,也就是超时时间到达时将返回运行状态。当线程调用同步方法,在没有获取到锁的情况下,线程会进入到阻塞状态;线程在执行Runnable的run方法之后将会进入到终止状态。
线程的启动
线程对象在初始化后,调用start()方法就可以启动这个线程;线程start()方法的含义是当前线程告诉JVM,启动调用start()方法的线程。
我们可能会比较疑惑一个问题,启动一个线程为什么是调用start方法,而不是run方法?先简单看一下start方法的定义:
public class Thread implements Runnable {
public synchronized void start() {
This method is not invoked for the main method thread or "system"
group threads created/set up by the VM. Any new functionality added
to this method in the future may have to also be added to the VM.
*
A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();
so that it can be added to the group's list of threads
and the group's unstarted count can be decremented. /
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
private native void start0();
}
我们可以看到start方法实际上是调用一个native方法start0()来启动一个线程,start0()这个方法是在Thread的静态块中注册的:
public class Thread implements Runnable {
/ Make sure registerNatives is the first thing <clinit> does. /
private static native void registerNatives();
static {
registerNatives();
}
}
registerNatives的本地方法的定义是在文件Thread.c; Thread.c定义了各个操作系统平台要用的关于线程的公共数据和操作,以下是Thread.c的全部内容:
http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/00cd9dc3c2b5/src/share/native/java/lang/Thread.c
static JNINativeMethod methods[] = {
{"start0", "()V", (void *)&JVM_StartThread},
{"stop0", "(" OBJ ")V", (void *)&JVM_StopThread},
{"isAlive", "()Z", (void *)&JVM_IsThreadAlive},
{"suspend0", "()V", (void *)&JVM_SuspendThread},
{"resume0", "()V", (void *)&JVM_ResumeThread},
{"setPriority0", "(I)V", (void *)&JVM_SetThreadPriority},
{"yield", "()V", (void *)&JVM_Yield},
{"sleep", "(J)V", (void *)&JVM_Sleep},
{"currentThread", "()" THD, (void *)&JVM_CurrentThread},
{"countStackFrames", "()I", (void *)&JVM_CountStackFrames},
{"interrupt0", "()V", (void *)&JVM_Interrupt},
{"isInterrupted", "(Z)Z", (void *)&JVM_IsInterrupted},
{"holdsLock", "(" OBJ ")Z", (void *)&JVM_HoldsLock},
{"getThreads", "()[" THD, (void *)&JVM_GetAllThreads},
{"dumpThreads", "([" THD ")[[" STE, (void *)&JVM_DumpThreads},
};
#undef THD
#undef OBJ
#undef STE
JNIEXPORT void JNICALL
JavajavalangThreadregisterNatives(JNIEnv *env, jclass cls)
{
(*env)->RegisterNatives(env, cls, methods, ARRAY_LENGTH(methods));
}
从上面代码可以看出,start0()实际会执行JVM_StartThread方法,这个方法是做什么的呢?从名字上看是在JVM中启动一个线程,我们去从jvm源码中找一下答案;需要先下载HotSpot源码 http://hg.openjdk.java.net/jdk8u/jdk8u60/hotspot 然后找到jvm.cpp这个文件:
JVMENTRY(void, JVMStartThread(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_StartThread");
......
sizet sz = size > 0 ? (sizet) size : 0;
nativethread = new JavaThread(&threadentry, sz);
.......
在这个方法中我们new了一个JavaThread,在thread.cpp文件中我们找到Thread::Java
JavaThread::JavaThread(ThreadFunction entrypoint, sizet stack_sz) :
Thread()
#if INCLUDEALLGCS
, satbmarkqueue(&satbmarkqueue_set),
dirtycardqueue(&dirtycardqueue_set)
#endif
{
if (TraceThreadEvents) {
tty->print_cr("creating thread %p", this);
}
initialize();
jniattachstate = notattachingvia_jni;
setentrypoint(entry_point);
os::ThreadType thrtype = os::javathread;
thrtype = entrypoint == &compilerthreadentry ? os::compiler_thread :
os::java_thread;
os::createthread(this, thrtype, stack_sz);
}
这个方法有两个参数,第一个是函数名字,线程创建成功后会根据这个函数名字调用对应的函数,第二个是当前进程内已有的线程数量。最后我们重点关注一下 os::create_thread,实际就是调用平台创建线程的方法来创建线程。接下来就是调用Thread.cpp文件中的Thread::start(Thread* thread)方法:
void Thread::start(Thread* thread) {
trace("start", thread);
if (!DisableStartThread) {
if (thread->isJavathread()) {
javalangThread::setthreadstatus(((JavaThread*)thread)->threadObj(),
javalangThread::RUNNABLE);
}
os::start_thread(thread);
}
}
start 方法中会去调用os::start_thread()方法,调用平台启动线程的方法,最终会回调Thread.cpp中的JavaThread.run()方法。
线程的终止
线程的启动我们已经熟悉了,那么如何终止一个线程呢?
线程的终止,并不是简单的调用stop命令去,虽然stop这个api仍然可以调用,但是它和其它线程的控制方法如suspend、resume一样都是过期了的,不建议使用。就拿stop来说,stop方法在结束一个线程时并不会保证线程的资源正常释放,因此会导致程序出现一些不确定的状态。
要优雅的去中断一个线程,在线程中提供一个interrupt方法。当其他线程通过调用当前线程的interrupt方法,表示向当前线程打了个招呼,告诉他可以中断线程的执行了,至于什么时候中断,取决于线程自己。线程通过检查自身是否被中断来进行响应,可以通过isInterrupted()方法来判断是否被中断。
public class InterruptedDemo {
private static int i;
public static void main(String[] args) throws Exception {
Thread thread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
i++;
}
}, "interruptedThread");
thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt();
}
}
这种通过标识位或者中断操作的方式能够使线程在终止的时候有机会去清理资源,而不是武断地将线程终止,因此这种方式显得更加安全和优雅。
上面的案例中,通过interrupt设置了一个标识告诉线程可以终止了,线程中还提供了一个静态方法Thread.interrupted()对设置中断标识的线程复位。比如在下面的例子中,外面的线程调用Thread.interrupt()来设置中断标识,而里面的线程又通过Thread.interrupted()方法把线程中断标识进行复位:
public class InterruptedDemo {
private static int i;
public static void main(String[] args) throws Exception {
Thread thread = new Thread(() -> {
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("before interrputed(): " + Thread.currentThread().isInterrupted());
Thread.currentThread().interrupted();
System.out.println("after interrputed(): " + Thread.currentThread().isInterrupted());
}
}
}, "interruptedThread");
thread.start();
TimeUnit.SECONDS.sleep(1);
thread.interrupt();
}
}
除了Thread.interrupted()方法对线程中断标识位进行复位以外,还有一种被动复位的场景,就是抛出InterruptedException异常的方法,在InterruptedException抛出之前,JVM会先把线程的中断标识位清除,然后才会抛InterruptedException,这时如果调用isInterrupted方法,将会返回false。
为什么要复位
Thread.interrupted()是属于当前线程的,是当前线程对外界中断信号的一个响应,表示自己已经得到了中断信号,但不会立刻中断自己,具体什么时候中断由自己决定,让外界知道在自身中断前,他的中断状态任然是false,这就是要复位的原因。
线程终止的原理
我们来看一下Thread.interrupt()方法做了什么事情:
public class Thread implements Runnable {
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0();
b.interrupt(this);
return;
}
}
interrupt0();
}
}
在interrupt()方法里面调用了interrupt0(),这个方法我们前面分析start方法的时候见过,也是个native方法,同样,我们找到jvm.cpp文件中JVM_Interrupt的定义:
JVMENTRY(void, JVMInterrupt(JNIEnv* env, jobject jthread))
JVMWrapper("JVM_Interrupt");
oop javathread = JNIHandles::resolvenon_null(jthread);
MutexLockerEx ml(thread->threadObj() == javathread ? NULL : Threadslock);
JavaThread* thr = javalangThread::thread(JNIHandles::resolvenonnull(jthread));
if (thr != NULL) {
Thread::interrupt(thr);
}
这个方法比较简单,直接调用了Thread::interrupt(thr)方法,这个方法的定义在thread.cpp文件中,代码如下:
void Thread::interrupt(Thread* thread) {
trace("interrupt", thread);
debugonly(checkfordanglingthread_pointer(thread);)
os::interrupt(thread);
}
Thread::interrupt方法调用了os::interrupt方法,这个是调用平台的interrupt方法,这个方法的实现是在os.cpp文件中,其中代表的是不同平台,因为JVM是跨平台的,所以对于不同的平台,线程调度的方式是不一样的。我们以oslinux.cpp为例:
void os::interrupt(Thread* thread) {
assert(Thread::current() == thread || Threadslock->ownedby_self(),
"possibility of dangling Thread pointer");
OSThread* osthread = thread->osthread();
if (!osthread->interrupted()) {
osthread->set_interrupted(true);
OrderAccess::fence();
ParkEvent * const slp = thread->_SleepEvent ;
if (slp != NULL) slp->unpark() ;
}
if (thread->isJavathread())
((JavaThread*)thread)->parker()->unpark();
ParkEvent * ev = thread->_ParkEvent ;
if (ev != NULL) ev->unpark() ;
}
setinterrupted(true)实际上就是调用osThread.cpp中的setinterrupted()方法,在osThread中定义了一个成员属性volatile jint_interrupted;
通过上面的分析可以知道,thread.interrupt()方法实际就是设置一个interrupted状态标识位true,并且通过ParkEvent的unpark方法来唤醒线程。
为什么Object.wait、Thread.sleep、Thread.join都会抛出InterruptedException?
其实这几个方法有一个共同点,就是都是属于阻塞的方法。而阻塞方法的释放会取决于一些外部事件,但阻塞方法可能因为等不到外部的触发事件而导致无法终止,所以它允许一个线程请求自己来停止它正在做的事情。当一个方法抛出InterruptedException时,它会尝试停止正在做的事情,并且通过抛出InterruptedException表示提前返回。
所以这个异常的意思是表示一个阻塞被其它线程中断了,然后由于线程调用了interrupt()中断方法,那么Object.wait、Thread.sleep()等被阻塞的线程被唤醒以后通过is_interrupted方法判断中断标识的状态变化,如果发现中断为true,则先清除中断标识,然后抛出InterruptedException。
需要注意的是,InterruptedException异常的抛出并不意味着线程必须终止,而是提醒当前线程有中断操作发生,至于接下来怎么处理取决于线程本身。
评论