Skip to content

从Java构建线程的方式到线程池ThreadPoolExecutor源码剖析

大纲

  • Java 构建线程的方式
  • 线程池的7个参数
  • 线程池的执行流程
  • 线程池属性标识
  • 线程池的 execute 方法执行
  • Worker 的封装

从Java构建线程的方式到线程池ThreadPoolExecutor源码剖析

一、Java 构建线程的方式(常识)

  • 继承 Thread
  • 实现 Runnable
  • 实现 Callable
  • 线程池方式(Java提供了构建线程池的方式)
    • Java提供了 Executors 可以去创建(规范中不允许使用这种方式创建线程池,这种方式对线程的控制粒度比较低)
    • 推荐手动创建线程池

二、线程池的7个参数(常识)

public ThreadPoolExecutor(int corePoolSize,   //核心线程池
                          int maximumPoolSize,   //最大线程数
                          long keepAliveTime,    //最大空闲时间
                          TimeUnit unit,         //时间单位
                          BlockingQueue<Runnable> workQueue,  //阻塞队列
                          ThreadFactory threadFactory,        //线程工厂
                          RejectedExecutionHandler handler) {   //拒绝策略

三、线程池的执行流程(常识)

线程池执行流程

image.png

为什么要先进阻塞再去尝试创建非核心线程:

饭店(线程池) - 厨子(线程) - 人多先排队(阻塞队列)- 招厨子(创建最大线程数) - 今日客满(拒绝策略)

四、线程池属性标识

4.1 线程池属性

//ctl 是一个int 类型的数值,表达了两个意思,1:声明当前线程池的状态, 2: 声明线程池中的线程数
//高3位是:线程池状态
//低29位是: 线程池中的线程个数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));  
private static final int COUNT_BITS = Integer.SIZE - 3;  //29,方便后面做位运算
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;   //通过位运算得出最大容量
  
// runState is stored in the high-order bits  
//线程池状态
private static final int RUNNING    = -1 << COUNT_BITS;  //111 代表线程池为RUNNING,代表正常接收任务
private static final int SHUTDOWN   =  0 << COUNT_BITS;  //000 代表线程池为`SHUTDOWN`状态,不接收新任务,但是内部还会处理阻塞队列中的任务,正在进行的任务也正常处理
private static final int STOP       =  1 << COUNT_BITS;  //001  代表线程池为`STOP`状态,不接收新任务,也不去处理阻塞队列中的任务,同时会中断正在执行的任务
private static final int TIDYING    =  2 << COUNT_BITS;  //010 代表线程池为`TIDYING`状态,过渡的状态,代表当前线程池即将Game Over
private static final int TERMINATED =  3 << COUNT_BITS;  //011 代表线程池为`TERMINATED`,要执行terminated(),真的凉凉了
  
// Packing and unpacking ctl  
private static int runStateOf(int c)     { return c & ~CAPACITY; }  //得到线程池的状态
private static int workerCountOf(int c)  { return c & CAPACITY; }   //得到当前线程池的线程数量

ThreadPoolExecutor类使用了一些高级的位操作来高效地管理线程池的状态和工作线程的数量。

核心变量解释

  • AtomicInteger ctl
    • 实际它是使用了 AtomicInteger ctl 这个变量来进行存储线程池的状态和线程数。原子操作保证了线程安全,即在多线程环境下,对这个变量的修改是原子性的,避免了竞态条件。
    • 高3位是:线程池状态
    • 低29位是: 线程池中的线程个数
  • COUNT_BITS
    • 用于计算和存储线程数量的位数。它是Integer.SIZE(Java中整数的位数,通常是32位)减去3。
    • 29,方便后面做位运算;3位用于表示线程池状态
  • CAPACITY
    • 表示的最大线程数。它通过将1左移COUNT_BITS位然后减1来计算。
    • 类似 100000000 - 1 = 011111111111 这种,是常见的位运算的一种表示方式

线程池状态

线程池的状态被存储在ctl的高位。这些状态包括:

  • RUNNING
    • 111
    • 线程池可以接受新任务,并且也可以处理排队的任务。
  • SHUTDOWN
    • 000
    • 不接受新任务,但是可以处理排队的任务。
  • STOP
    • 001
    • 不接受新任务,不处理排队的任务,并且中断正在进行的任务。
  • TIDYING
    • 010
    • 所有任务都已终止,workerCount(活动线程数)为零,线程池正在转换到状态TERMINATED
  • TERMINATED
    • 011
    • terminated()方法已经完成

位操作方法

  • runStateOf(int c): 提取ctl中的状态部分。
  • workerCountOf(int c): 提取ctl中的工作线程数部分。
  • ctlOf(int rs, int wc): 将运行状态和工作线程数组合成一个ctl

4.2 线程池状态变化

image.png

五、线程池的 execute 方法执行

在Java的ThreadPoolExecutor类中,execute方法是用于提交任务的关键方法。

它决定如何将一个新的Runnable任务添加到线程池中。

execute 方法

public void execute(Runnable command) {
    // 检查提交的任务不是null
    if (command == null)
        throw new NullPointerException();

    // 获取当前线程池状态和工作线程数
    int c = ctl.get();

    // 如果当前工作线程数小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 尝试添加一个新工作线程来执行这个任务
        if (addWorker(command, true))
            return; // 如果成功,直接返回
        // 如果添加工作线程失败,重新获取线程池状态
        c = ctl.get();
    }

    // 如果线程池处于运行状态,且任务能被添加到队列中
    if (isRunning(c) && workQueue.offer(command)) {
        // 重新检查线程池状态
        int recheck = ctl.get();
        // 如果线程池不再运行,并且能从队列中移除任务,则拒绝任务
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 如果没有活动的工作线程,则添加一个新的工作线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果任务不能被添加到队列,尝试创建一个新工作线程来执行这个任务
    else if (!addWorker(command, false))
        reject(command); // 如果创建失败,则拒绝任务
}

实现步骤:

  1. 参数检查: 首先检查传入的Runnable对象不是null
  2. 工作线程数量检查: 如果当前工作线程的数量小于核心线程数corePoolSize,尝试直接创建一个新的工作线程来执行任务。
  3. 任务队列处理: 如果当前线程数已经达到或超过核心线程数,或者新工作线程的创建失败,则尝试将任务加入到等待队列中。
  4. 状态重新检查: 在成功将任务加入队列后,需要再次检查线程池的状态,确保线程池仍在运行。如果线程池状态改变(例如,被关闭了),则尝试移除刚加入的任务,并执行拒绝策略。
  5. 无活动线程处理: 如果任务被成功加入队列,但没有活动的工作线程可以处理队列中的任务,这时会尝试创建一个新的工作线程。
  6. 拒绝策略: 如果无法将任务加入队列,且无法创建新的工作线程,最后的选项是拒绝任务。

addWorker 方法

在Java的ThreadPoolExecutor类中,addWorker方法是用于向线程池中添加新的工作线程的关键方法。

它在处理新任务或者需要增加线程池中的线程数量时被调用。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        // 获取当前线程池的控制状态
        int c = ctl.get();
        // 获取运行状态
        int rs = runStateOf(c);

        // 检查线程池的状态是否允许添加新的工作线程
        // 仅在以下情况返回false:
        // 1. 线程池状态为SHUTDOWN以上(不包括SHUTDOWN)且不满足以下所有条件:
        //    - 状态为SHUTDOWN
        //    - firstTask为null
        //    - 工作队列不为空
        if (rs >= SHUTDOWN &&
            !(rs == SHUTDOWN &&
              firstTask == null &&
              !workQueue.isEmpty()))
            return false;

        // 无限循环,尝试增加工作线程的数量
        for (;;) {
            int wc = workerCountOf(c);
            // 检查当前工作线程数是否超过了最大容量或设置的阈值
            // 如果超过容量或者超过corePoolSize/maximumPoolSize,则返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 使用CAS操作增加工作线程计数
            // 如果CAS成功,跳出retry标签,继续向下执行
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 如果CAS失败,重新读取ctl
            c = ctl.get(); 
            // 如果运行状态发生变化,重新开始外层循环
            if (runStateOf(c) != rs)
                continue retry;
            // 如果CAS失败但运行状态没有变化,继续尝试内层循环的CAS操作
        }
    }

    // 添加工作线程的过程
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建新的工作线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        // 如果成功创建线程
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 在持有锁的情况下再次检查线程池状态
                int rs = runStateOf(ctl.get());

                // 如果线程池状态允许添加工作线程,则将其添加到工作集合中
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 检查线程是否已经启动(预防异常情况)
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    // 更新记录的最大池大小
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                // 释放锁
                mainLock.unlock();
            }
            // 如果成功添加工作线程,则启动该线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 如果工作线程未能成功启动,处理失败情况
        if (!workerStarted)
            addWorkerFailed(w);
    }
    // 返回工作线程是否成功启动
    return workerStarted;
}

实现步骤:

  1. 状态检查: 首先检查线程池的状态,确定是否可以添加新的工作线程。如果线程池正在关闭,并且条件不允许添加工作线程(例如,任务队列为空),则直接返回false
  2. 增加工作线程数: 使用一个无限循环,通过CAS(Compare-And-Swap)操作尝试增加工作线程计数。如果CAS操作成功,跳出循环;如果失败,重新尝试。
  3. 创建和启动工作线程: 创建一个新的Worker对象,并尝试启动其线程。这个过程涉及获取一个全局锁以保证线程安全。
  4. 线程池状态再次检查: 在锁内部再次检查线程池的状态,以确保在获取锁的过程中状态没有改变。
  5. 添加到工作集合: 如果一切正常,将新的Worker添加到工作线程集合中,并更新记录的最大池大小。
  6. 启动线程: 尝试启动线程。如果启动成功,返回true;否则,在finally块中处理启动失败的情况。

addWorker方法的实现体现了线程池如何有效地管理线程的创建和添加。

使用CAS操作保证了线程安全,而双重检查(在方法开始和持有锁时)确保了即使在高并发的情况下也能正确地管理线程池的状态。

六、Worker 的封装

在Java的ThreadPoolExecutor类中,Worker是一个关键的内部类,它封装了线程池中的工作线程的行为和属性。

Worker类继承自AbstractQueuedSynchronizer,是一个用于构建锁和其他同步组件的框架

ThreadPoolExecutor内部类Worker的构造函数

Worker(Runnable firstTask) {
    // 设置Worker的状态为-1,暂时阻止线程中断
    // 这是为了防止在Worker真正开始运行前被中断
    setState(-1);

    // 将传入的任务(可能为null)设置为Worker的第一个任务
    // 这个任务将是Worker创建后执行的第一个任务
    this.firstTask = firstTask;

    // 使用线程池的线程工厂创建一个新线程,并将当前Worker作为任务传递给这个新线程
    // 这里,Worker自身实际上是一个Runnable,因为它实现了Runnable接口
    this.thread = getThreadFactory().newThread(this);
}

代码分析:

  1. 暂时禁用中断: setState(-1)设置Worker的状态,这个状态用于控制线程的中断。设置为-1意味着在runWorker方法真正开始执行之前,线程不应被中断。这是一个预防措施,以确保Worker在开始执行其任务前不会被意外中断。
  2. 设置第一个任务: this.firstTask = firstTask将传入的任务赋值给Worker的firstTask属性。这个任务是Worker将要执行的第一个任务。如果这个值是null,Worker将从线程池的任务队列中获取任务。
  3. 创建新线程: this.thread = getThreadFactory().newThread(this)调用线程工厂来创建一个新线程,并将当前Worker作为运行任务传递。由于Worker实现了Runnable接口,它可以被线程直接执行。线程工厂是线程池的一个组成部分,用于定制线程创建过程(例如设置线程名称、优先级等)。

runWorker 方法

定义了线程池中的工作线程(Worker)如何执行任务

final void runWorker(Worker w) {
    // 获取当前执行这个方法的线程
    Thread wt = Thread.currentThread();
    // 从Worker获取第一个任务
    Runnable task = w.firstTask;
    // 设置Worker的第一个任务为null
    w.firstTask = null;
    // 释放Worker上的锁,允许中断
    w.unlock(); 

    // 标记是否异常完成任务
    boolean completedAbruptly = true;
    try {
        // 当有任务执行或者能从任务队列中获取到任务时,继续循环
        while (task != null || (task = getTask()) != null) {
            // 锁定Worker,以开始执行任务
            w.lock();
            // 如果线程池正在停止,确保线程被中断;
            // 如果不是,则确保线程不被中断。这需要在第二种情况下重新检查以处理shutdownNow竞争状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 在执行任务前的钩子方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 运行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 在执行任务后的钩子方法
                    afterExecute(task, thrown);
                }
            } finally {
                // 任务执行完毕,清理工作
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        // 如果正常退出循环,设置completedAbruptly为false
        completedAbruptly = false;
    } finally {
        // 处理Worker退出
        processWorkerExit(w, completedAbruptly);
    }
}

执行步骤:

  1. 初始化: 首先获取当前线程和Worker中的第一个任务。解锁Worker以允许线程中断。
  2. 任务执行循环: 方法进入一个循环,不断执行任务。如果Worker的第一个任务为空,则尝试从线程池的任务队列中获取新的任务。
  3. 中断管理: 在每次任务执行前,检查线程池的状态,如果需要,根据线程池的状态来决定是否中断当前线程。
  4. 任务执行: 实际执行任务,并处理任何可能抛出的异常。同时,执行钩子方法beforeExecuteafterExecute,这些方法可以用于在任务执行前后做一些准备和清理工作。
  5. 任务完成后处理: 更新完成任务的计数,清理变量,解锁Worker。
  6. 异常处理与退出: 如果任务执行过程中发生异常导致线程意外结束,completedAbruptly标记会保持为true。在最后的finally块中,调用processWorkerExit来处理Worker的退出,这可能包括替换这个Worker或者根据当前线程池状态进行其他处理。

参考