线程池源码初理解

阅读: 评论:0

线程池源码初理解

线程池源码初理解

线程池

线程池提交的两种方式:submit(提交runnable或callable),execute(提交runnable,提交callable需要实现futuretask)

线程池的状态:

ctl前三位是状态信息,后29位是当前线程池线程数量

running(111)<shutdown(000)<stop(001)<tidying(010)<terminated(011)

execute()

1.如果当前线程数,少于corePoolSize,则addWorker

2.如果当前线程数大于或者等于corePoolSize,就加入阻塞队列

3.加入成功(成功说明当前是running),并且再次检查当前状态为running的话,就检查工作线程,工作线程为空则执行addworker。

4.加入成功,但是当前状态不为running的话,就执行remove,并且执行拒绝策略

5.加入阻塞队列失败,就执行非核心线程的addworker,失败则执行拒绝策略

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ();if (workerCountOf(c) < corePoolSize) {//判断当前线程与核心线程数//小于的话worker 则addworkerif (addWorker(command, true))return;c = ();}//大于或者等于则增加进入阻塞队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ();//再次检查是否为running(可能给其他线程修改),非running就remove,成功的话执行拒绝策略if (! isRunning(recheck) && remove(command))reject(command);//加入成功,并且是running状态,检查工作线程是否为空else if (workerCountOf(recheck) == 0)//空的话则执行addworkeraddWorker(null, false);}//加入阻塞队列失败,则为非核心线程addworkerelse if (!addWorker(command, false))//失败的话执行拒绝策略reject(command);}

addWorker()

1.判断是否可以添加线程

a.处于running状态可以添加

b.处于shutdown状态,但是firsttask为null并且阻塞队列不为空

2.添加线程,利用cas添加

3.真正的添加线程逻辑,看注释

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ();int rs = runStateOf(c);// Check if queue empty only if necessary.//判断是否可以添加线程//1.running状态可以添加线程//2.处于shutdown状态,但是firstTask为null且阻塞队列不为空if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;//添加线程数量for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//利用cas添加,失败的话要么就是有竞争,要么就是状态给修改了。//竞争的话就再来一遍自旋,状态给修改就去外面再判断一下if (compareAndIncrementWorkerCount(c))break retry;c = ();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;//用线程工厂创建的threadif (t != null) {//加锁 线程池全局锁final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = ());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//running或者shutdown&firstTask等于null//workers为线程池 一个全局变量workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//如果线程池增加成功,则运行线程if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)//处理添加进入线程池失败的逻辑,将一开始自旋里面的cas加一操作减一,并且将worker清理出线程池addWorkerFailed(w);}return workerStarted;
}

调用完addWorker后会执行,worker的run方法

runWorker()

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//task为当前worker里面的firstTask或者从阻塞队列里面拿的taskwhile (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt//检查当前线程池状态是否>=stop,是的话则执行中断//或者一开始不大于等于stop,但是后面线程池给外部线程给shutdown或者shutdownNow了,中断标志为true了//执行||后面的逻辑,再次将次线程设置为中断if (((), STOP) ||(Thread.interrupted() &&(), STOP))) &&!wt.isInterrupted())wt.interrupt();//如果到这里都不符合中断逻辑,则执行task的run方法try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {//执行完run,就解锁并且重新拿新的tasktask = null;wpletedTasks++;w.unlock();}}completedAbruptly = false;} finally {//1pletedAbruptly为false,说明全部task执行完毕,执行一种退出逻辑//2pletedAbruptly为默认值true,说明有异常,执行另一种退出逻辑processWorkerExit(w, completedAbruptly);}
}

getTak()

返回null的情况:1.允许回收核心线程或者线程数量大于核心线程数量,有机会返回null(boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;)

机会就是当前worker大于一个或者当前worker只有一个并且阻塞队列里面为空,返回null

2.判断线程池状态,线程池大于shutdown状态返回null

或者线程池为shutdown状态,并且阻塞队列为空返回null

其他情况则返回runnable

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 为true则执行poll(允许超时),为false则执行take(阻塞)

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}

processWorkerExit()

这个方法完成的就是线程退出逻辑,有两种情况

**1.**如果当前线程为异常退出状态,则将worker数量减一,并且重新addworker一次

**2.**如果当前线程为正常退出状态,则记录当前worker完成的tasks数量并且从workers里面退出(需要加锁完成),并且如果此时线程池状态为running或者shutdown状态,则判断当前线程池线程(worker)数量是否小于最小值min,小于的话则addworker,否则就return

min取决于是否允许回收核心线程,允许则为1,不允许则为核心线程数

private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += ve(w);} finally {mainLock.unlock();}tryTerminate();int c = ();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);}
}

tryTerminate()

final void tryTerminate() {for (;;) {int c = ();//running,tidying,termination状态,直接return或者 shutdown状态但是阻塞队列不会空,直接returnif (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;//stop或者阻塞队列为空的shutdown执行到这里 判断线程数量是否为0,不为0则中断当前线程if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}//如果为0,则说明线程池此时只剩执行到这的最后一个线程了,此线程负责将状态设为termination,并通知调用了waitTermanation的线程。final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (ctlpareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}
}

本文发布于:2024-02-01 23:46:10,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170680750039958.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:线程   源码
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23