丨核桃牛奶 阅读(22) 评论(0)

一、线程池状态

线程池的几种状态  RUNNING SHUTDOWN STOP TIDYING TERMINATED

几种状态之间的流转关系如下图

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;  //111
private static final int SHUTDOWN   =  0 << COUNT_BITS;  //000
private static final int STOP       =  1 << COUNT_BITS;  //001
private static final int TIDYING    =  2 << COUNT_BITS;  //010
private static final int TERMINATED =  3 << COUNT_BITS;  //011

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

重点关注AtomicInteger ctl 共32位,其中高3位表示"线程池状态",低29位代表"线程池中的任务数量"

各个状态详细说明如下:

*   RUNNING:  Accept new tasks and process queued tasks
*   SHUTDOWN: Don't accept new tasks, but process queued tasks
*   STOP:     Don't accept new tasks, don't process queued tasks,
*             and interrupt in-progress tasks
*   TIDYING:  All tasks have terminated, workerCount is zero,
*             the thread transitioning to state TIDYING
*             will run the terminated() hook method
*   TERMINATED: terminated() has completed

二、构造函数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

参数说明

Ps 线程池中有两大容器,分别是pool(存放线程) 和 queue(存放任务)

corePoolSize

   核心线程的数量,线程池初始化后,每接到一个任务就会创建一个线程来执行任务,直到当前的线程数目到达corePoolSize,此时新的任务将会进入queue中,只有当queue满了之后,maximunPoolSize才发挥作用

核心线程被保存在pool中,即使线程处于闲置状态也不会被回收,除非allowCoreThreadTimeOut被设置,从名字可以看出这是用来控制核心线程是否可以超时被回收的一个参数。

Ps可以理解为工厂的长工

maximumPoolSize

     pool中所允许的最大线程数。线程池的queue满了之后,如果还有新的任务到来,此时如果线程数目小于maximumPoolSize,则会新建线程来执行任务。

Ps 可以理解为工厂的短工 最大值=maximumPoolSize-corePoolSize

keepAliveTime

    线程空闲的时间,默认情况该参数只针对"短工"有效(短工空闲太久就要被辞退),只有当配置allowCoreThreadTimeOut时该参数才对"长工"生效

unit

    keepAliveTime的单位

workQueue

    上文提到的queue,用来保存等待执行的任务的阻塞队列

ThreadFactory

    线程工厂,可以用户自己配置,默认的ThreadFactory 1.给线程命名 2.将线程设置为非守护线程 3.优先级设置为NORM

handler

    拒绝策略:当线程数=maximumPoolSize 且 queue已满 这时候新提交的任务会被拒绝

    1.AbortPolicy:直接抛出异常,默认策略;

    2.CallerRunsPolicy:用调用者所在的线程来执行任务;

    3.DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

    4.DiscardPolicy:直接丢弃任务;

三、常用的几种线程池

FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

此时corePoolSize = maximumPoolSize , 这意味着不再有任何"短工",当线程池中的线程数量达到corePoolSize后,新来的任务被放到queue中,当queue满了之后,线程池将不再接收任何任务,

如果再继续提交任务,则会直接执行拒绝策略。这里执行的是默认的拒绝策略,直接抛出异常。

这里的queue使用的是LinkedBlockingQueue,queue的容量是MAX_VALUE,所以构造函数中的maximumPoolSize与keepAliveTime几乎无效

而ThreadPoolExectutor中的allowCoreThreadTimeOut也未被设置,所以默认为false

SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

相当于newFixedThreadPool(1)

CachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

任务一旦提交,首先会进入queue,此时的queue是synchronousQueue(不存储任何元素),所以会直接创建新的"短工"(线程),当这些线程闲置超过60s之后会被终止。

CachedThreadPool 会有一个问题 可能会创建大量的线程 可能导致耗尽CPU 和 内存。所以 一定要控制并发量

四、任务执行

线程池有两种执行任务的方式,execute() & submit()

execute()为例子,分析内部的执行原理

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

整个代码分三个步骤:

a)如果当前线程数目小于corePoolSize,则直接调用addWorker创建新线程来执行任务,如果addWorker失败,再次读取ctl,并执行步骤b

b)如果线程池处于RUNNING状态且当前任务进入queue成功,此时会对当前的ctl进行二次检查,

           如果在这个过程中线程池不处于RUNNING状态,那么从queue中移除任务并执行拒绝策略

           反之,如果二次检查通过,如果当前线程数目为0,则会创建新的工作线程,否则queue中的任务将无法执行。Ps等下分析addWorker(null, false)

c)入队失败,此时调用addWorker(command,false)创建非核心线程,若创建失败执行拒绝策略。Ps显然addWorker方法需要再次对线程池状态进行判断,不然b中如果非RUNNING状态,也会执行到c

 

addWorker()  Ps有意思的一段code

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                int rs = runStateOf(c);

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

a)读取当前线程池状态

           如果rs>=SHUTDOWN(线程池处于SHUTDOWN,STOP,TIDYING,TERMINATED状态),这个时候addWorker是要失败的,除了一种情况,当前线程池处于SHUTDOWN状态,且firstTask=null且queue不为空,我们知道这种情况,线程池是要执行完queue中的任务的,所以这种情况是可以增加worker的。

b) 内层循环,如果当前线程的数量满足要求(核心线程数<corePoolSize 非核心线程<maximumPoolSize),那么通过CAS使woker自增,成功跳出双重循环,否则继续自旋

c) 获取锁,再次判断线程状态,如果rs<SHUTDOWN 或者rs=SHUTDOWN且当前入参的firstTask=null 才可以继续向下执行,将woker添加到wokers(Set)中,然后释放锁并启动线程;否则直接释放锁。Ps这里加锁的目的一是为了wokers添加元素,二是为了更新largestPoolSize

Worker

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
	/**省略部分代码*/

Worker继承自AQS且实现了Runnable接口 Ps这里state设置为1的目的是这时候禁止中断,直到线程开始运行才可以中断

线程启动的时候,实际上是调用了runWorker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;//突然完成
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

a)方法一开始就调用unlock方法,此时会将state置为0(代表解锁状态,此时允许线程中断)interruptWorkers()方法只有在state>=0才能执行

b)有趣的循环,首先执行传递进来的任务,如果当前任务执行完毕,会调用getTask方法去queue中提取任务执行

c)当前有任务执行时,首先获取锁置state=1

          如果当前线程池状态>=STOP 且当前线程未被中断,那么中断当前线程

          如果当前线程池状态<STOP 且当前线程被中断了 再次检查当前线程池状态 如果>=STOP 那么中断当前线程

d) 终于可以执行任务了,这里有两个模板方法,ThreadPoolExecutor中给了空实现

e)最后置task=null 并将当前Woker的工作量++ 最后释放锁 接下来继续循环获取queue中的任务

 

getTask()

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        boolean timed;      // Are workers subject to culling?

        for (;;) {
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

a) 首先根据线程池的状态进行下一步的决策,如果当前线程池状态>=STOP or =SHUTDOWN且queue为空 那么woker数量--  并返回null

b)接下来判断是否需要超时机制,如果allowCoreThreadTimeOut 或者 当前线程数大于corePoolSize(说明有短工) 那么需要有超时控制

c)如果当前线程的数量处于正常状态,<=maximumPoolSize 且timeout timed其中有一个是false 那么跳出内层循环 直接去从queue中获取任务 并return

d) 如果从queue中取值结果为null  那么timeout会被置true  再次执行内层循环 简单说就是queue中无任务 且 有超时机制 那么workcount要-- (这里ctl-- 与之等价)

五、后处理

在runwoker方法中 finally代码块中调用了 processWorkerExit方法 用来对Worker进行后处理

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

a)如果是被意外终止(用户异常),那么将worker的数量--

b)加锁去更新共享参数completedTaskCount 加上当前worker处理的任务数量 并从set中移除当前woker

c) 调用tryTerminate方法(下文有源码),如果当前状态either (SHUTDOWN and pool and queue empty) or (STOP and pool empty) 那么直接return

d)如果当前woker数!=0 那么可以去中断,那么调用interruptIdleWorkers(ONLY_ONE) 去终止Worker

e)如果当前worker = 0 那么去终止线程池 首先将ctl置为TIDYING&0 接下来调用terminated()终止线程池 ,最后将ctl置为TERMINATED&0

tryTerminate()

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //1.running 2.tidying or terminated 3.shutdown and queue is not empty
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        //wokercount > 0  有资格去终止
        if (workerCountOf(c) != 0) { // Eligible to terminate
            //terminate one worker
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //set ctl -> ctlof(TIDYING,0)
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
        			//set ctl -> ctlof(TERMINATED,0)
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

六、线程池的终止

shutDown()

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            //SHUTDOWN
            advanceRunState(SHUTDOWN);
            // 中断空闲的线程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

  

shutDownNow()

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // STOP
            advanceRunState(STOP);
            // 中断所有线程
            interruptWorkers();
            //取出queue中的任务 最后return
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }