Java 线程池的一些概念
【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这 样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors 返回的线程池对象的弊端如下:
FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
CachedThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
我们首先需要了解一些线程的概念
核心线程 (corePool): 线程池最终执行任务的角色是线程,核心线程是在线程中有新任务提交的时候,首先检查核心线程数,如果核心线程都在工作,而且数量也已经达到最大核心线程数,则不会继续新建核心线程,而会将任务放入等待队列。
等待队列(workQueue): 如果核心线程都已经在工作,线程池持续添加任务,就会放到等待队列中,直到核心线程执行完成后,再去等待队列中拉取任务继续执行。
非核心线程: 由 最大线程数 - 核心线程数
得到,如果等待队列满了,那么就会创建新的线程来执行。
线程活跃保持时间(keepAliveTime): 非核心线程如果空闲下来,超过这个时间内还未执行任务,则线程结束。
拒绝策略(RejectedExecutionHandler): 当等待队列已满,线程数也达到最大线程数时,线程池会根据饱和策略来执行后续操作,默认的策略是抛弃要加入的任务。
JDK 为我们提供了什么
创建线程池的方法
newFixedThreadPool
返回固定长度的线程池,线程池中的线程数量是固定的
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newCachedThreadPool
该方法返回一个根据实际情况来进行调整线程数量的线程池,空余线程存活时间是60s
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newSingleThreadExecutor
该方法返回一个只有一个线程的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
等待队列的类型
SynchronousQueue
同步队列 本身没有容量的,如果入队的时候没有空余线程则直接执行拒绝策略
ArrayBlockingQueue
基于数组来进行实现的,初始化时必须指定容量参数,如果线程池的线程数量没有超过 maximumPoolSize ,则创建新的线程执行任务,如果超过了 maximumPoolSize 则执行拒绝策略。
LinkedBlockingQueue
它内部是基于链表的形式,默认队列的长度是Integer.MAX_VALUE
,若后续仍有新的任务提交,而没有空闲的线程时,它会不断往队列中入队提交的任务,直到资源耗尽。OOM
PriorityBlockingQueue
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
它是一个无界队列,该队列可以根据 Comparator
顺序先后执行。OOM
ThreadPoolExecutor 内的一些方法
//高3位代表线程池的状态,低29位代表的是线程池的数量
//默认是RUNNING状态,线程池的数量为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程个数位数,表示的Integer中除去最高的3位之后剩下的位数表示线程池的个数
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池的线程的最大数量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//线程池的状态
//负数补码 计算机内运算都是补码运算
// 1 = 00000000000000000000000000001
// -1 反码 = 11111111111111111111111111110
// -1 反码 补码 = 11111111111111111111111111111
// -1= 11111111111111111111111111111
// -1 << 29
//11100000000000000000000000000000
//接受新任务并且处理阻塞队列里面任务
private static final int RUNNING = -1 << COUNT_BITS;
//00000000000000000000000000000000
//拒绝新任务但是处理阻塞队列的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
//00100000000000000000000000000000
//拒接新任务并且抛弃阻塞队列里面的任务,同时会中断正在处理的任务
private static final int STOP = 1 << COUNT_BITS;
//01000000000000000000000000000000
//所有任务都执行完(包括阻塞队列中的任务)后当线程池活动线程数为0,将要调用terminated方法。
private static final int TIDYING = 2 << COUNT_BITS;
//01100000000000000000000000000000
//终止状态,terminated方法调用完成以后的状态
private static final int TERMINATED = 3 << COUNT_BITS;
二进制运算
正码
1 | 0000 0000 0000 0000 0000 0000 0000 0001
-1 | 1000 0000 0000 0000 0000 0000 0000 0001
反码
1 | 0000 0000 0000 0000 0000 0000 0000 0001
-1 | 1111 1111 1111 1111 1111 1111 1111 1110
补码
1 | 0000 0000 0000 0000 0000 0000 0000 0001
-1 | 1111 1111 1111 1111 1111 1111 1111 1111
public void execute(Runnable command) {
// 判断提交的任务是不是为空,如果为空则抛出NullPointException异常
if (command == null)
throw new NullPointerException();
// 获取线程池的状态和线程池的数量
int c = ctl.get();
// 如果线程池的数量小于corePoolSize,则进行添加线程执行任务
if (workerCountOf(c) < corePoolSize) {
// 添加线程修改线程数量并且将command作为第一个任务进行处理
if (addWorker(command, true))
return;
// 获取最新的状态
c = ctl.get();
}
// 如果线程池的状态是RUNNING,将命令添加到队列中
if (isRunning(c) && workQueue.offer(command)) {
// 二次检查线程池状态和线程数量
int recheck = ctl.get();
// 线程不是RUNNING状态,从队列中移除当前任务,并且执行拒绝策略。
// 这里说明一点,只有RUNNING状态的线程池才会接受新的任务,其余状态全部拒绝。
if (! isRunning(recheck) && remove(command))
reject(command);
//如果线程池的线程数量为空时,代表线程池是空的,添加一个新的线程。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果队列是满的,或者是SynchronousQueue队列时,则直接添加新的线程执行任务,如果添加失败则进行拒绝
//可能线程池的线程数量大于maximumPoolSize则采取拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// 获取线程池的状态和线程池线程的数量
int c = ctl.get();
// 单独获取线程池的状态
int rs = runStateOf(c);
// 检查队列是否只在必要时为空
// 当线程池状态是STOP、TIDYING、TERMINATED时,这些状态的时候不需要进行线程的添加和启动操作,因为如果是上面的状态,其实线程池的线程正在进行销毁操作,意味着线程调用了shutdownNow等方法。
// 如果线程池状态为SHUTDOWN并且第一个任务不为空时,不接受新的任务,直接返回false,也就是说SHUTDOWN的状态,不会接受新任务,只会针对队列中未完成的任务进行操作。
// 当线线程池状态为SHUTDOWN并且队列为空时,直接返回不进行任务添加。
if (rs >= SHUTDOWN && // 线程池的状态是SHUTDOWN、STOP、TIDYING、TERMINATED
! (rs == SHUTDOWN && // 可以看做是rs!=SHUTDOWN,线程池状态为STOP、TIDYING、TERMINATED
firstTask == null && // 可以看做firstTask!=null,并且rs=SHUTDOWN
! workQueue.isEmpty())) // 可以看做rs=SHUTDOWN,并且workQueue.isEmpty()队列为空
return false;
//循环CAS增加线程池中线程的个数
for (;;) {
//获取线程池中线程个数
int wc = workerCountOf(c);
//如果线程池线程数量超过最大线程池数量,则直接返回
if (wc >= CAPACITY ||
//如果指定使用corePoolSize作为限制则使用corePoolSize,反之使用maximumPoolSize,最为工作线程最大线程线程数量,如果工作线程大于相应的线程数量则直接返回。
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//CAS增加线程池中线程的数量
if (compareAndIncrementWorkerCount(c))
//跳出增加线程池数量。
break retry;
//如果修改失败,则重新获取线程池的状态和线程数量
c = ctl.get(); // Re-read ctl
//如果最新的线程池状态和原有线程状态不一样时,则跳转到外层retry中,否则在内层循环重新进行CAS
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//工作线程是否开始启动标志
boolean workerStarted = false;
//工作线程添加到线程池成功与否标志
boolean workerAdded = false;
Worker w = null;
try {
//创建一个Worker对象
w = new Worker(firstTask);
//获取worker中的线程,这里线程是通过ThreadFactory线程工厂创建出来的,
final Thread t = w.thread;
//判断线程是否为空
if (t != null) {
//添加独占锁,为添加worker进行同步操作,防止其他线程同时进行execute方法。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池的状态
int rs = runStateOf(ctl.get());
//如果线程池状态为RUNNING或者是线程池状态为SHUTDOWN并且第一个任务为空时,当线程池状态为SHUTDOWN时,是不允许添加新任务的,所以他会从队列中获取任务。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加worker到集合中
workers.add(w);
int s = workers.size();
//跟踪最大的线程池数量
if (s > largestPoolSize)
largestPoolSize = s;
//添加worker成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果添加worker成功就启动任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//如果没有启动,w不为空就已出worker,并且线程池数量进行减少。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// 这里发现它是实现了AQS,是一个不可重入的独占锁模式
// 并且它还集成了Runable接口,实现了run方法。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
/** 执行任务的线程,通过ThreadFactory创建 */
final Thread thread;
/** 初始化第一个任务*/
Runnable firstTask;
/** 每个线程完成任务的数量 */
volatile long completedTasks;
/**
* 首先现将state值设置为-1,因为在AQS中state=0代表的是锁没有被占用,而且在线程池中shutdown方法会判断能否争抢到锁,如果可以获得锁则对线程进行中断操作,如果调用了shutdownNow它会判断state>=0会被中断。
* firstTask第一个任务,如果为空则会从队列中获取任务,后面runWorker中。
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** 委托调用外部的runWorker方法 */
public void run() {
runWorker(this);
}
//是否独占锁
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
//这里就是上面shutdownNow中调用的线程中断的方法,getState()>=0
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
可以看到Worker是一个实现了AQS的锁,它是一个不可重入的独占锁,并且他也实现了Runnable
接口,实现了run
方法,在构造函数中将AQS的state
设置为-1
,为了避免线程还没有进入runWorker
方法前,就调用了shutdown
或shutdownNow
方法,会被中断,设置为-1则不会被中断。后面我们看到run
方法,它调用的是ThreadPoolExecutor
的runWorker
方法,我们这里回想一下,在addWorker
方法中,添加worker
到HashSet<Worker>
中后,他会将workerAdded
设置为true,代表添加worker
成功。
整体的逻辑是先进行创建线程,线程将Worker
设置为执行程序,并将线程塞到Worker
中,然后再addWorker中将Worker中的线程取出来,进行启动操作,启动后他会调用Worker中的run方法,然后run方法中将调用ThreadPoolExecutor的runWorker,然后runWorker又会调用Worker中的任务firstTask,这个fistTask是要真正执行的任务,也是用户自己实现的代码逻辑。
final void runWorker(Worker w) {
//调用者也就是Worker中的线程
Thread wt = Thread.currentThread();
//获取Worker中的第一个任务
Runnable task = w.firstTask;
//将Worker中的任务清除代表执行了第一个任务了,后面如果再有任务就从队列中获取。
w.firstTask = null;
//这里还记的我们在new Worker的时候将AQS的state状态设置为-1,这里先进行解锁操作,将state设置为0
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//循环进行获取任务,如果第一个任务不为空,或者是如果第一个任务为空,从任务队列中获取任务,如果有任务则返回获取的任务信息,如果没有任务可以获取则进行阻塞,阻塞也分两种第一种是阻塞直到任务队列中有内容,第二种是阻塞队列一定时间之后还是没有任务就直接返回null。
while (task != null || (task = getTask()) != null) {
//先获取worker的独占锁,防止其他线程调用了shutdown方法。
w.lock();
// 如果线程池正在停止,确保线程是被中断的,如果没有则确保线程不被中断操作。
if ((runStateAtLeast(ctl.get(), STOP) || //如果线程池状态为STOP、TIDYING、TERMINATED直接拒绝任务中断当前线程
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行任务之前做一些操作,可进行自定义
beforeExecute(wt, task);
Throwable thrown = null;
try {
//运行任务在这里喽。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务之后做一些操作,可进行自定义
afterExecute(task, thrown);
}
} finally {
//将任务清空为了下次任务获取
task = null;
//统计当前Worker完成了多少任务
w.completedTasks++;
//独占锁释放
w.unlock();
}
}
completedAbruptly = false;
} finally {
//处理Worker的退出操作,执行清理工作。
processWorkerExit(w, completedAbruptly);
}
}
我们看到如果Worker是第一次被启动,它会从Worker中获取firstTask任务来执行,然后执行成功后,它会getTask()来从队列中获取任务,这个地方比较有意思,它是分情况进行获取任务的,我们都直到BlockingQueue中提供了几种从队列中获取的方法,这个getTask中使用了两种方式,第一种是使用poll进行获取队列中的信息,它采用的是过一点时间如果队列中仍没有任务时直接返回null,然后还有一个就是take方法,take方法是如果队列中没有任务则将当前线程进行阻塞,等待队列中有任务后,会通知等待的队列线程进行消费任务
private Runnable getTask() {
boolean timedOut = false; //poll获取超时
for (;;) {
//获取线程池的状态和线程数量
int c = ctl.get();
//获取线程池的状态
int rs = runStateOf(c);
//线程池状态大于等于SHUTDOWN
//1.线程池如果是大于STOP的话减少工作线程池数量
//2.如果线程池状态为SHUTDOW并且队列为空时,代表队列任务已经执行完,返回null,线程数量减少1
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取线程池数量。
int wc = workerCountOf(c);
//如果allowCoreThreadTimeOut为true,则空闲线程在一定时间未获得任务会清除
//或者如果线程数量大于corePoolSize的时候会进行清除空闲线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//1.如果线程池数量大于最大的线程池数量或者对(空余线程进行清除操作并且poll超时了,意思是队列中没有内容了,导致poll间隔一段时间后没有获取内容超时了。
//2.如果线程池的数量大于1或者是队列已经是空的
//总之意思就是当线程池的线程池数量大于corePoolSize,或指定了allowCoreThreadTimeOut为true,当队列中没有数据或者线程池数量大于1的情况下,尝试对线程池的数量进行减少操作,然后返回null,用于上一个方法进行清除操作。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果timed代表的是清除空闲线程的意思
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //等待一段时间如果没有获取到返回null。
workQueue.take(); //阻塞当前线程
//如果队列中获取到内容则返回
if (r != null)
return r;
//如果没有获取到超时了则设置timeOut状态
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
工作线程调用getTask从队列中进行获取任务。
如果指定了allowCoreThreadTimeOut或线程池线程数量大于corePoolSize则进行清除空闲多余的线程,调用阻塞队列的poll方法,在指定时间内如果没有获取到任务直接返回false。
如果线程池中线程池数量小于corePoolSize或者allowCoreThreadTimeOut为false默认值,则进行阻塞线程从队列中获取任务,直到队列有任务唤醒线程。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // 如果突然完成则调整线程数量
decrementWorkerCount(); // 减少线程数量1
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //获取锁,同时只有一个线程获得锁
try {
completedTaskCount += w.completedTasks; //统计整个线程池完成的数量
workers.remove(w); //将完成任务的worker从HashSet中移除
} finally {
mainLock.unlock(); //释放锁
}
//尝试设置线程池状态为TERMINATED
//1.如果线程池状态为SHUTDOWN并且线程池线程数量与工作队列为空时,修改状态。
//2.如果线程池状态为STOP并且线程池线程数量为空时,修改状态。
tryTerminate();
// 获取线程池的状态和线程池的数量
int c = ctl.get();
// 如果线程池的状态小于STOP,也就是SHUTDOWN或RUNNING状态
if (runStateLessThan(c, STOP)) {
//如果不是突然完成,也就是正常结束
if (!completedAbruptly) {
//如果指定allowCoreThreadTimeOut=true(默认false)则代表线程池中有空余线程时需要进行清理操作,否则线程池中的线程应该保持corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//这里判断如果线程池中队列为空并且线程数量最小为0时,将最小值调整为1,因为队列中还有任务没有完成需要增加队列,所以这里增加了一个线程。
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//如果当前线程数效益核心个数,就增加一个Worker
addWorker(null, false);
}
final void tryTerminate() {
for (;;) {
// 获取线程池的状态和线程池的数量组合状态
int c = ctl.get();
//这里单独下面进行分析,这里说明两个问题,需要反向来想这个问题。
//1.如果线程池状态STOP则不进入if语句
//2.如果线程池状态为SHUTDOWN并且工作队列为空时,不进入if语句
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//如果线程池数量不为空时,进行中断操作。
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//修改状态为TIDYING,并且将线程池的数量进行清空
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//执行一些逻辑,默认是空的
terminated();
} finally {
//修改状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//唤醒调用awaitTermination方法的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
public void shutdown() {
//获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//权限检查
checkShutdownAccess();
//设置线程池状态为SHUTDOWN,如果状态已经是大于等于SHUTDOWN则直接返回
advanceRunState(SHUTDOWN);
//如果线程没有设置中断标识并且线程没有运行则设置中断标识
interruptIdleWorkers();
//空的可以实现的内容
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试修改线程池状态为TERMINATED
tryTerminate();
}
首先对当前线程进行权限检测,查看是否设置了安全管理器,如果设置了则要看当前调用shutdown的线程有没有权限都关闭线程的权限,如果有权限还要看是否有中断工作现成的权限,如果没有权限则抛出SecurityException
或NullPointException
异常。
设置线程池状态为SHUTDOWN,如果状态已经是大于等于SHUTDOWN则直接返回
如果线程没有设置中断标识并且线程没有运行则设置中断标识
尝试修改线程池状态为TERMINATED
private void advanceRunState(int targetState) {
for (;;) {
//获取线程池状态和线程池的线程数量
int c = ctl.get();
if (runStateAtLeast(c, targetState) || //如果线程池的状态>=SHUTDOWN
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) //设置线程池状态为SHUTDOWN
//返回
break;
}
}
当线程池的状态>=SHUTDOWN,直接返回
如果线程池状态为RUNNING,设置线程池状态为SHUTDOWN,设置成功则返回
private void interruptIdleWorkers(boolean onlyOne) {
//获取全局锁,同时只能有一个线程能够调用shutdown方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历工作线程
for (Worker w : workers) {
Thread t = w.thread;
//如果当前线程没有设置中断标志并且可以获取Worker自己的锁
if (!t.isInterrupted() && w.tryLock()) {
try {
//设置中断标志
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
//执行一次,清理空闲线程。
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
我们看到当我们调用shutdown方法的时候,只是将空闲的线程给设置了中断标识,也就是活跃正在执行任务的线程并没有设置中断标识,直到将任务全部执行完后才会逐步清理线程操作
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//权限检查
checkShutdownAccess();
//设置线程池状态为STOP,如果状态已经是大于等于STOP则直接返回
advanceRunState(STOP);
//这里是和SHUTDOWN区别的地方,这里是强制进行中断操作
interruptWorkers();
//将为完成任务复制到list集合中
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试修改线程池状态为TERMINATED
tryTerminate();
return tasks;
}
shutdownNow`方法将线程池状态设置为`STOP`,而`shutdown`则将状态修改为`SHUTDOWN
shutdownNow
方法将工作任务进行中断操作,也就是说如果工作线程在工作也会被中断,而shutdown
则是先尝试获取锁如果获得锁成功则进行中断标志设置,也就是中断操作,如果没有获取到锁则等待进行完成后自动退出。
shutdownNow
方法返回未完成的任务列表。
拒绝策略
AbortPoliay
该策略会直接抛出异常,阻止系统正常工作
CallerRunsPolicy
只要线程池没有关闭线程池状态是RUNNING状态,该略略直接调用线程中运行当前被丢弃的任务
DiscardOledestPolicy
该策略将丢弃最老的一个请求,也就是即将被执行的第一个任务,并尝试再次提交任务
DiscardPolicy
该策略将丢弃最老的一个请求,也就是即将被执行的第一个任务,并尝试再次提交任务
-
主线程进行线程池的调用,线程池执行execute方法
-
线程池通过
addWorker
进行创建线程,并将线程放入到线程池中,这里我们看到第二步是将线程添加到核心线程中,其实线程池内部不分核心线程和非核心线程,只是根据corePoolSize和maximumPoolSize设置的大小来进行区分,因为超过corePoolSize的线程会被回收,至于回收那些线程,是根据线程获取任务的时候进行判断,当前线程池数量大于corePoolSize,或者指定了allowCoreThreadTimeOut
为true,则他等待一定时间后会返回,不会一直等待 -
当线程池的数量达到corePoolSize时,线程池首先会将任务添加到队列中
-
当队列中任务也达到了队列设置的最大值时,它会创建新的线程,注意的是此时的线程数量已经超过了corePoolSize,但是没有达到maximumPoolSize最大值。
-
当线程池的线程数量达到了maximumPoolSize,则会相应拒绝策略。
CountDownLatch aqs
在使用CountDownLatch的时候,是先创建CountDownLatch对象,然后在每次执行完一个任务后,就执行一次countDown()方法。直到通过getCount()获取到的值为0时才算执行完,如果count值不为0可通过await()方法让主线程进行等待,知道所有任务都执行完成,count的值被设为0。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);// 等待可中断的获取共享资源的方法
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted()) // 如果线程已经中断,直接抛出异常结束。
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)// 尝试获取共享资源,获取失败后,自旋入队列
doAcquireSharedInterruptibly(arg);// 可中断的入队列过程
}
CountDownLatch中内部类Sync的releaseShared()方法,是使用的AQS的releaseShared()方法。
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {// 尝试释放资源
doReleaseShared();// 释放资源成功后,唤醒节点。
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0) // 若state为0,说明已经不需要释放资源了,直接返回false。
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))// 真正的释放资源,是通过CAS的方式将state的值减1。
return nextc == 0;
}
}
demo
package com.zhiqi.thread;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author czq
*/
public class ThreadDemoTest {
@Test
public void fixedDemo() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(100);
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
int finalItem = i;
executorService.execute(() -> {
try {
this.biz(finalItem);
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
executorService.shutdown();
}
@Test
public void cachedDemo() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(100);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
int finalItem = i;
executorService.execute(() -> {
try {
this.biz(finalItem);
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
executorService.shutdown();
}
@Test
public void singleDemo() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(100);
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100; i++) {
int finalItem = i;
executorService.execute(() -> {
try {
this.biz(finalItem);
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
executorService.shutdown();
}
public void biz(int item) throws InterruptedException {
System.out.println(Thread.currentThread().getName() + "--" + item);
Thread.sleep(1000);
}
}