玩转Java线程池:
一、Executor的结构
在Java中线程是一个工作单元,也是执行机制。JDK1.5的JUC的出现,工作单元就和工作机制分离。创建线程的两个接口Runnable和Callable,执行则是有Executor来完成。如下图所示:
Java中的线程启动是本地操作系统也会创建一个线程与之对应,当Java线程终止时,这个本地操作系统的线程也会被系统回收。学过操作系统的应该知道,线程是需要CPU时间来完成自己的任务的。
上面的Executor就是一个用户级别的调度器,负责调度java线程,将用户提交的任务与Java线程结合。底层的操作系统则调度这些线程分配CPU。
Executor类的源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public interface Executor {
/**
* 在将来的某个时间执行给定的命令。
* 根据Executor实现的判断,该命令可以在新线程、池线程或调用线程中执行。
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
|
Executor类中只有一个execute方法,参数是传入一个实现了Runnable接口的类,这是很基本的一个用法。Executor类中包含的信息的不多,主要还是要看其关键的实现类之一ExecutorService接口,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
|
整个Executor框架体系中,最关键的两个接口就是Executor和ExecutorService。这两个接口定义了标准,同时支持JDK1.5之前的Runnable和JDK1.5开始的Callable,支持异步计算并返回结果。主要的类图如下:
Executor是最顶层的接口,它制定了任务提交和任务的执行分离的标准。ThreadPoolExecutor是线程池的核心实现类,用来执行任务的。ScheduledExecutorService是执行定时任务的接口,其实现类ScheduledThreadPoolExecutor可以在给定的延迟时间后执行任务,或定期执行任务,比Timer更加的强大和灵活。
关于Java线程的类图:
对于Runnable估计不用多说了实现该接口创建线程的方法之一。Callable是JDK1.5后出现的,属于java.util.concurrent包下的, 作用是跟Runnable相似,但是Callable接口是有返回值并且会抛出异常。Future计算结果的接口,定义了一些对结果进行操作的方法,所以说FutureTask就是操作返回结果的,使用的Callable时,线程池就会返回FutureTask。Runnable和Callable的实现类都可以提交到Executor的实现类ThreadPoolExecutor或ScheduledExecutorService中运行。
它们之间的调用依赖关系:
二、JDK中Executors提供的线程池
Executors中定义的Executor 、 ExecutorService 、 ScheduledExecutorService 、 ThreadFactory和Callable类的工厂和实用方法。 此类支持以下类型的方法:
- 创建和返回ExecutorService设置了常用的配置设置。
- 使用常用配置设置创建和返回ScheduledExecutorService方法。
- 创建和返回“包装的”ExecutorService 的方法,通过使特定于实现的方法不可访问来禁用重新配置。
- 创建并返回将新创建的线程设置为已知状态的ThreadFactory方法。
- 从其他类似闭包的形式创建和返回Callable方法,因此它们可以用于需要Callable执行方法中。
1、FixedThreadPool
创建固定线程数的线程池,corePoolSize等于maximumPoolSize。适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。
1
2
3
4
5
6
7
8
9
10
11
|
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
|
2、SingleThreadExecutor
创建只有一个线程的线程池,corePoolSize和maximumPoolSize都等于1。适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
|
3、CachedThreadPool
CachedThreadPool是大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。
1
2
3
4
5
6
7
8
9
10
11
|
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
|