Java 线程池的一些概念

    63

【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这 样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明:Executors 返回的线程池对象的弊端如下:
FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
CachedThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

我们首先需要了解一些线程的概念

核心线程 (corePool): 线程池最终执行任务的角色是线程,核心线程是在线程中有新任务提交的时候,首先检查核心线程数,如果核心线程都在工作,而且数量也已经达到最大核心线程数,则不会继续新建核心线程,而会将任务放入等待队列。

等待队列(workQueue): 如果核心线程都已经在工作,线程池持续添加任务,就会放到等待队列中,直到核心线程执行完成后,再去等待队列中拉取任务继续执行。

非核心线程:最大线程数 - 核心线程数 得到,如果等待队列满了,那么就会创建新的线程来执行。

线程活跃保持时间(keepAliveTime): 非核心线程如果空闲下来,超过这个时间内还未执行任务,则线程结束。

拒绝策略(RejectedExecutionHandler): 当等待队列已满,线程数也达到最大线程数时,线程池会根据饱和策略来执行后续操作,默认的策略是抛弃要加入的任务。

image-20210128113539648

JDK 为我们提供了什么

创建线程池的方法

newFixedThreadPool

返回固定长度的线程池,线程池中的线程数量是固定的

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}
newCachedThreadPool

该方法返回一个根据实际情况来进行调整线程数量的线程池,空余线程存活时间是60s

public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>());
}
newSingleThreadExecutor

该方法返回一个只有一个线程的线程池

public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
}

等待队列的类型

SynchronousQueue

同步队列 本身没有容量的,如果入队的时候没有空余线程则直接执行拒绝策略

ArrayBlockingQueue

基于数组来进行实现的,初始化时必须指定容量参数,如果线程池的线程数量没有超过 maximumPoolSize ,则创建新的线程执行任务,如果超过了 maximumPoolSize 则执行拒绝策略。

LinkedBlockingQueue

它内部是基于链表的形式,默认队列的长度是Integer.MAX_VALUE ,若后续仍有新的任务提交,而没有空闲的线程时,它会不断往队列中入队提交的任务,直到资源耗尽。OOM

PriorityBlockingQueue
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
  if (initialCapacity < 1)
    throw new IllegalArgumentException();
  this.lock = new ReentrantLock();
  this.notEmpty = lock.newCondition();
  this.comparator = comparator;
  this.queue = new Object[initialCapacity];
}

它是一个无界队列,该队列可以根据 Comparator 顺序先后执行。OOM

ThreadPoolExecutor 内的一些方法
//高3位代表线程池的状态,低29位代表的是线程池的数量
//默认是RUNNING状态,线程池的数量为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程个数位数,表示的Integer中除去最高的3位之后剩下的位数表示线程池的个数
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池的线程的最大数量
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

//线程池的状态
//负数补码 计算机内运算都是补码运算
// 1 = 00000000000000000000000000001
// -1 反码 = 11111111111111111111111111110
// -1 反码 补码 = 11111111111111111111111111111
// -1= 11111111111111111111111111111
// -1 << 29

//11100000000000000000000000000000
//接受新任务并且处理阻塞队列里面任务
private static final int RUNNING    = -1 << COUNT_BITS;
//00000000000000000000000000000000
//拒绝新任务但是处理阻塞队列的任务
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//00100000000000000000000000000000
//拒接新任务并且抛弃阻塞队列里面的任务,同时会中断正在处理的任务
private static final int STOP       =  1 << COUNT_BITS;
//01000000000000000000000000000000
//所有任务都执行完(包括阻塞队列中的任务)后当线程池活动线程数为0,将要调用terminated方法。
private static final int TIDYING    =  2 << COUNT_BITS;
//01100000000000000000000000000000
//终止状态,terminated方法调用完成以后的状态
private static final int TERMINATED =  3 << COUNT_BITS;

二进制运算

正码
1  | 0000 0000 0000 0000 0000 0000 0000 0001
-1 | 1000 0000 0000 0000 0000 0000 0000 0001

反码
1  | 0000 0000 0000 0000 0000 0000 0000 0001
-1 | 1111 1111 1111 1111 1111 1111 1111 1110
  
补码
1  | 0000 0000 0000 0000 0000 0000 0000 0001
-1 | 1111 1111 1111 1111 1111 1111 1111 1111
public void execute(Runnable command) {
  // 判断提交的任务是不是为空,如果为空则抛出NullPointException异常
  if (command == null)
    throw new NullPointerException();
  // 获取线程池的状态和线程池的数量
  int c = ctl.get();
  // 如果线程池的数量小于corePoolSize,则进行添加线程执行任务
  if (workerCountOf(c) < corePoolSize) {
    // 添加线程修改线程数量并且将command作为第一个任务进行处理
    if (addWorker(command, true))
      return;
    // 获取最新的状态
    c = ctl.get();
  }
  // 如果线程池的状态是RUNNING,将命令添加到队列中
  if (isRunning(c) && workQueue.offer(command)) {
    // 二次检查线程池状态和线程数量
    int recheck = ctl.get();
    // 线程不是RUNNING状态,从队列中移除当前任务,并且执行拒绝策略。
    // 这里说明一点,只有RUNNING状态的线程池才会接受新的任务,其余状态全部拒绝。
    if (! isRunning(recheck) && remove(command))
      reject(command);
    //如果线程池的线程数量为空时,代表线程池是空的,添加一个新的线程。
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
  }
  //如果队列是满的,或者是SynchronousQueue队列时,则直接添加新的线程执行任务,如果添加失败则进行拒绝
  //可能线程池的线程数量大于maximumPoolSize则采取拒绝策略。
  else if (!addWorker(command, false))
    reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
  retry:
  for (;;) {
    // 获取线程池的状态和线程池线程的数量
    int c = ctl.get();
    // 单独获取线程池的状态
    int rs = runStateOf(c);

    // 检查队列是否只在必要时为空
    // 当线程池状态是STOP、TIDYING、TERMINATED时,这些状态的时候不需要进行线程的添加和启动操作,因为如果是上面的状态,其实线程池的线程正在进行销毁操作,意味着线程调用了shutdownNow等方法。
   	// 如果线程池状态为SHUTDOWN并且第一个任务不为空时,不接受新的任务,直接返回false,也就是说SHUTDOWN的状态,不会接受新任务,只会针对队列中未完成的任务进行操作。
    // 当线线程池状态为SHUTDOWN并且队列为空时,直接返回不进行任务添加。
    if (rs >= SHUTDOWN &&		// 线程池的状态是SHUTDOWN、STOP、TIDYING、TERMINATED
        ! (rs == SHUTDOWN &&	// 可以看做是rs!=SHUTDOWN,线程池状态为STOP、TIDYING、TERMINATED
           firstTask == null &&	// 可以看做firstTask!=null,并且rs=SHUTDOWN
           ! workQueue.isEmpty()))	// 可以看做rs=SHUTDOWN,并且workQueue.isEmpty()队列为空
      return false;
    //循环CAS增加线程池中线程的个数
    for (;;) {
      //获取线程池中线程个数
      int wc = workerCountOf(c);
      //如果线程池线程数量超过最大线程池数量,则直接返回
      if (wc >= CAPACITY ||
          //如果指定使用corePoolSize作为限制则使用corePoolSize,反之使用maximumPoolSize,最为工作线程最大线程线程数量,如果工作线程大于相应的线程数量则直接返回。
          wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
      //CAS增加线程池中线程的数量
      if (compareAndIncrementWorkerCount(c))
        //跳出增加线程池数量。
        break retry;
      //如果修改失败,则重新获取线程池的状态和线程数量
      c = ctl.get();  // Re-read ctl
      //如果最新的线程池状态和原有线程状态不一样时,则跳转到外层retry中,否则在内层循环重新进行CAS
      if (runStateOf(c) != rs)
        continue retry;
      // else CAS failed due to workerCount change; retry inner loop
    }
  }

  //工作线程是否开始启动标志
  boolean workerStarted = false;
  //工作线程添加到线程池成功与否标志
  boolean workerAdded = false;
  Worker w = null;
  try {
    //创建一个Worker对象
    w = new Worker(firstTask);
    //获取worker中的线程,这里线程是通过ThreadFactory线程工厂创建出来的,
    final Thread t = w.thread;
    //判断线程是否为空
    if (t != null) {
      //添加独占锁,为添加worker进行同步操作,防止其他线程同时进行execute方法。
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        //获取线程池的状态
        int rs = runStateOf(ctl.get());
        //如果线程池状态为RUNNING或者是线程池状态为SHUTDOWN并且第一个任务为空时,当线程池状态为SHUTDOWN时,是不允许添加新任务的,所以他会从队列中获取任务。
        if (rs < SHUTDOWN ||
            (rs == SHUTDOWN && firstTask == null)) {
          if (t.isAlive()) // precheck that t is startable
            throw new IllegalThreadStateException();
          //添加worker到集合中
          workers.add(w);
          int s = workers.size();
          //跟踪最大的线程池数量
          if (s > largestPoolSize)
            largestPoolSize = s;
          //添加worker成功
          workerAdded = true;
        }
      } finally {
        mainLock.unlock();
      }
      //如果添加worker成功就启动任务
      if (workerAdded) {
        t.start();
        workerStarted = true;
      }
    }
  } finally {
    //如果没有启动,w不为空就已出worker,并且线程池数量进行减少。
    if (! workerStarted)
      addWorkerFailed(w);
  }
  return workerStarted;
}
// 这里发现它是实现了AQS,是一个不可重入的独占锁模式
// 并且它还集成了Runable接口,实现了run方法。
private final class Worker
  extends AbstractQueuedSynchronizer
  implements Runnable
{
  private static final long serialVersionUID = 6138294804551838833L;

  /** 执行任务的线程,通过ThreadFactory创建 */
  final Thread thread;
  /** 初始化第一个任务*/
  Runnable firstTask;
  /** 每个线程完成任务的数量 */
  volatile long completedTasks;

  /**
     * 首先现将state值设置为-1,因为在AQS中state=0代表的是锁没有被占用,而且在线程池中shutdown方法会判断能否争抢到锁,如果可以获得锁则对线程进行中断操作,如果调用了shutdownNow它会判断state>=0会被中断。
     * firstTask第一个任务,如果为空则会从队列中获取任务,后面runWorker中。
     */
  Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
  }

  /** 委托调用外部的runWorker方法 */
  public void run() {
    runWorker(this);
  }

  //是否独占锁
  protected boolean isHeldExclusively() {
    return getState() != 0;
  }

  protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
      setExclusiveOwnerThread(Thread.currentThread());
      return true;
    }
    return false;
  }

  protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
  }

  public void lock()        { acquire(1); }
  public boolean tryLock()  { return tryAcquire(1); }
  public void unlock()      { release(1); }
  public boolean isLocked() { return isHeldExclusively(); }
  //这里就是上面shutdownNow中调用的线程中断的方法,getState()>=0
  void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
      try {
        t.interrupt();
      } catch (SecurityException ignore) {
      }
    }
  }
}

可以看到Worker是一个实现了AQS的锁,它是一个不可重入的独占锁,并且他也实现了Runnable接口,实现了run方法,在构造函数中将AQS的state设置为-1,为了避免线程还没有进入runWorker方法前,就调用了shutdownshutdownNow方法,会被中断,设置为-1则不会被中断。后面我们看到run方法,它调用的是ThreadPoolExecutorrunWorker方法,我们这里回想一下,在addWorker方法中,添加workerHashSet<Worker>中后,他会将workerAdded设置为true,代表添加worker成功。

整体的逻辑是先进行创建线程,线程将Worker设置为执行程序,并将线程塞到Worker中,然后再addWorker中将Worker中的线程取出来,进行启动操作,启动后他会调用Worker中的run方法,然后run方法中将调用ThreadPoolExecutor的runWorker,然后runWorker又会调用Worker中的任务firstTask,这个fistTask是要真正执行的任务,也是用户自己实现的代码逻辑。

final void runWorker(Worker w) {
  //调用者也就是Worker中的线程
  Thread wt = Thread.currentThread();
  //获取Worker中的第一个任务
  Runnable task = w.firstTask;
  //将Worker中的任务清除代表执行了第一个任务了,后面如果再有任务就从队列中获取。
  w.firstTask = null;
  //这里还记的我们在new Worker的时候将AQS的state状态设置为-1,这里先进行解锁操作,将state设置为0
  w.unlock(); // allow interrupts
  boolean completedAbruptly = true;
  try {
    //循环进行获取任务,如果第一个任务不为空,或者是如果第一个任务为空,从任务队列中获取任务,如果有任务则返回获取的任务信息,如果没有任务可以获取则进行阻塞,阻塞也分两种第一种是阻塞直到任务队列中有内容,第二种是阻塞队列一定时间之后还是没有任务就直接返回null。
    while (task != null || (task = getTask()) != null) {
      //先获取worker的独占锁,防止其他线程调用了shutdown方法。
      w.lock();
      // 如果线程池正在停止,确保线程是被中断的,如果没有则确保线程不被中断操作。
      if ((runStateAtLeast(ctl.get(), STOP) || //如果线程池状态为STOP、TIDYING、TERMINATED直接拒绝任务中断当前线程
           (Thread.interrupted() &&
            runStateAtLeast(ctl.get(), STOP))) &&
          !wt.isInterrupted())
        wt.interrupt();
      try {
        //执行任务之前做一些操作,可进行自定义
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
          //运行任务在这里喽。
          task.run();
        } catch (RuntimeException x) {
          thrown = x; throw x;
        } catch (Error x) {
          thrown = x; throw x;
        } catch (Throwable x) {
          thrown = x; throw new Error(x);
        } finally {
          //执行任务之后做一些操作,可进行自定义
          afterExecute(task, thrown);
        }
      } finally {
        //将任务清空为了下次任务获取
        task = null;
        //统计当前Worker完成了多少任务
        w.completedTasks++;
        //独占锁释放
        w.unlock();
      }
    }
    completedAbruptly = false;
  } finally {
    //处理Worker的退出操作,执行清理工作。
    processWorkerExit(w, completedAbruptly);
  }
}

我们看到如果Worker是第一次被启动,它会从Worker中获取firstTask任务来执行,然后执行成功后,它会getTask()来从队列中获取任务,这个地方比较有意思,它是分情况进行获取任务的,我们都直到BlockingQueue中提供了几种从队列中获取的方法,这个getTask中使用了两种方式,第一种是使用poll进行获取队列中的信息,它采用的是过一点时间如果队列中仍没有任务时直接返回null,然后还有一个就是take方法,take方法是如果队列中没有任务则将当前线程进行阻塞,等待队列中有任务后,会通知等待的队列线程进行消费任务

private Runnable getTask() {
  boolean timedOut = false; //poll获取超时

  for (;;) {
    //获取线程池的状态和线程数量
    int c = ctl.get();
    //获取线程池的状态
    int rs = runStateOf(c);

    //线程池状态大于等于SHUTDOWN
    //1.线程池如果是大于STOP的话减少工作线程池数量
    //2.如果线程池状态为SHUTDOW并且队列为空时,代表队列任务已经执行完,返回null,线程数量减少1
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      decrementWorkerCount();
      return null;
    }
    //获取线程池数量。
    int wc = workerCountOf(c);

    //如果allowCoreThreadTimeOut为true,则空闲线程在一定时间未获得任务会清除
    //或者如果线程数量大于corePoolSize的时候会进行清除空闲线程
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    //1.如果线程池数量大于最大的线程池数量或者对(空余线程进行清除操作并且poll超时了,意思是队列中没有内容了,导致poll间隔一段时间后没有获取内容超时了。
    //2.如果线程池的数量大于1或者是队列已经是空的
    //总之意思就是当线程池的线程池数量大于corePoolSize,或指定了allowCoreThreadTimeOut为true,当队列中没有数据或者线程池数量大于1的情况下,尝试对线程池的数量进行减少操作,然后返回null,用于上一个方法进行清除操作。
    if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
      if (compareAndDecrementWorkerCount(c))
        return null;
      continue;
    }

    try {
      //如果timed代表的是清除空闲线程的意思
      Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :	//等待一段时间如果没有获取到返回null。
      workQueue.take();					//阻塞当前线程
      //如果队列中获取到内容则返回
      if (r != null)
        return r;
      //如果没有获取到超时了则设置timeOut状态
      timedOut = true;
    } catch (InterruptedException retry) {
      timedOut = false;
    }
  }
}

工作线程调用getTask从队列中进行获取任务。

如果指定了allowCoreThreadTimeOut或线程池线程数量大于corePoolSize则进行清除空闲多余的线程,调用阻塞队列的poll方法,在指定时间内如果没有获取到任务直接返回false。

如果线程池中线程池数量小于corePoolSize或者allowCoreThreadTimeOut为false默认值,则进行阻塞线程从队列中获取任务,直到队列有任务唤醒线程。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
  if (completedAbruptly) 				// 如果突然完成则调整线程数量
    decrementWorkerCount();		// 减少线程数量1

  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();														//获取锁,同时只有一个线程获得锁
  try {
    completedTaskCount += w.completedTasks;	//统计整个线程池完成的数量
    workers.remove(w);											//将完成任务的worker从HashSet中移除
  } finally {
    mainLock.unlock();											//释放锁
  }
  //尝试设置线程池状态为TERMINATED
  //1.如果线程池状态为SHUTDOWN并且线程池线程数量与工作队列为空时,修改状态。
  //2.如果线程池状态为STOP并且线程池线程数量为空时,修改状态。
  tryTerminate();								

  // 获取线程池的状态和线程池的数量
  int c = ctl.get();
  // 如果线程池的状态小于STOP,也就是SHUTDOWN或RUNNING状态
  if (runStateLessThan(c, STOP)) {
    //如果不是突然完成,也就是正常结束
    if (!completedAbruptly) {
      //如果指定allowCoreThreadTimeOut=true(默认false)则代表线程池中有空余线程时需要进行清理操作,否则线程池中的线程应该保持corePoolSize
      int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
      //这里判断如果线程池中队列为空并且线程数量最小为0时,将最小值调整为1,因为队列中还有任务没有完成需要增加队列,所以这里增加了一个线程。
      if (min == 0 && ! workQueue.isEmpty())
        min = 1;
      if (workerCountOf(c) >= min)
        return; // replacement not needed
    }
    //如果当前线程数效益核心个数,就增加一个Worker
    addWorker(null, false);
  }
final void tryTerminate() {
  for (;;) {
    // 获取线程池的状态和线程池的数量组合状态
    int c = ctl.get();
    //这里单独下面进行分析,这里说明两个问题,需要反向来想这个问题。
    //1.如果线程池状态STOP则不进入if语句
    //2.如果线程池状态为SHUTDOWN并且工作队列为空时,不进入if语句
    if (isRunning(c) ||
        runStateAtLeast(c, TIDYING) ||
        (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
      return;
    //如果线程池数量不为空时,进行中断操作。
    if (workerCountOf(c) != 0) { // Eligible to terminate
      interruptIdleWorkers(ONLY_ONE);
      return;
    }

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
      //修改状态为TIDYING,并且将线程池的数量进行清空
      if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
        try {
          //执行一些逻辑,默认是空的
          terminated();
        } finally {
          //修改状态为TERMINATED
          ctl.set(ctlOf(TERMINATED, 0));
          //唤醒调用awaitTermination方法的线程
          termination.signalAll();
        }
        return;
      }
    } finally {
      mainLock.unlock();
    }
    // else retry on failed CAS
  }
public void shutdown() {
  //获取全局锁
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    //权限检查
    checkShutdownAccess();
    //设置线程池状态为SHUTDOWN,如果状态已经是大于等于SHUTDOWN则直接返回
    advanceRunState(SHUTDOWN);
    //如果线程没有设置中断标识并且线程没有运行则设置中断标识
    interruptIdleWorkers();
    //空的可以实现的内容
    onShutdown(); // hook for ScheduledThreadPoolExecutor
  } finally {
    mainLock.unlock();
  }
  //尝试修改线程池状态为TERMINATED
  tryTerminate();
}

首先对当前线程进行权限检测,查看是否设置了安全管理器,如果设置了则要看当前调用shutdown的线程有没有权限都关闭线程的权限,如果有权限还要看是否有中断工作现成的权限,如果没有权限则抛出SecurityExceptionNullPointException异常。

设置线程池状态为SHUTDOWN,如果状态已经是大于等于SHUTDOWN则直接返回

如果线程没有设置中断标识并且线程没有运行则设置中断标识

尝试修改线程池状态为TERMINATED

private void advanceRunState(int targetState) {
  for (;;) {
    //获取线程池状态和线程池的线程数量
    int c = ctl.get();
    if (runStateAtLeast(c, targetState) ||		//如果线程池的状态>=SHUTDOWN
        ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))	//设置线程池状态为SHUTDOWN
      //返回
      break;										
  }
}

当线程池的状态>=SHUTDOWN,直接返回
如果线程池状态为RUNNING,设置线程池状态为SHUTDOWN,设置成功则返回

private void interruptIdleWorkers(boolean onlyOne) {
  //获取全局锁,同时只能有一个线程能够调用shutdown方法
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    //遍历工作线程
    for (Worker w : workers) {
      Thread t = w.thread;
      //如果当前线程没有设置中断标志并且可以获取Worker自己的锁
      if (!t.isInterrupted() && w.tryLock()) {
        try {
          //设置中断标志
          t.interrupt();
        } catch (SecurityException ignore) {
        } finally {
          w.unlock();
        }
      }
      //执行一次,清理空闲线程。
      if (onlyOne)
        break;
    }
  } finally {
    mainLock.unlock();
  }
}

我们看到当我们调用shutdown方法的时候,只是将空闲的线程给设置了中断标识,也就是活跃正在执行任务的线程并没有设置中断标识,直到将任务全部执行完后才会逐步清理线程操作

public List<Runnable> shutdownNow() {
  List<Runnable> tasks;
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    //权限检查
    checkShutdownAccess();
    //设置线程池状态为STOP,如果状态已经是大于等于STOP则直接返回
    advanceRunState(STOP);
    //这里是和SHUTDOWN区别的地方,这里是强制进行中断操作
    interruptWorkers();
    //将为完成任务复制到list集合中
    tasks = drainQueue();
  } finally {
    mainLock.unlock();
  }
  //尝试修改线程池状态为TERMINATED
  tryTerminate();
  return tasks;
}
shutdownNow`方法将线程池状态设置为`STOP`,而`shutdown`则将状态修改为`SHUTDOWN

shutdownNow方法将工作任务进行中断操作,也就是说如果工作线程在工作也会被中断,而shutdown则是先尝试获取锁如果获得锁成功则进行中断标志设置,也就是中断操作,如果没有获取到锁则等待进行完成后自动退出。

shutdownNow方法返回未完成的任务列表。

拒绝策略

AbortPoliay

该策略会直接抛出异常,阻止系统正常工作

CallerRunsPolicy

只要线程池没有关闭线程池状态是RUNNING状态,该略略直接调用线程中运行当前被丢弃的任务

DiscardOledestPolicy

该策略将丢弃最老的一个请求,也就是即将被执行的第一个任务,并尝试再次提交任务

DiscardPolicy

该策略将丢弃最老的一个请求,也就是即将被执行的第一个任务,并尝试再次提交任务

image-20210128141532464

  1. 主线程进行线程池的调用,线程池执行execute方法

  2. 线程池通过addWorker进行创建线程,并将线程放入到线程池中,这里我们看到第二步是将线程添加到核心线程中,其实线程池内部不分核心线程和非核心线程,只是根据corePoolSize和maximumPoolSize设置的大小来进行区分,因为超过corePoolSize的线程会被回收,至于回收那些线程,是根据线程获取任务的时候进行判断,当前线程池数量大于corePoolSize,或者指定了allowCoreThreadTimeOut为true,则他等待一定时间后会返回,不会一直等待

  3. 当线程池的数量达到corePoolSize时,线程池首先会将任务添加到队列中

  4. 当队列中任务也达到了队列设置的最大值时,它会创建新的线程,注意的是此时的线程数量已经超过了corePoolSize,但是没有达到maximumPoolSize最大值。

  5. 当线程池的线程数量达到了maximumPoolSize,则会相应拒绝策略。

CountDownLatch aqs

在使用CountDownLatch的时候,是先创建CountDownLatch对象,然后在每次执行完一个任务后,就执行一次countDown()方法。直到通过getCount()获取到的值为0时才算执行完,如果count值不为0可通过await()方法让主线程进行等待,知道所有任务都执行完成,count的值被设为0。

public CountDownLatch(int count) {
  if (count < 0) throw new IllegalArgumentException("count < 0");
  this.sync = new Sync(count);
}

private static final class Sync extends AbstractQueuedSynchronizer {
  private static final long serialVersionUID = 4982264981922014374L;
  Sync(int count) {
    setState(count);
  }
}
public void await() throws InterruptedException {
  sync.acquireSharedInterruptibly(1);// 等待可中断的获取共享资源的方法
}
public final void acquireSharedInterruptibly(int arg)
  throws InterruptedException {
  if (Thread.interrupted()) // 如果线程已经中断,直接抛出异常结束。
    throw new InterruptedException();
  if (tryAcquireShared(arg) < 0)// 尝试获取共享资源,获取失败后,自旋入队列
    doAcquireSharedInterruptibly(arg);// 可中断的入队列过程
}

CountDownLatch中内部类Sync的releaseShared()方法,是使用的AQS的releaseShared()方法。

public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {// 尝试释放资源
        doReleaseShared();// 释放资源成功后,唤醒节点。
        return true;
    }
    return false;
}
protected boolean tryReleaseShared(int releases) {
  // Decrement count; signal when transition to zero
  for (;;) {
    int c = getState();
    if (c == 0) // 若state为0,说明已经不需要释放资源了,直接返回false。
      return false;
    int nextc = c-1;
    if (compareAndSetState(c, nextc))// 真正的释放资源,是通过CAS的方式将state的值减1。
      return nextc == 0;
  }
}

demo

package com.zhiqi.thread;

import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author czq
 */
public class ThreadDemoTest {

    @Test
    public void fixedDemo() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(100);
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
            int finalItem = i;
            executorService.execute(() -> {
                try {
                    this.biz(finalItem);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        executorService.shutdown();
    }

    @Test
    public void cachedDemo() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(100);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            int finalItem = i;
            executorService.execute(() -> {
                try {
                    this.biz(finalItem);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        executorService.shutdown();
    }

    @Test
    public void singleDemo() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(100);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 100; i++) {
            int finalItem = i;
            executorService.execute(() -> {
                try {
                    this.biz(finalItem);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        executorService.shutdown();
    }

    public void biz(int item) throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + "--" + item);
        Thread.sleep(1000);
    }
}
消息盒子

# 暂无消息 #

只显示最新10条未读和已读信息