# 《手写线程池》线程池核心技术-第04节:Worker线程核心执行流程解析

作者:冰河
星球:http://m6z.cn/6aeFbs (opens new window)
博客:https://binghe.gitcode.host (opens new window)
文章汇总:https://binghe.gitcode.host/md/all/all.html (opens new window)
源码获取地址:https://t.zsxq.com/0dhvFs5oR (opens new window)

沉淀,成长,突破,帮助他人,成就自我。

  • 本章难度:★★★☆☆
  • 本章重点:简单介绍Worker线程的核心执行流程,重点掌握线程池中Worker线程的执行流程和涉及的方法调用。从全局视角掌握线程池的核心技术原理,学会融汇贯通,将线程池的编程思想灵活应用到自身实际项目中,提升实际项目的并发处理能力,以及自身的并发编程内功功底。

大家好,我是冰河~~

线程池中的Worker类从源码角度来看,继承了AQS类,实现了Runnable接口,本质上就是在线程池中执行任务的线程。在ThreadPoolExecutor类中的addWorker(Runnable, boolean)方法中,使用CAS安全的更新线程的数量之后,接下来就是创建新的Worker线程执行任务。本节,就对线程池中Worker线程的核心流程进行简单的介绍。

# 一、前言

上一节中,主要从源码角度分析了线程池的核心流程,包括:线程池正确运行的核心流程,线程池执行任务的核心流程。在介绍线程池正确运行的核心流程时,重点介绍了ThreadPoolExecutor类的重要属性和内部类。同时,在介绍线程池的核心执行流程中,重点解析了线程中的execute()方法、addWorker()方法、addWorkerFailed()方法和拒绝策略的执行流程。

# 二、本节诉求

简单介绍Worker线程的核心执行流程,重点掌握线程池中Worker线程的执行流程和涉及的方法调用。从全局视角掌握线程池的核心技术原理,学会融汇贯通,将线程池的编程思想灵活应用到自身实际项目中,提升实际项目的并发处理能力,以及自身的并发编程内功功底。

# 三、核心流程解析

Worker线程本质上就是在线程池中执行任务的线程,所有进入线程池的任务最终都会在Worker线程中被执行。接下来,就重要解析Worker线程的核心执行流程。

# 3.1 Worker线程的核心流程概述

在ThreadPoolExecutor类中,Worker线程的核心流程主要涉及到runWorker()方法、getTask()方法、beforeExecute()方法、afterExecute()方法、processWorkerExit()方法、tryTerminate()方法和terminated()方法的执行逻辑。理解了这几个方法的核心流程,也就掌握了Worker线程执行的核心流程。

# 3.2 runWorker()方法解析

Worker类中的run()方法调用了ThreadPoolExecutor类中的runWorker(Worker)方法执行任务。runWorker(Worker)方法的源码如下所示。

final void runWorker(Worker w) {
	Thread wt = Thread.currentThread();
	Runnable task = w.firstTask;
	w.firstTask = null;
	//释放锁,将state设置为0,允许中断任务
	w.unlock();
	boolean completedAbruptly = true;
	try {
		//如果任务不为空,或者从任务队列中获取的任务不为空,则执行while循环
		while (task != null || (task = getTask()) != null) {
			//如果任务不为空,则获取Worker工作线程的独占锁
			w.lock();
			//如果线程已经停止,或者中断线程后线程终止并且没有成功中断线程
			if ((runStateAtLeast(ctl.get(), STOP) ||
				 (Thread.interrupted() &&
				  runStateAtLeast(ctl.get(), STOP))) &&
				!wt.isInterrupted())
				//中断线程
				wt.interrupt();
			try {
				//执行任务前执行的逻辑
				beforeExecute(wt, task);
				Throwable thrown = null;
				try {
					//调用Runable接口的run方法执行任务
					task.run();
				} catch (RuntimeException x) {
					thrown = x; throw x;
				} catch (Error x) {
					thrown = x; throw x;
				} catch (Throwable x) {
					thrown = x; throw new Error(x);
				} finally {
					//执行任务后执行的逻辑
					afterExecute(task, thrown);
				}
			} finally {
				//任务执行完成后,将其设置为空
				task = null;
				//完成的任务数量加1
				w.completedTasks++;
				//释放工作线程获得的锁
				w.unlock();
			}
		}
		completedAbruptly = false;
	} finally {
		//执行退出Worker线程的逻辑
		processWorkerExit(w, completedAbruptly);
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

为了更好的理解runWorker(Worker)方法,对runWorker(Worker)方法的核心流程进行拆解,如下所示。

(1)获取当前线程的句柄和工作线程中的任务,并将工作线程中的任务设置为空,执行unlock方法释放锁,将state状态设置为0,此时可以中断工作线程,源码如下所示。

Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//释放锁,将state设置为0,允许中断任务的执行
w.unlock();
1
2
3
4
5

(2)在while循环中进行判断,如果任务不为空,或者从任务队列中获取的任务不为空,则执行while循环,否则,调用processWorkerExit(Worker, boolean)方法退出Worker工作线程。源码如下所示。

while (task != null || (task = getTask()) != null)
1

(3)如果满足while的循环条件,首先获取工作线程内部的独占锁,并执行一系列的逻辑判断来检测是否需要中断当前线程的执行,源码如下所示。

//如果任务不为空,则获取Worker工作线程的独占锁
w.lock();
//如果线程已经停止,或者中断线程后线程终止并且没有成功中断线程
//大家好好理解下这个逻辑
if ((runStateAtLeast(ctl.get(), STOP) ||
	 (Thread.interrupted() &&
	  runStateAtLeast(ctl.get(), STOP))) &&
	!wt.isInterrupted())
	//中断线程
	wt.interrupt();
1
2
3
4
5
6
7
8
9
10

(4)调用执行任务前执行的逻辑,源码如下所示。

//执行任务前执行的逻辑
beforeExecute(wt, task);
1
2

(5)调用Runable接口的run方法执行任务,源码如下所示。

//调用Runable接口的run方法执行任务
task.run();
1
2

(6)调用执行任务后执行的逻辑,源码如下所示。

//执行任务后执行的逻辑
afterExecute(task, thrown);
1
2

(7)将完成的任务设置为空,完成的任务数量加1并释放工作线程的锁,源码如下所示。

//任务执行完成后,将其设置为空
task = null;
//完成的任务数量加1
w.completedTasks++;
//释放工作线程获得的锁
w.unlock();
1
2
3
4
5
6

(8)退出Worker线程的执行,源码如下所示。

//执行退出Worker线程的逻辑
processWorkerExit(w, completedAbruptly);
1
2

从代码分析上可以看到,当从Worker线程中获取的任务为空时,会调用getTask()方法从任务队列中获取任务。

# 查看完整文章

加入冰河技术 (opens new window)知识星球,解锁完整技术文章、小册、视频与完整代码