【高并发】从源码角度深度解析线程池是如何实现优雅退出的

大家好,我是冰河~~

在【高并发专题】中,我们从源码角度深度分析了线程池中那些重要的接口和抽象类、深度解析了线程池是如何创建的,ThreadPoolExecutor类有哪些属性和内部类,以及它们对线程池的重要作用。深度分析了线程池的整体核心流程,以及如何拆解Worker线程的执行代码,深度解析Worker线程的执行流程。

本文,我们就来从源码角度深度解析线程池是如何优雅的退出程序的。首先,我们来看下ThreadPoolExecutor类中的shutdown()方法。

shutdown()方法

当使用线程池的时候,调用了shutdown()方法后,线程池就不会再接受新的执行任务了。但是在调用shutdown()方法之前放入任务队列中的任务还是要执行的。此方法是非阻塞方法,调用后会立即返回,并不会等待任务队列中的任务全部执行完毕后再返回。我们看下shutdown()方法的源代码,如下所示。

public void shutdown() {  //获取线程池的全局锁  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {   //检查是否有关闭线程池的权限   checkShutdownAccess();   //将当前线程池的状态设置为SHUTDOWN   advanceRunState(SHUTDOWN);   //中断Worker线程   interruptIdleWorkers();   //为ScheduledThreadPoolExecutor调用钩子函数   onShutdown(); // hook for   } finally {   //释放线程池的全局锁   mainLock.unlock();  }  //尝试将状态变为TERMINATED  tryTerminate(); } 

总体来说,shutdown()方法的代码比较简单,首先检查了是否有权限来关闭线程池,如果有权限,则再次检测是否有中断工作线程的权限,如果没有权限,则会抛出SecurityException异常,代码如下所示。

//检查是否有关闭线程池的权限 checkShutdownAccess(); //将当前线程池的状态设置为SHUTDOWN advanceRunState(SHUTDOWN); //中断Worker线程 interruptIdleWorkers(); 

其中,checkShutdownAccess()方法的实现代码如下所示。

private void checkShutdownAccess() {  SecurityManager security = System.getSecurityManager();  if (security != null) {   security.checkPermission(shutdownPerm);   final ReentrantLock mainLock = this.mainLock;   mainLock.lock();   try {    for (Worker w : workers)     security.checkAccess(w.thread);   } finally {    mainLock.unlock();   }  } } 

对于checkShutdownAccess()方法的代码理解起来比较简单,就是检测是否具有关闭线程池的权限,期间使用了线程池的全局锁。

接下来,我们看advanceRunState(int)方法的源代码,如下所示。

private void advanceRunState(int targetState) {  for (;;) {   int c = ctl.get();   if (runStateAtLeast(c, targetState) ||    ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))    break;  } } 

advanceRunState(int)方法的整体逻辑就是:判断当前线程池的状态是否为指定的状态,在shutdown()方法中传递的状态是SHUTDOWN,如果是SHUTDOWN,则直接返回;如果不是SHUTDOWN,则将当前线程池的状态设置为SHUTDOWN。

接下来,我们看看showdown()方法调用的interruptIdleWorkers()方法,如下所示。

private void interruptIdleWorkers() {  interruptIdleWorkers(false); } 

可以看到,interruptIdleWorkers()方法调用的是interruptIdleWorkers(boolean)方法,继续看interruptIdleWorkers(boolean)方法的源代码,如下所示。

private void interruptIdleWorkers(boolean onlyOne) {  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {   for (Worker w : workers) {    Thread t = w.thread;    if (!t.isInterrupted() && w.tryLock()) {     try {      t.interrupt();     } catch (SecurityException ignore) {     } finally {      w.unlock();     }    }    if (onlyOne)     break;   }  } finally {   mainLock.unlock();  } } 

上述代码的总体逻辑为:获取线程池的全局锁,循环所有的工作线程,检测线程是否被中断,如果没有被中断,并且Worker线程获得了锁,则执行线程的中断方法,并释放线程获取到的锁。此时如果onlyOne参数为true,则退出循环。否则,循环所有的工作线程,执行相同的操作。最终,释放线程池的全局锁。

接下来,我们看下shutdownNow()方法。

shutdownNow()方法

如果调用了线程池的shutdownNow()方法,则线程池不会再接受新的执行任务,也会将任务队列中存在的任务丢弃,正在执行的Worker线程也会被立即中断,同时,方法会立刻返回,此方法存在一个返回值,也就是当前任务队列中被丢弃的任务列表。

shutdownNow()方法的源代码如下所示。

public List<Runnable> shutdownNow() {  List<Runnable> tasks;  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {   //检查是否有关闭权限   checkShutdownAccess();   //设置线程池的状态为STOP   advanceRunState(STOP);   //中断所有的Worker线程   interruptWorkers();   //将任务队列中的任务移动到tasks集合中   tasks = drainQueue();  } finally {   mainLock.unlock();  }  /尝试将状态变为TERMINATED  tryTerminate();  //返回tasks集合  return tasks; } 

shutdownNow()方法的源代码的总体逻辑与shutdown()方法基本相同,只是shutdownNow()方法将线程池的状态设置为STOP,中断所有的Worker线程,并且将任务队列中的所有任务移动到tasks集合中并返回。

可以看到,shutdownNow()方法中断所有的线程时,调用了interruptWorkers()方法,接下来,我们就看下interruptWorkers()方法的源代码,如下所示。

private void interruptWorkers() {  final ReentrantLock mainLock = this.mainLock;  mainLock.lock();  try {   for (Worker w : workers)    w.interruptIfStarted();  } finally {   mainLock.unlock();  } } 

interruptWorkers()方法的逻辑比较简单,就是获得线程池的全局锁,循环所有的工作线程,依次中断线程,最后释放线程池的全局锁。

在interruptWorkers()方法的内部,实际上调用的是Worker类的interruptIfStarted()方法来中断线程,我们看下Worker类的interruptIfStarted()方法的源代码,如下所示。

void interruptIfStarted() {  Thread t;  if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {   try {    t.interrupt();   } catch (SecurityException ignore) {   }  } } 

发现其本质上调用的还是Thread类的interrupt()方法来中断线程。

awaitTermination(long, TimeUnit)方法

当线程池调用了awaitTermination(long, TimeUnit)方法后,会阻塞调用者所在的线程,直到线程池的状态修改为TERMINATED才返回,或者达到了超时时间返回。接下来,我们看下awaitTermination(long, TimeUnit)方法的源代码,如下所示。

public boolean awaitTermination(long timeout, TimeUnit unit)  throws InterruptedException {  //获取距离超时时间剩余的时长  long nanos = unit.toNanos(timeout);  //获取Worker线程的的全局锁  final ReentrantLock mainLock = this.mainLock;  //加锁  mainLock.lock();  try {   for (;;) {    //当前线程池状态为TERMINATED状态,会返回true    if (runStateAtLeast(ctl.get(), TERMINATED))     return true;    //达到超时时间,已超时,则返回false    if (nanos <= 0)     return false;    //重置距离超时时间的剩余时长    nanos = termination.awaitNanos(nanos);   }  } finally {   //释放锁   mainLock.unlock();  } } 

上述代码的总体逻辑为:首先获取Worker线程的独占锁,后在循环判断当前线程池是否已经是TERMINATED状态,如果是则直接返回true,否则检测是否已经超时,如果已经超时,则返回false。如果未超时,则重置距离超时时间的剩余时长。接下来,进入下一轮循环,再次检测当前线程池是否已经是TERMINATED状态,如果是则直接返回true,否则检测是否已经超时,如果已经超时,则返回false。如果未超时,则重置距离超时时间的剩余时长。以此循环,直到线程池的状态变为TERMINATED或者已经超时。

好了,今天就到这儿吧,我是冰河,我们下期见~~

商匡云商
Logo
注册新帐户
对比商品
  • 合计 (0)
对比
0
购物车