一、阻塞队列
1. 简介
阻塞队列,即BlockingQueue,它是一个接口,继承自Queue 接口,是队列的一种。Queue 和 BlockingQueue 都是在 Java 5 中加入的。
public interface BlockingQueue<E> extends Queue<E>{
...}
阻塞队列是线程安全的,典型的应用场景是在生产者/消费者模式中,用于存储数据,保证再多线程下的正确运行。
除了BlockingQueue,Queue接口的实现类和子类还有很多,如下图所示:
上述实现类和子类中,除了Deque都是线程安全的,而这些线程安全的队列可以分为阻塞队列和非阻塞队列两大类。
阻塞队列就是BlockingQueue 接口的实现类,主要有6种:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、DelayQueue、PriorityBlockingQueue 和 LinkedTransferQueue。
非阻塞队列就是ConcurrentLinkedQueue,这个类不会让线程阻塞,利用 CAS 保证了线程安全。
而Deque 是一个双端队列,从头和尾都能添加和删除元素;而普通的 Queue 只能从一端进入,另一端出去。
2. BlockingQueue的常见方法
BlockingQueue中和添加、删除相关的方法有8个,它们的区别仅在于特殊情况:当队列满了无法添加元素,或者是队列空了无法移除元素时,不同组的方法对于这种特殊情况会有不同的处理方式:
- 抛出异常:add、remove、element
- 返回结果但不抛出异常:offer、poll、peek
- 阻塞:put、take
a. add、remove、element方法
这组方法在处理特殊情况时,会抛出异常
add方法用于添加元素,如果队列满了,就会抛出异常来提示队列已满
private static void addTest() {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
blockingQueue.add(1);
blockingQueue.add(1);
blockingQueue.add(1);
}
// 运行结果
Exception in thread "main" java.lang.IllegalStateException:Queue full
remove 方法用于删除元素,如果队列为空,抛出异常
private static void removeTest() {
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
blockingQueue.add(1);
blockingQueue.add(1);
blockingQueue.remove();
blockingQueue.remove();
blockingQueue.remove();
}
// 运行结果
Exception in thread "main" java.util.NoSuchElementException
element 方法用于返回队列头结点,但并不删除。和remove 方法一样,如果队列为空,抛出异常
private static void elementTest() {
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
blockingQueue.element();
}
// 运行结果
Exception in thread "main" java.util.NoSuchElementException
b. offer、poll、peek方法
这组方法在处理特殊情况时,会返回一个提示,而不会抛出异常。
offer 方法用来插入一个元素,并用返回值来提示插入是否成功。如果添加成功会返回 true,而如果队列已经满了,返回false
private static void offerTest() {
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
System.out.println(blockingQueue.offer(1));
System.out.println(blockingQueue.offer(1));
System.out.println(blockingQueue.offer(1));
}
// 运行结果
true
true
false
poll 方法用于移除并返回队列的头节点,如果当队列里面是空的,没有任何东西可以移除的时候,便会返回 null。正因为如此,不允许往队列中插入 null 值,否则没有办法区分返回的 null 是一个提示还是一个真正的元素
private static void pollTest() {
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(3);
blockingQueue.offer(1);
blockingQueue.offer(2);
blockingQueue.offer(3);
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
}
// 运行结果
1
2
3
null
peek 方法用于返回队列的头元素但并不删除。如果队列里面是空的,会返回 null 作为提示。
private static void peekTest() {
ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(2);
System.out.println(blockingQueue.peek());
}
// 运行结果
null
另外,offer 和 poll 都有带超时时间的重载方法。
offer(E e, long timeout, TimeUnit unit)
以offer为例,它有三个参数,分别是元素、超时时长和时间单位。插入成功会返回 true;如果队列满了导致插入不成功,则会等待指定的超时时间,如果时间到了依然没有插入成功,就会返回 false。
c. put、take方法
这一组方法在处理特殊情况时,会采用阻塞等待的策略,这也是阻塞队列名字的由来
put方法用于插入元素。如果队列已满,既不会立刻返回 false 也不会抛出异常,而是让插入的线程陷入阻塞状态,直到队列里有了空闲空间,此时队列就会让之前的线程解除阻塞状态,并把刚才那个元素添加进去。
take 方法用于获取并移除队列的头结点,当队列为空,则阻塞线程,直到队列里有数据;一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。
3. 常见的阻塞队列
a. ArrayBlockingQueue
一种有界队列,底层基于数组实现,利用 ReentrantLock 实现线程安全。
构造函数中可以指定队列容量,一旦指定后续不可以扩容。同时可以指定是否公平。
ArrayBlockingQueue(int capacity, boolean fair)
b. LinkedBlockingQueue
一种近似无界的队列(实际最大容量是整型的最大值 Integer.MAX_VALUE),内部基于链表实现。
c. SynchronousQueue
这种队列的容量为0,不能存储元素,每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。它的作用就是直接传递。
由于SynchronousQueue的特性,它的一些方法返回值很独特
// peek方法直接返回null
public E peek() {
return null;
}
// size方法直接返回0
public int size() {
return 0;
}
// isEmpty方法直接返回true
public boolean isEmpty() {
return true;
}
d. PriorityBlockingQueue
这种队列可以自定义内部元素的排列顺序,也是一个(可以看做)无界的阻塞队列。通过通过实现compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。同时,插入队列的对象必须是可比较大小的,也就是 Comparable 的,否则会抛出 ClassCastException 异常。
PriorityBlockingQueue的take方法会阻塞,但是由于无界,put方法永远不会阻塞。
e. DelayQueue
这种队列具有“延迟”的功能,可以指定任务延迟多久之后执行。
同时,它也是一个无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接口,因此可以比较和排序。
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
实现Delayed接口需要实现getDelay方法,该方法返回的是“还剩下多长的延迟时间才会被执行”。
DelayQueue中的元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。其内部复用了PriorityBlockingQueue的逻辑进行排序。
二、阻塞队列和非阻塞队列的线程安全原理
无论是阻塞队列还是非阻塞队列,都是可以保证线程安全的。
1. 阻塞队列的线程安全
以ArrayBlockingQueue 的源码为例,以下是该类的重要属性:
final Object[] items;
int takeIndex;
int putIndex;
int count;
// 与线程安全有关
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
Object 类型的数组用于存储元素;takeIndex 和 putIndex用来标明下一次读取和写入位置的;count 用来计数,它所记录的就是队列中的元素个数。而剩下的三个属性,一个是 ReentrantLock,另外两个Condition 分别是由 ReentrantLock 产生,这三个属性是实现线程安全最核心的工具。
以put方法为例:
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length){
notFull.await();
}
enqueue(e);
} finally {
lock.unlock();
}
}
该方法内部逻辑是:
- 首先用checkNotNull 方法去检查插入的元素是不是 null
- 不为null时,用 ReentrantLock 上锁,并且上锁方法是 lock.lockInterruptibly()。这意味着在尝试获取锁但还没拿到锁的期间可以响应中断
- 接着是try finally 代码块,finally 中会去解锁,try中的while 循环会会检查当前队列是不是已经满了。如果队列已满,便会进行等待,直到有空余的时候跳出循环,调用 enqueue 方法让元素进入队列,最后用 unlock 方法解锁。
这就是ArrayBlockingQueue 中put方法的线程安全策略。其实在[线程基础][Link 1]中用Condition 实现生产者消费者模式,本质上就是简易版的BlockingQueue。
类似的,LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue等也是利用了 ReentrantLock 来保证线程安全,只不过细节有差异,比如 LinkedBlockingQueue 的内部有两把锁,分别锁住队列的头和尾,比共用同一把锁的效率更高,不过总体思想都是类似的。
2. 非阻塞队列的线程安全
以ConcurrentLinkedQueue为例,查看offer方法的源码:
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
该方法整体是一个大的for循环,而且是明显的死循环。代码中的 p.casNext 方法,正是利用了 CAS 来操作的,而且这个死循环去配合 CAS 也就是典型的乐观锁的思想。
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
p.casNext 方法内部,运用了 UNSAFE.compareAndSwapObject 方法来完成 CAS 操作,而 compareAndSwapObject 是一个 native 方法,最终会利用 CPU 的 CAS 指令保证其不可中断。
三、阻塞队列的选择
阻塞队列很重要的一个应用场景就是线程池的,常见的线程池有5种,每种线程池的阻塞队列选择不同,具体的情况在[线程池][Link 2]中已经进行了较为细致的阐述。
在其他的应用场景中,选择合适的阻塞队列可以从以下几点考虑:
- 功能:如,是否需要阻塞队列排序,如优先级排序、延迟执行等
- 容量:是否有存储的要求,还是只需要“直接传递”
- 能否扩容:是否需要队列能够动态扩容
- 内存结构:不同阻塞队列的底层时间不同,如果对性能有要求可以从内存的结构角度去考虑
- 性能:比如 LinkedBlockingQueue 由于拥有两把锁,并发性能更好;SynchronousQueue只需要“直接传递”,而不需要存储,性能更好
[nbsp]: https://pottercoding.cn/wp-content/uploads/cloud/images/2024/4/5/2017/1712319457015.png
[nbsp 1]: https://pottercoding.cn/wp-content/uploads/cloud/images/2024/4/5/2017/1712319464429.png
[Link 1]: https://blog.csdn.net/weixin_41402069/article/details/125976626?spm=1001.2014.3001.5501
[Link 2]: https://blog.csdn.net/weixin_41402069/article/details/125995052?spm=1001.2014.3001.5501