# 《手写线程池》线程池核心技术-第04节:Worker线程核心执行流程解析
作者:冰河
星球:http://m6z.cn/6aeFbs (opens new window)
博客:https://binghe.gitcode.host (opens new window)
文章汇总:https://binghe.gitcode.host/md/all/all.html (opens new window)
源码获取地址:https://t.zsxq.com/0dhvFs5oR (opens new window)
沉淀,成长,突破,帮助他人,成就自我。
- 本章难度:★★★☆☆
- 本章重点:简单介绍Worker线程的核心执行流程,重点掌握线程池中Worker线程的执行流程和涉及的方法调用。从全局视角掌握线程池的核心技术原理,学会融汇贯通,将线程池的编程思想灵活应用到自身实际项目中,提升实际项目的并发处理能力,以及自身的并发编程内功功底。
大家好,我是冰河~~
线程池中的Worker类从源码角度来看,继承了AQS类,实现了Runnable接口,本质上就是在线程池中执行任务的线程。在ThreadPoolExecutor类中的addWorker(Runnable, boolean)方法中,使用CAS安全的更新线程的数量之后,接下来就是创建新的Worker线程执行任务。本节,就对线程池中Worker线程的核心流程进行简单的介绍。
# 一、前言
上一节中,主要从源码角度分析了线程池的核心流程,包括:线程池正确运行的核心流程,线程池执行任务的核心流程。在介绍线程池正确运行的核心流程时,重点介绍了ThreadPoolExecutor类的重要属性和内部类。同时,在介绍线程池的核心执行流程中,重点解析了线程中的execute()方法、addWorker()方法、addWorkerFailed()方法和拒绝策略的执行流程。
# 二、本节诉求
简单介绍Worker线程的核心执行流程,重点掌握线程池中Worker线程的执行流程和涉及的方法调用。从全局视角掌握线程池的核心技术原理,学会融汇贯通,将线程池的编程思想灵活应用到自身实际项目中,提升实际项目的并发处理能力,以及自身的并发编程内功功底。
# 三、核心流程解析
Worker线程本质上就是在线程池中执行任务的线程,所有进入线程池的任务最终都会在Worker线程中被执行。接下来,就重要解析Worker线程的核心执行流程。
# 3.1 Worker线程的核心流程概述
在ThreadPoolExecutor类中,Worker线程的核心流程主要涉及到runWorker()方法、getTask()方法、beforeExecute()方法、afterExecute()方法、processWorkerExit()方法、tryTerminate()方法和terminated()方法的执行逻辑。理解了这几个方法的核心流程,也就掌握了Worker线程执行的核心流程。
# 3.2 runWorker()方法解析
Worker类中的run()方法调用了ThreadPoolExecutor类中的runWorker(Worker)方法执行任务。runWorker(Worker)方法的源码如下所示。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//释放锁,将state设置为0,允许中断任务
w.unlock();
boolean completedAbruptly = true;
try {
//如果任务不为空,或者从任务队列中获取的任务不为空,则执行while循环
while (task != null || (task = getTask()) != null) {
//如果任务不为空,则获取Worker工作线程的独占锁
w.lock();
//如果线程已经停止,或者中断线程后线程终止并且没有成功中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//中断线程
wt.interrupt();
try {
//执行任务前执行的逻辑
beforeExecute(wt, task);
Throwable thrown = null;
try {
//调用Runable接口的run方法执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务后执行的逻辑
afterExecute(task, thrown);
}
} finally {
//任务执行完成后,将其设置为空
task = null;
//完成的任务数量加1
w.completedTasks++;
//释放工作线程获得的锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
//执行退出Worker线程的逻辑
processWorkerExit(w, completedAbruptly);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
为了更好的理解runWorker(Worker)方法,对runWorker(Worker)方法的核心流程进行拆解,如下所示。
(1)获取当前线程的句柄和工作线程中的任务,并将工作线程中的任务设置为空,执行unlock方法释放锁,将state状态设置为0,此时可以中断工作线程,源码如下所示。
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//释放锁,将state设置为0,允许中断任务的执行
w.unlock();
2
3
4
5
(2)在while循环中进行判断,如果任务不为空,或者从任务队列中获取的任务不为空,则执行while循环,否则,调用processWorkerExit(Worker, boolean)方法退出Worker工作线程。源码如下所示。
while (task != null || (task = getTask()) != null)
(3)如果满足while的循环条件,首先获取工作线程内部的独占锁,并执行一系列的逻辑判断来检测是否需要中断当前线程的执行,源码如下所示。
//如果任务不为空,则获取Worker工作线程的独占锁
w.lock();
//如果线程已经停止,或者中断线程后线程终止并且没有成功中断线程
//大家好好理解下这个逻辑
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
//中断线程
wt.interrupt();
2
3
4
5
6
7
8
9
10
(4)调用执行任务前执行的逻辑,源码如下所示。
//执行任务前执行的逻辑
beforeExecute(wt, task);
2
(5)调用Runable接口的run方法执行任务,源码如下所示。
//调用Runable接口的run方法执行任务
task.run();
2
(6)调用执行任务后执行的逻辑,源码如下所示。
//执行任务后执行的逻辑
afterExecute(task, thrown);
2
(7)将完成的任务设置为空,完成的任务数量加1并释放工作线程的锁,源码如下所示。
//任务执行完成后,将其设置为空
task = null;
//完成的任务数量加1
w.completedTasks++;
//释放工作线程获得的锁
w.unlock();
2
3
4
5
6
(8)退出Worker线程的执行,源码如下所示。
//执行退出Worker线程的逻辑
processWorkerExit(w, completedAbruptly);
2
从代码分析上可以看到,当从Worker线程中获取的任务为空时,会调用getTask()方法从任务队列中获取任务。
# 查看完整文章
加入冰河技术 (opens new window)知识星球,解锁完整技术文章、小册、视频与完整代码