Skip to content

并发编程之 Queue 和 Deque

Queue(队列)简述

队列(queue)是一种常用的数据结构,可以将队列看做是一种特殊的线性表,该结构遵循的先进先出(FIFO)原则。Java 中,LinkedList 实现了 Deque 接口,Deque 接口实现了 Queue 接口, 因此 LinkedList 进行插入、删除操作效率较高。

Queue 常用方法:

  • boolean add(E e); 将指定的元素插入此队列(如果立即可行且不会违反容量限制),在成功时返回 true,如果当前没有可用的空间,则抛出 IllegalStateException。
  • boolean offer(E e); 将指定的元素插入此队列(如果立即可行且不会违反容量限制),当使用有容量限制的队列时,此方法通常要优于 add(E),后者可能无法插入元素,而只是抛出一个异常。
  • E remove(); 获取并移除此队列的头。
  • E poll(); 获取并移除此队列的头,如果此队列为空,则返回 null。
  • E element(); 获取,但是不移除此队列的头。
  • E peek(); 获取但不移除此队列的头;如果此队列为空,则返回 null。

队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行 删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。 在队列中插入一个队列元素称为入队,从队列中删除一个队列元素称为出队。 因为队列只允许在一端插入,在另一端删除,所以只有最早进入队列的元素才能最先从队列中删除,故队列又称为先进先出(FIFO—first in first out)线性表。

什么是阻塞队列

  • 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
  • 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序整体处理数据的速度。

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。 在多线程开发中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。

为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。生产者和消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

有界无界?

有限队列就是长度有限,满了以后生产者会阻塞,无界队列就是里面能放无数的东西而不会因为队列长度限制被阻塞,当然空间限制来源于系统资源的限制,如果处理不及时,导致队列越来越大越来越大,超出一定的限制致使内存超限,操作系统或者 JVM 帮你解决烦恼,直接把你 OOM kill 省事了。

无界也会阻塞,为何?因为阻塞不仅仅体现在生产者放入元素时会阻塞,消费者拿取元素时,如果没有元素,同样也会阻塞。

Deque(双端队列)简述

双向队列(Deque)是 Queue 的一个子接口,双向队列是指该队列两端的元素既能入队(offer)也能出队(poll), 如果将 Deque 限制为只能从一端入队和出队,则可实现栈的数据结构。对于栈而言,有入栈(push)和出栈(pop),遵循先进后出原则。名称 deque 是“double ended queue(双端队列)”的缩写。

常用队列 java 类

LinkedList

实现了 Deque 接口。双向链表

java
LinkedList<?> list = new LinkedList();

SynchronousQueue

SynchronousQueue 是一个比较特别的队列,由于在线程池方面有所应用。它没有容量,是无缓冲等待队列,是一个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。拥有公平(FIFO)和非公平(LIFO)策略。

它的特别之处在于它内部没有容器。使用 SynchronousQueue 阻塞队列一般要求 maximumPoolSizes 为无界(Integer.MAX_VALUE),避免线程拒绝执行操作。

一个生产线程,当它生产产品(即 put 的时候),如果当前没有人想要消费产品(即当前没有线程执行 take),此生产线程必须阻塞,等待一个消费线程调用 take 操作,take 操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传递),这样的一个过程称为一次配对过程(当然也可以先 take 后 put, 原理是一样的)。

即当一个线程在把元素放入 SynchronousQueue 时,它会先阻塞,知道有其他线程过来拿它的时候(take),才会继续后面的执行。

java
ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), r -> new Thread(r, "ThreadTest"));

LinkedBlockingQueue

是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

LinkedBlockingQueue 是一个无界缓存等待队列。当前执行的线程数量达到 corePoolSize 的数量时,剩余的元素会在阻塞队列里等待。(所以在使用此阻塞队列时 maximumPoolSizes 就相当于无效了),每个线程完全独立于其他线程。生产者和消费者使用独立的锁来控制数据的同步,即在高并发的情况下可以并行操作队列中的数据。

注:这个队列需要注意的是,虽然通常称其为一个无界队列,但是可以人为指定队列大小,而且由于其用于记录队列大小的参数是 int 类型字段,所以通常意义上的无界其实就是队列长度为 Integer.MAX_VALUE,且在不指定队列大小的情况下也会默认队列大小为 Integer.MAX_VALUE。

java
ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(Integer.MAX_VALUE), r -> new Thread(r, "ThreadTest"));

ArrayBlockingQueue

是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。初始化时有参数可以设置

ArrayBlockingQueue 是一个有界缓存等待队列,可以指定缓存队列的大小,当正在执行的线程数等于 corePoolSize 时,多余的元素缓存在 ArrayBlockingQueue 队列中等待有空闲的线程时继续执行,当 ArrayBlockingQueue 已满时,加入 ArrayBlockingQueue 失败,会开启新的线程去执行,当线程数已经达到最大的 maximumPoolSizes 时,再有新的元素尝试加入 ArrayBlockingQueue 时会执行拒绝策略。

java
ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime.getRuntime().availableProcessors() * 2, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(32), r -> new Thread(r, "ThreadTest"));

PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素 采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素排序 规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素 进行排序。需要注意的是不能保证同优先级元素的顺序。

DelayQueue

是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。 队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。 DelayQueue 非常有用,可以将 DelayQueue 运用在以下应用场景。 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。还有订单到期,限时支付等等

LinkedBlockingQueue 和 ArrayBlockingQueue 的区别

相同:

  1. 都实现了 BlockingQueue 接口,都是可阻塞的队列;
  2. 内部都是使用 ReentrantLock 和 Condition 来保证生产和消费的同步。当队列为空,消费者线程被阻塞;当队列装满,生产者线程被阻塞;使用 Condition 的方法来同步和通信:await() 和 signal()

不同:

  1. 锁机制不同。LinkedBlockingQueue 中的锁是分离的,生产者的锁为 putLock,消费者的锁为 takeLock。而 ArrayBlockingQueue 生产者和消费者使用的是同一把锁;
  2. 底层实现机制也不同。LinkedBlockingQueue 内部维护的是一个链表结构。而 ArrayBlockingQueue 内部维护了一个数组。
  3. 构造函数不同。LinkedBlockingQueue 有默认的容量大小为:Integer.MAX_VALUE,可以认为是无界的缓存等待队列,当然也可以传入指定的容量大小。而 ArrayBlockingQueue 在初始化的时候,必须传入一个容量大小的值,是一个有界的缓存等待队列。
  4. 执行 clear()方法不同。LinkedBlockingQueue 执行 clear 方法时,会加上两把锁(跟第 1 条对应)。
  5. 统计元素的个数不同。LinkedBlockingQueue 中使用了一个 AtomicInteger 对象来统计元素的个数。而 ArrayBlockingQueu e 则使用 int 类型来统计元素。

Array 实现和 Linked 实现的区别

  1. 队列中锁的实现不同 ArrayBlockingQueue 实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁; LinkedBlockingQueue 实现的队列中的锁是分离的,即生产用的是 putLock, 消费是 takeLock
  2. 在生产或消费时操作不同 ArrayBlockingQueue 实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的; LinkedBlockingQueue 实现的队列中在生产和消费的时候,需要把枚举对象转换为 Node<E>进行插入或移除,会影响性能
  3. 队列大小初始化方式不同 ArrayBlockingQueue 实现的队列中必须指定队列的大小; LinkedBlockingQueue 实现的队列中可以不指定队列的大小,但是默认是 Integer.MAX_VALUE

常用阻塞队列

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。 以上的阻塞队列都实现了 BlockingQueue 接口,也都是线程安全的。