线程池ThreadPoolExecutor分析

线程池生命周期及状态

RUNNING: 接收新任务,也处理队列任务

SHUTDOWN: 不接收新任务,但是处理队列任务

STOP: 不接收新任务,不处理队列任务并且中断正在进行的任务

TIDYING: 所有任务都终止,workerCount为零,线程状态转为TIDYING,将运行terminated()方法

TERMINATED: terminated() 方法执行完毕

任务执行流程

execute(Runnable command) –> addWorker(Runnable firstTask,boolean core) –>runWorker(Worker w)

  • 线程池的工作线程由Worker类实现,通过ReentrantLock锁,把worker实例插入到HashSet中,并启动Worker中的线程。

  • 而Worker类的构造方法实现可以看出:threadFactory创建线程thread时,将worker实例本身this作为参数传入,执行start()时,本质是调用Worker的run()方法,run() 又调用了外部的runWorker()方法。

  • firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源

几个问题

  1. 线程池是什么时候创建线程的?

  2. 任务runnable task是先放到core到maxThread之间的线程,还是先放到队列?

  3. 队列中的任务是什么时候取出来的?

  4. 什么时候会触发reject策略?

  5. core到maxThread之间的线程什么时候会die?

  6. task抛出异常,线程池中这个work thread还能运行其他任务吗?

线程池执行流程

创建线程池都会调用核心构造方法ThreadPoolExecutor

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

corePoolSize:核心线程最大数量,通俗点来讲就是,线程池中常驻线程的最大数量
maximumPoolSize:线程池中运行最大线程数(包括核心线程和非核心线程)
keepAliveTime:线程池中空闲线程(仅适用于非核心线程)所能存活的最长时间
unit:存活时间单位,与keepAliveTime搭配使用
workQueue:存放任务的阻塞队列
handler:线程池饱和策略

在new ThreadPoolExecutor()创建线程池时,Thread对象并没有初始化. 这里仅仅指定了初始参数;

###线程是在什么时候创建的呢?
当执行execute时,更具当前条件(核心线程数与队列情况)判断,创建work线程。
###执行execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//当前线程数小于corePoolSize, 则创建新的核心worker对象
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//如果当前线程数大于corePoolSize, 并偿试放入队列 workQueue.offer(command) , 放入成功后等待线程池调度
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))//偿试放入队列 workQueue.offer(command) 失败, 增加一个非core的线程
reject(command);
}

第一个if, 判断如果当前线程数小于corePoolSize, 则创建新的核心worker对象(Worker中指向Thread对象,保持引用,保证不会被GC回收)

第二个if, 判断如果当前线程数大于corePoolSize, 并偿试放入队列 workQueue.offer(command) , 放入成功后等待线程池调度【见后面的getTask()】
第三个if, 偿试放入队列 workQueue.offer(command) 失败, 增加一个非core的线程


###增加任务addWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//此处new work()创建线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);//线程池this的worker容器,保持对线程的引用
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//处理结束后,启动线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

###new work()创建线程
work构造方法

1
2
3
4
5
6
7
8
9
      Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);//创建线程
}

public void run() {
runWorker(this);//线程创建后,运行
}

runWorker 启动线程

线程启动后,又做了那些事情呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
  final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//通过getTask()获取任务
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//执行结束后的清理工作
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//如果抛出异常后执行以下方法
processWorkerExit(w, completedAbruptly);
}
}

没抛异常时,会一直在while(task !=null || (task = getTask())!=null)中执行;
如果有异常时,再看一下processWorkerExit()

processWorkerExit 异常处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

从此可以看出有异常时 旧的worker会被删除(GC回收),再创建新的Worker, 即有异常时 旧worker不可能再执行新的任务

##结论

###执行流程判断
当提交一个新任务,线程池的处理流程如下:

  • 判断线程池中核心线程数是否已达阈值corePoolSize,若否,则创建一个新核心线程执行任务
  • 若核心线程数已达阈值corePoolSize,判断阻塞队列workQueue是否已满,若未满,则将新任务添加进阻塞队列
  • 若满,再判断,线程池中线程数是否达到阈值maximumPoolSize,若否,则新建一个非核心线程执行任务。若达到阈值,则执行线程池饱和策略。

线程池饱和策略分为一下几种:

  • AbortPolicy:直接抛出一个异常,默认策略
  • DiscardPolicy: 直接丢弃任务
  • DiscardOldestPolicy:抛弃下一个将要被执行的任务(最旧任务)
  • CallerRunsPolicy:主线程中执行任务

流程:


QA

Q. 线程池是什么时候创建线程的?
A.任务提交的时候

Q.任务runnable task是先放到core到maxThread之间的线程,还是先放到队列?
A.先放队列!!!

Q. 队列中的任务是什么时候取出来的?
A. worker中 runWorker() 一个任务完成后,会取下一个任务

Q. 什么时候会触发reject策略?
A.队列满并且maxthread也满了, 还有新任务,默认策略是reject

Q. core到maxThread之间的线程什么时候会die?
A. 没有任务时,或者抛异常时。core线程也会die的,core到maxThread之间的线程有可能会晋升到core线程区间,core max只是个计数,线程并不是创建后就固定在一个区间了

Q. task抛出异常,线程池中这个work thread还能运行其他任务吗?
A. 不能。 但是会创建新的线程, 新线程可以运行其他task。

参阅:


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!