Java并发编程学习六:阻塞队列

一、阻塞队列

1. 简介

阻塞队列,即BlockingQueue,它是一个接口,继承自Queue 接口,是队列的一种。Queue 和 BlockingQueue 都是在 Java 5 中加入的。

public interface BlockingQueue<E> extends Queue<E>{

     ...}

阻塞队列是线程安全的,典型的应用场景是在生产者/消费者模式中,用于存储数据,保证再多线程下的正确运行。

file

除了BlockingQueue,Queue接口的实现类和子类还有很多,如下图所示:

file

上述实现类和子类中,除了Deque都是线程安全的,而这些线程安全的队列可以分为阻塞队列和非阻塞队列两大类。

阻塞队列就是BlockingQueue 接口的实现类,主要有6种:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、DelayQueue、PriorityBlockingQueue 和 LinkedTransferQueue。

非阻塞队列就是ConcurrentLinkedQueue,这个类不会让线程阻塞,利用 CAS 保证了线程安全。

而Deque 是一个双端队列,从头和尾都能添加和删除元素;而普通的 Queue 只能从一端进入,另一端出去。

2. BlockingQueue的常见方法

BlockingQueue中和添加、删除相关的方法有8个,它们的区别仅在于特殊情况:当队列满了无法添加元素,或者是队列空了无法移除元素时,不同组的方法对于这种特殊情况会有不同的处理方式:

  • 抛出异常:add、remove、element
  • 返回结果但不抛出异常:offer、poll、peek
  • 阻塞:put、take

a. add、remove、element方法

这组方法在处理特殊情况时,会抛出异常

add方法用于添加元素,如果队列满了,就会抛出异常来提示队列已满

private static void addTest() {

    BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
    blockingQueue.add(1);
    blockingQueue.add(1);
    blockingQueue.add(1);
}

// 运行结果
Exception in thread "main" java.lang.IllegalStateException:Queue full

remove 方法用于删除元素,如果队列为空,抛出异常

private static void removeTest() {

    ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
    blockingQueue.add(1);
    blockingQueue.add(1);
    blockingQueue.remove();
    blockingQueue.remove();
    blockingQueue.remove();
}

// 运行结果
Exception in thread "main" java.util.NoSuchElementException

element 方法用于返回队列头结点,但并不删除。和remove 方法一样,如果队列为空,抛出异常

private static void elementTest() {

    ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
    blockingQueue.element();
}

// 运行结果
Exception in thread "main" java.util.NoSuchElementException

b. offer、poll、peek方法

这组方法在处理特殊情况时,会返回一个提示,而不会抛出异常。

offer 方法用来插入一个元素,并用返回值来提示插入是否成功。如果添加成功会返回 true,而如果队列已经满了,返回false

private static void offerTest() {

    ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
    System.out.println(blockingQueue.offer(1));
    System.out.println(blockingQueue.offer(1));
    System.out.println(blockingQueue.offer(1));
}

// 运行结果
true
true
false

poll 方法用于移除并返回队列的头节点,如果当队列里面是空的,没有任何东西可以移除的时候,便会返回 null。正因为如此,不允许往队列中插入 null 值,否则没有办法区分返回的 null 是一个提示还是一个真正的元素

private static void pollTest() {

    ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(3);
    blockingQueue.offer(1);
    blockingQueue.offer(2);
    blockingQueue.offer(3);
    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.poll());
    System.out.println(blockingQueue.poll());
}

// 运行结果
1
2
3
null

peek 方法用于返回队列的头元素但并不删除。如果队列里面是空的,会返回 null 作为提示。

private static void peekTest() {

    ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
    System.out.println(blockingQueue.peek());
}

// 运行结果
null

另外,offer 和 poll 都有带超时时间的重载方法。

offer(E e, long timeout, TimeUnit unit)

以offer为例,它有三个参数,分别是元素、超时时长和时间单位。插入成功会返回 true;如果队列满了导致插入不成功,则会等待指定的超时时间,如果时间到了依然没有插入成功,就会返回 false。

c. put、take方法
这一组方法在处理特殊情况时,会采用阻塞等待的策略,这也是阻塞队列名字的由来

put方法用于插入元素。如果队列已满,既不会立刻返回 false 也不会抛出异常,而是让插入的线程陷入阻塞状态,直到队列里有了空闲空间,此时队列就会让之前的线程解除阻塞状态,并把刚才那个元素添加进去。

take 方法用于获取并移除队列的头结点,当队列为空,则阻塞线程,直到队列里有数据;一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。

3. 常见的阻塞队列

a. ArrayBlockingQueue

一种有界队列,底层基于数组实现,利用 ReentrantLock 实现线程安全。

构造函数中可以指定队列容量,一旦指定后续不可以扩容。同时可以指定是否公平。

ArrayBlockingQueue(int capacity, boolean fair)

b. LinkedBlockingQueue

一种近似无界的队列(实际最大容量是整型的最大值 Integer.MAX_VALUE),内部基于链表实现。

c. SynchronousQueue

这种队列的容量为0,不能存储元素,每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。它的作用就是直接传递。

由于SynchronousQueue的特性,它的一些方法返回值很独特

// peek方法直接返回null
public E peek() {

    return null;
}

// size方法直接返回0
public int size() {

    return 0;
}

// isEmpty方法直接返回true
public boolean isEmpty() {

    return true;
}

d. PriorityBlockingQueue

这种队列可以自定义内部元素的排列顺序,也是一个(可以看做)无界的阻塞队列。通过通过实现compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。同时,插入队列的对象必须是可比较大小的,也就是 Comparable 的,否则会抛出 ClassCastException 异常。

PriorityBlockingQueue的take方法会阻塞,但是由于无界,put方法永远不会阻塞。

e. DelayQueue

这种队列具有“延迟”的功能,可以指定任务延迟多久之后执行。

同时,它也是一个无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接口,因此可以比较和排序。

public interface Delayed extends Comparable<Delayed> {

    long getDelay(TimeUnit unit);
}

实现Delayed接口需要实现getDelay方法,该方法返回的是“还剩下多长的延迟时间才会被执行”。

DelayQueue中的元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。其内部复用了PriorityBlockingQueue的逻辑进行排序。

二、阻塞队列和非阻塞队列的线程安全原理

无论是阻塞队列还是非阻塞队列,都是可以保证线程安全的。

1. 阻塞队列的线程安全

以ArrayBlockingQueue 的源码为例,以下是该类的重要属性:

final Object[] items;
int takeIndex;
int putIndex;
int count;

// 与线程安全有关
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

Object 类型的数组用于存储元素;takeIndex 和 putIndex用来标明下一次读取和写入位置的;count 用来计数,它所记录的就是队列中的元素个数。而剩下的三个属性,一个是 ReentrantLock,另外两个Condition 分别是由 ReentrantLock 产生,这三个属性是实现线程安全最核心的工具。

以put方法为例:

public void put(E e) throws InterruptedException {

    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {

        while (count == items.length){

            notFull.await();
        }
        enqueue(e);
    } finally {

        lock.unlock();
    }
}

该方法内部逻辑是:

  • 首先用checkNotNull 方法去检查插入的元素是不是 null
  • 不为null时,用 ReentrantLock 上锁,并且上锁方法是 lock.lockInterruptibly()。这意味着在尝试获取锁但还没拿到锁的期间可以响应中断
  • 接着是try finally 代码块,finally 中会去解锁,try中的while 循环会会检查当前队列是不是已经满了。如果队列已满,便会进行等待,直到有空余的时候跳出循环,调用 enqueue 方法让元素进入队列,最后用 unlock 方法解锁。

这就是ArrayBlockingQueue 中put方法的线程安全策略。其实在[线程基础][Link 1]中用Condition 实现生产者消费者模式,本质上就是简易版的BlockingQueue。

类似的,LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue等也是利用了 ReentrantLock 来保证线程安全,只不过细节有差异,比如 LinkedBlockingQueue 的内部有两把锁,分别锁住队列的头和尾,比共用同一把锁的效率更高,不过总体思想都是类似的。

2. 非阻塞队列的线程安全

以ConcurrentLinkedQueue为例,查看offer方法的源码:

public boolean offer(E e) {

    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {

        Node<E> q = p.next;
        if (q == null) {

            // p is last node
            if (p.casNext(null, newNode)) {

                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

该方法整体是一个大的for循环,而且是明显的死循环。代码中的 p.casNext 方法,正是利用了 CAS 来操作的,而且这个死循环去配合 CAS 也就是典型的乐观锁的思想。

boolean casNext(Node<E> cmp, Node<E> val) {

    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

p.casNext 方法内部,运用了 UNSAFE.compareAndSwapObject 方法来完成 CAS 操作,而 compareAndSwapObject 是一个 native 方法,最终会利用 CPU 的 CAS 指令保证其不可中断。

三、阻塞队列的选择

阻塞队列很重要的一个应用场景就是线程池的,常见的线程池有5种,每种线程池的阻塞队列选择不同,具体的情况在[线程池][Link 2]中已经进行了较为细致的阐述。

在其他的应用场景中,选择合适的阻塞队列可以从以下几点考虑:

  • 功能:如,是否需要阻塞队列排序,如优先级排序、延迟执行等
  • 容量:是否有存储的要求,还是只需要“直接传递”
  • 能否扩容:是否需要队列能够动态扩容
  • 内存结构:不同阻塞队列的底层时间不同,如果对性能有要求可以从内存的结构角度去考虑
  • 性能:比如 LinkedBlockingQueue 由于拥有两把锁,并发性能更好;SynchronousQueue只需要“直接传递”,而不需要存储,性能更好
    [nbsp]: https://pottercoding.cn/wp-content/uploads/cloud/images/2024/4/5/2017/1712319457015.png
    [nbsp 1]: https://pottercoding.cn/wp-content/uploads/cloud/images/2024/4/5/2017/1712319464429.png
    [Link 1]: https://blog.csdn.net/weixin_41402069/article/details/125976626?spm=1001.2014.3001.5501
    [Link 2]: https://blog.csdn.net/weixin_41402069/article/details/125995052?spm=1001.2014.3001.5501