Java 线程池框架核心代码分析

前言

Runnable
ThreadPoolExecutor

下面的分析基于JDK1.7

生命周期

ThreadPoolExecutorCAPACITY
terminated()terminated()

状态转换图

ThreadPoolExecutor
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

线程池模型

核心参数

corePoolSizeallowCoreThreadTimeOutmaximumPoolSizeCAPACITYkeepAliveTimeworkQueueRejectExecutionHandler
CAPACITY(2^29)-1

四种模型

CachedThreadPoolFixedThreadPoolSingleThreadPoolScheduledThreadPool

执行任务 execute

核心逻辑:

corePoolSizeaddWorker(command, true)corePoolSizeRUNNINGaddWorker(command, false)

从上面的分析可以总结出线程池运行的四个阶段:

poolSize < corePoolSizepoolSize == corePoolSizepoolSize == corePoolSizepoolSize < maxPoolSizepoolSize == maxPoolSize

拒绝策略

RejectedExecutionHandler
AbortPolicyCallerRunsPolicyDiscardPolicyDiscardOldersPolicy

线程池中的 Worker

WorkerAbstractQueuedSynchronizerRunnableWorkerrunWorker(Worker w)workersmainLock
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();

核心函数 runWorker

run
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
while (task != null || (task = getTask()) != null) {
w.lock();
beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);
w.unlock();
}
processWorkerExit(w, completedAbruptly);
}
getTask()beforeExecute(wt, task)ThreadPoolExecutorafterExecute(task, thrown);

获取任务 getTask

线程池内部的任务队列是一个阻塞队列,具体实现在构造时传入。

private final BlockingQueue<Runnable> workQueue;
getTask()nullworker
STOPSHUTDOWN
timed
timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();

在以下两种情况下等待任务会超时:

allowCoreThreadTimeOut(true)wc > corePoolSize
BlockingQueue

总结

ThreadPoolExecutorExecutorsThreadPoolExecutorThreadPoolExecutorbeforeExecuteafterExecuteCallerRunsPolicy