(1) 阻塞队列是什么
阻塞队列(BlockingQueue)是一个支持阻塞操作的线程安全的队列。使得数据由队列的一端输入,从另外一端输出;
BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
阻塞队列方法概要
方法\处理方式 |
抛异常 |
返回特殊值 |
一直阻塞(死等) |
超时退出(等一会儿) |
添加元素 |
add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
删除元素 |
remove() |
poll() |
take() |
poll(time, unit) |
检查元素 |
element() |
peek() |
|
|
(2) 阻塞队列应用
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) {
queue = q;
}
public void run() {
try {
while (true) {
queue.put(produce());
}
} catch (InterruptedException ex) {
}
}
Object produce() {
}
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) {
queue = q;
}
public void run() {
try {
while (true) {
consume(queue.take());
}
} catch (InterruptedException ex) {
}
}
void consume(Object x) {
}
}
class Setup {
public static void main(String[] args) {
BlockingQueue q = new LinkedBlockingQueue();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
(3) 阻塞队列的原理
package java.util.concurrent;
public interface BlockingQueue<E> extends Queue<E> {
}
(3.1) 添加元素
添加元素有4种方法 add(e)
,offer(e)
,put(e)
,offer(e, time, unit)
常用的是offer(e)
(3.1.1) add(e)
boolean add(E e);
(3.1.2) offer(e)
boolean offer(E e);
(3.1.3) put(e)
void put(E e) throws InterruptedException;
(3.1.4) offer(e, time, unit)
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
(3.2) 移除元素
(3.2.1) remove(o)
boolean remove(Object o);
(3.2.2) poll()
poll()方法继承自Queue
E poll();
(3.2.3) take()
E take() throws InterruptedException;
(3.2.4) poll(timeout, unit)
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
(3.3) 其它
int remainingCapacity();
boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
(4) JDK里的阻塞队列
JDK里提供了多种阻塞队列
ArrayBlockingQueue
LinkedBlockingQueue
PriorityBlockingQueue
DelayQueue
SynchronousQueue 并发同步阻塞队列
LinkedTransferQueue
LinkedBlockingDeque
(5) 有界数组阻塞队列-ArrayBlockingQueue
ArrayBlockingQueue是一个用数组实现的有界阻塞队列。
此队列按照先进先出的原则对元素进行排序。
思考一下,如果要自己用数组实现一个先进先出的阻塞队列,自己怎么实现?
是不是添加元素时添加到数据尾部,删除元素时也在头部删除?
如果直接在头部删除元素,需要移动后面的所有元素,时间复杂度比较高,有没有优化的方案?
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
@SuppressWarnings("serial")
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
@SuppressWarnings("serial")
private final Condition notEmpty;
@SuppressWarnings("serial")
private final Condition notFull;
transient Itrs itrs;
}
(5.1) 构造方法
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Object[] items = this.items;
int i = 0;
try {
for (E e : c)
items[i++] = Objects.requireNonNull(e);
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
(5.2) 添加元素
(5.2.1) add(e)
public boolean add(E e) {
return super.add(e);
}
(5.2.2) offer(e)
public boolean offer(E e) {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
(5.2.3) put(e)
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
(5.2.4) offer(e, timeout, unit)
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
Objects.requireNonNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
(5.2.5) enqueue(e)
private void enqueue(E e) {
final Object[] items = this.items;
items[putIndex] = e;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}
(5.3) 删除元素
(5.3.1) remove()
void removeAt(final int removeIndex) {
final Object[] items = this.items;
if (removeIndex == takeIndex) {
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
for (int i = removeIndex, putIndex = this.putIndex;;) {
int pred = i;
if (++i == items.length) i = 0;
if (i == putIndex) {
items[pred] = null;
this.putIndex = pred;
break;
}
items[pred] = items[i];
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
(5.3.2) poll()
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
(5.3.3) take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
(5.3.4) poll(timeout, unit)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
(5.3.5) dequeue()
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E e = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return e;
}
(5.4) 其它
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}
参考资料
[1] Java并发系列 — 阻塞队列(BlockingQueue)