Java中的基础阻塞队列

摘要
  1. 分析ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue、DelayQueue五种队列;

  2. Executors创建的3种类型的ThreadPoolExecutor(FixedThreadPool、SingleThreadExecutor、CachedThreadPool)的executor()方法说明;

  3. Executors创建的2种类型的ScheduledThreadPoolExecutor(ScheduledThreadPoolExecutor、SingleThreadScheduledExecutor)的分析。

interface Queue extends Collection,Queue接口提供的方法

方法/处理方式 抛出异常 返回特殊值
插入 boolean add(E e) boolean offer(E e)
移除 E remove() E poll()
检查 E element() E peek()

interface BlockingQueue extends Queue,BlockingQueue接口提供的方法

方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入 boolean add(E e) boolean offer(E e) void put(E e) boolean offer(E e, long timeout, TimeUnit unit)
移除 E remove() E poll() E take() throws InterruptedException E poll(long timeout, TimeUnit unit)
检查 E element() E peek()

ArrayBlockingQueue分析!

ArrayBlockingQueue的注释大意

ArrayBlockingQueue是一个大小固定的BlockingQueue,底层是由数组维护,队列按照FIFO(先进先出)原则。

ArrayBlockingQueue一旦创建,大小不可变。

ArrayBlockingQueue的注意要点

  1. ArrayBlockingQueue的offer()方法(add()方法相当于调用offer()方法),poll()方法调用lock.lock()获取锁,不响应中断,因为这两个方法不是阻塞的;

  2. take()和put()方法都调用lock.lockInterruptibly();方法获取锁,该方法支持线程中断响应,如果其他线程中断当前线程那么当前线程就会抛出InterruptedException;如果此时锁被其他线程锁占用,那么当前线程处于WAITING状态(详见AbstractQueuedSynchronizer分析)。

LinkedBlockingQueue分析

LinkedBlockingQueue的注释大意

LinkedBlockingQueue底层由链表维护,队列按照FIFO(先进先出)原则。
构造LinkedBlockingQueue最好传参数capacity(容量),防止过度扩张,默认为Integer.MAX_VALUE。因为Integer.MAX_VALUE数值很大,所以可以称为”无界”队列(这里的无界并不是真正意义上的无界)。
链接节点的插入如果不超过队列容量,都是在每次插入时动态创建的(创建指创建Node)。

LinkedBlockingQueue比ArrayBlockingQueue具有更高的吞吐量,但在大多数并发应用程序中的预测性能较差。

LinkedBlockingQueue中一些参数的解释

LinkedBlockingQueue的注意要点

  1. ArrayBlockingQueue底层保存元素数量count使用的是一个普通的int类型变量。其原因是在ArrayBlockingQueue底层对于元素的入队列和出队列使用的是同一个lock对象。而数量的修改都是在处于线程获取锁的情况下进行操作,因此不会有线程安全问题。

  2. LinkedBlockingQueue的入队列和出队列使用的是两个不同的lock对象,因此无论是在入队列还是出队列,都会涉及对元素数量的并发修改,因此这里使用了一个原子操作类来解决对同一个变量进行并发修改的线程安全问题。

  3. LinkedBlockingQueue的获取锁的方式和ArrayBlockingQueue类似,只是使用两个不同的lock对象而已。

LinkedBlockingQueue的静态工厂方法

使用LinkedBlockingQueue的好处:
因为线程大小固定的线程池,其线程的数量是不具备伸缩性的,当任务非常繁忙的时候,就势必会导致所有的线程都处于工作状态,导致队列满的情况发生,从而导致任务无法提交而抛出RejectedExecutionException;
而使用”无界”队列由于其良好的存储容量的伸缩性,可以很好的去缓冲任务繁忙情况下场景,即使任务非常多,也可以进行动态扩容,当任务被处理完成之后,队列中的节点也会被随之被GC回收,非常灵活。

FixedThreadPool的execute()说明


FixedThreadPool的execute()执行流程
  1. 如果当前运行的线程数少于corePoolSize,则会创建新线程来执行任务;

  2. 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue;

  3. 线程执行完步骤A中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。

使用Executors.newFixedThreadPool相关的方法,LinkedBlockingQueue的capacity为Integer.MAX_VALUE,所以会对线程池有以下影响:
  1. 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize;

  2. 由于A影响,使用”无界”队列时maximumPoolSize将是一个无效参数;

  3. 由于A、B影响,使用”无界”队列时keepAliveTime将是一个无效参数;

  4. 由于使用”无界队列”,运行中的FixedThreadPool(未执行shutdown()或shutdownNow())不会拒绝任务(不会调用RejectedExecutionException.rejectedExecution方法)。

SingleThreadExecutor的execute()说明


SingleThreadExecutor的execute()执行流程
  1. 如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则会创建一个新线程来执行任务;

  2. 在线程池完成预热之后(当前线程池中有一个运行的线程),将任务加入LinkedBlockingQueue;

  3. 线程执行完步骤A中的任务后,会在一个无线循环中反复从LinkedBlockingQueue获取任务来执行。

SingleThreadExecutor使用”无界”的工作队列对线程池带来的影响与使用Executors.newFixedThreadPool相同。

ArrayBlockingQueue和LinkedBlockingQueue的比较

  1. ArrayBlockingQueue由于其底层基于数组,并且在创建时指定存储的大小,在完成后就会立即在内存分配固定大小容量的数组元素,因此其存储通常有限;
    而LinkedBlockingQueue可以由用户指定最大存储容量,也可以无需指定,如果不指定则最大存储容量将是Integer.MAX_VALUE,由于其节点的创建都是动态创建,并且在节点出队列后可以被GC所回收,因此其具有灵活的伸缩性。
    但是由于ArrayBlockingQueue的有界性,因此其能够更好的对于性能进行预测,
    而LinkedBlockingQueue由于没有限制大小,当任务非常多的时候,不停地向队列中存储,就有可能导致内存溢出的情况发生。

  2. ArrayBlockingQueue中在入队列和出队列操作过程中,使用的是同一个lock,所以即使在多核CPU的情况下,其读取和操作的都无法做到并行,而LinkedBlockingQueue的读取和插入操作所使用的锁是两个不同的lock,它们之间的操作互相不受干扰,因此两种操作可以并行完成,故LinkedBlockingQueue的吞吐量要高于ArrayBlockingQueue。

PriorityBlockingQueue分析

PriorityBlockingQueue的注释大意

PriorityBlockingQueue底层由数组维护,是基于PriorityQueue实现的阻塞队列。构造PriorityBlockingQueue最好传参数capacity(容量),防止过度扩张,默认为11。

但因为PriorityBlockingQueue可以扩容,可以被无限扩容到Integer.MAX_VALUE - 8,所以可以看做是一个”无界”阻塞队列。注意资源耗竭问题(会产生OOM)。

默认的对于优先级相同的元素没有排序规律,但可以自己通过构造方法传入指定的Comparator排序方式。

PriorityBlockingQueue的注意要点

  1. PriorityBlockingQueue底层对于元素的入队列和出队列使用的是同一个lock对象。

  2. 因为PriorityBlockingQueue是”无界”阻塞队列,所以put()不需要阻塞,直接调用offer()方法。因为,put方法在队列满时阻塞,take方法在队列空时阻塞,由于PriorityBlockingQueue是”无界”阻塞队列,所以不需要阻塞。

  3. PriorityBlockingQueue和ArrayBlockingQueue都是由数组维护。二者区别是,PriorityBlockingQueue支持扩容,ArrayBlockingQueue并不支持。由于LinkedBlockingQueue由链表维护,没有扩容的概念,只是会有容量限制。PriorityBlockingQueue的扩容使用CAS自旋锁。

  4. PriorityBlockingQueue的offer()方法(add()、put()方法相当于调用offer()方法),poll()方法调用lock.lock()获取锁,不响应中断,因为这两个方法不是阻塞的;take()方法都调用lock.lockInterruptibly();方法获取锁,注意:该方法支持线程中断响应。

  5. PriorityBlockingQueue的插入移除参考二叉堆中的最小堆,因为数字越小优先级越高,数字越大优先级越低。所以数字最小的是在队头。

SynchronousQueue分析

SynchronousQueue的注释大意

每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。
SynchronousQueue的支持公平策略和非公平策略,所以底层可能两种数据结构:队列(实现公平策略)和栈(实现非公平策略),队列与栈都是通过链表来实现的。

SynchronousQueue的静态工厂方法

SynchronousQueue的execute()说明


可以看到CacheThreadPool的corePoolSize被设置为0,即corePool为空,maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool是”无界”的,这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度,CacheThreadPool会不断创建新的线程,极端情况下,CacheThreadPool会因为创建过多线程而耗尽CPU和内存资源;keepAliveTime设置为60L,意味着CacheThreadPool中的空闲线程等待新任务的最长时间为60s,空闲线程超过60s将被终止。

SynchronousQueue的execute()执行流程
  1. 首先执行SynchronousQueue.offer(E o)。如果当前maximumPool中有空闲线程正在执行SynchronousQueue.poll(long timeout, TimeUnit unit),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成;否则执行下面的步骤2;

  2. 当初始maximumPool为空,或者maximumPool中当前没有空闲线程,将没有线程执行SynchronousQueue.poll(long timeout, TimeUnit unit)。这种情况下步骤1将失败。此时CacheThreadPool会创建一个新线程执行任务,execute()方法执行完成;

  3. 在步骤2中创建的线程将任务执行完成后,会执行SynchronousQueue.poll(long timeout, TimeUnit unit)。这个poll操作会让线程最多在SynchronousQueue中等待60s。如果60s内主线程就提交了一个新任务(主线程执行步骤1),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于60s的空闲线程会被终止,因此长时间保持空闲的CacheThreadPool不会使用任何资源。

DelayQueue分析

DelayQueue的注释大意

DelayQueue底层由数组维护,是基于PriorityQueue实现的阻塞队列,并且是真正意义上的无界队列,因为这里没有capacity的概念。
队列里只允许放入可以”延期”的元素,队列中列头的元素是最先”到期”的元素。如果队列中没有任何元素”到期”,尽管队列中有元素,也不能从队列头获取到任何元素。

DelayQueue的注意要点

  1. DelayQueue底层对于元素的入队列和出队列使用的是同一个lock对象。
    因为DelayQueue是无界阻塞队列,所以put()不需要阻塞,直接调用offer()方法。

  2. DelayQueue的offer()方法(add()、put()方法相当于调用offer()方法),poll()方法调用lock.lock()获取锁,不响应中断,因为这两个方法不是阻塞的;
    take()方法都调用lock.lockInterruptibly();方法获取锁,注意:该方法支持线程中断响应。

DelayQueue的静态工厂方法

ScheduledThreadPoolExecutor的execute()方法说明





ScheduledThreadPoolExecutor使用自己实现的DelayedWorkQueue。它的实现和DelayQueue、PriorityQueue类似,也是通过二叉堆中的最小堆的实现方法,将任务通过封装的ScheduledFutureTask的time参数(即到期时间)排序放入队列中。

参考

  1. ArrayBlockingQueue源码分析
  2. LinkedBlockingQueue源码分析
  3. LinkedBlockingQueue的dequeue()方法中Help GC注释的解释
  4. 《Java并发编程的艺术》