Java并发编程学习一:线程基础

一、线程的创建

线程是并发编程中基础的基础,只有先了解如何创建线程,才能进一步学习并发相关的知识。

常见的实现线程的方法有以下几种:

1. Runnable接口

可以通过实现Runnable接口,并重写run()方法,之后将实现Runnable接口的类传入Thread中即可创建线程。

public class RunnableThread implements Runnable {

    @Override
    public void run() {

        System.out.println('用实现Runnable接口实现线程');
    }
}

2. Thread类

继承Thread类,重写其中的run()方法创建线程:

public class ExtendsThread extends Thread {

    @Override
    public void run() {

        System.out.println('用Thread类实现线程');
    }
}

3. 线程池

通过线程池,可以指定线程数量,由线程池创建线程。

对于线程池而言,本质上是通过线程工厂创建线程的,默认采用 DefaultThreadFactory ,它会给线程池创建的线程设置一些默认值,比如:线程的名字、是否是守护线程,以及线程的优先级等。

/** 
* 描述: 用固定线程数的线程池执行10000个任务 
*/ 
public class ThreadPoolDemo {

    public static void main(String[] args) {

        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10000; i++) {

            service.execute(new Task());
        } 
    System.out.println(Thread.currentThread().getName());
    } 

    static class Task implements Runnable {

        public void run() {

            System.out.println("Thread Name: " + Thread.currentThread().getName());
        } 
    } 
}

4. 带返回值的Callable接口

实现Callable接口,并重写call()方法,之后将实现Callable接口的类传入Thread中创建线程。

这种方式与Runnable接口的方式相类似,但是Callable接口通过Future将线程执行的结果作为返回值返回。

class CallableTask implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {

        return new Random().nextInt();
    }
}

//创建线程池
ExecutorService service = Executors.newFixedThreadPool(10);
//提交任务,并用 Future提交返回结果
Future<Integer> future = service.submit(new CallableTask());

5. 其他

a. 定时器
新建一个 Timer,令其每隔一段时间之后,执行一些任务,这时它也会创建线程并执行任务。

public class Run {

    private static Timer timer=new Timer();

    public static void main(String[] args) throws ParseException
    {

        timer.schedule(new Mytask(), TimeUtil.df.get().parse("2017-09-14 09:19:30"));
    }
}

b. 匿名内部类或 lambda 表达式

/**
 *描述:匿名内部类创建线程
 */
new Thread(new Runnable() {

    @Override
    public void run() {

        System.out.println(Thread.currentThread().getName());
    }
}).start();

/**
 *描述:lambda表达式创建线程
 */
new Thread(() -> System.out.println(Thread.currentThread().getName())).start();
}

6. 到底有几种创建线程的方法?

前面介绍了5中创建线程的方法,按理来说,创建线程的方法应该是很多的,然而实际上,创建线程的方法只有一种而已

对于线程池而言,跟进DefaultThreadFactory 的源码查看具体创建线程的实现:

static class DefaultThreadFactory implements ThreadFactory {

    DefaultThreadFactory() {

        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
            Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
            poolNumber.getAndIncrement() +
            "-thread-";
    }

    public Thread newThread(Runnable r) {

        Thread t = new Thread(group, r,
                    namePrefix + threadNumber.getAndIncrement(),
0);

        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

可以看到,线程池底层还是通过Thread类创建线程的,只是在创建时传入的参数更多一些。

对于Callable接口,其底层实现依赖FutureTask,而FutureTask实际上实现了Runnable接口,并进行了一系列封装后能够返回返回值。因此Callable接口还是离不开Runnable接口。

对于定时器Timer,其内部实现,本质上还是会有一个继承自 Thread 类的 TimerThread,因此还是通过Thread类创建线程的。

置于匿名内部类和lambda表达式,它们仅仅是一种语法实现,本质上还是基于Thread类

那么归根到底,其实只有Thread类和Runnable接口两种方式咯?其实这两种方式本质上也是一种。

线程的启动需要调用Thread.start()方法,而start()方法最终会调用run()方法:

@Override
public void run() {

    if (target != null) {

        target.run();
    }
}

这里有两个run()方法,一个是外层的run(),一个是内层的run()。

上述代码中的target其实就是一个 Runnable,实现Runnable并重写run()方法,实际上就是重写这里内层的run()的内部逻辑。

而实现Thread类,重写run()方法,实际上是重写上述代码的外层run()方法。

因此可以这样说,本质上,实现线程只有一种方式,而要想实现线程执行的内容,却有两种方式,也就是可以通过 实现 Runnable 接口的方式,或是继承 Thread 类重写 run() 方法的方式

7. Runnable还是Thread

上面提到,实现线程执行的内容其实只要Runnable 和 Thread两种方式,那么用哪种更好呢?

先说结论,实现 Runnable 接口比继承 Thread 类实现线程要好,原因如下:

第一,从代码的架构考虑,Runnable只定义了线程需要执行的内容,实现了与Thread的解耦,由Thread 类负责线程启动和属性设置等内容,权责分明

第二,使用继承 Thread 类方式,每次执行一次任务,都需要新建一个独立的线程,执行完任务后线程走到生命周期的尽头被销毁,如果还想执行这个任务,就必须再新建一个继承了 Thread 类的类。而 Runnable 接口可以直接传入线程池,使用一些固定的线程来完成任务,不需要每次新建销毁线程,大大降低了性能开销。

第三,Java 语言不支持双继承,一旦继承了 Thread 类,那么后续就没有办法再继承其他的类,这限制了代码未来的可拓展性。

二、正确停止线程

通常情况下,我们不会手动停止一个线程,而是允许线程运行到结束,然后让它自然停止。但是如果遇到特殊情况比如:用户突然关闭程序,或程序运行出错重启等,这种情况下,如何安全停止线程就很重要了。

对于Java 而言,最正确的停止线程的方式是使用 interrupt。但 interrupt 仅仅起到通知被停止线程的作用。而对于被停止的线程而言,它拥有完全的自主权,它既可以选择立即停止,也可以选择一段时间后停止,也可以选择压根不停止。

那么为什么 Java 不提供强制停止线程的能力呢?因为如果不了解对方正在做的工作,贸然强制停止线程就可能会造成一些安全的问题,为了避免造成问题就需要给对方一定的时间来整理收尾工作。比如:线程正在写入一个文件,这时收到终止信号,它就需要根据自身业务判断,是选择立即停止,还是将整个文件写入成功后停止,而如果选择立即停止就可能造成数据不完整,不管是中断命令发起者,还是接收者都不希望数据出现问题。

1. interrupt正确停止线程

一旦调用某个线程的 interrupt() 之后,这个线程的中断标记位就会被设置成 true。每个线程都有这样的标记位,当线程执行时,应该定期检查这个标记位,如果标记位被设置成 true,就说明有程序想终止该线程。

要想使用interrupt正确停止线程,应当在线程中做如下设计,在 while 循环体判断语句中,首先通过 Thread.currentThread().isInterrupt() 判断线程是否被中断,随后检查是否还有工作要做。&& 逻辑表示只有当两个判断条件同时满足的情况下,才会去执行下面的工作。

while (!Thread.currentThread().isInterrupted() && more work to do) {

    do more work
}

以下是interrupt的使用示例:

public class StopThread implements Runnable {

    @Override
    public void run() {

        int count = 0;
        while (!Thread.currentThread().isInterrupted() && count < 1000) {

            System.out.println("count = " + count++);
        }
    }

    public static void main(String[] args) throws InterruptedException {

        Thread thread = new Thread(new StopThread());
        thread.start();
        Thread.sleep(5);
        thread.interrupt();
    }
}

run() 方法中,首先判断线程是否被中断,然后判断 count 值是否小于 1000。线程中打印 0~999 的数字,每打印一个数字 count 值加 1。线程会在每次循环开始之前,检查是否被中断了。接下来在 main 函数中会启动该线程,然后休眠 5 毫秒后立刻中断线程,该线程会检测到中断信号,于是在还没打印完1000个数的时候就会停下来,这种就属于通过 interrupt 正确停止线程的情况。

2. sleep 期间能否感受到中断?

现在考虑一种特殊情况,如果线程在执行任务期间有休眠需求,也就是每打印一个数字,就进入一次 sleep ,而此时将 Thread.sleep() 的休眠时间设置为 1000 秒钟

Runnable runnable = () -> {

    int num = 0;
    try {

        while (!Thread.currentThread().isInterrupted() && 
        num <= 1000) {

            System.out.println(num);
            num++;
            Thread.sleep(1000000);
        }
    } catch (InterruptedException e) {

        e.printStackTrace();
    }
};

在主线程中,休眠 5 毫秒后,通知子线程中断,此时子线程仍在执行 sleep 语句,处于休眠中。那么就需要考虑一点,在休眠中的线程是否能够感受到中断通知呢?是否需要等到休眠结束后才能中断线程呢?如果是这样,就会带来严重的问题,因为响应中断太不及时了。

public class StopDuringSleep {

    public static void main(String[] args) throws InterruptedException {

        Runnable runnable = () -> {

            int num = 0;
            try {

                while (!Thread.currentThread().isInterrupted() && num <= 1000) {

                    System.out.println(num);
                    num++;
                    Thread.sleep(1000000);
                }
            } catch (InterruptedException e) {

                e.printStackTrace();
            }
        };
        Thread thread = new Thread(runnable);
        thread.start();
        Thread.sleep(5);
        thread.interrupt();
    }
}

Java 设计者在设计之初就考虑到以上的问题,如果 sleep、wait 等可以让线程进入阻塞的方法使线程休眠了,而处于休眠中的线程被中断,那么线程是可以感受到中断信号的,并且会抛出一个 InterruptedException 异常,同时清除中断信号,将中断标记位设置成 false。这样一来就不用担心长时间休眠中线程感受不到中断了,因为即便线程还在休眠,仍然能够响应中断通知,并抛出异常。

3. sleep / wait 时的解决方案

上面提到, sleep、wait 等方法可以使线程休眠,而处于休眠中的线程被中断,将会抛出一个 InterruptedException 异常,并清除中断信号,将中断标记位设置成 false。

在实际的开发者中,不同的人负责编写不同的方法,然后相互调用来实现整个业务的逻辑。那么如果我们负责编写的方法需要被别人调用,同时我们的方法内调用了 sleep 或者 wait 等能响应中断的方法时,要怎么处理?

a. 方法签名抛异常 / run() 强制 try/catch

如果只是捕捉到异常,没有进行任何处理逻辑,相当于把中断信号给隐藏了,这样做非常不合理。

调用方要不使用 try/catch 并在 catch 中正确处理异常,要不将异常声明到方法签名中。如果每层逻辑都遵守规范,便可以将中断信号层层传递到顶层,最终让 run() 方法可以捕获到异常。而对于 run() 方法而言,它本身没有抛出 checkedException 的能力,只能通过 try/catch 来处理异常。层层传递异常的逻辑保障了异常不会被遗漏,而对 run() 方法而言,就可以根据不同的业务逻辑来进行相应的处理。

void subTask2() throws InterruptedException {

    Thread.sleep(1000);
}

b. 再次中断
在catch 语句中再次中断线程。如代码所示,需要在 catch 语句块中调用 Thread.currentThread().interrupt() 函数。因为如果线程在休眠期间被中断,那么会自动清除中断信号。如果这时手动添加中断信号,中断信号依然可以被捕捉到。这样后续执行的方法依然可以检测到这里发生过中断,可以做出相应的处理,整个线程可以正常退出。

private void reInterrupt() {

    try {

        Thread.sleep(2000);
    } catch (InterruptedException e) {

        Thread.currentThread().interrupt();
        e.printStackTrace();
    }
}

4. volatile标记位停止线程(场景有限)

除了interrupt之外, volatile标记位也能起到停止线程的目的,但是这种方法有局限性,适用的场景有限。

a. 适用的场景

public class VolatileCanStop implements Runnable {

    private volatile boolean canceled = false;

    @Override
    public void run() {

        int num = 0;
        try {

            while (!canceled && num <= 1000000) {

                if (num % 10 == 0) {

                    System.out.println(num + "是10的倍数。");
                }
                num++;
                Thread.sleep(1);
            }
        } catch (InterruptedException e) {

            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {

        VolatileCanStop r = new VolatileCanStop();
        Thread thread = new Thread(r);
        thread.start();
        Thread.sleep(3000);
        r.canceled = true;
    }
}

声明一个叫作 VolatileStopThread 的类, 它实现了 Runnable 接口,然后在 run() 中进行 while 循环,在循环体中又进行了两层判断,首先判断 canceled 变量的值,canceled 变量是一个被 volatile 修饰的初始值为 false 的布尔值,当该值变为 true 时,while 跳出循环,while 的第二个判断条件是 num 值小于1000000(一百万),在while 循环体里,只要是 10 的倍数就打印出来,然后 num++。

首先启动线程,然后经过 3 秒钟的时间,把用 volatile 修饰的布尔值的标记位设置成 true,这样,正在运行的线程就会在下一次 while 循环中判断出 canceled 的值已经变成 true 了,这样就不再满足 while 的判断条件,跳出整个 while 循环,线程就停止了,这种情况是演示 volatile 修饰的标记位可以正常工作的情况

b. 不适用的场景

class Producer implements Runnable {

    public volatile boolean canceled = false;
    BlockingQueue storage;
    public Producer(BlockingQueue storage) {

        this.storage = storage;
    }

    @Override
    public void run() {

        int num = 0;
        try {

            while (num <= 100000 && !canceled) {

                if (num % 50 == 0) {

                    storage.put(num);
                    System.out.println(num + "是50的倍数,被放到仓库中了。");
                }
                num++;
            }
        } catch (InterruptedException e) {

            e.printStackTrace();
        } finally {

            System.out.println("生产者结束运行");
        }
    }
}

声明了一个生产者 Producer,通过 volatile 标记的初始值为 false 的布尔值 canceled 来停止线程。而在 run() 方法中,while 的判断语句是 num 是否小于 100000 及 canceled 是否被标记。while 循环体中判断 num 如果是 50 的倍数就放到 storage 仓库中,storage 是生产者与消费者之间进行通信的存储器,当 num 大于 100000 或被通知停止时,会跳出 while 循环并执行 finally 语句块,告诉大家“生产者结束运行”。

class Consumer {

    BlockingQueue storage;
    public Consumer(BlockingQueue storage) {

        this.storage = storage;
    }
    public boolean needMoreNums() {

        if (Math.random() > 0.97) {

            return false;
        }
        return true;
    }
}

对于消费者 Consumer,它与生产者共用同一个仓库 storage,并且在方法内通过 needMoreNums() 方法判断是否需要继续使用更多的数字,刚才生产者生产了一些 50 的倍数供消费者使用,消费者是否继续使用数字的判断条件是产生一个随机数并与 0.97 进行比较,大于 0.97 就不再继续使用数字。

public static void main(String[] args) throws InterruptedException {

        ArrayBlockingQueue storage = new ArrayBlockingQueue(8);

        Producer producer = new Producer(storage);
        Thread producerThread = new Thread(producer);
        producerThread.start();
        Thread.sleep(500);

        Consumer consumer = new Consumer(storage);
        while (consumer.needMoreNums()) {

            System.out.println(consumer.storage.take() + "被消费了");
            Thread.sleep(100);
        }
        System.out.println("消费者不需要更多数据了。");

        //一旦消费不需要更多数据了,我们应该让生产者也停下来,但是实际情况却停不下来
        producer.canceled = true;
        System.out.println(producer.canceled);
    }
}

main 函数中,首先创建了生产者/消费者共用的仓库 BlockingQueue storage,仓库容量是 8,并且建立生产者并将生产者放入线程后启动线程,启动后进行 500 毫秒的休眠,休眠时间保障生产者有足够的时间把仓库塞满,而仓库达到容量后就不会再继续往里塞,这时生产者会阻塞,500 毫秒后消费者也被创建出来,并判断是否需要使用更多的数字,然后每次消费后休眠 100 毫秒。

当消费者不再需要数据,就会将 canceled 的标记位设置为 true,理论上此时生产者会跳出 while 循环,并打印输出“生产者运行结束”。

但实际结果是,尽管已经把 canceled 设置成 true,但生产者仍然没有停止,这是因为在这种情况下,生产者在执行 storage.put(num) 时发生阻塞,在它被叫醒之前是没有办法进入下一次循环判断 canceled 的值的,所以在这种情况下用 volatile 是没有办法让生产者停下来的

总结以上两种场景,volatile标记位停止线程的方法会在存在阻塞的时候失效。这种情况下,使用interrupt 语句来中断,即使生产者处于阻塞状态,仍然能够感受到中断信号,并做响应处理。因此相比interrupt ,volatile的使用还是有局限性的。

5. 错误停止线程的方法

常见停止线程的方法有: stop(),suspend() 和 resume(),这些方法已经被 Java 直接标记为 @Deprecated。

对于stop()方法,它会直接把线程停止,这样就没有给线程足够的时间来处理想要在停止前保存数据的逻辑,任务戛然而止,会导致出现数据完整性等问题。

对于suspend() 和 resume() 而言,它们的问题在于如果线程调用 suspend(),它并不会释放锁,就开始进入休眠,但此时有可能仍持有锁,这样就容易导致死锁问题,因为这把锁在线程被 resume() 之前,是不会被释放的。

三、线程状态切换

在Java中,线程的生命周期一共有6种状态:

  • 新建 – New
  • 可运行 – Runnable
  • 阻塞 – Blocked
  • 等待 – Waiting
  • 计时等待 – Timed Waiting
  • 终止 – Terminated

可以通过getState() 方法获取当前线程所处的状态,线程在运行期间的任何时刻只能处于一种状态。

1. 新建 – New

线程被创建但尚未启动的状态,当我们用 new Thread() 新建一个线程时,如果线程没有开始运行 start() 方法,那么此时它的状态就是 New。而一旦线程调用了 start(),它的状态就会从 New 变成 Runnable。

file

2. 可运行 – Runnable

该状态对应操作系统线程状态中的两种状态,分别是 Running 和 Ready。也就是说,Java 中处于 Runnable 状态的线程有可能正在执行,也有可能没有正在执行,正在等待被分配 CPU 资源。

如果一个正在运行的线程是 Runnable 状态,当它运行到任务的一半时,执行该线程的 CPU 被调度去做其他事情,导致该线程暂时不运行,它的状态依然不变,还是 Runnable,因为它有可能随时被调度回来继续执行任务。

file

3. 阻塞 – Blocked / 等待 – Waiting / 计时等待 – Timed Waiting

这三种状态被统称为阻塞状态

file

阻塞 – Blocked
从Runnable 状态进入 Blocked 状态只有一种可能,就是进入 synchronized 保护的代码时没有抢到 monitor 锁,无论是进入 synchronized 代码块,还是 synchronized 方法,都是一样。

当处于Blocked 的线程抢到 monitor 锁,就会从 Blocked 状态回到Runnable 状态。

file

等待 – Waiting
线程从Runnable 状态进入 Waiting 状态有三种可能性:

  • 没有设置 Timeout 参数的 Object.wait() 方法。
  • 没有设置 Timeout 参数的 Thread.join() 方法。
  • LockSupport.park() 方法。

Blocked 与 Waiting 的区别是 Blocked 在等待其他线程释放 monitor 锁,而 Waiting 则是在等待某个条件,比如 join 的线程执行完毕,或者是 notify()/notifyAll() 。进入 waiting 状态是线程主动的,而进入 blocked 状态是被动的。

file

计时等待 – Timed Waiting
计时等待与等待相似,区别仅在于有没有时间限制,Timed Waiting 会等待超时,由系统自动唤醒,或者在超时前被唤醒信号唤醒。

线程从Runnable 状态进入 Timed Waiting 状态有四种可能性:

  • 设置了时间参数的 Thread.sleep(long millis) 方法;
  • 设置了时间参数的 Object.wait(long timeout) 方法;
  • 设置了时间参数的 Thread.join(long millis) 方法;
  • 设置了时间参数的 LockSupport.parkNanos(long nanos) 方法和 LockSupport.parkUntil(long deadline) 方法。

file

三种状态的流转

Blocked -> Runnable
Blocked 状态进入 Runnable 状态,要求线程获取 monitor 锁

file

Waiting -> Runnable
从Waiting 状态流转到其他状态则比较特殊,因为首先 Waiting 是不限时的,也就是说无论过了多长时间它都不会主动恢复。只有当执行了 LockSupport.unpark(),或者 join 的线程运行结束,或者被中断时才可以进入 Runnable 状态。

file

Waiting -> Blocked
如果其他线程调用 notify() 或 notifyAll()来唤醒它,当前Waiting的线程会直接进入 Blocked 状态。因为唤醒 Waiting 线程的线程如果调用 notify() 或 notifyAll(),要求必须首先持有该 monitor 锁,所以处于 Waiting 状态的线程被唤醒时拿不到该锁,就会进入 Blocked 状态,直到执行了 notify()/notifyAll() 的唤醒它的线程执行完毕并释放 monitor 锁,才可能轮到它去抢夺这把锁,如果它能抢到,就会从 Blocked 状态回到 Runnable 状态。

file

Timed Waiting -> Blocked
Timed Waiting 中执行 notify() 和 notifyAll() 也是一样的道理,它们会先进入 Blocked 状态,然后抢夺锁成功后,再回到 Runnable 状态。

file

Timed Waiting -> Runnable
Timed Waiting状态的线程,当超时时间到了且能直接获取到锁/join的线程运行结束/被中断/调用了LockSupport.unpark(),会直接恢复到 Runnable 状态,而无需经历 Blocked 状态。

file

3. 终止 – Terminated

进入Terminated状态有两种可能:

  • run() 方法执行完毕,线程正常退出。
  • 出现一个没有捕获的异常,终止了 run() 方法,最终导致意外终止

file

四、wait / notify / notifyAll / sleep

线程的状态切换中,wait / notify / notifyAll / sleep等方法都起到了很重要的作用,而这些方法在实际使用中有不少注意事项。

1. wait 必须在 synchronized 保护的同步代码中

在使用 wait 方法时,必须把 wait 方法写在 synchronized 保护的 while 代码块中,并始终判断执行条件是否满足,如果满足就往下继续执行,如果不满足就执行 wait 方法,而在执行 wait 方法之前,必须先持有对象的 monitor 锁,也就是通常所说的 synchronized 锁

假如不按照以上的方法使用wait ,会出现什么问题?下面使用案例进行分析

class BlockingQueue {

    Queue<String> buffer = new LinkedList<String>();

    public void give(String data) {

        buffer.add(data);
        notify();  // Since someone may be waiting in take
    }

    public String take() throws InterruptedException {

        while (buffer.isEmpty()) {

            wait();
        }
        return buffer.remove();
    }
}

give 方法负责往 buffer 中添加数据,添加完之后执行 notify 方法来唤醒之前等待的线程,而 take 方法负责检查整个 buffer 是否为空,如果为空就进入等待,如果不为空就取出一个数据。

如果这段代码并没有受 synchronized 保护,会有以下场景:

  • 首先,消费者线程调用 take 方法并判断 buffer.isEmpty 方法是否返回 true,若为 true 代表buffer是空的,则线程希望进入等待,但是在线程调用 wait 方法之前,就被调度器暂停了,所以此时还没来得及执行 wait 方法。
  • 生产者开始运行,执行了整个 give 方法,往 buffer 中添加了数据,并执行了 notify 方法,但 notify 并没有任何效果,因为消费者线程的 wait 方法没来得及执行,所以没有线程在等待被唤醒。
  • 此时,刚才被调度器暂停的消费者线程回来继续执行 wait 方法并进入了等待。

虽然消费者判断了 buffer.isEmpty 条件,但真正执行 wait 方法时,之前的 buffer.isEmpty 的结果已经过期了。原因在于这里的“判断-执行”不是一个原子操作,它在中间被打断了,是线程不安全的

如果这时没有更多的生产者进行生产,消费者便有可能陷入无穷无尽的等待,因为它错过了刚才 give 方法内的 notify 的唤醒。

为了解决上述问题,需要用synchronized方法,确保 notify 方法永远不会在 buffer.isEmpty 和 wait 方法之间被调用,提升了程序的安全性。另外,wait 方法会释放 monitor 锁,这也要求我们必须首先进入到 synchronized 内持有这把锁。

public void give(String data) {

   synchronized (this) {

      buffer.add(data);
      notify();
  }
}

public String take() throws InterruptedException {

   synchronized (this) {

    while (buffer.isEmpty()) {

         wait();
       }
     return buffer.remove();
  }
}

另外,实际上还会发生“虚假唤醒”(spurious wakeup)的问题,线程可能在既没有被notify/notifyAll,也没有被中断或者超时的情况下被唤醒。虽然虚假唤醒发生的概率很小,但是程序依然需要保证在发生虚假唤醒的时候的正确性,所以就需要采用while循环的结构。即便被虚假唤醒了,也会再次检查while里面的条件,如果不满足条件,就会继续wait,也就消除了虚假唤醒的风险。

while (condition does not hold)
    obj.wait();
2. wait/notify/notifyAll 被定义在 Object 类中,而 sleep 定义在 Thread 类中
  • Java 中每个对象都有一把称之为 monitor 监视器的锁,由于每个对象都可以上锁,这就要求在对象头中有一个用来保存锁信息的位置。这个锁是对象级别的,而非线程级别的,wait/notify/notifyAll 也都是锁级别的操作,它们的锁属于对象,所以把它们定义在 Object 类中是最合适
  • 如果把 wait/notify/notifyAll 方法定义在 Thread 类中,会带来很大的局限性,比如一个线程可能持有多把锁,以便实现相互配合的复杂逻辑,假设此时 wait 方法定义在 Thread 类中,如何实现让一个线程持有多把锁呢?又如何明确线程等待的是哪把锁呢?既然是让当前线程去等待某个对象的锁,自然应该通过操作对象来实现,而不是操作线程
3. wait/notify vs sleep

这两种方式的相同点在于:

  • 都可以让线程阻塞
  • 都可以响应 interrupt 中断:在等待的过程中如果收到中断信号,都可以进行响应,并抛出 InterruptedException 异常

这两种方式的区别在于:

  • wait 方法必须在 synchronized 保护的代码中使用,而 sleep 方法并没有这个要求
  • 在同步代码中执行 sleep 方法时,并不会释放 monitor 锁,但执行 wait 方法时会主动释放 monitor 锁
  • sleep 方法中会要求必须定义一个时间,时间到期后会主动恢复,而对于没有参数的 wait 方法而言,意味着永久等待,直到被中断或被唤醒才能恢复,它并不会主动恢复
  • wait/notify 是 Object 类的方法,而 sleep 是 Thread 类的方法

五、生产者消费者模式

生产者消费者模式是程序设计中非常常见的一种设计模式,被广泛运用在解耦、消息队列等场景。把生产商品的一方称为生产者,把消费商品的一方称为消费者,有时生产者的生产速度特别快,但消费者的消费速度跟不上,俗称“产能过剩”,有时是多个生产者对应多个消费者。这时在生产者和消费者之间就需要一个中介来进行调度,于是便诞生了生产者消费者模式。

消费者模式通常需要在两者之间增加一个阻塞队列作为媒介,生产者在生产数据后将数据存放在阻塞队列中,消费者获取阻塞队列中的数据。而当阻塞队列已满或是已空都可能会产生线程阻塞

那么什么时候阻塞线程需要被唤醒?第一种情况是当消费者看到阻塞队列为空时,开始进入等待,这时生产者一旦往队列中放入数据,就会通知所有的消费者,唤醒阻塞的消费者线程。另一种情况是如果生产者发现队列已经满了,也会被阻塞,而一旦消费者获取数据之后就相当于队列空了一个位置,这时消费者就会通知所有正在阻塞的生产者进行生产。

那么如何实现生产者消费者模式呢?这里介绍三种方法:

1. BlockingQueue 实现生产者消费者模式
public static void main(String[] args) {

    BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);

    Runnable producer = () -> {

       while (true) {

             queue.put(new Object());
        }
    };

    new Thread(producer).start();
    new Thread(producer).start();

    Runnable consumer = () -> {

          while (true) {

               queue.take();
        }
    };

     new Thread(consumer).start();
     new Thread(consumer).start();
}

首先,创建了一个 ArrayBlockingQueue 类型的 BlockingQueue,命名为 queue 并将它的容量设置为 10;其次,创建一个简单的生产者,while(true) 循环体中的queue.put() 负责往队列添加数据;然后,创建两个生产者线程并启动;同样消费者也非常简单,while(true) 循环体中的 queue.take() 负责消费数据,同时创建两个消费者线程并启动。为了简介设计,代码里省略了 try/catch 检测。

BlockingQueue实现生产者消费者模式看上去很简单,但实际上 ArrayBlockingQueue 已经在背后完成了很多工作,比如队列满了就去阻塞生产者线程,队列有空就去唤醒生产者线程等。

2. Condition 实现生产者消费者模式
public class MyBlockingQueue {

   private Queue queue;
   private int max = 16;
   private ReentrantLock lock = new ReentrantLock();
   private Condition notEmpty = lock.newCondition();
   private Condition notFull = lock.newCondition();

   public MyBlockingQueue (int size) {

       this.max = size;
       queue = new LinkedList();
   }

   public void put(Object o) throws InterruptedException {

       lock.lock();
       try {

           while (queue.size() == max) {

               notFull.await();
           }
           queue.add(o);
           notEmpty.signalAll();
       } finally {

           lock.unlock();
       }
   }

   public Object take() throws InterruptedException {

       lock.lock();
       try {

           while (queue.size() == 0) {

               notEmpty.await();
           }
           Object item = queue.remove();
           notFull.signalAll();
           return item;
       } finally {

           lock.unlock();
       }
   }
}

利用Condition自己实现BlockingQueue:

首先,定义了一个队列变量 queue 并设置最大容量为 16;其次,定义了一个 ReentrantLock 类型的 Lock 锁,并在 Lock 锁的基础上创建两个 Condition,一个是 notEmpty,另一个是 notFull,分别代表队列没有空和没有满的条件;最后,声明了 put 和 take 这两个核心方法。

生产者消费者模式通常是面对多线程的场景,因此需要一定的同步措施保障线程安全。

对于put 方法,先将 Lock 锁上,然后,在 while 的条件里检测 queue 是不是已经满了,如果已经满了,则调用 notFull 的 await() 阻塞生产者线程并释放 Lock,如果没有满,则往队列放入数据并利用 notEmpty.signalAll() 通知正在等待的所有消费者并唤醒它们。最后在 finally 中利用 lock.unlock() 方法解锁,把 unlock 方法放在 finally 中是一个基本原则,否则可能会产生无法释放锁的情况。

对于take方法,实际上是与 put 方法相互对应的,同样是通过 while 检查队列是否为空,如果为空,消费者开始等待,如果不为空则从队列中获取数据并通知生产者队列有空余位置,最后在 finally 中解锁。

上述代码中,生产者和消费者为什么要用while循环检查队列情况,而不用if判断呢?以消费者为例,生产者消费者往往是多线程的,如果将while( queue.size() == 0 ) 改为 if( queue.size() == 0 ),第一个消费者线程获取数据时,发现队列为空,便进入等待状态;因为第一个线程在等待时会释放 Lock 锁,所以第二个消费者可以进入并执行 if( queue.size() == 0 ),也发现队列为空,于是第二个线程也进入等待;而此时,如果生产者生产了一个数据,便会唤醒两个消费者线程,而两个线程中只有一个线程可以拿到锁,并执行 queue.remove 操作,另外一个线程因为没有拿到锁而卡在被唤醒的地方,而第一个线程执行完操作后会在 finally 中通过 unlock 解锁,而此时第二个线程便可以拿到被第一个线程释放的锁,继续执行操作,也会去调用 queue.remove 操作,然而这个时候队列已经为空了,所以会抛出 NoSuchElementException 异常。而如果用 while 做检查,当第一个消费者被唤醒得到锁并移除数据之后,第二个线程在执行 remove 前仍会进行 while 检查,发现此时依然满足 queue.size() == 0 的条件,就会继续执行 await 方法,避免了获取的数据为 null 或抛出异常的情况。

3. wait/notify 实现生产者消费者模式

利用wait/notify自己实现BlockingQueue:

class MyBlockingQueue {

   private int maxSize;
   private LinkedList<Object> storage;

   public MyBlockingQueue(int size) {

       this.maxSize = size;
       storage = new LinkedList<>();
   }

   public synchronized void put() throws InterruptedException {

       while (storage.size() == maxSize) {

           wait();
       }
       storage.add(new Object());
       notifyAll();
   }

   public synchronized void take() throws InterruptedException {

       while (storage.size() == 0) {

           wait();
       }
       System.out.println(storage.remove());
       notifyAll();
   }
}

仍然是take 与 put 方法。对于put 方法,put 方法被 synchronized 保护,while 检查队列是否为满,如果不满就往里放入数据并通过 notifyAll() 唤醒其他线程。同样,take 方法也被 synchronized 修饰,while 检查队列是否为空,如果不为空就获取数据并唤醒其他线程。

基于Condition和wait/notify实现BlockingQueue之后,以下为生产者消费者代码:

public class Test {

   public static void main(String[] args) {

       MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
       Producer producer = new Producer(myBlockingQueue);
       Consumer consumer = new Consumer(myBlockingQueue);
       new Thread(producer).start();
       new Thread(consumer).start();
   }
}

class Producer implements Runnable {

   private MyBlockingQueue storage;

   public Producer(MyBlockingQueue storage) {

       this.storage = storage;
   }

   @Override
   public void run() {

       for (int i = 0; i < 100; i++) {

           try {

               storage.put();
           } catch (InterruptedException e) {

               e.printStackTrace();
           }
       }
   }
}

class Consumer implements Runnable {

   private MyBlockingQueue storage;

   public Consumer(MyBlockingQueue storage) {

       this.storage = storage;
   }

   @Override
   public void run() {

       for (int i = 0; i < 100; i++) {

           try {

               storage.take();
           } catch (InterruptedException e) {

               e.printStackTrace();
           }
       }
   }
}