线程池详解

优势

  1. 线程和任务分离,提升线程重用性;
  2. 控制线程并发数量,降低服务器压力,统一管理所有线程;
  3. 提升系统响应速度,假如创建线程用的时间为T1,执行任务用的时间为T2,销毁线程用的时间为T3,那么使用线程池就免去了T1和T3的时间;

只要有并发的地方、任务数量大或小、每个任务执行时间长或短的都可以使用线程池
只不过在使用线程池的时候,注意一下设置合理的线程池大小即可

线程池类的结构

线程池存在于Java的并发包J.U.C中,线程池可以根据项目灵活控制并发的数目,避免频繁的创建和销毁线程,达到线程对象的重用。

img

1、 接口Executor

接口Executor中,只有一个方法,为execute()

2、 接口ExecutorService,继承自Executor

几个重要的方法:

(1) 关闭线程池的方法,有两种

一个ExecutorService(J.U.C)可以关闭,这将导致它拒绝新的任务。 ExecutorService的两种关闭线程池的方式,shutdownshutdownNow方法:

① shutdown():拒收新的任务,立马关闭正在执行的任务,可能会引起报错,需要异常捕获

② shutdownNow():拒收新的任务,等待任务执行完毕,要确保任务里不会有永久等待阻塞的逻辑,否则会导致线程关闭不了

③ 不是马上关闭,要想等待线程池关闭,还需要调用waitFermination来阻塞等待

④ 还有一些业务场景下,需要知道线程池中的任务是否全部执行完成,当我们关闭线程池之后,可以用isTerminated来判断所有的线程是否执行完成,千万不要用isShutdown,它只是返回你是否调用过shutdown的结果

(2) submit()方法

submit()方法在ExecutorService中,ExecutorService接口继承Executor接口,方法submit延伸的方法Executor.execute(Runnable)通过创建并返回一个Future可用于取消执行和/或等待完成。submit()与execute()的一个区别是submit()有返回值,并且能够处理异常,在task里会抛出checked或者unchecked exception, 而又希望外面的调用者能够感知这些exception并作出及时的处理,用 submit,通过捕获Future.get抛出的异常

3、 Executors(J.U.C)

提供了6个静态方法,分别创建6种不同的线程池,六大静态方法 内部都是直接或间接调用ThreadPoolExecutor类的构造方法创建线程池对象,这六个静态方法本身是没有技术含量的。

Executors(类) Executors静态方法 实现类
newCachedThreadPool ThreadPoolExecutor
newFixedThreadPool ThreadPoolExecutor
newSingleThreadExecutor ThreadPoolExecutor
newScheduledThreadPool ScheduledThreadPoolExecutor
newSingleThreadScheduledExecutor ScheduledThreadPoolExecutor
newWorkStealingPool ForkJoinPool
Executor(接口):只有一个方法execute()

核心参数

无论是创建何种类型线程池(FixedThreadPoolCachedThreadPool…),均会调用ThreadPoolExecutor构造函数,下面详细解读各个参数的作用

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
  • corePoolSize:核心线程最大数量,通俗点来讲就是,线程池中常驻线程的最大数量
  • maximumPoolSize:线程池中运行最大线程数(包括核心线程和非核心线程)
  • keepAliveTime:线程池中空闲线程(仅适用于非核心线程)所能存活的最长时间
  • unit:存活时间单位,与keepAliveTime搭配使用
  • workQueue:存放任务的阻塞队列
  • threadFactory:线程工厂,用来创建一个新线程时使用的工厂,可以用来设定线程名,是否为daemon线程等
  • handler:线程池饱和策略

执行流程

​ (1) 刚开始运行时,线程池是空的

(2) 一个任务进来,检查池中的线程数量,是否达到corePoolSize,如果没有达到,则创建线程,执行任务

(3) 任务执行完成之后,线程不会销毁,而是阻塞的等待下一个任务

(4) 又进来一个任务,不是直接使用阻塞的线程,而是检查线程池中的线程数大小,是否达到corePoolSize,如果没有达到,则继续创建新的线程,来执行新的任务,如此往复,  直到线程池中的线程数达到corePoolSize,此时停止创建新的线程

(5) 此时,又来新的任务,会选择线程池中阻塞等待的线程来执行任务,有一个任务进来,唤醒一个线程来执行这个任务,处理完之后,再次阻塞,尝试在workQueue上获取下一  个任务,如果线程池中没有可唤醒的线程,则任务进入workQueue,排队等待

(6) 如果队列是无界队列,比如LinkedBlockingQueue,默认最大容量为Integer.MAX,接近于无界,可用无限制的接收任务,如果队列是有界队列,比如ArrayBlockingQueue,可限定队列大小,当线程池中的线程来不及处理,然后,所有的任务都进入队列,队列的任务数也达到限定大小,此时,再来新的任务,就会入队失败,然后,就会再次尝试在线程池里创建线程,直到线程数达到maximumPoolSize,停止创建线程

(7)此时,队列满了,新的任务无法入队,创建的线程数也达到了maximumPoolSize,无法再创建新的线程,此时,就会reject掉,使用拒绝策略RejectedExecutionHandler,不让继续提交任务,默认的是AbortPolicy策略,拒绝,并抛出异常

(8) 超出corePoolSize数创建的那部分线程,是跟空闲时间keepAliveTime相关的,如果超过keepAliveTime时间还获取不到任务,线程会被销毁,自动释放掉

饱和策略

线程池饱和策略分为一下几种:

  1. AbortPolicy:直接抛出一个异常,默认策略
  2. DiscardPolicy: 直接丢弃任务
  3. DiscardOldestPolicy:抛弃下一个将要被执行的任务(最旧任务)
  4. CallerRunsPolicy:主线程中执行任务

img

img

工作队列

  • ArrayBlockingQueue:使用数组实现的有界阻塞队列,特性先进先出,新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。
  • LinkedBlockingQueue:基于链表的无界阻塞队列(其实最大容量为Interger.MAX),特性先进先出,可以设置其容量,默认为Interger.MAX_VALUE,特性先进先出,由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。
  • PriorityBlockingQueue:使用平衡二叉树,实现的具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
  • DelayQueue:无界阻塞延迟队列,队列中每个元素均有过期时间,当从队列获取元素时,只有过期元素才会出队列。队列头元素是最块要过期的元素。
  • SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作,必须等到另一个线程调用移除操作,也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。

几种典型的线程池

SingleThreadExecutor(单线程线程池)

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

创建单个线程。它适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。SingleThreadExecutorcorePoolSizemaximumPoolSize被设置为1,使用无界队列LinkedBlockingQueue作为线程池的工作队列。

img

  • 当线程池中没有线程时,会创建一个新线程来执行任务。
  • 当前线程池中有一个线程后,将新任务加入LinkedBlockingQueue
  • 线程执行完第一个任务后,会在一个无限循环中反复从LinkedBlockingQueue 获取任务来执行。

**使用场景:**适用于串行执行任务场景

FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

corePoolSize等于maximumPoolSize,所以线程池中只有核心线程,使用无界阻塞队列LinkedBlockingQueue作为工作队列

FixedThreadPool是一种线程数量固定的线程池,当线程处于空闲状态时,他们并不会被回收,除非线程池被关闭。当所有的线程都处于活动状态时,新的任务都会处于等待状态,直到有线程空闲出来。

img

  • 如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。
  • 在线程数目达到corePoolSize后,将新任务放到LinkedBlockingQueue阻塞队列中。
  • 线程执行完(1)中任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。

使用场景:适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。

CachedThreadPool(弹性缓存线程池)

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

核心线程数为0,总线程数量阈值为Integer.MAX_VALUE,即可以创建无限的非核心线程

执行流程

  • 先执行SynchronousQueueoffer方法提交任务,并查询线程池中是否有空闲线程来执行SynchronousQueuepoll方法来移除任务。如果有,则配对成功,将任务交给这个空闲线程
  • 否则,配对失败,创建新的线程去处理任务
  • 当线程池中的线程空闲时,会执行SynchronousQueuepoll方法等待执行SynchronousQueue中新提交的任务。若等待超过60s,空闲线程就会终止

img

img

使用场景执行大量短生命周期任务。因为maximumPoolSize是无界的,所以提交任务的速度 > 线程池中线程处理任务的速度就要不断创建新线程;每次提交任务,都会立即有线程去处理,因此CachedThreadPool适用于处理大量、耗时少的任务。

ScheduledThreadPoolExecutor(定时器线程池)

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

线程总数阈值为Integer.MAX_VALUE,工作队列使用DelayedWorkQueue,非核心线程存活时间为0,所以线程池仅仅包含固定数目的核心线程。

两种方式提交任务:

scheduleAtFixedRate: 按照固定速率周期执行

scheduleWithFixedDelay:上个任务延迟固定时间后执行

使用无界队列的线程池会导致内存飙升吗?

会的,newFixedThreadPool使用了无界的阻塞队列LinkedBlockingQueue,如果线程获取一个任务后,任务的执行时间比较长,会导致队列的任务越积越多,导致机器内存使用不停飙升, 最终导致OOM。

注意:要用ScheduledExecutorService去创建ScheduledThreadpool,如果用Executor去引用,就只能调用Executor接口中定义的方法;如果用ExecutorService接口去引用,就只能调用ExecutorService接口中定义的方法,无法使用ScheduledExecutorService接口中新增的方法,那么也就失去了这种线程池的意义

线程池的使用

  • 第一种方式,构建一个线程池

    ExecutorService threadPool = Executors.newFixedThreadPool(10);

  • 第二种方式,使用ThreadPoolExecutor构建一个线程池

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class test {
        public static void main(String args[]) {
            ExecutorService executorService = new ThreadPoolExecutor(5,10,
                    10,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(5));
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("开始执行线程池中的任务");
                }
            });
        }
    }
    

    如果只是简单的想要改变线程名称的前缀的话可以自定义ThreadFactory来实现,在Executors.new…中有一个ThreadFactory的参数,如果没有指定则用的是DefaultThreadFactory。

  • 第三种方式,使用工具来创建线程池,Apache的guava中ThreadFactoryBuilder()来创建线程池,不仅可以避免OOM问题,还可以自定义线程名称,方便出错时溯源

其他问题

为什么不建议使用Executors创建线程,而使用ThreadPoolExecutor实现类来创建线程?

Executors中FixedThreadPool使用的是LinkedBlockingQueue队列,近乎于无界,队列大小默认为Integer.MAX_VALUE,几乎可以无限制的放任务到队列中,线程池中数量是固定的,当线程池中线程数量达到corePoolSize,不会再创建新的线程,所有任务都会入队到workQueue中,线程从workQueue中获取任务,但这个队列几乎永远不会满,只要队列不满,就不会再去创建新的线程,就跟maximumPoolSize和keepAliveTime没有关系,此时,如果线程池中的线程处理任务的时间特别长,导致无法处理新的任务,队列中的任务就会不断的积压,这个过程,会导致机器的内存使用不停的飙升,极端情况下会导致JVM OOM,系统就挂了。

总结:Executors中FixedThreadPool指定使用无界队列LinkedBlockingQueue会导致内存溢出,所以,最好使用ThreadPoolExecutor自定义线程池

换一种问法:线程池中,无界队列导致的内存飙升问题,同上

线程池如何调优

(1)首先,根据不同的需求选择线程池,如果需要单线程顺序执行,使用SingleThreadExecutor,如果已知并发压力,使用FixedThreadPool,固定线程数的大小,执行时间小的任务,可以使用CachedThreadPool,创建可缓存的线程池,可以无限扩大线程池,可以灵活回收空闲线程,最多可容纳几万个线程,线程空余60s会被回收,需要后台执行周期任务的,可以使用ScheduledThreadPool,可以延时启动和定时启动线程池,

(2)如何确认线程池的最大线程数目,分CPU密集型和IO密集型,如果是CPU密集型或计算密集型,因为CPU的利用率高,核心线程数可设置为n(核数)+1,如果是IO密集型,CPU利用率不高,可多给几个线程数,来进行工作,核心线程数可设置为2n(核数)

实际场景的使用分析

线程池适合单系统的大量的异步任务处理,比如发送短信、保存日志等。

1、几个真实的场景中如何选择线程池?

​ (1)高并发、任务执行时间短,此类任务可用充分利用CPU,尽可能的减少上下文切换,线程池的线程数可用设置为CPU核数+1

​ (2)并发不高、任务执行时间长

此种类型的任务分两种情况:

① IO密集型的任务,业务长时间集中在IO操作上,因为IO操作并不占用 CPU,所以尽可能的不要让所有的CPU闲下来,可用加大线程池中的线程数目,让CPU处理更多的业务,如设置线程池的线程数为2 * CPU核数

​ ② 计算密集型的任务,业务长时间集中在计算操作上,和(1)一样,线程数可设置为CPU核数+1,减少一下线程数,以便减少线程的上下文切换

​ (3)并发高、业务执行时间长,这种类型的任务就不单单要关注线程池了,而是要从整体架构上来考虑,看能否使用中间件对任务进行拆分和解耦,部分数据做缓存处理,以及增加服务器等

2、线程池参数设置的一些分析

​ (1)几个参数:

​ tasks:每秒的任务数,假设为500~1000

​ taskcost:每个任务花费的时间,假设为0.1s

responsetime:系统允许容忍的最大响应时间,假设为1s

​ (2)做几个计算:

​ ① corePoolSize:每秒需要多少个线程处理

​ threadcount = tasks/(1/taskcount) = (500~1000)*0.1 = 50~100

​ 线程数应该设置为大于50个,根据8020原则,如果80%的每秒任务数 小于800,那么corePoolSize设置为80即可

​ ② queueCapacity = (coreSizePool/taskcost)responsetime = 80/0.11 = 80

​ ③ 注意阻塞队列的大小,LinkedBlockingQueue的大小为Integer.MAX_VALUE,接近于无界,会导致内存溢出,因为当任务徒增 时,都会进入队列中,不能开新的线程来执行

​ ④ maxPoolSize = (max(tasks) - queueCapacity)/(1/taskcount)=(最大 任务数-队列容量)/每个线程每秒处理能力 = 最大线程数,计算可得,最大线程数maxPoolSize = (1000-80)/10 = 92

​ ⑤ rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理

​ ⑥ keepAliveTime和allowCoreThreadTimeout:采用默认通常能满足

3、几个具体场景的分析(8核CPU为例)

​ (1) 任务数多但资源占用不大,电商平台的消息推送或短信通知,该场景需要被处理的消息对象内容简单占用资源非常少,通常为百字节量级,但在高并发访问下,可能瞬间产生大量的任务数,而此类任务的处理通常效率非常高,因此处理的重点在于控制并发线程数,不要以为大量的线程启用及线程的上下文频繁切换而导致内存使用率过高,CPU的内核态使用率过高等不良情况发生,通常可以在创建线程池时设置较长的任务队列,并以CPU内核数2-4倍(经验值)的值设置核心线程与扩展线程数,合理固定的线程数使得CPU的使用率更加平滑,如:

BlockingQueue queue = new ArrayBlockingQueue<>(4096);
ThreadPoolExecutor executor = newThreadPoolExecutor(16, 16, 0, TimeUnit.SECONDS, queue);

​ (2) 任务数不多但资源占用大,非社交流媒体的使用场景下,该情况多发生于文件流、长文本对象或批量数据加工的处理,如日志收集、图片流压缩或批量订单处理等场景,而此类场景下的单个资源处理,往往会发生较大的资源消耗,因此为使系统达到较强处理能力,同时又可以控制任务资源对内存过大的使用,通常可以在创建线程池时适当加大扩展线程数量,同时设置相对较小的任务队列长度,如此,当遇到任务数突增的情况,可以有更多的并发线程来应对,此外需要合理设置扩展线程空闲回收的等待时长以节省不必要的开销,如:

BlockingQueue queue = new ArrayBlockingQueue<>(512);
ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 64, 30, TimeUnit.SECONDS, queue);

​ (3) 极端场景的情况,如遇到任务资源较大,任务数较多,同时处理效率不高的场景,首先需要考虑任务的产生发起需要限流,理论上讲为保障系统的可用性及稳定运行,任务的发起能力应当略小于任务的处理能力,其次,对于类似场景可以采用以时间换取空间的思想,充分利用系统计算资源,当遇到任务处理能力不足的情况,任务发起方的作业将被阻塞,从而充分保护系统的资源开销边界,但可能会导致CPU核心态的使用率高,如:

BlockingQueue queue = new SynchronousQueue<>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(64, 64, 0, TimeUnit.SECONDS, queue);

CPU密集型、IO密集型

CPU密集型(CPU-bound)

CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。

在多重程序系统中,大部份时间用来做计算、逻辑判断等CPU动作的程序称之CPU bound。例如一个计算圆周率至小数点一千位以下的程序,在执行的过程当中绝大部份时间用在三角函数和开根号的计算,便是属于CPU bound的程序。

CPU bound的程序一般而言CPU占用率相当高。这可能是因为任务本身不太需要访问I/O设备,也可能是因为程序是多线程实现因此屏蔽掉了等待I/O的时间。

IO密集型(I/O bound)

IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。

I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力。

CPU密集型 vs IO密集型

我们可以把任务分为计算密集型和IO密集型。

计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。

第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。

IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。

总之,计算密集型程序适合C语言多线程,I/O密集型适合脚本语言开发的多线程。