【重要】Java 线程池核心内容、生产环境实践

目录

  • Java线程池的核心内容详解
    • 线程池的优势
    • 什么场景下要用到线程池呢?
    • 线程池中重要的参数【掌握】
    • 新加入一个任务,线程池如何进行处理呢?【掌握】
    • 如何将任务提交到线程池中呢?
    • 线程池是如何关闭的呢?
    • 线程池为什么设计为任务队列满了才创建新线程?
    • 线程池中线程异常后,该线程会销毁吗?
  • 关于线程池在生产环境中的使用
    • 一个项目使用一个线程池还是多个线程池?
    • 线程池在 RocketMQ 中的使用
    • 关于线程数量的设置
    • 美团技术团队针对线程池所做的优化
    • 自定义拒绝策略
    • 阿里手册中的线程池规范

Java线程池的核心内容详解

线程池的优势

首先,线程池是将多个线程进行池化操作,统一进行管理,这样做有什么好处呢?

  • 降低创建、销毁线程的开销 :线程池中维护固定数量的线程,不需要临时进行线程的创建和销毁
  • 提高响应速度 :对于新提交到线程池中的任务,直接使用线程池中的空闲线程可以直接进行处理,不需要等待创建线程
  • 节省资源:可以重复利用线程

什么场景下要用到线程池呢?

一般就是多 IO 的场景下需要用到,像 IO 任务很多,比如数据库操作、请求其他接口操作,这都属于 IO 类任务,IO 类任务的特点就是只需要线程去启动一下 IO 任务,之后就等待 IO 结果返回即可,IO 结果返回的时间是比较慢的 ,因此如果只使用单线程去执行 IO 任务的话,由于这个等待时间比较长,那么线程需要一直等待 IO 结果返回,而无法执行其他操作

因此在多 IO 场景下,可以使用线程池来加快 IO 任务的执行,开启多个线程同时去启动多个 IO 任务,可以加快 IO 任务的处理速度

线程池中重要的参数【掌握】

线程池中重要的参数如下:

  • corePoolSize :核心线程数量
  • maximumPoolSize :线程池最大线程数量 = 核心线程数+非核心线程数
  • keepAliveTime :非核心线程存活时间
  • unit:空闲线程存活时间单位(keepAliveTime单位)
  • workQueue :工作队列(任务队列),存放等待执行的任务
    • LinkedBlockingQueue:无界的阻塞队列,最大长度为 Integer.MAX_VALUE
    • ArrayBlockingQueue:基于数组的有界阻塞队列,按FIFO排序
    • SynchronousQueue:同步队列,不存储元素,对于提交的任务,如果有空闲线程,则使用空闲线程来处理;否则新建一个线程来处理任务
    • PriorityBlockingQueue:具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
  • threadFactory :线程工厂,创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等。
  • handler:拒绝策略 ,有4种
    • AbortPolicy :直接抛出异常,默认策略
    • CallerRunsPolicy:用调用者所在的线程来执行任务(主线程执行)
    • DiscardOldestPolicy:丢弃阻塞队列里最老的任务,也就是队列里靠前的任务
    • DiscardPolicy :当前任务直接丢弃

新加入一个任务,线程池如何进行处理呢?【掌握】

新加入一个任务,线程池处理流程如下:

  1. 如果核心线程数量未达到,创建核心线程执行
  2. 如果当前运行线程数量已经达到核心线程数量,查看任务队列是否已满
  3. 如果任务队列未满,将任务放到任务队列
  4. 如果任务队列已满,看最大线程数是否达到,如果未达到,就新建非核心线程处理
  5. 如果当前运行线程数量未达到最大线程数,则创建非核心线程执行
  6. 如果当前运行线程数量达到最大线程数,根据拒绝策略处理

如何将任务提交到线程池中呢?

有两种方式:executesubmit

这两种方式的区别:

  • execute
    • execute 没有返回值
    • execute 无法捕获任务过程中的异常
  • submit
    • submit 会返回一个 Future 对象,用来获取任务的执行结果
    • submit 可以通过 Future 对象来捕获任务中的异常

execute 方式如下:

ExecutorService executor = Executors.newFixedThreadPool(5);
executor.execute(new Runnable() {
    public void run() {
        // 执行具体的任务逻辑
        System.out.println("Task executed using execute method");
    }
});
executor.shutdown();

submit 方式如下:

ExecutorService executor = Executors.newFixedThreadPool(5);
Future future = executor.submit(new Callable() {
    public String call() {
        // 执行具体的任务逻辑
        return "Task executed using submit method";
    }
});

try {
    String result = future.get(); // 获取任务执行结果
    System.out.println(result);
} catch (InterruptedException e) {
    // 处理中断异常catch (ExecutionException e) {
    // 处理任务执行异常finally {
  // 关闭线程池
  executor.shutdown();
}

线程池是如何关闭的呢?

通过调用线程池的 shutdown() 方法即可关闭线程池

调用之后,会设置一个标志位表示当前线程池已经关闭,会禁止向线程池中提交新的任务

去中断所有的空闲线程并且等待正在执行的任务执行完毕(通过调用线程 interrupt() 方法),当线程池中所有任务都执行完毕之后,线程池就会被完全关闭

扩展:thread.interrupt() 方法调用后线程会立即中断吗?

不会,调用 interrupt 只是将被中断线程的中断状态设置为 true,通知被中断的线程自己处理中断,而不是立即强制的让线程直接中断(强制中断不安全)

当外部调用线程进行中断的命令时,如果该线程处于被阻塞的状态,如 Thread.sleep(),Object.wait(),BlockingQueue#put,BlockingQueue#take 等等时,那么此时调用该线程的 interrupt 方法就会抛出 InterruptedException 异常

因此,可以通过这个特点来优雅的停止线程(在 《Java多线程核心技术》 一书中说到):将 sleep() 和 interrupt() 搭配使用,来停止线程

线程池为什么设计为任务队列满了才创建新线程?

这里说一下在知乎上看到的一个问题,个人觉得提问的比较好

线程池为什么设计为队列满+核心线程数满了才创建新线程?而不是队列积压一定阈值的时候创建新的线程?

当队列积压满了之后,创建非核心线程来执行任务只是一个 兜底措施

你想如果我们自己去设计一个线程池,是不是只需要一个参数来管理线程池中的线程数量就可以了,完全没必要去创建这些非核心线程执行任务

那么线程池的设计团队可不会考虑的这么简单,它们不仅会考虑性能方面,更是会保证比较高的 可用性

因为在 Java 应用中,高并发高可用 这两块都是比较重要的东西,不仅要性能好,还要不崩溃

就比如之前滴滴故障、阿里云故障、语雀故障所带来的影响都是比较大的,对公司来讲整个可信度有所下降,对于我们个人来讲,可能有些人恰巧需要紧急使用,但是由于发生故障,不得已计划延期

所以线程池为了保证 高可用 就设计了任务队列,以及在队列满了之后再去创建非核心线程处理溢出来的任务

当然任何设计都是平衡之后的选择,如果你在公司项目需求与设计者的理念不符合,可以基于原有设计做出封装,来进行定制化操作!

线程池中线程异常后,该线程会销毁吗?

向线程池中提交任务有 execute()submit() ,两种提交方式的区别如下:

  • execute 执行任务:execute 没有返回值,无法捕获任务过程中的异常

  • submit 执行任务:submit 会返回一个 Future 对象,用来获取任务的执行结果,可以通过 Future 对象来捕获任务中的异常

那么执行过程中发生异常,线程会销毁吗?

execute 无法捕捉任务过程中的异常是因为当任务在执行时遇到异常的话,如果异常在线程执行过程中没有被捕获的话,该异常就会导致线程停止执行,并且在控制台打印异常,之后该线程会终止,线程池会创建一个新线程来替换他

submit 方式执行任务的话,当执行过程中发生异常,异常会被封装在 submit() 返回的 Future 对象中,当调用 Future.get() 时,可以捕获到 ExecutionException 异常,因此使用 submit() 发生异常不会终止线程

参考:线程池中线程异常后:销毁还是复用?

关于线程池在生产环境中的使用

这里整理了一些线程池在生产环境中使用的建议来帮助我们更好的在项目中使用线程池

一个项目使用一个线程池还是多个线程池?

一般建议是不同的业务使用不同的线程池,从而避免非核心业务对于核心业务的影响

如果所有的业务使用同一个线程池,非核心业务可能执行速度很慢,从而占用了很多线程迟迟不归还,导致核心业务在任务队列中等待,拿不到线程执行

并且还可能造成 死锁问题 ,当父子任务使用同一个线程池时,父任务如果将核心线程全部占用之后,等待子任务完成,由于核心线程没有空闲的,导致子任务进入到任务队列中等待线程资源,导致父子任务之间互相等待

线程池在 RocketMQ 中的使用

在 MQ 中使用了很多线程池,这里说一下在发送消息时使用的线程池:

1、任务队列:创建了 异步发送者线程池任务队列 使用长度为 50000 的阻塞队列

2、线程数:核心线程数最大线程数 相同,为 CPU 核数

3、存活时间:非核心线程存活时间 60s

4、线程名称:重写了线程工厂,主要是 为了线程的命名规范 ,这样在查询日志时,只要做好业务之间的隔离,就可以很容易的根据线程名称来定位到对应的业务,便于分析线上问题

private final ExecutorService defaultAsyncSenderExecutor;

private final BlockingQueue asyncSenderThreadPoolQueue;

this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue(50000);

this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
 Runtime.getRuntime().availableProcessors(),
 Runtime.getRuntime().availableProcessors(),
 1000 * 60,
 TimeUnit.MILLISECONDS,
 this.asyncSenderThreadPoolQueue,
 new ThreadFactory() {
  private AtomicInteger threadIndex = new AtomicInteger(0);

  @Override
  public Thread newThread(Runnable r) {
   return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
  }
 });

那么我们在自己的项目中使用的线程池就可以参考 MQ 中的用法,更加规范的使用线程池

至于为什么要这样设置核心线程数,一方面是参考了设置核心线程数的经验(CPU 密集型的任务令线程数等于 CPU 核心数,减少了线程之间的上下文切换,速度比较快),另一方面 RocketMQ 肯定内部经过性能测试,发现这样设置性能比较好一些

关于线程数量的设置

在项目中,一般使用线程池的场景无非就两种:

  • 及时性任务 :需要迅速完成,降低用户等待时间
  • 非及时性任务 :批量完成任务,一般是后台任务

那么对于 及时性任务 来说,需要尽可能快的完成任务,因此要 尽可能增大可执行任务的线程数量 ,来尽可能快的完成任务,不要设置任务队列 ,因为只有任务队列满了之后,才会去创建非核心线程执行

对于 非及时性任务 来说,这类任务并不面向用户,特征是任务量很大,需要批量处理,不需要很低的延迟,因此需要设置合适线程数量, 利用有限的资源去尽可能快的执行任务 ,并且设置任务队列去缓冲任务,但是尽量不要使用无界的任务队列,无界队列任务堆积过多会造成 OOM

  • 这里举一个线程池在高并发电商系统中的使用案例

这里我举一个使用线程池的真实生产环境的案例:用户消息推送

对于中大型电商系统来说,用户量一般最少都达到了千万级,那么如果举办促销活动或者优惠活动了,电商系统肯定需要给用户发送通知,可能会有多个渠道发送比如短信、邮箱等等,那么肯定是需要调用第三方平台的 API 了

调用其他平台 API,毫无疑问就会产生网络 IO,并且是 千万级别的网络 IO ,如果只靠单线程去执行,那可能等推送完之后,促销活动也已经结束了

因此,对于这种 IO 任务,并且是大体量推送的 IO 任务,就必须引入线程池来优化性能了,通过多线程来进行任务的推送(当然这里还使用了 RocketMQ 来进行解耦,引入 MQ 之后,就是使用线程池来生成大量消息推送到 MQ 中,消费者再去订阅这些消息去调用第三方平台进行推送,由于该文章主要是讲线程池的,所以这里 MQ 的部分就简单说一下)

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
        0,
        permits * 2,
        60,
        TimeUnit.SECONDS,
        new SynchronousQueue(),
        NamedDaemonThreadFactory.getInstance(name)
);

这里也将线程池创建的代码给列出来,这里顺带说一下线程池核心线程的参数为什么设置为 0:

因为在消息推送这块,并不是一直要推送的,促销活动、发优惠券,在正常情况下是不会推送发送消息的,因此将核心线程数设置为 0 可以在没有推送任务的时候,将线程池中的线程都回收掉,有任务的时候,再来创建非核心线程执行任务,这样可以避免线程在没有任务时空闲,占用资源

这里注意任务队列的选用

将核心线程数设置为 0 之后,队列使用了 SynchronousQueue ,因为这个队列是不存储元素的,因此有任务来了就会创建非核心线程去执行

如果将设置了有容量的任务队列,任务进来之后会先放在队列中,并不会创建非核心线程!

美团技术团队针对线程池所做的优化

在美团内部有多次因为线程池参数设置不合理而引发故障的案例

因此可以发现在不同场景下,开发人员对参数的配置有一个大概的方向,但是具体配置多少还没有一个通用的公式 ,导致上线之后,线程池会因为 线程数设置过少 或者 任务队列设置不合理 而出现故障

因此美团技术团队设计了 动态化线程池 ,提供了对 线程池的监控 以及参数动态调整,这样在调整参数之后,通过监控可以看到整个线程池的负载情况,可以选出比较合适的参数方案

那么这里重点的优化提升就在于两点:

  • 线程池参数的动态化设置
  • 线程池监控

这里提一下在线程监控中,对线程池负载的定义

线程池的负载可以根据活跃的线程数和最大线程数的比值来反映

线程池活跃度 = activeCount/maximumPoolSize ,当活跃度升高,代表着线程池负载在逐步上升

还可以 从任务队列中等待的任务数量 或者 发生拒绝策略的次数 来反映

  • 总结一下

线程池参数的设置没有一个通用的公式,要根据实际场景出发,在设置之后,可以对线程池的性能进行测试,像对线程池进行性能测试的话,就需要对线程池做监控,来看在不同参数下线程池处理任务时的负载表现,来设置更加合理的参数

自定义拒绝策略

在线程池中可以 自己去定义拒绝策略 ,如果线程池无法处理更多的任务了,可以在自定义的拒绝策略中,将拒绝的任务 异步持久化 到磁盘中去,之后再通过一个后台线程去定时扫描这些被拒绝的任务,慢慢执行

保证严格的任务不丢失:如果线上机器突然宕机,线程池的阻塞队列中的请求怎么办?

如果宕机,重启之后,线程池阻塞队列中的任务就会全部丢失

如果想要解决这种情况的话,有这么一个 解决方案 :在将任务提交到线程池中去的时候,先把任务在数据库中存储一份,并记录任务执行的状态:未提交、已提交、已完成,执行完之后的话,将任务状态标记为 已完成,如果宕机后,导致任务丢失,就可以去数据库中扫描任务,重新提交给线程池执行

阿里手册中的线程池规范

在使用线程池的时候,需要注意一些规范,以免出现不必要的问题,可以参考阿里巴巴 Java 开发手册,如下:

线程池名称命名规范:

【重要】Java 线程池核心内容、生产环境实践
1707126835124

线程池创建规范:

【重要】Java 线程池核心内容、生产环境实践

 

本文摘自网络。

原创文章,作者:北大青鸟,如若转载,请注明出处:http://news.yy-accp.com/archives/18662