Java并发编程学习九:Future

一、Callable vs Runnable

在前述关于线程创建的部分(线程基础),提到过Callable 和 Runnable,本质上来讲,两者都是定义线程执行内容的方法(这里不应该说是创建线程的方法,实际创建线程最终都是要基于Thread创建),同时Callable底层实现有依赖于Runnable。

1. Runnable的缺陷

相比于Callable, Runnable存在一些缺陷,具体如下:

a. 不能返回一个返回值

Runnable不能返回一个返回值,虽然可以可以利用其他的一些办法,比如在 Runnable 方法中写入日志文件或者修改某个共享的对象的办法,来达到保存线程执行结果的目的,但这属于"曲线救国"的做法。

b. 不能抛出checked Exception

对于Runnable而言,其内部的run方法既不能通在过方法签名上声明 throws 一个异常,也不能在方法内部 throw 一个 checked Exception。只能用 try catch 包裹起来。

public class RunThrowException {

   /**
    * 普通方法内可以 throw 异常,并在方法签名上声明 throws
    */
   public void normalMethod() throws Exception {

       throw new IOException();
   }

   Runnable runnable = new Runnable() {

       /**
        *  run方法上无法声明 throws 异常,且run方法内无法 throw 出 checked Exception,除非使用try catch进行处理
        */
       @Override
       public void run() {

           try {

               throw new IOException();
           } catch (IOException e) {

               e.printStackTrace();
           }
       }
   }
}

之所以有这个限制,是因为Runnable 接口中对run方法的定义就是返回类型为 void,且没有声明抛出任何异常。当实现并重写这个方法时,既不能改返回值类型,也不能更改对于异常抛出的描述,因为在实现方法的时候,语法规定是不允许对这些内容进行修改的。

public interface Runnable {

   public abstract void run();
}

c. Callable的改进

查看一下Callable的源码:

public interface Callable<V> {

     V call() throws Exception;
}

可以看到,call方法已经声明了 throws Exception,前面还有一个 V 泛型的返回值。实现 Callable 接口,就要实现 call 方法,这个方法的返回值是泛型 V,如果把 call 中计算得到的结果放到这个对象中,就可以利用 call 方法的返回值来获得子线程的执行结果了。

2. Callable 和 Runnable 的对比
  • 方法名:Callable 规定的执行方法是 call(),而 Runnable 规定的执行方法是 run()
  • 返回值:Callable 的任务执行后有返回值,而 Runnable 的任务执行后是没有返回值的
  • 异常:call() 方法可抛出异常,而 run() 方法是不能抛出受检查异常的
  • 功能:和 Callable 配合的有一个 Future 类,通过 Future 可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是 Runnable 做不到的,Callable 的功能要比 Runnable 强大。

二、Future

1. Future的用法

Callable相比于Runnable最明显的一个特征,就是Callable有返回结果。那么,这个返回结果怎么拿到呢?这就要借助Future类了。

Future 相当于一个存储器,它存储了 Callable 的 call 方法的任务结果。除此之外,Future还有很多功能。以下是Future的内部方法

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutExceptio
}

a. get() 方法:获取结果

get方法获取结果时,根据Callable 任务的状态,会有以下5种情况:

  • 当执行 get 的时候,任务已经执行完毕了,直接返回执行结果
  • 当任务还没有结果,比如线程池中任务积压过多,当前任务还没开始;或者任务正在执行中,比如任务执行耗时过长。此时get方法会把当前的线程阻塞,直到任务完成再把结果返回回来
  • 当任务执行过程中抛出异常,get方法会就会抛出 ExecutionException 异常,而且不管执行 call 方法时里面抛出的异常类型是什么,在执行 get 方法时所获得的异常都是 ExecutionException。
  • 当任务被取消,get 方法会抛出 CancellationException
  • 当get 方法指定了延迟参数,如果 call 方法在规定时间内正常顺利完成了任务,那么 get 会正常返回;但是如果到达了指定时间依然没有完成任务,get 方法则会抛出 TimeoutException,代表超时。

以下是get的使用方法

/**
 * 描述: 演示一个 Future 的使用方法
 */
public class OneFuture {

    public static void main(String[] args) {

        ExecutorService service = Executors.newFixedThreadPool(10);
        Future<Integer> future = service.submit(new CallableTask());
        try {

            System.out.println(future.get());
        } catch (InterruptedException e) {

            e.printStackTrace();
        } catch (ExecutionException e) {

            e.printStackTrace();
        }
        service.shutdown();
    }

    static class CallableTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {

            Thread.sleep(3000);
            return new Random().nextInt();
        }
    }
}

b. isDone() 方法:判断是否执行完毕

这个方法如果返回 true 则代表执行完成了;如果返回 false 则代表还没完成。但是,返回true并不代表任务是成功执行的,比如说任务执行到一半抛出了异常,isDone方法也会任务任务执行完毕。

以下为使用示例:

public class GetException {

    public static void main(String[] args) {

        ExecutorService service = Executors.newFixedThreadPool(20);
        Future<Integer> future = service.submit(new CallableTask());
        try {

            // 慢慢打印出 0 ~ 4 这 5 个数字,起到延迟作用
            for (int i = 0; i < 5; i++) {

                System.out.println(i);
                Thread.sleep(500);
            }
            System.out.println(future.isDone());
            future.get();
        } catch (InterruptedException e) {

            e.printStackTrace();
        } catch (ExecutionException e) {

            e.printStackTrace();
        }
    }
    static class CallableTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {

            throw new IllegalArgumentException("Callable抛出异常");
        }
    }
}

执行结果如下:

0
1
2
3
4
true
java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: Callable抛出异常
...

可以看到:1. 即便任务抛出异常,isDone 方法依然会返回 true;2. 虽然call方法抛出的异常是 IllegalArgumentException,但是对于 get 而言,它抛出的异常依然是 ExecutionException;3. 虽然在任务执行一开始时就抛出了异常,但是真正要等到执行 get 的时候,才看到异常。

c. cancel() 方法:取消任务的执行

cancel方法执行时,受到Callable 任务的状态,会有三种情况:

  • 当任务还没有开始执行时,一旦调用 cancel,这个任务就会被正常取消,未来也不会被执行,那么 cancel 方法返回 true
  • 当任务已经完成,或者之前已经被取消过了,那么执行 cancel 方法则代表取消失败,返回 false
  • 当任务正在执行,cancel 方法会根据传入的mayInterruptIfRunning参数做判断。如果传入的参数是 true,执行任务的线程就会收到一个中断的信号,正在执行的任务可能会有一些处理中断的逻辑,进而停止;如果传入的是 false 则就代表不中断正在运行的任务,也就是说,本次 cancel 不会有任何效果,同时 cancel 方法会返回 false。

对于mayInterruptIfRunning参数,什么时候传入true/false呢?

  • 如果明确知道这个任务能够处理中断,传入true;否则传入false
  • 如果这个任务一旦开始运行,就希望它完全的执行完毕。在这种情况下,也应该传入 false

d. isCancelled() 方法:判断是否被取消

该方法用于判断线程是否被取消,和 cancel 方法配合使用

2. FutureTask 创建 Future

除了将Callable的实现类放入线程池会返回Future之外,还可以用 FutureTask 来获取 Future 类和任务的结果。

FutureTask实现了RunnableFuture,RunnableFuture又继承了Runnable 和 Future,具体关系如下。因此FutureTask 既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。

file

典型的使用方法是,把 Callable 实例当作 FutureTask 构造函数的参数,生成 FutureTask 的对象,然后把这个对象当作一个 Runnable 对象,放到线程池中或另起线程去执行,最后还可以通过 FutureTask 获取任务执行的结果

/**
 * 描述: 演示 FutureTask 的用法
 */
public class FutureTaskDemo {

    public static void main(String[] args) {

        Task task = new Task();
        FutureTask<Integer> integerFutureTask = new FutureTask<>(task);
        new Thread(integerFutureTask).start();

        try {

            System.out.println("task运行结果:"+integerFutureTask.get());
        } catch (InterruptedException e) {

            e.printStackTrace();
        } catch (ExecutionException e) {

            e.printStackTrace();
        }
    }
}

class Task implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {

        System.out.println("子线程正在计算");
        int sum = 0;
        for (int i = 0; i < 100; i++) {

            sum += i;
        }
        return sum;
    }
}

3. Future的注意事项

a. for循环获取结果,get方法需要限制时间

当for 循环批量获取 Future 的结果时容易 block,在调用 get 方法时,应该使用 timeout 来限制。

如果每个任务的执行耗时不同,不设置限制时间,需要等到所有的任务执行结束才能得到最终所有的结果,此时主线程会一直卡着,影响了程序的运行效率。

采用带超时参数的 get(long timeout, TimeUnit unit) 方法,在限定的时间内没能返回结果的话,那么便会抛出一个 TimeoutException 异常,随后就可以把这个异常捕获住,或者是再往上抛出去,这样就不会一直卡着。

b. Future 的生命周期不能后退

Future 的生命周期不能后退,一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来,也不能让一个已经完成计算的 Future 再次重新执行任务。

c. Future 不产生新的线程

Future 搭配Callable并不会产生新的线程,实际上线程只有通过Thread来创建,Callable以及Runnable只是定义线程执行的任务内容,Future这里只是获取最终执行的结果。

三、CompletableFuture

想象这样一个场景,设计一个旅游平台,为用户展示多家航空公司的航班信息,由于每个航空公司都有自己的服务器,所以分别去请求它们的服务器就可以了,比如请求国航、海航、东航等。

下面分析具体怎么做:

  • 首先很容易想到,可以串行地请求各家航空公司的航班信息,但这种效率非常低下
  • 那么采用并行?采用并行可以成倍提高效率,但是需要等到所有请求结果都返回,某家公司的网站特别慢,就会导致整个获取结果的速度减慢
  • 那么最终方案就出来了,采用带超时限制的并行方案,给定超时时间,超过该时间如果还有些网站没能及时返回,就把这些请求给忽略掉

那么怎么实现呢?有以下几种方案

1. 线程池的实现
public class ThreadPoolDemo {

    ExecutorService threadPool = Executors.newFixedThreadPool(3);

    public static void main(String[] args) throws InterruptedException {

        ThreadPoolDemo threadPoolDemo = new ThreadPoolDemo();
        System.out.println(threadPoolDemo.getPrices());
    }

    private Set<Integer> getPrices() throws InterruptedException {

        Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());
        threadPool.submit(new Task(123, prices));
        threadPool.submit(new Task(456, prices));
        threadPool.submit(new Task(789, prices));
        // sleep 方法来休眠 3 秒钟,之后直接返回 prices。
        Thread.sleep(3000);
        return prices;
    }

    private class Task implements Runnable {

        Integer productId;
        Set<Integer> prices;

        public Task(Integer productId, Set<Integer> prices) {

            this.productId = productId;
            this.prices = prices;
        }

        @Override
        public void run() {

            int price=0;
            try {

                // 随机的时间去模拟各个航空网站的响应时间
                Thread.sleep((long) (Math.random() * 4000));
                // 随机的价格来表示票价
                price= (int) (Math.random() * 4000);
            } catch (InterruptedException e) {

                e.printStackTrace();
            }
            prices.add(price);
        }
    }
}

2. CountDownLatch的实现

上述实现中,如果个航空公司响应速度都特别快,无需等待三秒,那么Thread.sleep(3000)的做法反而会增加等待时间,这里用CountDownLatch进行改进

public class CountDownLatchDemo {

    ExecutorService threadPool = Executors.newFixedThreadPool(3);

    public static void main(String[] args) throws InterruptedException {

        CountDownLatchDemo countDownLatchDemo = new CountDownLatchDemo();
        System.out.println(countDownLatchDemo.getPrices());
    }

    private Set<Integer> getPrices() throws InterruptedException {

        Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());
        // 初始化CountDownLatch 的参数为3
        CountDownLatch countDownLatch = new CountDownLatch(3);

        // 将 CountDownLatch传入线程
        threadPool.submit(new Task(123, prices, countDownLatch));
        threadPool.submit(new Task(456, prices, countDownLatch));
        threadPool.submit(new Task(789, prices, countDownLatch));

        // 如果三个任务都非常快速地执行完毕,wait 方法就会立刻返回,不需要傻等到 3 秒钟
        // 如果来不及在 3 秒钟之内执行完毕,即时放弃等待
        countDownLatch.await(3, TimeUnit.SECONDS);
        return prices;
    }

    private class Task implements Runnable {

        Integer productId;
        Set<Integer> prices;
        CountDownLatch countDownLatch;

        public Task(Integer productId, Set<Integer> prices,
                CountDownLatch countDownLatch) {

            this.productId = productId;
            this.prices = prices;
            this.countDownLatch = countDownLatch;
        }

        @Override
        public void run() {

            int price = 0;
            try {

                Thread.sleep((long) (Math.random() * 4000));
                price = (int) (Math.random() * 4000);
            } catch (InterruptedException e) {

                e.printStackTrace();
            }
            prices.add(price);
            // 调用 countDown 方法,相当于把计数减 1
            countDownLatch.countDown();
        }
    }
}

3. CompletableFuture的实现
public class CompletableFutureDemo {

    public static void main(String[] args)
            throws Exception {

        CompletableFutureDemo completableFutureDemo = new CompletableFutureDemo();
        System.out.println(completableFutureDemo.getPrices());
    }

    private Set<Integer> getPrices() {

        Set<Integer> prices = Collections.synchronizedSet(new HashSet<Integer>());
        CompletableFuture<Void> task1 = CompletableFuture.runAsync(new Task(123, prices));
        CompletableFuture<Void> task2 = CompletableFuture.runAsync(new Task(456, prices));
        CompletableFuture<Void> task3 = CompletableFuture.runAsync(new Task(789, prices));

        CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);
        try {

            allTasks.get(3, TimeUnit.SECONDS);
        } catch (InterruptedException e) {

        } catch (ExecutionException e) {

        } catch (TimeoutException e) {

        }
        return prices;
    }

    private class Task implements Runnable {

        Integer productId;
        Set<Integer> prices;

        public Task(Integer productId, Set<Integer> prices) {

            this.productId = productId;
            this.prices = prices;
        }

        @Override
        public void run() {

            int price = 0;
            try {

                Thread.sleep((long) (Math.random() * 4000));
                price = (int) (Math.random() * 4000);
            } catch (InterruptedException e) {

                e.printStackTrace();
            }
            prices.add(price);
        }
    }
}

这里没有采用线程池,CompletableFuture是Future的实现类,runAsync 方法会异步的去执行任务。

每个任务分别返回一个 CompletableFuture 对象,这里分别是task 1、task 2、task 3。执行 CompletableFuture 的 allOf 方法,并且把 task 1、task 2、task 3 传入。将多个 task 汇总,然后可以根据需要去获取到传入参数的这些 task 的返回结果,或者等待它们都执行完毕等。

当每一个任务都执行完毕,get 方法就可以及时正常返回,并且往下执行return prices。当遇到超时或者线程中断等问题时,会排除相应的异常,catch之后会继续执行return prices。