Java concurrency线程池之线程池原理(二)_动力节点Java学院整理

2025-05-29 0 57

线程池示例

在分析线程池之前,先看一个简单的线程池示例。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32
import java.util.concurrent.Executors;

import java.util.concurrent.ExecutorService;

public class ThreadPoolDemo1 {

public static void main(String[] args) {

// 创建一个可重用固定线程数的线程池

ExecutorService pool = Executors.newFixedThreadPool(2);

// 创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口

Thread ta = new MyThread();

Thread tb = new MyThread();

Thread tc = new MyThread();

Thread td = new MyThread();

Thread te = new MyThread();

// 将线程放入池中进行执行

pool.execute(ta);

pool.execute(tb);

pool.execute(tc);

pool.execute(td);

pool.execute(te);

// 关闭线程池

pool.shutdown();

}

}

class MyThread extends Thread {

@Override

public void run() {

System.out.println(Thread.currentThread().getName()+ " is running.");

}

}

运行结果:

?

1

2

3

4

5
pool-1-thread-1 is running.

pool-1-thread-2 is running.

pool-1-thread-1 is running.

pool-1-thread-2 is running.

pool-1-thread-1 is running.

示例中,包括了线程池的创建,将任务添加到线程池中,关闭线程池这3个主要的步骤。稍后,我们会从这3个方面来分析ThreadPoolExecutor。

线程池源码分析

(一) 创建“线程池

下面以newFixedThreadPool()介绍线程池的创建过程。

1. newFixedThreadPool()

newFixedThreadPool()在Executors.java中定义,源码如下:

?

1

2

3

4

5
public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>());

}

说明:newFixedThreadPool(int nThreads)的作用是创建一个线程池线程池的容量是nThreads。
newFixedThreadPool()在调用ThreadPoolExecutor()时,会传递一个LinkedBlockingQueue()对象,而LinkedBlockingQueue是单向链表实现的阻塞队列。在线程池中,就是通过该阻塞队列来实现"当线程池中任务数量超过允许的任务数量时,部分任务会阻塞等待"。
关于LinkedBlockingQueue的实现细节,读者可以参考"Java多线程系列–“JUC集合”08之 LinkedBlockingQueue"。

2. ThreadPoolExecutor()

ThreadPoolExecutor()在ThreadPoolExecutor.java中定义,源码如下:

?

1

2

3

4

5

6

7

8
public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue) {

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

Executors.defaultThreadFactory(), defaultHandler);

}

说明:该函数实际上是调用ThreadPoolExecutor的另外一个构造函数。该函数的源码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26
public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler) {

if (corePoolSize < 0 ||

maximumPoolSize <= 0 ||

maximumPoolSize < corePoolSize ||

keepAliveTime < 0)

throw new IllegalArgumentException();

if (workQueue == null || threadFactory == null || handler == null)

throw new NullPointerException();

// 核心池大小

this.corePoolSize = corePoolSize;

// 最大池大小

this.maximumPoolSize = maximumPoolSize;

// 线程池的等待队列

this.workQueue = workQueue;

this.keepAliveTime = unit.toNanos(keepAliveTime);

// 线程工厂对象

this.threadFactory = threadFactory;

// 拒绝策略的句柄

this.handler = handler;

}

说明:在ThreadPoolExecutor()的构造函数中,进行的是初始化工作。
corePoolSize, maximumPoolSize, unit, keepAliveTime和workQueue这些变量的值是已知的,它们都是通过newFixedThreadPool()传递而来。下面看看threadFactory和handler对象。

2.1 ThreadFactory

线程池中的ThreadFactory是一个线程工厂,线程池创建线程都是通过线程工厂对象(threadFactory)来完成的。
上面所说的threadFactory对象,是通过 Executors.defaultThreadFactory()返回的。Executors.java中的defaultThreadFactory()源码如下:

?

1

2

3
public static ThreadFactory defaultThreadFactory() {

return new DefaultThreadFactory();

}

defaultThreadFactory()返回DefaultThreadFactory对象。Executors.java中的DefaultThreadFactory()源码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30
static class DefaultThreadFactory implements ThreadFactory {

private static final AtomicInteger poolNumber = new AtomicInteger(1);

private final ThreadGroup group;

private final AtomicInteger threadNumber = new AtomicInteger(1);

private final String namePrefix;

DefaultThreadFactory() {

SecurityManager s = System.getSecurityManager();

group = (s != null) ? s.getThreadGroup() :

Thread.currentThread().getThreadGroup();

namePrefix = "pool-" +

poolNumber.getAndIncrement() +

"-thread-";

}

// 提供创建线程的API。

public Thread newThread(Runnable r) {

// 线程对应的任务是Runnable对象r

Thread t = new Thread(group, r,

namePrefix + threadNumber.getAndIncrement(),

0);

// 设为“非守护线程”

if (t.isDaemon())

t.setDaemon(false);

// 将优先级设为“Thread.NORM_PRIORITY”

if (t.getPriority() != Thread.NORM_PRIORITY)

t.setPriority(Thread.NORM_PRIORITY);

return t;

}

}

说明:ThreadFactory的作用就是提供创建线程的功能的线程工厂。
它是通过newThread()提供创建线程功能的,下面简单说说newThread()。newThread()创建的线程对应的任务是Runnable对象,它创建的线程都是“非守护线程”而且“线程优先级都是Thread.NORM_PRIORITY”。

2.2 RejectedExecutionHandler

handler是ThreadPoolExecutor中拒绝策略的处理句柄。所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。
线程池默认会采用的是defaultHandler策略,即AbortPolicy策略。在AbortPolicy策略中,线程池拒绝任务时会抛出异常!
defaultHandler的定义如下:

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
AbortPolicy的源码如下:

?

1

2

3

4

5

6

7

8

9

10
public static class AbortPolicy implements RejectedExecutionHandler {

public AbortPolicy() { }

// 抛出异常

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

throw new RejectedExecutionException("Task " + r.toString() +

" rejected from " +

e.toString());

}

}

(二) 添加任务到“线程池

1. execute()

execute()定义在ThreadPoolExecutor.java中,源码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29
public void execute(Runnable command) {

// 如果任务为null,则抛出异常。

if (command == null)

throw new NullPointerException();

// 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息

int c = ctl.get();

// 当线程池中的任务数量 < "核心池大小"时,即线程池中少于corePoolSize个任务。

// 则通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true))

return;

c = ctl.get();

}

// 当线程池中的任务数量 >= "核心池大小"时,

// 而且,"线程池处于允许状态"时,则尝试将任务添加到阻塞队列中。

if (isRunning(c) && workQueue.offer(command)) {

// 再次确认“线程池状态”,若线程池异常终止了,则删除任务;然后通过reject()执行相应的拒绝策略的内容。

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

// 否则,如果"线程池中任务数量"为0,则通过addWorker(null, false)尝试新建一个线程,新建线程对应的任务为null。

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

// 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。

// 如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。

else if (!addWorker(command, false))

reject(command);

}

说明:execute()的作用是将任务添加到线程池中执行。它会分为3种情况进行处理:
情况1 — 如果"线程池中任务数量" < "核心池大小"时,即线程池中少于corePoolSize个任务;此时就新建一个线程,并将该任务添加到线程中进行执行。
情况2 — 如果"线程池中任务数量" >= "核心池大小",并且"线程池是允许状态";此时,则将任务添加到阻塞队列中阻塞等待。在该情况下,会再次确认"线程池的状态",如果"第2次读到的线程池状态"和"第1次读到的线程池状态"不同,则从阻塞队列中删除该任务。
情况3 — 非以上两种情况。在这种情况下,尝试新建一个线程,并将该任务添加到线程中进行执行。如果执行失败,则通过reject()拒绝该任务。

2. addWorker()

addWorker()的源码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81
private boolean addWorker(Runnable firstTask, boolean core) {

retry:

// 更新"线程池状态和计数"标记,即更新ctl。

for (;;) {

// 获取ctl对应的int值。该int值保存了"线程池中任务的数量"和"线程池状态"信息

int c = ctl.get();

// 获取线程池状态。

int rs = runStateOf(c);

// 有效性检查

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

return false;

for (;;) {

// 获取线程池中任务的数量。

int wc = workerCountOf(c);

// 如果"线程池中任务的数量"超过限制,则返回false。

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

// 通过CAS函数将c的值+1。操作失败的话,则退出循环。

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get(); // Re-read ctl

// 检查"线程池状态",如果与之前的状态不同,则从retry重新开始。

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

// 添加任务到线程池,并启动任务所在的线程。

try {

final ReentrantLock mainLock = this.mainLock;

// 新建Worker,并且指定firstTask为Worker的第一个任务。

w = new Worker(firstTask);

// 获取Worker对应的线程。

final Thread t = w.thread;

if (t != null) {

// 获取锁

mainLock.lock();

try {

int c = ctl.get();

int rs = runStateOf(c);

// 再次确认"线程池状态"

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

// 将Worker对象(w)添加到"线程池的Worker集合(workers)"中

workers.add(w);

// 更新largestPoolSize

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

} finally {

// 释放锁

mainLock.unlock();

}

// 如果"成功将任务添加到线程池"中,则启动任务所在的线程。

if (workerAdded) {

t.start();

workerStarted = true;

}

}

} finally {

if (! workerStarted)

addWorkerFailed(w);

}

// 返回任务是否启动。

return workerStarted;

}

说明:
addWorker(Runnable firstTask, boolean core) 的作用是将任务(firstTask)添加到线程池中,并启动该任务。
core为true的话,则以corePoolSize为界限,若"线程池中已有任务数量>=corePoolSize",则返回false;core为false的话,则以maximumPoolSize为界限,若"线程池中已有任务数量>=maximumPoolSize",则返回false。
addWorker()会先通过for循环不断尝试更新ctl状态,ctl记录了"线程池中任务数量和线程池状态"。
更新成功之后,再通过try模块来将任务添加到线程池中,并启动任务所在的线程。

从addWorker()中,我们能清晰的发现:线程池在添加任务时,会创建任务对应的Worker对象;而一个Workder对象包含一个Thread对象。(01) 通过将Worker对象添加到"线程的workers集合"中,从而实现将任务添加到线程池中。 (02) 通过启动Worker对应的Thread线程,则执行该任务。

3. submit()

补充说明一点,submit()实际上也是通过调用execute()实现的,源码如下:

?

1

2

3

4

5

6
public Future<?> submit(Runnable task) {

if (task == null) throw new NullPointerException();

RunnableFuture<Void> ftask = newTaskFor(task, null);

execute(ftask);

return ftask;

}

(三) 关闭“线程池

shutdown()的源码如下:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20
public void shutdown() {

final ReentrantLock mainLock = this.mainLock;

// 获取锁

mainLock.lock();

try {

// 检查终止线程池的“线程”是否有权限。

checkShutdownAccess();

// 设置线程池的状态为关闭状态。

advanceRunState(SHUTDOWN);

// 中断线程池中空闲的线程。

interruptIdleWorkers();

// 钩子函数,在ThreadPoolExecutor中没有任何动作。

onShutdown(); // hook for ScheduledThreadPoolExecutor

} finally {

// 释放锁

mainLock.unlock();

}

// 尝试终止线程池

tryTerminate();

}

说明:shutdown()的作用是关闭线程池

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持快网idc。

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 Java concurrency线程池之线程池原理(二)_动力节点Java学院整理 https://www.kuaiidc.com/115936.html

相关文章

发表评论
暂无评论