百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 编程网 > 正文

理解线程池,一篇就够

yuyutoo 2024-12-12 15:54 1 浏览 0 评论

为什么使用线程池

在多线程和高并发场景中,需要创建大量的线程来进行业务处理,我们通常创建线程的方式有三种,通过继承Thread类,实现Runnable接口以及实现Callable接口,我们创建这三种线程在运行结束后都会被虚拟机销毁,如果数量多的话,创建和销毁线程时需要消耗比较多的系统资源,这个时候可以使用线程池,线程池可以将使用完的线程进行回收利用,如果系统需要大量使用线程,线程池可以减少创建线程数量,从而节省系统资源。

线程池的生命周期

线程池从诞生到死亡,中间会经历RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五个生命周期状态。

  • RUNNING 表示线程池处于运行状态,能够接受新提交的任务且能对已添加的任务进行处理。RUNNING状态是线程池的初始化状态,线程池一旦被创建就处于RUNNING状态。
  • SHUTDOWN 线程处于关闭状态,不接受新任务,但可以处理已添加的任务。RUNNING状态的线程池调用shutdown后会进入SHUTDOWN状态。
  • STOP 线程池处于停止状态,不接收任务,不处理已添加的任务,且会中断正在执行任务的线程。RUNNING状态的线程池调用了shutdownNow后会进入STOP状态。
  • TIDYING 当所有任务已终止,且任务数量为0时,线程池会进入TIDYING。当线程池处于SHUTDOWN状态时,阻塞队列中的任务被执行完了,且线程池中没有正在执行的任务了,状态会由SHUTDOWN变为TIDYING。当线程处于STOP状态时,线程池中没有正在执行的任务时则会由STOP变为TIDYING。
  • TERMINATED 线程终止状态。处于TIDYING状态的线程执行terminated()后进入TERMINATED状态。

根据上述线程池生命周期状态的描述,可以画出如下所示的线程池生命周期状态流程示意图。

线程池工作机制

我们通常可以使用newSingleThreadExecutor(),newFixedThreadPool(),newCachedThreadPool()的方式创建线程池,这些都是通过ThreadPoolExecutor类中构造函数传入不同的参数封装的对象,所以想要了解线程池,我们需要研究一下线程池中最重要类ThreadPoolExecutor。

ThreadPoolExecutor类中创建线程池的构造函数如下:

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler)

  • corePoolSize:核心池大小,核心线程数(线程池不会销毁)。
  • maximumPoolSize:线程池最多线程数。
  • keepAliveTime:存活时间,当线程池数量超corePoolSize时,多余的空闲线程的存活时间,即超过corePoolSize的空闲线程,在多长时间内会被销毁。
  • unit:单位(存活时间)。
  • workQueue:任务队列,被提交但尚未被执行的任务。
  • threadFactory:线程工厂,用于创建线程,一般用默认的即可。
  • handler:拒绝策略。当任务太多来不及处理时,如何拒绝任务。

核心池大小,最多线程数和存活时间共同管理这线程的创建与销毁。核心池大小是目标大小;线程池的实现试图维护线程池的大小,即是没有任务执行,池的大小也等于核心池的大小,并且在工作队列充满前,线程池都不会创建更多的线程。最多线程数是可同时活动的线程数的上限。如果一个线程已经闲置的时间超过了存活时间,它将被线程池回收。

线程池工作流程

线程池提交任务是从execute方法开始的,我们可以从execute方法来分析线程池的工作流程。

1.当execute方法提交一个任务时,如果线程池中线程数小于corePoolSize,那么不管线程池中是否有空闲的线程,都会创建一个新的线程来执行任务。

2.当execute方法提交一个任务时,线程池中的线程数已经达到了corePoolSize,且此时没有空闲的线程,那么则会将任务存储到workQueue中。

3.如果execute提交任务时线程池中的线程数已经到达了corePoolSize,并且workQueue已满,那么则会创建新的线程来执行任务,但总线程数应该小于maximumPoolSize。

4.如果线程池中的线程执行完了当前的任务,则会尝试从workQueue中取出第一个任务来执行。如果workQueue为空则会阻塞线程。

5.如果execute提交任务时,线程池中的线程数达到了maximumPoolSize,且workQueue已满,此时会执行拒绝策略来拒绝接受任务。

6.如果线程池中的线程数超过了corePoolSize,那么空闲时间超过keepAliveTime的线程会被销毁,但程池中线程个数会保持为corePoolSize。

7.如果线程池存在空闲的线程,并且设置了allowCoreThreadTimeOut为true。那么空闲时间超过keepAliveTime的线程都会被销毁。

任务队列-workQueue

参数workQueue指被提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于存放Runnable对象,根据队列功能分类,在ThreadPoolExecutor类的构造函数中可以使用以下几种BlockingQueue接口。

1.直接提交的队列:该功能由SynchronousQueue对象提供。SynchronousQueue是一个特殊的BlockingQueue。SynchronousQueue没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。如果使用SynchronousQueue,则提交的任务不会被真实的保存,而总是将新任务提交给线程执行,如果没有空闲线程,则尝试创建新的线程,如果进程数量已经达到最大值,则执行拒绝策略。因此,使用SynchronousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略。

2.有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue类实现。ArrayBlockingQueue类的构造函数必须带一个容量参数,表示该队列的最大容量:

public ArrayBlockingQueue(int capacity)

当使用有界的任务队列时,若有新的任务需要执行,如果线程池的实际线程数小于corePoolSize,则会优先创建新的线程,若大于corePoolSize,则会将新任务加入等待队列。若等待队列已满,无法加入。则在总线程数不大于maximumPoolSize的前提下,创建新的进程执行任务。若大于maximumPoolSize,则执行拒绝策略。可见,有界队列仅当在任务队列装满时,才可能将线程数提升到corePoolSize以上,换言之,除非系统非常繁忙,否则要确保核心线程数维持在corePoolSize。

3.无界的任务队列:无界任务队列可以通过LinkedBlockingQueue类实现。与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新的任务到来,系统的线程数小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize时,线程就不会继续增加了。若后续任由新的任务加入,而又没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。

4.优先任务队列:优先任务队列是带有执行优先级的任务队列。它通过PriorityBlockingQueue类实现,可以控制任务的执行先后顺序。他是一个特殊的无界队列。无论是有界队列ArrayBlockingQueue类,还是未指定大小的无界队列LinkedBlockingQueue类都是按照先进先出算法处理任务的。而PriorityBlockingQueue类则可以根据任务自身的优先级顺序先后执行,在确保系统性能的同时,也能有很好的质量保证(总是确保高优先级的任务先执行)。

拒绝策略-handler

ThreadPoolExecutor类的最后一个参数指定了拒绝策略。也就是当任务数量超过系统实际承载能力时,就要用到拒绝策略了。拒绝策略可以说是系统超负荷运行时的补救措施,通常由于压力太大而引起的,也就是线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列中也已经排满了,再也放不下新任务了。这时,我们就需要有一套机制合理的处理这个问题。

jdk在ThreadPoolExecutor类中定义了四种内置的拒绝策略,其均实现RejectedExecutionHandler接口。其四种拒绝策略为:

1.AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。

2.CallRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。

3.DiscardOldestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。

4.DiscardPolicy策略:该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,我觉得这可能是最好的一种方案了吧。

分析常见线程池

1.newFixedThreadPool()

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>());

}

newFixedThreadPool() 方法的实现,它返回了一个corePoolSize和maximumPoolSize大小一样的,并且使用了LinkedBlockingQueue任务队列的线程池。因为对固定大小的线程池而言,不存在线程数量的动态变化,因此corePoreSize和maximumPoolSize相等。同时,它使用无界队列存放无法立即执行的任务,当任务提交非常频繁的时候,该队列可能迅速膨胀,从而耗尽系统资源。

2.newSingleThreadExecutor()

public static ExecutorService newSingleThreadExecutor() {

return new FinalizableDelegatedExecutorService

(new ThreadPoolExecutor(1, 1,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>()));

}

newSingleThreadExecutor()方法返回的单线程线程池,是newFixedThreadPool()方法的一种退化,只是简单的将线程池线程数量设置为1。它的特点在于工作线程数目被限制为1,操作一个无界的工作队列,所以他能保证了所有任务都是被顺序执行,最多会有一个任务处于活动状态,并且不允许使用者改动线程池实例,因此可以避免其改变线程数目。

3.newCachedThreadPool()

public static ExecutorService newCachedThreadPool() {

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

new SynchronousQueue<Runnable>());

}

newCacheThreadPool()方法返回corePoolSize为0,maximumPoolSize无穷大的线程池,这意味着在没有任务时,该线程池内无线程,而当任务被提交时,该线程池会使用空闲的线程执行任务,若无空闲线程,则将任务加入SynchronousQueue队列,而SynchronousQueue队列时一种直接提交的队列,它总会迫使线程池增加新的线程执行任务。当任务执行完毕后,由于corePoolSize为0,因此空闲线程又会在指定时间内60秒内被回收。它是一种用来处理大量短时间工作任务的线程池,具有几个鲜明特点:它会试图缓存线程并重用,当无缓存线程可用时,就会创建新的工作线程;如果线程闲置的时间超过60秒,则被终止并移除缓存,长时间闲置时,这种线程池,不会消耗什么资源,其内部使用SynchronousQueue作为工作队列,无界线程池,可以进行自动线程回收。

线程池源码分析

1.线程池中的位运算

在向线程池提交任务时有两个比较中要的参数会决定任务的去向,这两个参数分别是线程池的状态和线程池中的线程数。在ThreadPoolExecutor内部使用了一个AtomicInteger类型的整数ctl来表示这两个参数,代码如下:

public class ThreadPoolExecutor extends AbstractExecutorService {

// Integer.SIZE = 32.所以 COUNT_BITS= 29

private static final int COUNT_BITS = Integer.SIZE - 3;

// 00011111 11111111 11111111 11111111 这个值可以表示线程池的最大线程容量

private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

// 将-1左移29位得到RUNNING状态的值

private static final int RUNNING = -1 << COUNT_BITS;

// 线程池运行状态和线程数

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int ctlOf(int rs, int wc) { return rs | wc; }

// ...

}

因为涉及多线程的操作,这里为了保证原子性,ctl参数使用了AtomicInteger类型,并且通过ctlOf方法来计算出了ctl的初始值。如果你不了解位运算大概很难理解上述代码的用意。

我们知道,int类型在Java中占用4byte的内存,一个byte占用8bit,所以Java中的int类型共占用32bit。对于这个32bit,我们可以进行高低位的拆分。做Android开发的同学应该都了解View测量流程中的MeasureSpec参数,这个参数将32bit的int拆分成了高2位和低30位,分别表示View的测量模式和测量值。而这里的ctl与MeasureSpec类似,ctl将32位的int拆分成了高3位和低29位,分别表示线程池的运行状态和线程池中的线程个数。

下面我们通过位运算来验证一下ctl是如何工作的,当然,如果你不理解这个位运算的过程对理解线程池的源码影响并不大,所以对以下验证内容不感兴趣的同学可以直接略过。

可以看到上述代码中RUNNING的值为-1左移29位,我们知道在计算机中负数是以其绝对值的补码来表示的,而补码是由反码加1得到。因此-1在计算机中存储形式为1的反码+1

1的原码:00000000 00000000 00000000 00000001

+

1的反码:11111111 11111111 11111111 11111110

---------------------------------------

-1存储:11111111 11111111 11111111 11111111

接下来对-1左移29位可以得到RUNNING的值为:

// 高三位表示线程状态,即高三位为111表示RUNNING

11100000 00000000 00000000 00000000

而AtomicInteger初始线程数量是0,因此ctlOf方法中的“|”运算如下:

RUNNING:11100000 00000000 00000000 00000000

|

线程数为0: 00000000 00000000 00000000 00000000

---------------------------------------

得到ctl: 11100000 00000000 00000000 00000000

通过RUNNING|0(线程数)即可得到ctl的初始值。同时还可以通过以下方法将ctl拆解成运行状态和线程数:

// 00011111 11111111 11111111 11111111

private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

// 获取线程池运行状态

private static int runStateOf(int c) { return c & ~COUNT_MASK; }

// 获取线程池中的线程数

private static int workerCountOf(int c) { return c & COUNT_MASK; }

假设此时线程池为RUNNING状态,且线程数为0,验证一下runStateOf是如何得到线程池的运行状态的:

COUNT_MASK: 00011111 11111111 11111111 11111111

~COUNT_MASK: 11100000 00000000 00000000 00000000

&

ctl: 11100000 00000000 00000000 00000000

----------------------------------------

RUNNING: 11100000 00000000 00000000 00000000

如果不理解上边的验证流程没有关系,只要知道通过runStateOf方法可以得到线程池的运行状态,通过workerCountOf可以得到线程池中的线程数即可。接下来我们进入线程池的源码的源码分析环节。

2.ThreadPoolExecutor的execute

向线程池提交任务的方法是execute方法,execute方法是ThreadPoolExecutor的核心方法,以此方法为入口来进行剖析,execute方法的代码如下:

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

// 获取ctl的值

int c = ctl.get();

// 1.线程数小于corePoolSize

if (workerCountOf(c) < corePoolSize) {

// 线程池中线程数小于核心线程数,则尝试创建核心线程执行任务

if (addWorker(command, true))

return;

c = ctl.get();

}

// 2.到此处说明线程池中线程数大于核心线程数或者创建线程失败

if (isRunning(c) && workQueue.offer(command)) {

// 如果线程是运行状态并且可以使用offer将任务加入阻塞队列未满,offer是非阻塞操作。

int recheck = ctl.get();

// 重新检查线程池状态,因为上次检测后线程池状态可能发生改变,如果非运行状态就移除任务并执行拒绝策略

if (! isRunning(recheck) && remove(command))

reject(command);

// 如果是运行状态,并且线程数是0,则创建线程

else if (workerCountOf(recheck) == 0)

// 线程数是0,则创建非核心线程,且不指定首次执行任务,这里的第二个参数其实没有实际意义

addWorker(null, false);

}

// 3.阻塞队列已满,创建非核心线程执行任务

else if (!addWorker(command, false))

// 如果失败,则执行拒绝策略

reject(command);

}

execute方法中的逻辑可以分为三部分:

1.如果线程池中的线程数小于核心线程,则直接调用addWorker方法创建新线程来执行任务。

2.如果线程池中的线程数大于核心线程数,则将任务添加到阻塞队列中,接着再次检验线程池的运行状态,因为上次检测过之后线程池状态有可能发生了变化,如果线程池关闭了,那么移除任务,执行拒绝策略。如果线程依然是运行状态,但是线程池中没有线程,那么就调用addWorker方法创建线程,注意此时传入任务参数是null,即不指定执行任务,因为任务已经加入了阻塞队列。创建完线程后从阻塞队列中取出任务执行。

3.如果第2步将任务添加到阻塞队列失败了,说明阻塞队列任务已满,那么则会执行第三步,即创建非核心线程来执行任务,如果非核心线程创建失败那么就执行拒绝策略。

接下来看下execute方法中创建线程的方法addWoker,addWoker方法承担了核心线程和非核心线程的创建,通过一个boolean参数core来区分是创建核心线程还是非核心线程。先来看addWorker方法前半部分的代码:

// 返回值表示是否成功创建了线程

private boolean addWorker(Runnable firstTask, boolean core) {

// 这里做了一个retry标记,相当于goto.

retry:

for (int c = ctl.get();;) {

// Check if queue empty only if necessary.

if (runStateAtLeast(c, SHUTDOWN)

&& (runStateAtLeast(c, STOP)

|| firstTask != null

|| workQueue.isEmpty()))

return false;

for (;;) {

// 根据core来确定创建最大线程数,超过最大值则创建线程失败,注意这里的最大值可能有s三个corePoolSize、maximumPoolSize和线程池线程的最大容量

if (workerCountOf(c)

>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))

return false;

// 通过CAS来将线程数+1,如果成功则跳出循环,执行下边逻辑

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get(); // Re-read ctl

// 线程池的状态发生了改变,退回retry重新执行

if (runStateAtLeast(c, SHUTDOWN))

continue retry;

}

}

// ...省略后半部分

return workerStarted;

}

这部分代码会通过是否创建核心线程来确定线程池中线程数的值,如果是创建核心线程,那么最大值不能超过corePoolSize,如果是创建非核心线程那么线程数不能超过maximumPoolSize,另外无论是创建核心线程还是非核心线程,最大线程数都不能超过线程池允许的最大线程数COUNT_MASK(有可能设置的maximumPoolSize大于COUNT_MASK)。如果线程数大于最大值就返回false,创建线程失败。

接下来通过CAS将线程数加1,如果成功那么就break retry结束无限循环,如果CAS失败了则就continue retry从新开始for循环,注意这里的retry不是Java的关键字,是一个可以任意命名的字符。

接下来,如果能继续向下执行则开始执行创建线程并执行任务的工作了,看下addWorker方法的后半部分代码:

private boolean addWorker(Runnable firstTask, boolean core) {

// ...省略前半部分

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

// 实例化一个Worker,内部封装了线程

w = new Worker(firstTask);

// 取出新建的线程

final Thread t = w.thread;

if (t != null) {

// 这里使用ReentrantLock加锁保证线程安全

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

int c = ctl.get();

// 拿到锁湖重新检查线程池状态,只有处于RUNNING状态或者处于SHUTDOWN并且firstTask==null时候才会创建线程

if (isRunning(c) ||

(runStateLessThan(c, STOP) && firstTask == null)) {

// 线程不是处于NEW状态,说明线程已经启动,抛出异常

if (t.getState() != Thread.State.NEW)

throw new IllegalThreadStateException();

// 将线程加入线程队列,这里的worker是一个HashSet

workers.add(w);

workerAdded = true;

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

}

} finally {

mainLock.unlock();

}

if (workerAdded) {

// 开启线程执行任务

t.start();

workerStarted = true;

}

}

} finally {

if (! workerStarted)

addWorkerFailed(w);

}

return workerStarted;

}

这部分逻辑其实比较容易理解,就是创建Worker并开启线程执行任务的过程,Worker是对线程的封装,创建的worker会被添加到ThreadPoolExecutor中的HashSet中。也就是线程池中的线程都维护在这个名为workers的HashSet中并被ThreadPoolExecutor所管理,HashSet中的线程可能处于正在工作的状态,也可能处于空闲状态,一旦达到指定的空闲时间,则会根据条件进行回收线程。

我们知道,线程调用start后就会开始执行线程的逻辑代码,执行完后线程的生命周期就结束了,那么线程池是如何保证Worker执行完任务后仍然不结束的呢?当线程空闲超时或者关闭线程池又是怎样进行线程回收的呢?这个实现逻辑其实就在Worker中。看下Worker的代码:

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable

{

// 执行任务的线程

final Thread thread;

// 初始化Worker时传进来的任务,可能为null,如果不空,则创建和立即执行这个task,对应核心线程创建的情况

Runnable firstTask;

Worker(Runnable firstTask) {

// 初始化时设置setate为-1

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

// 通过线程工程创建线程

this.thread = getThreadFactory().newThread(this);

}

// 线程的真正执行逻辑

public void run() {

runWorker(this);

}

// 判断线程是否是独占状态,如果不是意味着线程处于空闲状态

protected boolean isHeldExclusively() {

return getState() != 0;

}

// 获取锁

protected boolean tryAcquire(int unused) {

if (compareAndSetState(0, 1)) {

setExclusiveOwnerThread(Thread.currentThread());

return true;

}

return false;

}

// 释放锁

protected boolean tryRelease(int unused) {

setExclusiveOwnerThread(null);

setState(0);

return true;

}

// ...

}

Worker是位于ThreadPoolExecutor中的一个内部类,它继承了AQS,使用AQS来实现了独占锁的功能,但是并没支持可重入。这里使用不可重入的特性来表示线程的执行状态,即可以通过isHeldExclusively方法来判断,如果是独占状态,说明线程正在执行任务,如果非独占状态,说明线程处于空闲状态。

另外,Worker还实现了Runnable接口,因此它的执行逻辑就是在run方法中,run方法调用的是线程池中的runWorker(this)方法。任务的执行逻辑就在runWorker方法中,它的代码如下:

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

// 取出Worker中的任务,可能为空

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

// task不为null或者阻塞队列中有任务,通过循环不断的从阻塞队列中取出任务执行

while (task != null || (task = getTask()) != null) {

w.lock();

// ...

try {

// 任务执行前的hook点

beforeExecute(wt, task);

try {

// 执行任务

task.run();

// 任务执行后的hook点

afterExecute(task, null);

} catch (Throwable ex) {

afterExecute(task, ex);

throw ex;

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

// 超时没有取到任务,则回收空闲超时的线程

processWorkerExit(w, completedAbruptly);

}

}

可以看到,runWorker的核心逻辑就是不断通过getTask方法从阻塞队列中获取任务并执行.通过这样的方式实现了线程的复用,避免了创建线程。这里要注意的是这里是一个“生产者-消费者”模式,getTask是从阻塞队列中取任务,所以如果阻塞队列中没有任务的时候就会处于阻塞状态。getTask中通过判断是否要回收线程而设置了等待超时时间,如果阻塞队列中一直没有任务,那么在等待keepAliveTime时间后会抛出异常。最终会走到上述代码的finally方法中,意味着有线程空闲时间超过了keepAliveTime时间,那么调用processWorkerExit方法移除Worker。processWorkerExit方法中没有复杂难以理解的逻辑,这里就不再贴代码了。我们重点看下getTask中是如何处理的,代码如下:

private Runnable getTask() {

boolean timedOut = false; // Did the last poll() time out?

for (;;) {

int c = ctl.get();

// ...

// Flag1. 如果配置了allowCoreThreadTimeOut==true或者线程池中的线程数大于核心线程数,则timed为true,表示开启指定线程超时后被回收

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// ...

try {

// Flag2. 取出阻塞队列中的任务,注意如果timed为true,则会调用阻塞队列的poll方法,并设置超时时间为keepAliveTime,如果超时没有取到任务则会抛出异常。

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

workQueue.take();

if (r != null)

return r;

timedOut = true;

} catch (InterruptedException retry) {

timedOut = false;

}

}

}

重点看getTask是如何处理空闲超时的逻辑的。我们知道,回收线程的条件是线程大于核心线程数或者配置了allowCoreThreadTimeOut为true,当线程空闲超时的情况下就会回收线程。上述代码在Flag1处先判断了如果线程池中的线程数大于核心线程数,或者开启了allowCoreThreadTimeOut,那么就需要开启线程空闲超时回收。所有在Flag2处,timed为true的情况下调用了阻塞队列的poll方法,并传入了超时时间为keepAliveTime,poll方法是一个阻塞方法,在没有任务时候回进行阻塞。如果在keepAliveTime时间内,没有获取到任务,那么poll方法就会返回null,结束runWorker的循环。进而执行runWorker方法中回收线程的操作。

这里需要我们理解阻塞队列poll方法的使用,poll方法接受一个时间参数,是一个阻塞操作,在给定的时间内没有获取到数据就返回null。poll方法的核心代码如下:

while (count == 0) {

if (nanos <= 0L)

return null;

nanos = notEmpty.awaitNanos(nanos);

}

3.ThreadPoolExecutor的拒绝策略

线程池的拒绝策略,它是在reject方法中实现的。实现代码也非常简单,代码如下:

final void reject(Runnable command) {

handler.rejectedExecution(command, this);

}

通过调用handler的rejectedExecution方法实现。这里其实就是运用了策略模式,handler是一个RejectedExecutionHandler类型的成员变量,RejectedExecutionHandler是一个接口,只有一个rejectedExecution方法。在实例化线程池时构造方法中传入对应的拒绝策略实例即可。前文已经提到了Java提供的几种默认实现分别为DiscardPolicy、DiscardOldestPolicy、CallerRunsPolicy以及AbortPolicy。

以AbortPolicy直接抛出异常为例,来看下代码实现:

public static class AbortPolicy implements RejectedExecutionHandler {

public AbortPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

throw new RejectedExecutionException("Task " + r.toString() +

" rejected from " +

e.toString());

}

}

可以看到直接在rejectedExecution方法中抛出了RejectedExecutionException来拒绝任务。其他的几个策略实现也都比较简单,有兴趣可以自己查阅代码。

4.ThreadPoolExecutor的shutdown

调用shutdown方法后,会将线程池标记为SHUTDOWN状态,上边execute的源码可以看出,只有线程池是RUNNING状态才接受任务,因此被标记位SHUTDOWN后,再提交任务会被线程池拒绝。shutdown的代码如下:

public void shutdown() {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

//检查是否可以关闭线程

checkShutdownAccess();

// 将线程池状态置为SHUTDOWN状态

advanceRunState(SHUTDOWN);

// 尝试中断空闲线程

interruptIdleWorkers();

// 空方法,线程池关闭的hook点

onShutdown();

} finally {

mainLock.unlock();

}

tryTerminate();

}

private void interruptIdleWorkers() {

interruptIdleWorkers(false);

}

修改线程池为SHUTDOWN状态后,会调用interruptIdleWorkers去中断空闲线程线程,具体实现逻辑是在interruptIdleWorkers(boolean onlyOne)方法中,如下:

private void interruptIdleWorkers(boolean onlyOne) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

for (Worker w : workers) {

Thread t = w.thread;

// 尝试tryLock获取锁,如果拿锁成功说明线程是空闲状态

if (!t.isInterrupted() && w.tryLock()) {

try {

// 中断线程

t.interrupt();

} catch (SecurityException ignore) {

} finally {

w.unlock();

}

}

if (onlyOne)

break;

}

} finally {

mainLock.unlock();

}

}

shutdown的逻辑比较简单,里边做了两件比较重要的事情,即先将线程池状态修改为SHUTDOWN,接着遍历所有Worker,将空闲的Worker进行中断。

相关推荐

史上最全的浏览器兼容性问题和解决方案

微信ID:WEB_wysj(点击关注)◎◎◎◎◎◎◎◎◎一┳═┻︻▄(页底留言开放,欢迎来吐槽)●●●...

平面设计基础知识_平面设计基础知识实验收获与总结
平面设计基础知识_平面设计基础知识实验收获与总结

CSS构造颜色,背景与图像1.使用span更好的控制文本中局部区域的文本:文本;2.使用display属性提供区块转变:display:inline(是内联的...

2025-02-21 16:01 yuyutoo

写作排版简单三步就行-工具篇_作文排版模板

和我们工作中日常word排版内部交流不同,这篇教程介绍的写作排版主要是用于“微信公众号、头条号”网络展示。写作展现的是我的思考,排版是让写作在网格上更好地展现。在写作上花费时间是有累积复利优势的,在排...

写一个2048的游戏_2048小游戏功能实现

1.创建HTML文件1.打开一个文本编辑器,例如Notepad++、SublimeText、VisualStudioCode等。2.将以下HTML代码复制并粘贴到文本编辑器中:html...

今天你穿“短袖”了吗?青岛最高23℃!接下来几天气温更刺激……

  最近的天气暖和得让很多小伙伴们喊“热”!!!  昨天的气温到底升得有多高呢?你家有没有榜上有名?...

CSS不规则卡片,纯CSS制作优惠券样式,CSS实现锯齿样式

之前也有写过CSS优惠券样式《CSS3径向渐变实现优惠券波浪造型》,这次再来温习一遍,并且将更为详细的讲解,从布局到具体样式说明,最后定义CSS变量,自定义主题颜色。布局...

柠檬科技肖勃飞:大数据风控助力信用社会建设

...

你的自我界限够强大吗?_你的自我界限够强大吗英文

我的结果:A、该设立新的界限...

行内元素与块级元素,以及区别_行内元素和块级元素有什么区别?

行内元素与块级元素首先,CSS规范规定,每个元素都有display属性,确定该元素的类型,每个元素都有默认的display值,分别为块级(block)、行内(inline)。块级元素:(以下列举比较常...

让“成都速度”跑得潇潇洒洒,地上地下共享轨交繁华
让“成都速度”跑得潇潇洒洒,地上地下共享轨交繁华

去年的两会期间,习近平总书记在参加人大会议四川代表团审议时,对治蜀兴川提出了明确要求,指明了前行方向,并带来了“祝四川人民的生活越来越安逸”的美好祝福。又是一年...

2025-02-21 16:00 yuyutoo

今年国家综合性消防救援队伍计划招录消防员15000名

记者24日从应急管理部获悉,国家综合性消防救援队伍2023年消防员招录工作已正式启动。今年共计划招录消防员15000名,其中高校应届毕业生5000名、退役士兵5000名、社会青年5000名。本次招录的...

一起盘点最新 Chrome v133 的5大主流特性 ?

1.CSS的高级attr()方法CSSattr()函数是CSSLevel5中用于检索DOM元素的属性值并将其用于CSS属性值,类似于var()函数替换自定义属性值的方式。...

竞走团体世锦赛5月太仓举行 世界冠军杨家玉担任形象大使

style="text-align:center;"data-mce-style="text-align:...

学物理能做什么?_学物理能做什么 卢昌海

作者:曹则贤中国科学院物理研究所原标题:《物理学:ASourceofPowerforMan》在2006年中央电视台《对话》栏目的某期节目中,主持人问过我一个的问题:“学物理的人,如果日后不...

你不知道的关于这只眯眼兔的6个小秘密
你不知道的关于这只眯眼兔的6个小秘密

在你们忙着给熊本君做表情包的时候,要知道,最先在网络上引起轰动的可是这只脸上只有两条缝的兔子——兔斯基。今年,它更是迎来了自己的10岁生日。①关于德艺双馨“老艺...

2025-02-21 16:00 yuyutoo

取消回复欢迎 发表评论: