Java 线程池的一些概念
【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这 样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors 返回的线程池对象的弊端如下:
FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
CachedThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
我们首先需要了解一些线程的概念
核心线程 (corePool): 线程池最终执行任务的角色是线程,核心线程是在线程中有新任务提交的时候,首先检查核心线程数,如果核心线程都在工作,而且数量也已经达到最大核心线程数,则不会继续新建核心线程,而会将任务放入等待队列。
等待队列(workQueue): 如果核心线程都已经在工作,线程池持续添加任务,就会放到等待队列中,直到核心线程执行完成后,再去等待队列中拉取任务继续执行。
非核心线程: 由 最大线程数 - 核心线程数
得到,如果等待队列满了,那么就会创建新的线程来执行。
线程活跃保持时间(keepAliveTime): 非核心线程如果空闲下来,超过这个时间内还未执行任务,则线程结束。
拒绝策略(RejectedExecutionHandler): 当等待队列已满,线程数也达到最大线程数时,线程池会根据饱和策略来执行后续操作,默认的策略是抛弃要加入的任务。
JDK 为我们提供了什么
创建线程池的方法
newFixedThreadPool
返回固定长度的线程池,线程池中的线程数量是固定的
newCachedThreadPool
该方法返回一个根据实际情况来进行调整线程数量的线程池,空余线程存活时间是60s
newSingleThreadExecutor
该方法返回一个只有一个线程的线程池
等待队列的类型
SynchronousQueue
同步队列 本身没有容量的,如果入队的时候没有空余线程则直接执行拒绝策略
ArrayBlockingQueue
基于数组来进行实现的,初始化时必须指定容量参数,如果线程池的线程数量没有超过 maximumPoolSize ,则创建新的线程执行任务,如果超过了 maximumPoolSize 则执行拒绝策略。
LinkedBlockingQueue
它内部是基于链表的形式,默认队列的长度是Integer.MAX_VALUE
,若后续仍有新的任务提交,而没有空闲的线程时,它会不断往队列中入队提交的任务,直到资源耗尽。OOM
PriorityBlockingQueue
它是一个无界队列,该队列可以根据 Comparator
顺序先后执行。OOM
ThreadPoolExecutor 内的一些方法
二进制运算
可以看到Worker是一个实现了AQS的锁,它是一个不可重入的独占锁,并且他也实现了Runnable
接口,实现了run
方法,在构造函数中将AQS的state
设置为-1
,为了避免线程还没有进入runWorker
方法前,就调用了shutdown
或shutdownNow
方法,会被中断,设置为-1则不会被中断。后面我们看到run
方法,它调用的是ThreadPoolExecutor
的runWorker
方法,我们这里回想一下,在addWorker
方法中,添加worker
到HashSet<Worker>
中后,他会将workerAdded
设置为true,代表添加worker
成功。
整体的逻辑是先进行创建线程,线程将Worker
设置为执行程序,并将线程塞到Worker
中,然后再addWorker中将Worker中的线程取出来,进行启动操作,启动后他会调用Worker中的run方法,然后run方法中将调用ThreadPoolExecutor的runWorker,然后runWorker又会调用Worker中的任务firstTask,这个fistTask是要真正执行的任务,也是用户自己实现的代码逻辑。
我们看到如果Worker是第一次被启动,它会从Worker中获取firstTask任务来执行,然后执行成功后,它会getTask()来从队列中获取任务,这个地方比较有意思,它是分情况进行获取任务的,我们都直到BlockingQueue中提供了几种从队列中获取的方法,这个getTask中使用了两种方式,第一种是使用poll进行获取队列中的信息,它采用的是过一点时间如果队列中仍没有任务时直接返回null,然后还有一个就是take方法,take方法是如果队列中没有任务则将当前线程进行阻塞,等待队列中有任务后,会通知等待的队列线程进行消费任务
工作线程调用getTask从队列中进行获取任务。
如果指定了allowCoreThreadTimeOut或线程池线程数量大于corePoolSize则进行清除空闲多余的线程,调用阻塞队列的poll方法,在指定时间内如果没有获取到任务直接返回false。
如果线程池中线程池数量小于corePoolSize或者allowCoreThreadTimeOut为false默认值,则进行阻塞线程从队列中获取任务,直到队列有任务唤醒线程。
首先对当前线程进行权限检测,查看是否设置了安全管理器,如果设置了则要看当前调用shutdown的线程有没有权限都关闭线程的权限,如果有权限还要看是否有中断工作现成的权限,如果没有权限则抛出SecurityException
或NullPointException
异常。
设置线程池状态为SHUTDOWN,如果状态已经是大于等于SHUTDOWN则直接返回
如果线程没有设置中断标识并且线程没有运行则设置中断标识
尝试修改线程池状态为TERMINATED
当线程池的状态>=SHUTDOWN,直接返回
如果线程池状态为RUNNING,设置线程池状态为SHUTDOWN,设置成功则返回
我们看到当我们调用shutdown方法的时候,只是将空闲的线程给设置了中断标识,也就是活跃正在执行任务的线程并没有设置中断标识,直到将任务全部执行完后才会逐步清理线程操作
shutdownNow`方法将线程池状态设置为`STOP`,而`shutdown`则将状态修改为`SHUTDOWN
shutdownNow
方法将工作任务进行中断操作,也就是说如果工作线程在工作也会被中断,而shutdown
则是先尝试获取锁如果获得锁成功则进行中断标志设置,也就是中断操作,如果没有获取到锁则等待进行完成后自动退出。
shutdownNow
方法返回未完成的任务列表。
拒绝策略
AbortPoliay
该策略会直接抛出异常,阻止系统正常工作
CallerRunsPolicy
只要线程池没有关闭线程池状态是RUNNING状态,该略略直接调用线程中运行当前被丢弃的任务
DiscardOledestPolicy
该策略将丢弃最老的一个请求,也就是即将被执行的第一个任务,并尝试再次提交任务
DiscardPolicy
该策略将丢弃最老的一个请求,也就是即将被执行的第一个任务,并尝试再次提交任务
-
主线程进行线程池的调用,线程池执行execute方法
-
线程池通过
addWorker
进行创建线程,并将线程放入到线程池中,这里我们看到第二步是将线程添加到核心线程中,其实线程池内部不分核心线程和非核心线程,只是根据corePoolSize和maximumPoolSize设置的大小来进行区分,因为超过corePoolSize的线程会被回收,至于回收那些线程,是根据线程获取任务的时候进行判断,当前线程池数量大于corePoolSize,或者指定了allowCoreThreadTimeOut
为true,则他等待一定时间后会返回,不会一直等待 -
当线程池的数量达到corePoolSize时,线程池首先会将任务添加到队列中
-
当队列中任务也达到了队列设置的最大值时,它会创建新的线程,注意的是此时的线程数量已经超过了corePoolSize,但是没有达到maximumPoolSize最大值。
-
当线程池的线程数量达到了maximumPoolSize,则会相应拒绝策略。
CountDownLatch aqs
在使用CountDownLatch的时候,是先创建CountDownLatch对象,然后在每次执行完一个任务后,就执行一次countDown()方法。直到通过getCount()获取到的值为0时才算执行完,如果count值不为0可通过await()方法让主线程进行等待,知道所有任务都执行完成,count的值被设为0。
CountDownLatch中内部类Sync的releaseShared()方法,是使用的AQS的releaseShared()方法。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {// 尝试释放资源
doReleaseShared();// 释放资源成功后,唤醒节点。
return true;
}
return false;
}
demo