首页 技术 正文
技术 2022年11月15日
0 收藏 695 点赞 2,764 浏览 3559 个字

线程池 ThreadPoolExecutor 类的源码解析:

1:数据结构的分析:

private final BlockingQueue<Runnable> workQueue;  //用于存储未被线程池处理的任务

private final ReentrantLock mainLock = new ReentrantLock(); //维护一个lock来保证线程安全

private final HashSet<Worker> workers = new HashSet<Worker>();

private final Condition termination = mainLock.newCondition(); //通过Condition来进行线程之间的通信

private volatile ThreadFactory threadFactory;  //维护一个线程工厂,用于生成线程

private volatile RejectedExecutionHandler handler; //拒绝任务的句柄对象

private volatile int corePoolSize;  //核心池大小

private volatile int maximumPoolSize;  //最大池的大小

2:构造方法:将用户自定义的参数赋值给成员变量,并且使用默认的线程工厂,默认的任务拒绝

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue) {

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

Executors.defaultThreadFactory(), defaultHandler);

}

3:下面看看  execute 执行方法的原理:

public void execute(Runnable command) {

if (command == null)   //判空操作

throw new NullPointerException();

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true))

return;

c = ctl.get();

}

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command))

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

else if (!addWorker(command, false))

reject(command);

}

下面主要分析下addWorker这个方法:

//以2种场景为例进行分析:

//1第一次添加任务到线程池中  2 第6次添加  corePoolsize=5

private boolean addWorker(Runnable firstTask, boolean core) { // firstTask就是Runnable任务 core=true

retry:

for (;;) {  //相当于while循环

int c = ctl.get(); //初始换后状态位Running  线程池中任务数为0

int rs = runStateOf(c); //获取线程池的状态 这里是RUNNING

// Check if queue empty only if necessary.

if (rs >= SHUTDOWN &&

! (rs == SHUTDOWN &&

firstTask == null &&

! workQueue.isEmpty()))

return false;

for (;;) { //相当于while循环

int wc = workerCountOf(c); //获取worker数量 wc=0

//判断线程池中线程是否超出了限制,若超出了则返回false

if (wc >= CAPACITY ||

wc >= (core ? corePoolSize : maximumPoolSize))

return false;

//CAS 将c的值+1  操作失败退出循环

if (compareAndIncrementWorkerCount(c))

break retry;

c = ctl.get();  // Re-read ctl

if (runStateOf(c) != rs)

continue retry;

// else CAS failed due to workerCount change; retry inner loop

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

//将传进来的Runnable 任务构建成Worker对象

w = new Worker(firstTask);

//获取worker对应的线程

final Thread t = w.thread;

if (t != null) {

//获取锁

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int rs = runStateOf(ctl.get());//获取线程池状态 这里是RUNNING

//判断线程池状态

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

//将工作任务添加到workers集合中

workers.add(w);

int s = workers.size();

if (s > largestPoolSize)  //初始化时largestPoolSize=0

largestPoolSize = s; //赋值  largestPoolSize=1

workerAdded = true;  //这里true表明添加成功的标识,后面执行该线程

}

} finally {

mainLock.unlock();

}

if (workerAdded) {

t.start();  //线程开始执行

workerStarted = true;

}

}

} finally {

if (! workerStarted)

addWorkerFailed(w);

}

return workerStarted;

}

下面看看构建 Worker对象的逻辑:

Worker(Runnable firstTask) {

setState(-1); // 设置AQS state=-1

this.firstTask = firstTask; //任务赋值给firstTask全局变量

this.thread = getThreadFactory().newThread(this); //从线程工程创建新的线程

}

当第6次添加的时候

// command Runnable 任务

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

int c = ctl.get();  //获取线程池的状态 这里是Running

if (workerCountOf(c) < corePoolSize) {  //这里c的任务数是5 corePoolSize=5

if (addWorker(command, true))

return;

c = ctl.get();

}

// 判断线程池的状态是不是Ruuning  将任务添加到队列中

if (isRunning(c) && workQueue.offer(command)) {//进入这个逻辑

int recheck = ctl.get(); //再次获取ctl对象

if (! isRunning(recheck) && remove(command)) //线程池状态不是Running 或者任务被移除则局拒绝任务

reject(command);

else if (workerCountOf(recheck) == 0)

addWorker(null, false); //添加空任务到worker中

}

else if (!addWorker(command, false))

reject(command);

}

到这里线程池的分析已经结束了;

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,104
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,580
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,428
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,200
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,835
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,918