• java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

  • java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

【第二部分】结构化并发程序

  • executor框架
  • 执行策略
  • 线程池
  • executor的生命周期
    • shutdown
    • shutdownnow
  • 延迟任务与周期任务
  • callable与future
    • callable
    • future
    • completion-service
  • 为任务设置实现
  • 取消与关闭线程
    • 已请求取消(cancellation-request)
    • 中断(interrupt)
      • 中断策略
      • 中断响应
      • future
    • 线程异常的处理
      • 主动解决未处理的异常
      • uncaught-exception-handler
    • jvm关闭
      • shutdown-hook
  • 线程池
    • 任务与执行策略的耦合
    • 线程的饥饿死锁
    • 运行时间长的任务
    • 线程池核心参数
    • 线程工厂

executor框架

当围绕”任务执行”来设计应用程序时,第一步要找出任务的边界.

Executor虽然是个简单接口,但却是灵活且强大的异步框架的基础.Executor的实现还提供了对生命周期的支持,统计信息收集,应用程序管理机制和性能机制等.

通过使用executor,可以实现调优、管理、监视、记录日志、错误报告和其他功能.

1
2
3
4
// executor接口
public interface Executor {
void execute(Runnable command);
}

执行策略

  • 在什么(what)线程中执行任务?
  • 任务按照什么(what)顺序执行(FIFO,LIFO,优先级)?
  • 有多少个(how many)任务?
  • 在队列中有多少任务(how many)任务等待?
  • 如果系统由于过载而需要拒绝,那么应该选择哪一个(which)任务?另外如何(how)通知应用程序有任务被拒绝
  • 在执行一个任务之前,之后,应该进行哪些(what)操作?

线程池

线程池优点:

  1. 在处理多个请求时,分摊在线程创建和销毁过程中产生的开销.
  2. 当请求到达时,工作线程通常已存在.
  3. 适当调整线程池大小,可以发挥多核优势,同时防止过多线程相互竞争资源导致oom.

Executors的静态工厂方法提供的线程池:

  • newFixedThreadPool: 固定长度的线程池,每提交一个任务就创建一个线程,直到达到线程池最大数量.若某个线程由于exception而结束,那么线程池会补充一个新的.
  • newCachedThreadPool: 可缓存线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲线程,当需求增加时,可以添加新的线程,线程池规模不受限制.
  • newSingleThreadExecutor: 单线程线程池,如果这个线程由于Exception结束,那么创建两一个线程来代替.newSinleThreadExecutor能确保依照任务在队列里的顺序执行(fifo,lifo,优先级等).
  • newScheduledThreadPool: 固定长度线程池,以延时或者定时方式来执行.

newFixedThreadPool和newCachedThreadPool这两个工厂返回通用的ThreadPoolExecutor实例,这些实例可以直接用来构造专门用途的Executor.

executor的生命周期

ExecutorService扩展了Executor接口,添加了一些生命周期管理方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface ExecutorService extends Executor {
boolean isShutdown();
boolean isTerminated();
void shutdown();

/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. For example, typical
* implementations will cancel via {@link Thread#interrupt}, so any
* task that fails to respond to interrupts may never terminate.
*/
List<Runnable> shutdownNow();
}

ExecutorService生命周期有三种状态:运行,关闭,已终止.

shutdown

平缓关闭,不再接受新任务,等待已提交的任务执行完成(包括队列中的),

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate
* threads that the caller is not permitted to modify
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")},
* or the security manager's {@code checkAccess} method
* denies access.
*/
void shutdown();

shutdownnow

尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务.

延迟任务与周期任务

Timer类负责管理延迟热为奴,然而timer存在缺陷(支持基于绝对时间的调度机制,对系统时钟变化敏感).
timer的缺陷:

  1. 对系统时钟敏感
  2. 执行所有任务只会创建一个线程,若某个任务执行时间过长,则会影响其他timerTask精确性,
  3. 会发生线程泄露,若timerTask发生异常,而导致线程终止,timer不会恢复线程的执行,二十错误地认为整个timer都被取消.

若构建自己的调度服务,那么可以使用DelayQueue,它实现了BlockingQueue,并为ScheduledThreadpoolExecutor提供了调度功能,DelayQueue管理一组Delayed对象,每个Delayed对象,都有一个相应的延迟时间,在DelayQueue中,只有某个元素逾期后,才能从DelayQueue中执行take操作.从DelayQueue中返回的对象将根据它们的延迟时间进行排序.

callable与future

callable

Callable,适用于存在延迟的计算(数据库查询,网络上获取资源,计算某个复杂的功能等).
Callale同样可以表示无返回值的任务:Callable<Void>

Executor执行任务通常有4个周期:创建,提交,开始和完成.在Executor中,已提交未开始的任务可以取消.执行中的任务,只有它们能响应中断时,才能取消.取消一个已经完成的任务不会有任何影响.

future

future表示一个任务的生命周期,并提供了相应的方法来表示其是否已经完成或者取消,以及获取任务的结果和取消任务等.
在future规范中,包含的隐含含义是:任务的生命周期只能向前,不能后退,就像ExecutorService的生命周期一样.当某个任务完成后,它就永远停留在”完成状态”.

get方法:如果任务已经完成,直接返回执行结果,执行中,阻塞至任务完成,如果抛出了异常,get会将该异常包装成ExecutionException并重新抛出,如果任务被取消,抛出CancellationException.
如果抛出了ExecutionException,可以通过getCause来获得被封装的初始异常.

completion-service

如果像Executor提交了一组计算事务,并且希望在计算完成后获得结果,简单粗暴的办法就是保留每个任务关联的future,然后轮询future.get().可以用CompletionService替代.

CompletionService将Executor与BlockingQueue的功能融合在一起.其功能可以视为持有一组计算的句柄.

ExecutorCompletionService的实现:在构造函数中创建一个blockingQueue保存计算完成的结果,计算完成时调用FutureTask的done方法(实际为QueueingFuture,修改了done方法,将其添加到队列里).

为任务设置实现

exec.invokeAll()

取消与关闭线程

java没有提供安全、快速、可靠的停止线程方法,它提供了interruption,一种协作机制,能够使一个线程终止另一个线测的当前工作

已请求取消(cancellation-request)

通过设置标记,来判断线程是否执行.使用volatile保护标记.

1
2
3
4
5
6
7
8
9
10
11
12
13
public class PrimeGenerator implement Runnable {
private volatile Boolean cancel = false;

public void run(){
while(!cancel){
dth...
}
}

public void cancel(){
cancel = true;
}
}

中断(interrupt)

请求取消需要检查标志位,然而,当调用了一个阻塞方法,可能很长一段时间内,该线程都不会检查标志位.

java的语义中,并没有将中断与取消关联起来,但是在任何其他场景使用中断,都是不合适的.

阻塞库方法:Thread.sleep, Object.wait等方法都会检测线程的中断,在发现时及时返回.

在响应中断时执行的操作: 清除中断状态,抛出InterruptedException,表示阻塞操作由于中断而提前结束.

jvm并不能保证阻塞方法中中断的检测速度,但是实际中响应速度还是很快的.

调用interrupt并不意味着线程立即停止目标线程正在进行的工作,而只是传递了中断的请求,线程在下一个检查点中断自己.

中断策略

所谓中断策略即指的是,当收到中断请求时,应该做哪些操作,以多快的响应来中断。任务一般不会在自己的线程中执行,而是在其他某个服务(比如:其他线程或者线程池)中执行.对于非线程所有者(其他线程或者线程池),应该保存并传递中断状态,使得真正拥有线程得代码对中断做出响应.
如:多数可阻塞库代码都是抛出InterruptedException作为中断响应。

合理的中断策略: 线程级的取消操作或服务级的取消操作
尽快退出,必要时进行清里,通知线程所有者该线程已退出.

中断响应

只有实现了线程中断策略的代码才可以忽略中断请求,在常规的任务和库代码中都不应该屏蔽中断请求.

通常,可中断的方法会在阻塞或进行重要的工作前首先检查中断,从而尽快响应中断.

在中断线程之前,要先了解其中断策略,要谨慎。

future

ExecutorService.submit方法返回一个future来描述任务,其拥有一个cancle方法,该方法带一个boolean类型的参数mayIntteruptIfRunning,返回一个boolean参数.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when {@code cancel} is called,
* this task should never run. If the task has already started,
* then the {@code mayInterruptIfRunning} parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return {@code true}. Subsequent calls to {@link #isCancelled}
* will always return {@code true} if this method returned {@code true}.
*
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return {@code false} if the task could not be cancelled,
* typically because it has already completed normally;
* {@code true} otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);

线程异常的处理

这块重点是处理的思路

主动解决未处理的异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 线程池工作者线程结构

public void run() {
Throwable thrown = null;
try {
while(!isInterrupted()){
runTask(getTaskFromWorkQueue());
}
} catch (Throwable t){
thrown = t;
} finally {
threadExited(this, thrown);
}
}
uncaught-exception-handler

继承Thread.UncaughtExceptionHandler接口.

只有通过execute提交的异常交给处理器,submit提交的会将异常交给Future.get

jvm关闭

shutdown-hook

在jvm正常关闭时,jvm会调用所有shutdownHook即通过Runtime.addSutdownHook注册,但是尚未开始的线程.
jvm正常退出时,不会停止或者中断任何运行的线程.

关闭钩子可以用于实现服务或者应用的清理工作,如删除临时文件,或者清除无法由操作系统自动清除的资源.

线程池

任务与执行策略的耦合

  1. 依赖性任务
  2. 使用线程封闭机制的任务
  3. 对响应时间敏感的任务
  4. 使用ThreadLocal的任务

只有当任务是相同类型且相互独立,线程池的性能才能达到最佳。

线程的饥饿死锁

在线程池中,如果任务依赖其他任务,可能造成死锁.

在单线程的Executor中,如果一个任务将另一个任务提交到同一个Executor,并且等待这个被提交任务的结果,那么通常引发死锁.(a提交b,等待b的执行结果,而b在队列中等待a执行完毕.)

在更大的线程池中,如果所有正在执行任务的线程都由于等待其他仍处于工作队列中的任务而阻塞,那么会发生同样的问题,这就是线程饥饿死锁.

  • 每当提交了一个有依赖性的Executor任务时,要清楚地知道可能会出现线程”饥饿”死锁,因此需要在代码或配置Executor的配置文件中记录线程池的大小限制或者配置限制.

运行时间长的任务

如果任务阻塞时间过长,那么即使不出现死锁,线程池的响应性也会糟糕.

解决方案:限定任务等待资源的时间,不要无限制的等待.

在平台类库的可阻塞方法中,同事定义了限时版本,和不限时版.

线程池核心参数

coreSize 基本线程数
maxSize 阻塞队列满时,若max>core,创建新线程
keepAliveTime 多余线程存活时间
timeUnit
queue 任务队列
rejectdExecutingHandler 拒绝策略

线程工厂

故名思意,定制线程.
设置线程名

一、并发编程的挑战
【第一部分】并发基础
  1. 1. executor框架
  2. 2. 执行策略
  3. 3. 线程池
  4. 4. executor的生命周期
    1. 4.1. shutdown
    2. 4.2. shutdownnow
  5. 5. 延迟任务与周期任务
  6. 6. callable与future
    1. 6.1. callable
    2. 6.2. future
    3. 6.3. completion-service
  7. 7. 为任务设置实现
  8. 8. 取消与关闭线程
    1. 8.1. 已请求取消(cancellation-request)
    2. 8.2. 中断(interrupt)
      1. 8.2.1. 中断策略
      2. 8.2.2. 中断响应
      3. 8.2.3. future
    3. 8.3. 线程异常的处理
      1. 8.3.1. 主动解决未处理的异常
      2. 8.3.2. uncaught-exception-handler
    4. 8.4. jvm关闭
      1. 8.4.1. shutdown-hook
  9. 9. 线程池
    1. 9.1. 任务与执行策略的耦合
    2. 9.2. 线程的饥饿死锁
    3. 9.3. 运行时间长的任务
    4. 9.4. 线程池核心参数
    5. 9.5. 线程工厂
© 2023 haoxp
Hexo theme