Java并发系列 - Java Concurrency Utilities 之 Executor 框架

引言

现在大部分的应用都需要并发,并且这些并发应用变得越来越复杂,因此Java最初给我提供的并发手段(比如:synchronized volatile wait() notify())等越来越难以满足我们开发并发应用的需求。如果使用这些原始的并发手段,程序员需要更高的编程能力,增加开发时间,导致我们浪费很多精力在并发应用的正确性上,而不能只专注于我们自己手上的业务。

JSR 166: Concurrency Utilities 框架设计的目的就是减小程序员写并发应用程序的难度,让我们拿出更多的时间去关注我们的业务,同时也降低了开发者写并发应用的门槛。因此,我准备在这一系列文章中为大家介绍这些工具,让大家可以更轻松地去写并发程序。

java并发编程

原始并发手段的缺点

Java之所以引入新的工具来帮助我们开发者写应用程序,那么一定是先前的手段有一些缺点。下面,让我们来看看都有一些什么样的缺点。

一、原始并发手段对开发者的水平要求更高,不容易写出正确的并发程序。不恰当的使用这些手段会导致deadlock、thread starvation、race conditions,并且在多线程应用中,一旦出现错误不容易还原错误情况,从而检测和debug程序。

二、现在很多应用都会强调程序的性能问题,尤其是访问量很大的应用,因此性能是我们非常关心的一件事情。而用传统的synchronized 去协调不同线程对共享资源的访问会导致性能问题。

三、正如我在引言中所说,一些高级的特点(比如:semaphores)并没有被提供,如果我们自己去实现这样的功能,会浪费我们大量的时间,并且很有可能出现错误。因此,我相信Doug Lea写的这些功能不会出现问题,并且写的会比我们自己写的要好吧。

Executor 框架的作用

在说明Executor框架的好处之前,我们必须先说明传统手段的不好之处。请看下面这段代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class MyServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8888);
while (true) {
Socket socket = serverSocket.accept();
Runnable task = new Runnable() {
@Override
public void run() {
doSomething(socket);
}
};
new Thread(task).start();
}
}
private static void doSomething(Socket s) {
//TODO
}
}

上面的代码是我写的一个简单的服务器应用,主线程不断地等待进来的请求,当请求到达时,它会新建一个线程去处理请求执行任务。上面的代码有如下几个缺点:

  • 任务提交与任务执行紧密地耦合在一起
  • 由于主线程为每个新来的请求都会创建一个新的线程,如果有大量的请求一定会消耗掉所有的系统内存资源,最后导致应用终止。

如果想解决上面的问题,我们必须自己写一些代码才能解决上面的问题,比如可以用装有固定数量的线程池。庆幸的是在Java 5以后,Executor框架已经帮我们解决了上面的问题,因此我们可以不用自己去写代码实现了。

理解Executor框架

Executor框架是基于Executor接口的,下面是我引用官方文档对Executor接口的描述。

一个可以执行已经提交地Runnable任务的对象。这个接口提供了一种方式去解耦任务地提交和每个任务地执行机制,这个机制也包括线程地创建和调度方式。

这个接口只包含下面一个方法。

void execute(Runnable command)

你提交一个Runnable任务通过上面的方法,如果executor由于任何原因不能执行提交的任务,这个方法将抛出RejectedExecutionException. Executor本身的功能是非常有限的。比如:你不能关闭掉Executor或者判断出一个异步任务是否已经完成。同样地,你也不能取消一个正在运行的任务。出于这样地原因,Executor框架提供了一个功能更加丰富地ExecutorService接口,它继承了Executor接口。

ExecutorService接口的几个重要API

  • awaitTermination:这个方法会使程序阻塞,满足下面3个条件中的任意1个,将会打破阻塞状态:
    1. 调用shutdown方法以后,所有提交的任务执行完毕
    2. 超出指定的时间
    3. 当前线程被打断
  • isShutdown:如果executor已经关闭则返回true
  • shutdown:有序地关闭ExecutorService,它会依然去执行先前提交的任务,但是不接受提交任务。这个方法不会等待所有的任务执行完成,用awaitTermination方法可以。
  • submit(Callable task):提交一个具有返回值的任务去执行,并返回一个Future代表任务的结果
  • submit(Runnable task, T result):提交一个Runnable任务去执行,并返回一个Future代表那个任务。如果成功地执行了这个提交的任务,那么Futureget方法将返回参数指定的结果。

如果大家去看看官方文档,会发现submit方法都会返回一个Future<V>接口,它代表异步计算的结果,这个结果之所以叫做Future,是因为它通常是不能马上得到的,而是在未来的某个时间才会得到这样的结果。通过这个Future接口,可以取消一个任务、获取返回地结果、判断任务是被完成了还是取消了。

上面的submit方法中有一个参数对象为Callable<V>接口,它与Runnable接口很相似,它只提供了一个call方法去表述一个任务的执行。但它们之间有个显著的不同就是Callable<V>接口有个返回值,而Runnable接口没有。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceDemo {
private static ExecutorService service = Executors.newFixedThreadPool(2);
public static void main(String[] args) throws InterruptedException {
service.submit(new TaskOne());
service.submit(new TaskOne());
service.submit(new TaskTwo());
service.submit(new TaskThree());
service.submit(new TaskFour());
service.shutdown();
// 由于已经调用shutdown方法,那么awaitTermination方法会阻塞,直到所有提交的任务被执行完,或者超出了给定的时间
// 由于我指定的时间为5秒,而每个任务线程睡了10秒,那么它一定会超时,大家自己修改一下时间,多测试一下,看看不同的效果
service.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("end");
}
static class TaskOne implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("TaskOne");
Thread.sleep(10000); // sleep for 10 seconds
return "happy";
}
}
static class TaskTwo implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("TaskTwo");
Thread.sleep(10000);
return "new";
}
}
static class TaskThree implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("TaskThree");
Thread.sleep(10000);
return "year";
}
}
static class TaskFour implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("TaskFour");
Thread.sleep(10000);
return "2017";
}
}
}

Executors工具类

在上面的小节中,我已经介绍了ExecutorService接口有很多丰富的功能,那么我们如何获取一个ExecutorService对象呢?Executor框架给我们提供了Executors工具类,它可以做到这点。Executors给我们提供了几个工厂方法获取不同种类的executors,下面是3个具体地例子:

  1. newCachedThreadPool():对于这个线程池来说,当需要线程时,它就会创建一个线程,但是,它会重新使用先前被创建过的线程池,减少线程反复创建地开销。如果一个线程60秒内都没有被使用过,那么它会被终止并从缓存中移出。这个线程池通常会提高执行很多短命异步任务程序的性能。
  2. newSingleThreadExecutor():创建只有一个工作线程的executor来操作没有边界地队列 – 任务被加到队列中然后顺序地执行,一次只会执行一个任务。如果在关闭executor之前,这个仅有的线程在执行任务的时候被终止,那么会创建一个新的线程用来去执行后面的任务。
  3. newFixedThreadPool(int nThreads):创建一个具有固定线程数的线程池来操作没有边界地队列。至多nThreads个线程激活处理任务,多余的任务会一直等待有空闲的线程来执行它们,如果这个线程池中的任何一个线程在执行任务的时候挂了,那么这个线程池会新创建线程来替代挂了的线程,继续执行后续的任务。在这个线程池关闭之前,池中的线程会一直存在。

Executor框架还给我们提供了ScheduledExecutorService接口,这里我就不介绍了,大家去看官方API就会明白了。

用Executor框架重写上面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class MyServer {
static Executor pool = Executors.newFixedThreadPool(5);
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(8888);
while (true) {
Socket socket = serverSocket.accept();
Runnable task = new Runnable() {
@Override
public void run() {
doSomething(socket);
}
};
pool.execute(task);
}
}
private static void doSomething(Socket s) {
//TODO
}
}

如果你已经看懂了我上面介绍的Executor框架,那么上面的代码也没有什么难度,这里我就不解释了。下面我在给出一个例子,这个例子中的异步任务为读取指定URL的网页。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
public class ReadWebPage
{
public static void main(final String[] args)
{
if (args.length != 1)
{
System.err.println("usage: java ReadWebPage url");
return;
}
ExecutorService executor = Executors.newSingleThreadExecutor();
Callable<List<String>> callable;
callable = new Callable<List<String>>()
{
@Override
public List<String> call()
throws IOException, MalformedURLException
{
List<String> lines = new ArrayList<>();
URL url = new URL(args[0]);
HttpURLConnection con;
con = (HttpURLConnection) url.openConnection();
InputStreamReader isr;
isr = new InputStreamReader(con.getInputStream());
BufferedReader br;
br = new BufferedReader(isr);
String line;
while ((line = br.readLine()) != null)
lines.add(line);
return lines;
}
};
Future<List<String>> future = executor.submit(callable);
try
{
List<String> lines = future.get(5, TimeUnit.SECONDS);
for (String line: lines)
System.out.println(line);
}
catch (ExecutionException ee)
{
System.err.println("Callable through exception: "+ee.getMessage());
}
catch (InterruptedException | TimeoutException eite)
{
System.err.println("URL not responding");
}
executor.shutdown();
}
}

在上面的代码中,一旦把异步任务(Callable对象)提交到executor中,线程池中的线程就会去执行这个任务,Futureget方法会一直等待获取结果,如果5秒钟之内还没有获取到异步任务的结果,就会抛出异常。不管是否出现异常,在应用退出之前,executor必须被关闭。如果executor没有被关闭,应用将不会退出,因为non-daemon线程池中的线程仍然在执行。

参考资料

Java concurrency without the pain

shutdown and awaitTermination which first call have any difference