Java 阻塞队列 BlockingQueue

(1) 阻塞队列是什么

 阻塞队列(BlockingQueue)是一个支持阻塞操作的线程安全的队列。使得数据由队列的一端输入,从另外一端输出;

 BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

阻塞队列方法概要

方法\处理方式 抛异常 返回特殊值 一直阻塞(死等) 超时退出(等一会儿)
添加元素 add(e) offer(e) put(e) offer(e, time, unit)
删除元素 remove() poll() take() poll(time, unit)
检查元素 element() peek()

(2) 阻塞队列应用

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

/**
 * 生产者
 */
class Producer implements Runnable {
    private final BlockingQueue queue;

    Producer(BlockingQueue q) {
        queue = q;
    }

    public void run() {
        try {
            while (true) {
                // 往队列添加元素
                queue.put(produce());
            }
        } catch (InterruptedException ex) {
            //...handle ...
        }
    }

    Object produce() {
        //    ...
    }
}

/**
 * 消费者 
 */
class Consumer implements Runnable {
    private final BlockingQueue queue;

    Consumer(BlockingQueue q) {
        queue = q;
    }

    public void run() {
        try {
            while (true) {
                // 从队列取元素
                consume(queue.take());
            }
        } catch (InterruptedException ex) {
            //...handle ...
        }
    }

    void consume(Object x) {
        //...
    }
}

class Setup {

    public static void main(String[] args) {
        BlockingQueue q = new LinkedBlockingQueue();
        Producer p = new Producer(q);
        Consumer c1 = new Consumer(q);
        Consumer c2 = new Consumer(q);
        new Thread(p).start();
        new Thread(c1).start();
        new Thread(c2).start();
    }
}

(3) 阻塞队列的原理

package java.util.concurrent;

/**
 * 一个队列,它还支持在检索元素时等待队列变为非空,并在存储元素时等待队列中的空间变为可用的操作。
 * 
 * BlockingQueue 方法有四种形式,用不同的方式处理不能立即满足,但可能在未来某个时间点满足的操作:
 * 第一种抛出异常,
 * 第二种返回特殊值(null 或 false,取决于操作),
 * 第三个无限期地阻塞当前线程,直到操作成功,
 * 第四个阻塞只有给定的最大时间限制才放弃。
 * 
 * BlockingQueue 不接受空元素。在尝试添加、放置或提供 null 时,实现会抛出 NullPointerException。
 * 
 * BlockingQueue 可能是容量受限的。在任何给定时间,它都可能有一个剩余容量,超出该容量就不能在不阻塞的情况下放置其他元素。
 * 
 * BlockingQueue 实现是线程安全的。所有排队方法都使用内部锁或其他形式的并发控制以原子方式实现其效果。
 * 但是,批量收集操作 addAll、containAll、retainAll 和 removeAll 不一定以原子方式执行,除非在实现中另外指定。
 * 
 * BlockingQueue 本质上不支持任何类型的“关闭”或“关闭”操作来指示不再添加项目。此类功能的需求和使用往往取决于实现。
 * 例如,一种常见的策略是生产者插入特殊的流尾或有毒对象,当消费者采用时会相应地解释这些对象。
 */ 
public interface BlockingQueue<E> extends Queue<E> {

}

(3.1) 添加元素

添加元素有4种方法 add(e)offer(e)put(e)offer(e, time, unit)
常用的是offer(e)  

(3.1.1) add(e)

/**
 * 如果可以在不违反容量限制的情况下立即将指定的元素插入此队列,则在成功时返回 {@code true} 
 * 在当前没有可用空间时抛出IllegalStateException。
 * 使用容量受限队列时,通常最好使用 #offer(Object) 。
 *
 * @param e 要添加的元素
 * @return true  
 * @throws IllegalStateException 达到容量限制(队列已满)抛出此异常
 * @throws ClassCastException 如果指定元素的类阻止它被添加到这个队列
 * @throws NullPointerException 元素为null
 * @throws IllegalArgumentException 如果指定的某些属性元素阻止它被添加到这个队列
 */
boolean add(E e);

 

(3.1.2) offer(e)

/**
 * 如果可以在不违反容量限制的情况下立即将指定的元素插入此队列(队列未满),则在成功时返回true,
 * 如果当前没有可用空间(队列满了)则返回false。
 *
 * @param e 要添加的元素
 * @return 添加成功返回true  添加失败返回false
 * @throws ClassCastException 如果指定元素的类阻止它被添加到这个队列
 * @throws NullPointerException 元素为null
 * @throws IllegalArgumentException 如果指定的某些属性元素阻止它被添加到这个队列
 */
boolean offer(E e);

(3.1.3) put(e)

/**
 * 将指定元素插入此队列,队列已满时等待可用空间。
 *
 * @param e 要添加的元素
 * @throws InterruptedException 在等待时被中断
 * @throws ClassCastException 如果指定元素的类阻止它被添加到这个队列
 * @throws NullPointerException 元素为null
 * @throws IllegalArgumentException 如果指定的某些属性元素阻止它被添加到这个队列
 */
void put(E e) throws InterruptedException;

(3.1.4) offer(e, time, unit)

/**
 * 将指定的元素插入此队列,如有必要,等待指定时间(等一会)以获得可用空间。
 *
 * @param e 要添加的元素
 * @param timeout 放弃以前的等待时间
 * @param unit 时间单位
 * @return 成功返回true,失败返回false
 * @throws InterruptedException 在等待时被中断
 * @throws ClassCastException 如果指定元素的类阻止它被添加到这个队列
 * @throws NullPointerException 元素为null
 * @throws IllegalArgumentException 如果指定的某些属性元素阻止它被添加到这个队列
 */
boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException;

(3.2) 移除元素


(3.2.1) remove(o)

/**
 * 从此队列中移除指定元素的单个实例(如果存在)。
 * 更正式地说,如果此队列包含一个或多个此类元素,则删除元素e 使得o.equals(e)。如果此队列包含指定元素(或等效地,如果此队列因调用而更改),则返回 {@code true}。
 *
 * @param o 要从此队列中删除的元素(如果存在)
 * @return true 移除成功返回true
 * @throws ClassCastException 如果指定元素的类与此队列不兼容
 * @throws NullPointerException 元素为空
 */
boolean remove(Object o);

(3.2.2) poll()

poll()方法继承自Queue

/**
 * 检索并删除此队列的头部,如果此队列为空,则返回null
 *
 * @return 此队列的头部元素,如果此队列为空返回null
 */
E poll();

(3.2.3) take()

/**
 * 检索并删除此队列的头部,必要时等待直到元素可用。
 *
 * @return 此队列的头部元素
 * @throws InterruptedException 如果在等待时被打断
 */
E take() throws InterruptedException;

(3.2.4) poll(timeout, unit)

/**
 * Retrieves and removes the head of this queue, waiting up to the
 * specified wait time if necessary for an element to become available.
 *
 * @param timeout 等待多长时间才放弃
 * @param unit 时间单位
 * @return 此队列的头部,等待时间用尽后返回null
 * @throws InterruptedException 如果在等待时被打断
 */
E poll(long timeout, TimeUnit unit)
    throws InterruptedException;

(3.3) 其它

/**
 * 返回此队列在理想情况下(在没有内存或资源限制的情况下)可以无阻塞地接受的附加元素的数量,
 * 如果没有内在限制,则返回 Integer.MAX_VALUE
 *
 * 请注意,您不能总是通过检查剩余容量来判断插入元素的尝试是否会成功,因为可能是另一个线程即将插入或删除元素。
 *
 * @return 剩余容量
 */
int remainingCapacity();

/**
 * 如果此队列包含指定元素,则返回true。
 * 更正式地说,返回true当且仅当此队列包含至少一个元素e使得o.equals(e)。
 *
 * @param o 要检查此队列中包含的对象
 * @return true 如果此队列包含指定元素
 * @throws ClassCastException 如果指定元素的类与此队列不兼容
 * @throws NullPointerException 如果元素为空
 */
boolean contains(Object o);

/**
 * 从此队列中删除所有可用元素并将它们添加到给定集合中。
 * 此操作可能比重复轮询此队列更有效。
 * 尝试将元素添加到集合c时遇到的失败可能会导致在抛出相关异常时元素不在任何一个或两个集合中。尝试将队列排空到自身会导致IllegalArgumentException。
 * 此外,如果指定的集合在操作过程中被修改,则此操作的行为是未定义的。
 *
 * @param c 将元素转移到的集合
 * @return 转移的元素数量
 * @throws UnsupportedOperationException 如果指定集合不支持添加元素
 * @throws ClassCastException 如果此队列的元素的类阻止将其添加到指定的集合
 * @throws NullPointerException 如果指定的集合为空
 * @throws IllegalArgumentException 如果指定的集合是这个队列,或者这个队列元素的某些属性阻止它被添加到指定的集合
 */
int drainTo(Collection<? super E> c);

/**
 * 从此队列中最多移除给定数量的可用元素,并将它们添加到给定的集合中。
 * 尝试将元素添加到集合c时遇到的失败可能会导致在抛出相关异常时元素不在任何一个或两个集合中。尝试将队列排空到自身会导致IllegalArgumentException。
 * 此外,如果指定的集合在操作过程中被修改,则此操作的行为是未定义的。
 * 
 * @param c 将元素转移到的集合
 * @param maxElements 要传输的最大元素数
 * @return 转移的元素数量
 * @throws UnsupportedOperationException 如果指定集合不支持添加元素
 * @throws ClassCastException 如果此队列的元素的类阻止将其添加到指定的集合
 * @throws NullPointerException 如果指定的集合为空
 * @throws IllegalArgumentException 如果指定的集合是这个队列,或者这个队列元素的某些属性阻止它被添加到指定的集合
 */
int drainTo(Collection<? super E> c, int maxElements);

(4) JDK里的阻塞队列

JDK里提供了多种阻塞队列
ArrayBlockingQueue
LinkedBlockingQueue
PriorityBlockingQueue
DelayQueue
SynchronousQueue 并发同步阻塞队列
LinkedTransferQueue
LinkedBlockingDeque


(5) 有界数组阻塞队列-ArrayBlockingQueue

ArrayBlockingQueue是一个用数组实现的有界阻塞队列。
此队列按照先进先出的原则对元素进行排序。

思考一下,如果要自己用数组实现一个先进先出的阻塞队列,自己怎么实现?
是不是添加元素时添加到数据尾部,删除元素时也在头部删除?
如果直接在头部删除元素,需要移动后面的所有元素,时间复杂度比较高,有没有优化的方案?

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /** 队列中的元素  数组 */
    @SuppressWarnings("serial") // 有条件可序列化 Conditionally serializable
    final Object[] items;

    /** 下一次移除元素操作的元素索引  移除元素操作有 take, poll, peek or remove */
    int takeIndex;

    /** 下一次添加元素操作的元素索引,添加元素的操作有 put, offer, or add */
    int putIndex;

    /** 队列中元素的个数 */
    int count;

    /** 主锁保护所有访问 */
    final ReentrantLock lock;

    /** 移除元素的等待条件  不为空 */
    @SuppressWarnings("serial")  // Classes implementing Condition may be serializable.
    private final Condition notEmpty;

    /** 添加元素的等待条件  队列不满 */
    @SuppressWarnings("serial")  // Classes implementing Condition may be serializable.
    private final Condition notFull;

    /**
     * 当前活动迭代器的共享状态,如果已知没有,则为 null。允许队列操作更新迭代器状态。
     */
    transient Itrs itrs;
}

(5.1) 构造方法

/**
 * 使用给定(固定)容量和默认访问策略(非公平队列)创建ArrayBlockingQueue。
 *
 * @param capacity  队列容量
 * @throws IllegalArgumentException  capacity < 1会抛出异常
 */
public ArrayBlockingQueue(int capacity) {
    // 默认是非公平队列
    this(capacity, false);
}

/**
 * 使用给定(固定)容量和指定访问策略(公平/非公平)创建ArrayBlockingQueue。
 *
 * @param 队列容量
 * @param fair  如果是true,访问队列的线程阻塞在插入或者移除方法时,访问顺序按照先进先出顺序处理。
 *              如果为false,则未指定访问顺序。
 *        
 * @throws IllegalArgumentException if {@code capacity < 1}
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    // 指定容量    
    this.items = new Object[capacity];
    // 通过ReentrantLock来保证公平还是非公平
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

/**
 * 创建一个具有给定(固定)容量、指定访问策略(公平/非公平)并最初包含给定集合元素的ArrayBlockingQueue,按集合迭代器的遍历顺序添加。 
 *
 * @param capacity 队列容量
 * @param fair  如果是true,访问队列的线程阻塞在插入或者移除方法时,访问顺序按照先进先出顺序处理。
 *              如果为false,则未指定访问顺序。
 * @param c 最初包含的元素集合
 * @throws IllegalArgumentException capacity < c.size() || capacity < 1 
 * @throws NullPointerException 如果指定的集合或其任何元素为 null
 */
public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
    // 初始化队列
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        // 队列数组
        final Object[] items = this.items;
        int i = 0;
        try {
            // 把集合c里的元素添加到队列
            for (E e : c)
                items[i++] = Objects.requireNonNull(e); // 这儿要求元素不为null
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        // 更新count
        count = i;
        // 更新index  
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

(5.2) 添加元素

(5.2.1) add(e)

/**
 * (队列未满)插入(尾部)成功返回true,
 * (队列满)插入失败返回IllegalStateException     
 *
 * @param e 要添加的元素
 * @return true  (as specified by {@link Collection#add})
 * @throws IllegalStateException 如果队列满了
 * @throws NullPointerException 如果元素为null
 */
public boolean add(E e) {
    return super.add(e);
}

(5.2.2) offer(e)

/**
 * (队列未满)插入(尾部)成功返回true,
 * (队列满)插入失败返回false
 *
 * @throws NullPointerException 如果元素为null
 */
public boolean offer(E e) {
    // 要求元素不为空
    Objects.requireNonNull(e); 
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        if (count == items.length)
            // 队列已满,返回false
            return false;
        else {
            // 添加到队列尾部
            enqueue(e);
            return true;
        }
    } finally {
        // 解锁
        lock.unlock();
    }
}

(5.2.3) put(e)

/**
 * 将指定元素插入此队列的尾部,如果队列已满,则等待(队列)可用空间。  
 *
 * @throws InterruptedException 其它线程调用等待线程的Thread.interrupt方法来中断等待线程的等待时
 * @throws NullPointerException 元素为空
 */
public void put(E e) throws InterruptedException {
    // 要求元素不为空
    Objects.requireNonNull(e);
    final ReentrantLock lock = this.lock;
    // 加锁,优先响应中断 // 允许在等待时由其它线程调用等待线程的Thread.interrupt方法来中断等待线程的等待而直接返回,这时不用获取锁,而会抛出一个InterruptedException
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 队列已满,等待
            notFull.await();
        // 添加到队列尾部    
        enqueue(e);
    } finally {
        // 解锁
        lock.unlock();
    }
}

(5.2.4) offer(e, timeout, unit)

/**
 * 将指定的元素插入此队列的尾部,
 * 如果队列已满,则等待指定的时间(等一会儿),如果在等待的时间内队列空间可用,插入返回true,否则超时返回false 
 *
 * @throws InterruptedException 其它线程调用等待线程的Thread.interrupt方法来中断等待线程的等待时
 * @throws NullPointerException 元素为null
 */
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    // 要求元素不为null
    Objects.requireNonNull(e);
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // 加锁  优先响应中断
    lock.lockInterruptibly();
    try {
        while (count == items.length) {
            if (nanos <= 0L)
                // 返回失败
                return false;
            // 更新等待时间    
            nanos = notFull.awaitNanos(nanos);
        }
        // 添加到队列尾部
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}

(5.2.5) enqueue(e)

/**
 * 在当前放置位置插入元素、前进和发出信号。
 * 仅在持有锁时调用。  
 */
private void enqueue(E e) {
    // assert lock.isHeldByCurrentThread();
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    // 队列数组
    final Object[] items = this.items;
    // 把元素添加到数组尾部
    items[putIndex] = e;
    // 这儿有个置0操作,可以循环写  避免了删除操作
    if (++putIndex == items.length) putIndex = 0;
    // 更新数组元素个数
    count++;
    // 发信号 唤醒删除操作
    notEmpty.signal();
}

(5.3) 删除元素

(5.3.1) remove()


/**
 * Deletes item at array index removeIndex.
 * Utility for remove(Object) and iterator.remove.
 * Call only when holding lock.
 */
void removeAt(final int removeIndex) {
    // assert lock.isHeldByCurrentThread();
    // assert lock.getHoldCount() == 1;
    // assert items[removeIndex] != null;
    // assert removeIndex >= 0 && removeIndex < items.length;
    final Object[] items = this.items;
    if (removeIndex == takeIndex) {
        // removing front item; just advance
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {
        // an "interior" remove

        // slide over all others up through putIndex.
        for (int i = removeIndex, putIndex = this.putIndex;;) {
            int pred = i;
            if (++i == items.length) i = 0;
            if (i == putIndex) {
                items[pred] = null;
                this.putIndex = pred;
                break;
            }
            items[pred] = items[i];
        }
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    notFull.signal();
}

(5.3.2) poll()

public E poll() {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        // 队列为空 返回null   不为空移除元素并返回
        return (count == 0) ? null : dequeue();
    } finally {
        // 解锁
        lock.unlock();
    }
}

(5.3.3) take()

/**
 * 移除元素
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    // 加锁 可响应中断
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 只要队列为空就一直等  死等 
            notEmpty.await();
        // 移除的元素并返回
        return dequeue();
    } finally {
        lock.unlock();
    }
}

(5.3.4) poll(timeout, unit)

/**
 * 移除元素
 */
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    // 要等待的时间
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    // 加锁 可响应中断
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0L)
                // 等待时间<0 返回
                return null;
            // 等待时间>0 继续等待
            // 更新等待时间    
            nanos = notEmpty.awaitNanos(nanos);
        }
        // 移除的元素并返回
        return dequeue();
    } finally {
        // 解锁
        lock.unlock();
    }
}

(5.3.5) dequeue()

/**
 * 移除元素
 * 在当前采取的位置、前进和信号中提取元素。
 * 仅在持有锁时调用。 
 *
 * @return 返回被移除的元素 
 */
private E dequeue() {
    // assert lock.isHeldByCurrentThread();
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    // 队列元素 
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    // 要移除的原因
    E e = (E) items[takeIndex];
    // 置为空
    items[takeIndex] = null;
    // 置0  循环操作  避免了删除要移动元素的问题
    if (++takeIndex == items.length) takeIndex = 0;
    // 更新队列数组元素个数
    count--;
    // 
    if (itrs != null)
        itrs.elementDequeued();
    // 发信号  唤醒添加操作
    notFull.signal();
    return e;
}

(5.4) 其它

// this doc comment is a modified copy of the inherited doc comment,
// without the reference to unlimited queues.
/**
 * 剩余容量
 * 返回此队列可以理想地(在没有内存或资源限制的情况下)无阻塞地接受的附加元素的数量。
 * 这始终等于此队列的初始容量减去此队列的当前size。
 *
 * 请注意,您不能总是通过检查剩余容量来判断插入元素的尝试是否会成功,因为可能是另一个线程即将插入或删除元素。
 */
public int remainingCapacity() {
    final ReentrantLock lock = this.lock;
    // 加锁
    lock.lock();
    try {
        // 数组长度 - 已有元素个数 = 剩余容量
        return items.length - count;
    } finally {
        // 解锁
        lock.unlock();
    }
}

参考资料

[1] Java并发系列 — 阻塞队列(BlockingQueue)