Java线程池ThreadPoolExecutor源码分析

摘要

Java线程池的处理流程 & ThreadPoolExecutor构造方法的参数意义 & 源码深入分析

线程池的处理流程

线程池在执行excute方法时,主要有以下四种情况

说明如下:

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(需要获得全局锁);
  2. 如果运行的线程等于或多于corePoolSize ,则将任务加入BlockingQueue;
  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(需要获得全局锁);
  4. 如果创建新线程将使当前运行的线程超出maxiumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor类,先看构造方法:

1
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

corePoolSize:核心池的大小。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于corePoolSize时就不再创建;如果调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从名字可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程并启动。

maximumPoolSize:线程池最大线程数,它表示在线程池中最多能创建多少个线程。如果使用了无界的任务队列这个参数就没有什么效果

keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0。

unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:天、小时、分钟、秒、毫秒、微妙、纳秒。

workQueue:任务队列,用于保存等待执行的任务的阻塞队列。线程池的排队策略与BlockingQueue有关。

  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO原则对元素进行排序;
  • LinkedBlockingQueue:是一个基于链表结构的有界阻塞队列,吞吐量通常要高于ArrayBlockingQueue(主要因为ArrayBlockingQueue进队出队使用一个锁,但LinkedBlockingQueuey中有两个锁)。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
  • SychronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用了这个队列。
  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

threadFactory:线程工厂,主要用来创建线程;

handler:表示当拒绝处理任务时的策略,有以下四种取值:

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
  • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

注意:

线程池这个核心线程数的用处就是来判断当前这个闲置线程是否应该回收,那么什么是闲置线程呢?

一个线程执行完了一个任务后,会去阻塞队列里面取新的任务,在取到任务之前它就是一个闲置的线程,取任务的方法有两个,一个是一直阻塞直到取出任务,另一个是一定时间内阻塞直到取出任务或者超时,如果超时这个线程就会被回收,我们知道核心线程一般不会被回收。
线程在取任务的时候,线程池会比较当前的有效线程数和允许的核心线程数,如果小于当前的核心线程数则使用第一个方法取任务,也就是没有超时回收,如果大于核心线程数,则使用第二个,一旦超时就回收,所以,并没有绝对的核心线程,只要这个线程出于闲置状态就有被回收的可能。
还有一种情况是设置了线程池允许核心线程超时回收,那么无论线程数有多少,统统会使用第二个方法取任务。

向线程池提交任务

可以使用两个方法向线程池提交任务,分别是execute()和submit()。

  1. execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功;
  2. submit()方法用于提交需要返回值的任务。线程池会返回一个Future类型的对象,通过这个Future对象可以判断任务是否执行成功,并且可以通过Future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

合理地配置线程池

要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:

  1. 任务的性质:CPU密集型任务(压缩和解压缩,这种需要CPU不停的计算的任务),IO密集型任务和混合型任务。
  2. 任务的优先级:高,中和低。
  3. 任务的执行时间:长,中和短。
  4. 任务的依赖性:是否依赖其他系统资源,如数据库连接。

任务性质不同的任务可以用不同规模的线程池分开处理:

CPU密集型任务配置尽可能少的线程,如配置Ncpu+1个线程的线程池。IO密集型任务则由于线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理:

它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

执行时间不同的任务:

可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。

依赖其他资源的任务:

依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。

线程池实现原理

线程池状态

参数ctl

这个变量是整个类的核心,AtomicInteger保证了对这个变量的操作是原子的,通过巧妙的操作,ThreadPoolExecutor用这一个变量保存了两个内容:

  1. 所有有效线程的数量
  2. 各个线程的状态(runState)

低29位存线程数,高3位存runState。

围绕ctl变量的一些操作

线程池状态

  • 当创建线程池后,初始时,线程池处于RUNNING状态;
  • 如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;
  • 如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;
  • 所有工作线程被销毁,任务缓存队列清空,线程池状态过渡到TIDYING(整理)状态,将执行terminated()方法;
  • terminated()方法执行完毕,线程池被设置为TERMINATED(终止)状态。

任务的执行

每个变量的作用都已经标明出来了,这里要重点解释一下corePoolSize、maximumPoolSize、largestPoolSize三个变量。举个简单的例子:

假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。
因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;
当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;
如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;
然后就将任务也分配给这4个临时工人做;
如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。
当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。

这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
也就是说corePoolSize就是线程池大小,maximumPoolSize是线程池的一种补救措施,即任务量突然过大时的一种补救措施。
largestPoolSize只是一个用来起记录作用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。

任务从提交到最终执行完毕经历的过程

在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以只需要研究execute()方法的实现原理即可。

addWorker()方法-创建线程


Worker是什么?

Worker是ThreadPoolExecutor的一个内部类,实现了Runnable接口。

上一节创建线程成功后调用t.start(),而这个线程又是Worker的成员变量,在构造方法中赋值。所以我们直接查看Worker的run()方法,Worker的run方法调用了runWorker(),这个方法里面就是取出任务执行的逻辑。

  • 第一次启动会执行初始化传进来的任务firstTask;
  • 然后会从workQueue中取任务执行,如果队列为空则等待keepAliveTime这么长时间。

这里可以知道当一个worker执行任务前或者执行完任务,到取出下一个任务期间,都是闲置状态可以被打断。

getTask()方法

processWorkerExit()方法:线程退出会执行这个方法做一些清理工作

tryTerminate()方法:终止线程池。这个方法在任何可能导致线程池终止的动作后执行:比如减少wokerCount或SHUTDOWN状态下从队列中移除任务

shutdown和shutdownNow

shutdown方法会将runState置为SHUTDOWN,会终止所有空闲的线程。

shutdownNow方法将runState置为STOP。和shutdown方法的区别,这个方法会终止所有的线程。

主要区别在于shutdown调用的是interruptIdleWorkers这个方法,而shutdownNow实际调用的是Worker类的interruptIfStarted方法。

只要调用了这两个关闭方法中的任意一个,isShutDown()方法就返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminated()方法会返回true。

如果任务不一定要执行完,可以调用shutdownNow()方法关闭线程池。

interruptIdleWorkers和interruptIfStarted

参考

  1. volatile关键字解析
  2. ThreadPoolExecutor源码学习笔记
  3. JAVA线程池的分析和使用
  4. Java中的线程池——ThreadPoolExecutor的原理
  5. Java并发编程:线程池的使用