一、Callable vs Runnable
在前述关于线程创建的部分(线程基础),提到过Callable 和 Runnable,本质上来讲,两者都是定义线程执行内容的方法(这里不应该说是创建线程的方法,实际创建线程最终都是要基于Thread创建),同时Callable底层实现有依赖于Runnable。
1. Runnable的缺陷
相比于Callable, Runnable存在一些缺陷,具体如下:
a. 不能返回一个返回值
Runnable不能返回一个返回值,虽然可以可以利用其他的一些办法,比如在 Runnable 方法中写入日志文件或者修改某个共享的对象的办法,来达到保存线程执行结果的目的,但这属于"曲线救国"的做法。
b. 不能抛出checked Exception
对于Runnable而言,其内部的run方法既不能通在过方法签名上声明 throws 一个异常,也不能在方法内部 throw 一个 checked Exception。只能用 try catch 包裹起来。
public class RunThrowException {
/**
* 普通方法内可以 throw 异常,并在方法签名上声明 throws
*/
public void normalMethod() throws Exception {
throw new IOException();
}
Runnable runnable = new Runnable() {
/**
* run方法上无法声明 throws 异常,且run方法内无法 throw 出 checked Exception,除非使用try catch进行处理
*/
@Override
public void run() {
try {
throw new IOException();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
之所以有这个限制,是因为Runnable 接口中对run方法的定义就是返回类型为 void,且没有声明抛出任何异常。当实现并重写这个方法时,既不能改返回值类型,也不能更改对于异常抛出的描述,因为在实现方法的时候,语法规定是不允许对这些内容进行修改的。
public interface Runnable {
public abstract void run();
}
c. Callable的改进
查看一下Callable的源码:
public interface Callable<V> {
V call() throws Exception;
}
可以看到,call方法已经声明了 throws Exception,前面还有一个 V 泛型的返回值。实现 Callable 接口,就要实现 call 方法,这个方法的返回值是泛型 V,如果把 call 中计算得到的结果放到这个对象中,就可以利用 call 方法的返回值来获得子线程的执行结果了。
2. Callable 和 Runnable 的对比
- 方法名:Callable 规定的执行方法是 call(),而 Runnable 规定的执行方法是 run()
- 返回值:Callable 的任务执行后有返回值,而 Runnable 的任务执行后是没有返回值的
- 异常:call() 方法可抛出异常,而 run() 方法是不能抛出受检查异常的
- 功能:和 Callable 配合的有一个 Future 类,通过 Future 可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是 Runnable 做不到的,Callable 的功能要比 Runnable 强大。
二、Future
1. Future的用法
Callable相比于Runnable最明显的一个特征,就是Callable有返回结果。那么,这个返回结果怎么拿到呢?这就要借助Future类了。
Future 相当于一个存储器,它存储了 Callable 的 call 方法的任务结果。除此之外,Future还有很多功能。以下是Future的内部方法
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutExceptio
}
a. get() 方法:获取结果
get方法获取结果时,根据Callable 任务的状态,会有以下5种情况:
- 当执行 get 的时候,任务已经执行完毕了,直接返回执行结果
- 当任务还没有结果,比如线程池中任务积压过多,当前任务还没开始;或者任务正在执行中,比如任务执行耗时过长。此时get方法会把当前的线程阻塞,直到任务完成再把结果返回回来
- 当任务执行过程中抛出异常,get方法会就会抛出 ExecutionException 异常,而且不管执行 call 方法时里面抛出的异常类型是什么,在执行 get 方法时所获得的异常都是 ExecutionException。
- 当任务被取消,get 方法会抛出 CancellationException
- 当get 方法指定了延迟参数,如果 call 方法在规定时间内正常顺利完成了任务,那么 get 会正常返回;但是如果到达了指定时间依然没有完成任务,get 方法则会抛出 TimeoutException,代表超时。
以下是get的使用方法
/**
* 描述: 演示一个 Future 的使用方法
*/
public class OneFuture {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(10);
Future<Integer> future = service.submit(new CallableTask());
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
service.shutdown();
}
static class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(3000);
return new Random().nextInt();
}
}
}
b. isDone() 方法:判断是否执行完毕
这个方法如果返回 true 则代表执行完成了;如果返回 false 则代表还没完成。但是,返回true并不代表任务是成功执行的,比如说任务执行到一半抛出了异常,isDone方法也会任务任务执行完毕。
以下为使用示例:
public class GetException {
public static void main(String[] args) {
ExecutorService service = Executors.newFixedThreadPool(20);
Future<Integer> future = service.submit(new CallableTask());
try {
// 慢慢打印出 0 ~ 4 这 5 个数字,起到延迟作用
for (int i = 0; i < 5; i++) {
System.out.println(i);
Thread.sleep(500);
}
System.out.println(future.isDone());
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
static class CallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
throw new IllegalArgumentException("Callable抛出异常");
}
}
}
执行结果如下:
0
1
2
3
4
true
java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Callable抛出异常
...
可以看到:1. 即便任务抛出异常,isDone 方法依然会返回 true;2. 虽然call方法抛出的异常是 IllegalArgumentException,但是对于 get 而言,它抛出的异常依然是 ExecutionException;3. 虽然在任务执行一开始时就抛出了异常,但是真正要等到执行 get 的时候,才看到异常。
c. cancel() 方法:取消任务的执行
cancel方法执行时,受到Callable 任务的状态,会有三种情况:
- 当任务还没有开始执行时,一旦调用 cancel,这个任务就会被正常取消,未来也不会被执行,那么 cancel 方法返回 true
- 当任务已经完成,或者之前已经被取消过了,那么执行 cancel 方法则代表取消失败,返回 false
- 当任务正在执行,cancel 方法会根据传入的mayInterruptIfRunning参数做判断。如果传入的参数是 true,执行任务的线程就会收到一个中断的信号,正在执行的任务可能会有一些处理中断的逻辑,进而停止;如果传入的是 false 则就代表不中断正在运行的任务,也就是说,本次 cancel 不会有任何效果,同时 cancel 方法会返回 false。
对于mayInterruptIfRunning参数,什么时候传入true/false呢?
- 如果明确知道这个任务能够处理中断,传入true;否则传入false
- 如果这个任务一旦开始运行,就希望它完全的执行完毕。在这种情况下,也应该传入 false
d. isCancelled() 方法:判断是否被取消
该方法用于判断线程是否被取消,和 cancel 方法配合使用
2. FutureTask 创建 Future
除了将Callable的实现类放入线程池会返回Future之外,还可以用 FutureTask 来获取 Future 类和任务的结果。
FutureTask实现了RunnableFuture,RunnableFuture又继承了Runnable 和 Future,具体关系如下。因此FutureTask 既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。
典型的使用方法是,把 Callable 实例当作 FutureTask 构造函数的参数,生成 FutureTask 的对象,然后把这个对象当作一个 Runnable 对象,放到线程池中或另起线程去执行,最后还可以通过 FutureTask 获取任务执行的结果
/**
* 描述: 演示 FutureTask 的用法
*/
public class FutureTaskDemo {
public static void main(String[] args) {
Task task = new Task();
FutureTask<Integer> integerFutureTask = new FutureTask<>(task);
new Thread(integerFutureTask).start();
try {
System.out.println("task运行结果:"+integerFutureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
class Task implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("子线程正在计算");
int sum = 0;
for (int i = 0; i < 100; i++) {
sum += i;
}
return sum;
}
}
3. Future的注意事项
a. for循环获取结果,get方法需要限制时间
当for 循环批量获取 Future 的结果时容易 block,在调用 get 方法时,应该使用 timeout 来限制。
如果每个任务的执行耗时不同,不设置限制时间,需要等到所有的任务执行结束才能得到最终所有的结果,此时主线程会一直卡着,影响了程序的运行效率。
采用带超时参数的 get(long timeout, TimeUnit unit) 方法,在限定的时间内没能返回结果的话,那么便会抛出一个 TimeoutException 异常,随后就可以把这个异常捕获住,或者是再往上抛出去,这样就不会一直卡着。
b. Future 的生命周期不能后退
Future 的生命周期不能后退,一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来,也不能让一个已经完成计算的 Future 再次重新执行任务。
c. Future 不产生新的线程
Future 搭配Callable并不会产生新的线程,实际上线程只有通过Thread来创建,Callable以及Runnable只是定义线程执行的任务内容,Future这里只是获取最终执行的结果。
三、CompletableFuture
想象这样一个场景,设计一个旅游平台,为用户展示多家航空公司的航班信息,由于每个航空公司都有自己的服务器,所以分别去请求它们的服务器就可以了,比如请求国航、海航、东航等。
下面分析具体怎么做:
- 首先很容易想到,可以串行地请求各家航空公司的航班信息,但这种效率非常低下
- 那么采用并行?采用并行可以成倍提高效率,但是需要等到所有请求结果都返回,某家公司的网站特别慢,就会导致整个获取结果的速度减慢
- 那么最终方案就出来了,采用带超时限制的并行方案,给定超时时间,超过该时间如果还有些网站没能及时返回,就把这些请求给忽略掉
那么怎么实现呢?有以下几种方案
1. 线程池的实现
public class ThreadPoolDemo {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
public static void main(String[] args) throws InterruptedException {
ThreadPoolDemo threadPoolDemo = new ThreadPoolDemo();
System.out.println(threadPoolDemo.getPrices());
}
private Set<Integer> getPrices() throws InterruptedException {
Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());
threadPool.submit(new Task(123, prices));
threadPool.submit(new Task(456, prices));
threadPool.submit(new Task(789, prices));
// sleep 方法来休眠 3 秒钟,之后直接返回 prices。
Thread.sleep(3000);
return prices;
}
private class Task implements Runnable {
Integer productId;
Set<Integer> prices;
public Task(Integer productId, Set<Integer> prices) {
this.productId = productId;
this.prices = prices;
}
@Override
public void run() {
int price=0;
try {
// 随机的时间去模拟各个航空网站的响应时间
Thread.sleep((long) (Math.random() * 4000));
// 随机的价格来表示票价
price= (int) (Math.random() * 4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
prices.add(price);
}
}
}
2. CountDownLatch的实现
上述实现中,如果个航空公司响应速度都特别快,无需等待三秒,那么Thread.sleep(3000)的做法反而会增加等待时间,这里用CountDownLatch进行改进
public class CountDownLatchDemo {
ExecutorService threadPool = Executors.newFixedThreadPool(3);
public static void main(String[] args) throws InterruptedException {
CountDownLatchDemo countDownLatchDemo = new CountDownLatchDemo();
System.out.println(countDownLatchDemo.getPrices());
}
private Set<Integer> getPrices() throws InterruptedException {
Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());
// 初始化CountDownLatch 的参数为3
CountDownLatch countDownLatch = new CountDownLatch(3);
// 将 CountDownLatch传入线程
threadPool.submit(new Task(123, prices, countDownLatch));
threadPool.submit(new Task(456, prices, countDownLatch));
threadPool.submit(new Task(789, prices, countDownLatch));
// 如果三个任务都非常快速地执行完毕,wait 方法就会立刻返回,不需要傻等到 3 秒钟
// 如果来不及在 3 秒钟之内执行完毕,即时放弃等待
countDownLatch.await(3, TimeUnit.SECONDS);
return prices;
}
private class Task implements Runnable {
Integer productId;
Set<Integer> prices;
CountDownLatch countDownLatch;
public Task(Integer productId, Set<Integer> prices,
CountDownLatch countDownLatch) {
this.productId = productId;
this.prices = prices;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
int price = 0;
try {
Thread.sleep((long) (Math.random() * 4000));
price = (int) (Math.random() * 4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
prices.add(price);
// 调用 countDown 方法,相当于把计数减 1
countDownLatch.countDown();
}
}
}
3. CompletableFuture的实现
public class CompletableFutureDemo {
public static void main(String[] args)
throws Exception {
CompletableFutureDemo completableFutureDemo = new CompletableFutureDemo();
System.out.println(completableFutureDemo.getPrices());
}
private Set<Integer> getPrices() {
Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());
CompletableFuture<Void> task1 = CompletableFuture.runAsync(new Task(123, prices));
CompletableFuture<Void> task2 = CompletableFuture.runAsync(new Task(456, prices));
CompletableFuture<Void> task3 = CompletableFuture.runAsync(new Task(789, prices));
CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);
try {
allTasks.get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
} catch (ExecutionException e) {
} catch (TimeoutException e) {
}
return prices;
}
private class Task implements Runnable {
Integer productId;
Set<Integer> prices;
public Task(Integer productId, Set<Integer> prices) {
this.productId = productId;
this.prices = prices;
}
@Override
public void run() {
int price = 0;
try {
Thread.sleep((long) (Math.random() * 4000));
price = (int) (Math.random() * 4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
prices.add(price);
}
}
}
这里没有采用线程池,CompletableFuture是Future的实现类,runAsync 方法会异步的去执行任务。
每个任务分别返回一个 CompletableFuture 对象,这里分别是task 1、task 2、task 3。执行 CompletableFuture 的 allOf 方法,并且把 task 1、task 2、task 3 传入。将多个 task 汇总,然后可以根据需要去获取到传入参数的这些 task 的返回结果,或者等待它们都执行完毕等。
当每一个任务都执行完毕,get 方法就可以及时正常返回,并且往下执行return prices。当遇到超时或者线程中断等问题时,会排除相应的异常,catch之后会继续执行return prices。