十九、Java并发 Java wait() 和 notify() 方法

大家有没有发现,其实 「 一文秒懂 」 系列讲述的都是多线程并发开发的问题。这个话题太大了,估计没有上百篇文章都解释不清楚。

本文,我们来讲解下 Java 并发中的基础的基础,核心的核心,Java 并发编程中的最基本的机制之一 – 「 线程同步 」

为了方便你理解并发编程中的各种概念和术语,我们首先会来一阵扫盲,讨论一些基本的并发相关术语和方法。接着,我们将开发一个简单的应用程序,并在合格应用程序里处理并发问题,以方便大家理解和巩固 wait()notify()

Java 中的线程同步 ( Thread Synchronization )

在并发编程中,在多线程环境下,多个线程可能会尝试修改同一资源。如果线程管理不当,这显然会导致一致性问题。

Java 中的哨兵块 ( guarded block )

Java 中,可以用来协调多个线程操作的一个工具是 「 哨兵块 」。这个哨兵块会在恢复执行前检查特定条件。

基于这种哨兵检查的思想,Java 在所有类的基类 Object 中提供了两个方法

方法 说明
Object.wait() 暂停一个线程
Object.notify() 唤醒一个线程

是不是有点难以理解,别担心,看下面这个图,这个图描绘了线程的的生命周期。

 

虽然从上图中可以看出,有多个方法可以控制一个线程的生命周期,但本章节,我们只讨论 notify() 方法和 wait() 方法

wait() 方法

对照上图,简单的说,当我们调用 wait() 时会强制当前线程等待,直到某个其它线程在同一个对象上调用 notify()notifyAll() 方法。

因此,当前线程必须拥有对象的监视器。根据 Java docs 的说法,这可能发生在

  • 我们已经为给定对象执行了同步实例方法
  • 我们已经在给定对象上执行了 synchronized 块的主体
  • 通过为 Class 类型的对象执行同步静态方法

请注意,一次只有一个活动线程可以拥有对象的监视器。

除了无参数 wait() 方法外,Java 还重载了另一个 wait() 方法

wait() 方法

wait() 方法导致当前线程无限期地等待,直到另一个线程调用此对象的 notify()notifyAll() 方法

wait(long timeout) 方法

使用此方法,我们可以指定一个超时,在此之后将自动唤醒线程。

当然了,我们可以在到达超时之前使用 notify()notifyAll() 提前唤醒线程。

请注意,调用 wait(0) 与调用 wait() 相同

wait(long timeout, int nanos)

这是与wait(long timeout) 提供相同功能的签名,唯一的区别是我们可以提供更高的精度。

该方法计算超时之间的方式为:

总超时时间(以纳秒为单位)= 1_000_000 * 超时 + nanos

notify() 或 notifyAll() 方法

notify()notifyAll() 方法用于唤醒等待访问此对象监视器的线程。

它们以不同的方式通知等待线程。

notify() 方法

对于在此对象的监视器上等待的所有线程(通过使用任何一个重载 wait() 方法 ),notify() 通知将会随机唤醒任何一个线程。

也就是说,我们并不能确切知道唤醒了哪个线程,这取决于实现。

因为notify() 提供了唤醒一个随机线程的机制,因此它可用于实现线程执行类似任务的互斥锁定。

但在大多数情况下,使用 notifyAll() 会是一个更可行的方案。

notifyAll() 方法

notifyAll() 方法用于唤醒正在此对象的监视器上等待的所有线程。唤醒的线程将以常规的方式完成 – 就像任何其他线程一样。

但,有一点要注意的是,对于任意一个线程,但在我们允许其继续执行之前,请始终快速检查继续执行该线程所需的条件。因为在某些情况下线程被唤醒而没有收到通知(这个场景将在后面的例子中讨论 )

发送者 – 接收者同步问题

线程同步的问题,我们已经有了个大概的了解,接下来,我们看一个简单的 Sender-Receiver ( 发送者 – 接收者 ) 应用程序,这个应用程序将利用wait()notify() 方法建立它们之间的同步。

  • 发送者应该向接收者发送数据包
  • 在发送方完成发送之前,接收方无法处理数据包
  • 同样,发送方不得尝试发送另一个数据包,除非接收方已处理过上一个数据包

我们首先创建一个 Data 类,用于包含将从 Sender 发送到 Receiver 的数据包,同时,我们将使用 wait()notifyAll() 来设置它们之间的同步。

public class Data {
    private String packet;
    // True if receiver should wait
    // False if sender should wait
    private boolean transfer = true;
    public synchronized void send(String packet) {
        while (!transfer) {
            try { 
                wait();
            } catch (InterruptedException e)  {
                Thread.currentThread().interrupt(); 
                Log.error("Thread interrupted", e); 
            }
        }
        transfer = false;
        this.packet = packet;
        notifyAll();
    }
    public synchronized String receive() {
        while (transfer) {
            try {
                wait();
            } catch (InterruptedException e)  {
                Thread.currentThread().interrupt(); 
                Log.error("Thread interrupted", e); 
            }
        }
        transfer = true;
        notifyAll();
        return packet;
    }

范例有点小长,我们一步一步分析下代码

1、 私有属性packet用于表示通过网络传输的数据;
2、 布尔类型的私有属性transfer用于Sender和Receiver之间的同步;

 *  如果此变量为 true,则 Receiver 应等待 Sender 发送消息
 *  如果它是 false ,那么 Sender 应该等待 Receiver 接收消息

3、 Sender使用send()方法将数据发送给Receiver:;

 *  如果 transfer 为 false ,我们将在此线程上调用 wait()
 *  但如果它为 true ,我们需要切换状态,设置我们的消息并调用 notifyAll() 来唤醒其他线程以指定发生了重大事件,然后这些线程它们自己可以自查是否可以继续执行。

4、 同样的,Receiver将使用receive()方法接收数据;

 *  如果 Sender 将传输设置为 false,那么继续,否则将在此线程上调用 wait()
 *  满足条件时,我们切换状态,通知所有等待的线程唤醒并返回 Receiver 的数据包

为什么在 while 循环中包含 wait()

由于notify()notifyAll() 随机唤醒正在此对象监视器上等待的线程,因此满足条件并不总是很重要。有时可能会发生线程被唤醒,但实际上并没有满足条件。

当然了,跟进一步说,我们还可以定义一个检查来避免虚假唤醒 – 线程可以从等待中醒来而不会收到通知。

我们为什么需要同步 send() 和 receive() 方法

我们将这些方法放在 synchronized 方法是为了提供内部锁。

如果调用 wait() 方法的线程不拥有固有锁,则会抛出错误。

现在,是时候创建 Sender 和 Receiver 并在两者上实现 Runnable 接口,以便它们的实例可以由线程执行。

我们先来看看 Sender 将如何工作

public class Sender implements Runnable {
    private Data data;
    // standard constructors
    public void run() {
        String packets[] = {
          "First packet",
          "Second packet",
          "Third packet",
          "Fourth packet",
          "End"
        };
        for (String packet : packets) {
            data.send(packet);
            // Thread.sleep() to mimic heavy server-side processing
            try {
                Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
            } catch (InterruptedException e)  {
                Thread.currentThread().interrupt(); 
                Log.error("Thread interrupted", e); 
            }
        }
    }

对于这个 Sender :

  • 我们正在创建一些随机数据包,这些数据包将通过网络以 packet[] 数组的形式发送
  • 对于每个数据包,我们只是调用 send() 而不做其它动作
  • 然后我们用随机时间间隔调用 Thread.sleep() 来模仿繁重的服务器端处理

接下来,我们来看看如何实现 Receiver

public class Receiver implements Runnable {
    private Data load;
    // standard constructors
    public void run() {
        for(String receivedMessage = load.receive();
          !"End".equals(receivedMessage);
          receivedMessage = load.receive()) {
            System.out.println(receivedMessage);
            // ...
            try {
                Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); 
                Log.error("Thread interrupted", e); 
            }
        }
    }

上面这段代码很简单,只是在循环中调用 load.receive() ,直到我们得到最后一个 “End” 数据包。

最后,我们就可以写一个 main() 方法来运行它们了

public static void main(String[] args) {
    Data data = new Data();
    Thread sender = new Thread(new Sender(data));
    Thread receiver = new Thread(new Receiver(data));
    sender.start();
    receiver.start();

运行范例,输出结果如下

First packet
Second packet
Third packet
Fourth packet

完美!

我们在这里 – 我们以正确的顺序接收所有数据包,并成功建立了发送方和接收方之间的正确通信。