30分钟带你了解阻塞队列所有内容,再也不怕面试官刁难你了!(上)
30分钟带你了解阻塞队列所有内容,再也不怕面试官刁难你了!(上)
目录
- 1、概述
- 2、BlockingQueue 源码解析
- 3、ArrayBlockingQueue 源码解析
- 4、LinkedBlockingQueue 源码解析
- 5、PriorityBlockingQueue 源码解析
呕心沥血,耗费一周的时间来看源码,这么优秀的博主你难道不关注一下?
1、概述
今天在整理线程池的相关内容时,发现许多面试官对于阻塞队列这块内容问的还是挺挺深入的。事实上也是,公司招人的时候,总不会去招那些只会用,但是不知道为什么的人吧。而且阻塞队列是线程池的核心内容,因此我们将这块内容给搞懂了,这样才能在开发中游刃有余,在面试时镇定自若。
然而在找寻相关博客的时候,发现除了大部分博客千篇一律的copy之外,一些原创博客出现一些内容不全、不明原理、甚至内容重复的现象,这真的是对我们求知若渴的孩子的打击。因此,我从 IDEA 中的类图功能一个个找过去,给大家解读源码,所以,这里,以我为准。
先给大家看一下BlockingQueue
的相关类的图。(???为什么是BlockingQueue
?兄弟快去看看线程池构造函数的参数!)
2、BlockingQueue 源码解析
BlockingQueue
是一个接口,它继承了Queue
接口。然后下面是它的方法:
// 这里我们看到 BlockingQueue 是一个泛型接口,但是我们在线程池中用的是 Runnable 接口也就是线程可执行动作。这里了解一下
public interface BlockingQueue<E> extends Queue<E> {
/**
* Q:好的,问题来了,我们看到下面三个方法都是添加元素,那么他们有什么不同嘛?
* A:有的,我们看上面,BlockingQueue 实现了 Queue 接口,Queue 接口又实现了 Collection 接口。其实这三个方法分别属于不同接口中的定义。
* add 方法来源于 Collection 接口,并且在 BlockingQueue 中定义当大小不足时,会抛出 IllegalStateException;
* offer 方法来源于 Queue 接口,并且在 BlockingQueue 中定义当大小不足时,会返回 false 而不是抛错;
* put 方法是 BlockingQueue 接口自己定义的,并且在 BlockingQueue 中定义当大小不足时会挂起,直到有足够的大小。允许被打断,打断会抛出 InterruptedException。
*/
// 添加元素
boolean add(E e);
// 添加元素
boolean offer(E e);
// 添加元素
void put(E e) throws InterruptedException;
// 添加元素,并且有一个时间。超过时间就放弃
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 把第一个元素取走并删除,如果没有的话就挂起直到有,打断会抛出 InterruptedException
E take() throws InterruptedException;
// 也是拿元素,超时就放弃,打断会抛出 InterruptedException
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
// 查询剩余容量
int remainingCapacity();
// 删除元素。这里为什么不用 E 而用 Object 呢?我觉得有两点吧:1、限定了元素为非基础类型;2、在元素比较时需要用到 equals 方法,在注释中有用到
boolean remove(Object o);
// 查询元素,也是用 Object
public boolean contains(Object o);
// 将当前所有可用元素放到 C 中,需要子类自己实现。原注释提出,drainTo 方法比迭代调用 poll 元素要好,因为迭代过程出现异常可能会导致元素的丢失
int drainTo(Collection<? super E> c);
// 将指定大小个元素放到 C 中
int drainTo(Collection<? super E> c, int maxElements);
}
3、ArrayBlockingQueue 源码解析
3-1、ArrayBlockingQueue 概述
ArrayBlockingQueue
是一个有界的BlockingQueue
,内部存储使用数组。ArrayBlockingQueue
提供先进先出的机制,提供公平锁和非公平锁的获取(那么有同学可能要问了,既然保证了先进先出那不就肯定是公平的嘛?别急,这个问题我们在下面源码中进行回答)。
3-2、ArrayBlockingQueue 源码
我会根据使用逻辑将源码顺序进行调换,不要以出现顺序当做源码中的真实位置。
3-2-1、成员变量
首先是一些成员变量:
// 存储元素的数组。可以看到是 final 修饰的,意味着一旦 ArrayBlockingQueue 创建之后,大小不能再改变了
final Object[] items;
// 下一个获取元素的索引值。为什么要这样子呢?因为 ArrayBlockingQueue 规定了获取肯定是从存储最久的元素开始的,如果是链表的话好办,直接改节点就行了,但是数组的话我们就必须用头尾指针表示当前头在哪里,尾在哪里,否则每次拿元素都得进行数组的重构,特别浪费时间。其实这两个索引值是相当于用数组来完成链表的表示的妥协方法
int takeIndex;
// 同上
int putIndex;
// 用于记录当前元素个数
int count;
/*
* 下面是一些并发控制组件
*/
// 一个可重入锁来保证并发过程中 ArrayBlockingQueue 的正确性。final 保证了这个锁的安全性
final ReentrantLock lock;
// 一个 condition
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
// 一个内部类迭代器
transient Itrs itrs = null;
3-2-2、构造方法
构造方法:
// 调用下面的构造方法,默认使用非公平锁
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
// 懒加载。这边就可以看出上面提出的问题了。既然 ArrayBlockingQueue 保证了先进先出那不就肯定是公平的嘛,为什么要说提供了公平和非公平呢?我们虽然保证了阻塞队列中获取线程池中线程资源的时候是公平的,但是我们没有保证入阻塞队列的时候是公平的,我们还要保证获取 ArrayBlockingQueue 的可重入锁的等待队列的公平性(想要知道这个,AQS 相关内容了解一下)
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
// 在懒加载的同时还把 c 中的内容放到这个阻塞队列中来
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
3-2-3、数组辅助方法
然后是三个数组操作的辅助方法:
// 返回 n-1 的值。当 n==0 时,返回阻塞队列的长度-1
final int dec(int i) {
return ((i == 0) ? items.length : i) - 1;
}
// 返回指定下标的元素
@SuppressWarnings("unchecked")
final E itemAt(int i) {
return (E) items[i];
}
// 检查元素是否为 null
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
3-2-4、队列方法
接下来讲队列方法->进队、出队方法的实现。由于后面的代码会用到,但是后面代码太多,因此提前讲。
// 入队。使用了 condition 是为了满足之前 BlockingQueue 接口中提出的”队列已满就挂起直到有空闲空间“的功能,这里因为加了一个元素,因此将 notEmpty(即等待取出)的那些线程唤醒
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
// 增加元素后将尾指针的位置进行合理修改
if (++putIndex == items.length)
putIndex = 0;
count++;
// 唤醒那些准备出队的线程
notEmpty.signal();
}
// 出队。使用 condition 的原因跟上面相同。和上面形成互补
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 元素出队后将头指针的位置进行合理修改
if (++takeIndex == items.length)
takeIndex = 0;
count--;
// 在迭代器中将元素出队
if (itrs != null)
itrs.elementDequeued();
// 唤醒那些准备入队的线程
notFull.signal();
return x;
}
// 删除指定下标的元素(只有持有锁的线程才能这么做)
void removeAt(final int removeIndex) {
final Object[] items = this.items;
// 如果下标值和头指针相等,直接执行出队逻辑
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 下标值和头指针的值不同,那么需要将后面的元素往前搬
} else {
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
// 在迭代器中将元素出队
if (itrs != null)
itrs.removedAt(removeIndex);
}
// 唤醒那些准备入队的线程
notFull.signal();
}
3-2-5、接口方法实现
然后是 BlockingQueue
、Queue
和Collection
接口中方法的实现:
// 添加元素。add 方法没有加锁?为什么?因为 super.add 调用的是 offer 方法。。。
public boolean add(E e) {
return super.add(e);
}
// 添加元素。加锁之后判断队列是否满了,满了话返回false,否则执行入队方法
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
// 添加元素。不同的是,这边会根据情况挂起
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 用 lock.lockInterruptibly() 而不用 lock,表明该方法允许被其他线程中断
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
// 添加元素。超时则放弃
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
// 这个是设定等待时间,超时就会中断
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
// 获取元素,没有元素则返回 null,有则执行出队方法
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
// 获取元素,没有则等待。直到被入队方法唤醒
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
// 获取元素,没有则等待,超时则中断
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
// 这边也是获取元素,但是元素不出队,因此不删除,所以使用 itemAt 而不是 dequeue
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
// 返回大小。这边也 lock 了,严谨!
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
// 返回数组剩余空间
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}
// 删除元素
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
// 这边使用 equals 方法进行判定,所以上面使用 Object 而不是泛型
if (o.equals(items[i])) {
// 上面获取了锁,因此可以调用 removeAt 方法
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
// 检查元素是否存在
public boolean contains(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
// 看的出来,其实是遍历
do {
if (o.equals(items[i]))
return true;
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
// 这边返回 Object数组。但不是直接返回数组,而是调用 System.arraycopy 进行复制
public Object[] toArray() {
Object[] a;
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int count = this.count;
a = new Object[count];
int n = items.length - takeIndex;
// 因为有头尾指针的关系,因此需要判断。可以看的出来,这边复制之后的依然是有序的!
if (count <= n)
System.arraycopy(items, takeIndex, a, 0, count);
else {
System.arraycopy(items, takeIndex, a, 0, n);
System.arraycopy(items, 0, a, n, count - n);
}
} finally {
lock.unlock();
}
return a;
}
// 这边返回泛型数组,也是使用 System.arraycopy 进行复制,不同的是这边使用了反射来进行泛型数组的初始化
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int count = this.count;
final int len = a.length;
if (len < count)
// 不像 Object 那么方便,需要使用反射来进行数组的初始化
a = (T[])java.lang.reflect.Array.newInstance(
a.getClass().getComponentType(), count);
int n = items.length - takeIndex;
if (count <= n)
System.arraycopy(items, takeIndex, a, 0, count);
else {
System.arraycopy(items, takeIndex, a, 0, n);
System.arraycopy(items, 0, a, n, count - n);
}
if (len > count)
a[count] = null;
} finally {
lock.unlock();
}
return a;
}
// toString 方法,不多说
public String toString() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
if (k == 0)
return "[]";
final Object[] items = this.items;
StringBuilder sb = new StringBuilder();
sb.append('[');
for (int i = takeIndex; ; ) {
Object e = items[i];
sb.append(e == this ? "(this Collection)" : e);
if (--k == 0)
return sb.append(']').toString();
sb.append(',').append(' ');
if (++i == items.length)
i = 0;
}
} finally {
lock.unlock();
}
}
// 清空队列。因为有加锁,因此可以认为是原子操作
public void clear() {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
if (k > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
// 循环将数组清空。因为原数组是 final 的,因此不能直接 new 一个这么简单
do {
items[i] = null;
if (++i == items.length)
i = 0;
} while (i != putIndex);
takeIndex = putIndex;
count = 0;
// 顺便将迭代器中的也清空
if (itrs != null)
itrs.queueIsEmpty();
// 将阻塞队列中想要添加元素的线程全部唤醒
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
}
} finally {
lock.unlock();
}
}
// 调用下面 drainTo 方法
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
// 使用锁来保证元素转移的时候不会出问题
public int drainTo(Collection<? super E> c, int maxElements) {
checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = Math.min(maxElements, count);
int take = takeIndex;
int i = 0;
try {
while (i < n) {
@SuppressWarnings("unchecked")
E x = (E) items[take];
c.add(x);
items[take] = null;
if (++take == items.length)
take = 0;
i++;
}
return n;
} finally {
// 如果出错,那么搬过去的内容也不转回来
if (i > 0) {
count -= i;
takeIndex = take;
if (itrs != null) {
if (count == 0)
itrs.queueIsEmpty();
else if (i > take)
itrs.takeIndexWrapped();
}
for (; i > 0 && lock.hasWaiters(notFull); i--)
notFull.signal();
}
}
} finally {
lock.unlock();
}
}
3-2-6、内部类
接下来是两个内部类,这两个内部类提供了更多复杂的方法,但是跟我们阻塞队列的主逻辑(BlockingQueue)没有关系,因此这边搁置不谈,大家有兴趣可以自己去看,以后有机会我再补上来。
ArrayBlockingQueue 总结
上面我们看完了 ArrayBlockingQueue
的源码,相信大家都已经看过了。在1.8中,ArrayBlockingQueue
一共也只有 1444 行源码,去除注释和无关核心的内容不谈也就寥寥几百行,所以看源码 jdk 源码真的不难,只要肯坚持。看到这边大家也忘得差不多了,这边再总结一下:
- ArrayBlockingQueue 是一个有界队列,创建之后大小不能修改;
- ArrayBlockingQueue 使用了头尾指针来构建队列,而不是链表的方式;
- ArrayBlockingQueue 提供公平和非公平的机制,来让大家去向线程池中的资源,这种机制取决于队列的 FIFO 和可重入锁的公平机制的配合;
- 内部使用了 condition 来唤醒或等待。
4、LinkedBlockingQueue 源码解析
4-1、LinkedBlockingQueue 概述
LinkedBlockingQueue
是一个无界的阻塞队列,内部采用Node
链表来实现队列,实现了FIFO的特性。不同于ArrayBlockingQueue
,LinkedBlockingQueue
只提供了非公平的抢锁机制,因此入队先后是不公平的(这点在代码中可以体现)。
4-2、LinkedBlockingQueue 源码
4-2-1、Node 类
使用静态内部类Node
来作为存储结构:
// 可以看到节点只有'元素'和'next'两个成员,因此形成的链表是单向的
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
4-2-2、成员变量
接下来是一些成员变量(这里为了统一称呼我把 final 类型的也成为’变量’)
// final 修饰的容量,因此可以看出容量是固定的。那就得看初始值是啥了
private final int capacity;
// final 和原子类型修饰的元素个数。使用原子类型来保证多线程操作的安全性。注意:使用 final 并不意味着值不能修改,只是该引用指向的对象地址不能修改而已,真正的值是 AtomicInteger 里的`private volatile int value;`,可以通过`setValue()`方法修改
private final AtomicInteger count = new AtomicInteger();
// 头结点
transient Node<E> head;
// 尾结点
private transient Node<E> last;
/**
* 注意这里,LinkedBlockingQueue 将入队锁出队锁分离,提高了队列的操作速度,因此能够提高并发量
*/
// 出队锁,使用饿汉,并用 final 修饰保证锁不被替换
private final ReentrantLock takeLock = new ReentrantLock();
// 非空条件对象,用来挂起和唤醒获取元素的线程
private final Condition notEmpty = takeLock.newCondition();
// 出队锁
private final ReentrantLock putLock = new ReentrantLock();
// 没满条件对象,用来挂起和唤醒添加元素的线程
private final Condition notFull = putLock.newCondition();
4-2-3、构造方法
LinkedBlockingQueue
的构造函数跟ArrayBlockingQueue
差不多吧,只有一些细节地方需要注意
// 调用下面的构造方法,注意默认的队列长度设置为 Integer.MAX_VALUE,因此 LinkedBlockingQueue 默认是无界队列,如果这么多的话容易导致 OOM,因此建议自己手动设置一个值,设置成有界队列。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 这边判断了 capacity 参数范围并赋值,并且构建了哨兵节点,让头尾指针都指向了该哨兵节点
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
// 这边赋值的时候顺便将 c 的内容入队了
public LinkedBlockingQueue(Collection<? extends E> c) {
// 这边容量设置成了 Integer.MAX_VALUE
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
// 入队获取入队锁
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
// 执行入队操作
enqueue(new Node<E>(e));
// 已入队元素个数 ++
++n;
}
// 将加进来的值赋值给了 count 里的 value
// 这边这么写应该是为了提高效率,如果直接使用原子类型的 CAS 进行递增,还要直接跟内存打交道,显然降低了并发的效率
// 但是有个问题,如果上面出错了,count 的值是不会赋值的,后面如果进行判断的时候会出问题的。我相信源码编写者肯定想到了这个问题,后面应该不会直接利用 count 进行判断吧。1.8里有这个bug,pick一下!!!
count.set(n);
} finally {
putLock.unlock();
}
}
4-2-4、condition 方法
实现了两个方法,封装了 condition 的signal
方法
// 唤醒等待获取元素的线程。只能被 offer、put 方法调用。需要获取出队锁
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
// 唤醒等待添加元素的线程。只能被 take、poll 方法调用。需要获取入队锁
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
4-2-5、队列(链表)方法
实现了队列的入队、出队方法。需要用到头指针和尾指针
// 入队。从放到末尾
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
// 出队。出队的概念比较麻烦,下面代码可以通过辅助画一张图来理解
private E dequeue() {
// head 节点是哨兵节点,因此出队永远从 head.next 出队
Node<E> h = head;
// 将要出队的元素赋值给 first
Node<E> first = h.next;
// 将原来的哨兵节点的 next 指向自己,让 GC回收(这里为什么不直接将其指向 next.next,然后回收出队的节点呢?)
h.next = h; // help GC
// 后面三行是将要出队的元素赋值给 x,并且将这个节点设置成哨兵节点
head = first;
E x = first.item;
first.item = null;
// 返回 x
return x;
}
4-2-6、锁方法
这边对锁方法进行了封装,提供了全锁和全解锁两个方法
// 同时获取出队锁和入队锁,防止读写操作的进行(我觉得可能是用于 drainTo 方法里吧)
void fullyLock() {
putLock.lock();
takeLock.lock();
}
// 同时释放出对锁、入队锁
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
4-2-7、接口方法实现
然后是 BlockingQueue
、Queue
和Collection
接口中方法的实现:
// 返回当前元素个数(由于之前构造函数那边的bug,这边是不是其实还要进行处理。。。至少确认一遍吧)
public int size() {
return count.get();
}
// 返回剩余空闲容量
public int remainingCapacity() {
return capacity - count.get();
}
// 入队操作,如果满了,那就挂起
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 队列满了就挂起
while (count.get() == capacity) {
notFull.await();
}
// 执行入队操作
enqueue(node);
// 使用原子操作保证线程安全,因为这边只获取了入队锁,没有获出队锁。此时 c 是 count ++ 之前的值(为什么?看一下原子操作)。
c = count.getAndIncrement();
// 如果队列没满就唤醒那些等着入队的操作
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 这边表明了已经有且一个元素入队了,唤醒等待获取的线程。
// 但是,这边是不是也有问题?这边没有获得出队锁,万一其他线程已经读取了,实际上队列是空的???其实么有!哈哈,我们看其他出队被挂起的地方都是在 take() 方法中,而被唤醒之后,那个线程是在一个 while 中的!!!此时他又要去判断此时元素个数,是原子操作因此显然没有并发问题,所以他又会 wait!!!呵,骗我到这里来,100块也不给我???
if (c == 0)
signalNotEmpty();
}
// 入队。如果满了就等待一段时间,时间超过就放弃操作
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
// 入队,满了就返回 false,不挂起,就是刚。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
// 没满,获的入队锁
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
// 出队,队列为空就挂起
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
// 出队,队列为空就等一段时间,超时就放弃
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
// 出队,队列为空就返回false
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
// 这边只是返回第一个节点的元素,不进行队列的修改
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
// 把节点 p 的链接清除,其实就是删掉 p 节点
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}
// 删除元素 o。
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
// 利用 for 循环找到元素 o 的节点
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
// 找到节点后调用 unlink 方法删除节点 p 和他的链接
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}
// 检查是否存在元素 o。
public boolean contains(Object o) {
if (o == null) return false;
fullyLock();
try {
// 遍历 + 比较
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
fullyUnlock();
}
}
// 将元素迁移到 c 中。调用了下面的方法
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
// 将元素迁移到 c 中,最多迁移 maxElements 个
public int drainTo(Collection<? super E> c, int ) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
boolean signalNotFull = false;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
// 迁移之后需要删掉队列中的节点
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;
head = h;
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}
4-2-8、其他内部类和方法
后面是 toString
、toArray
方法、序列化反序列化方法以及迭代器内部类和拆分器内部类。由于这些内容和阻塞队列核心不怎么相关,就搁置不讲了,以后有机会再补。
4-3、LinkedBlockingQueue 总结
看完了源码,源码有点长,过了一遍也容易忘,这边再来总结一下:
LinkedBlockingQueue
是一个默认无界(推荐调用有界的构造方法进行构造,避免出现 OOM 的情况)的阻塞队列,一旦创建大小不能修改;LinkedBlockingQueue
内部使用Node
存储元素实现队列,单链表,有头尾指针(头指针专注出队、尾指针专注出队);LinkedBlockingQueue
将入队锁和出队锁分离,极大地提高了并发的效率。但是由于这两个锁都是 final 修饰的并且使用饿汉加载,因此默认是使用ReentrantLock
的非公平策略,因此虽然保证了阻塞队列的先进先出,但是入队的过程是非公平的;- 内部使用 Condition 机制是进程进行休眠和唤醒;
- 构造函数中有个小bug(面试的时候可以提出来,说不定可以加分。而且在修改节点以后判断长度唤醒线程那里也可以进行优化,避免唤醒线程又让其进行睡眠浪费资源)
5、PriorityBlockingQueue 源码解析
5-1、PriorityBlockingQueue 概述
PriorityBlockingQueue
是一个基于数组+堆的默认无界阻塞队列,并且内部通过堆的结构实现了优先队列的特性。具体如何实现,还看源码。
5-2、PriorityBlockingQueue 源码
5-2-1、成员属性
首先是PriorityBlockingQueue
的成员属性:
// 默认初始化容量,指的是入队元素的个数
private static final int DEFAULT_INITIAL_CAPACITY = 11;
//最大数组长度指的是最大的元素容量,为了避免虚拟机可能会保留一些空间,所以用了 Integer.MAX_VALUE - 8
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 存储数组,虽然说是堆但是实际上是用数组存储的,这跟堆的性质有关
private transient Object[] queue;
// 元素个数
private transient int size;
// 比较器
private transient Comparator<? super E> comparator;
// 可重入锁用于线程安全,final 修饰因此不可改变。在构造方法中没有给我们选公平锁或者非公平锁,因此默认是非公平锁。因此可得入队是不公平的
private final ReentrantLock lock;
// condition 用于睡眠或唤醒线程。这里只有一个非空的条件,因此可以猜测出该队列是可以无限扩容的(内存允许)
private final Condition notEmpty;
// 自旋锁的状态,使用CAS去获取值。用于扩容
private transient volatile int allocationSpinLock;
// 一个优先队列,只用于序列化和反序列化
private PriorityQueue<E> q;
5-2-2、构造方法
然后是4个构造方法:
// 构造默认初始大小的阻塞队列
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
// 构造指定初始大小的阻塞队列
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
// 构造指定初始大小的阻塞队列,并传入比较器,初始化重入锁、condition等
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
// 创建一个阻塞队列,并将所给的集合中的元素放到队列中,并把大小设为所给集合的大小。
// 在此过程中没有锁的获得,因此可能会因为并发出错
public PriorityBlockingQueue(Collection<? extends E> c) {
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
// 是否要将整个堆重新排
boolean heapify = true; // true if not known to be in heap order
// 是否要检查元素是不是为 null
boolean screen = true; // true if must screen for nulls
// 如果本身就是一个有序的set
if (c instanceof SortedSet<?>) {
SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
// 把 c 的比较器传给该阻塞队列
this.comparator = (Comparator<? super E>) ss.comparator();
// 由于 set 有序,因此不需要整个堆再排序
heapify = false;
}
// 如果本身就是一个优先阻塞队列
else if (c instanceof PriorityBlockingQueue<?>) {
PriorityBlockingQueue<? extends E> pq =
(PriorityBlockingQueue<? extends E>) c;
// 把 c 的比较器传给该阻塞队列
this.comparator = (Comparator<? super E>) pq.comparator();
// 不需要检查元素是否为 null
screen = false;
// 如果 class 完全相等,那就不需要整个堆再排。(为什么子类还需要再排呢?为了防止子类中重写的排序方法与正常的排序矛盾?)
if (pq.getClass() == PriorityBlockingQueue.class) // exact match
heapify = false;
}
Object[] a = c.toArray();
// 将 c 中元素个数值赋给 n,后面再赋给 size
int n = a.length;
// 如果 c.toArray() 返回的格式有问题,那就使用 Arrays.copy 进行拷贝
if (a.getClass() != Object[].class)
a = Arrays.copyOf(a, n, Object[].class);
// 验证元素是否为 null,'与'后面的判断条件的条件很有意思,可以思考一下为什么
if (screen && (n == 1 || this.comparator != null)) {
for (int i = 0; i < n; ++i)
if (a[i] == null)
throw new NullPointerException();
}
// 赋值
this.queue = a;
this.size = n;
// 堆重排
if (heapify)
heapify();
}
5-2-3、扩容机制
下面是PriorityBlockingQueue
的扩容机制:
private void tryGrow(Object[] array, int oldCap) {
// 先释放锁,在分配好空间后再获得锁进行复制
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 此时扩容自旋锁为 0,表明当前没有其他线程在进行扩容,那么继续执行扩容机制
if (allocationSpinLock == 0 &&
// 这个 allocationSpinLockOffset 是类静态变量,后面才加入的。由于使用的是 sun 公司的 unsafe,因此源码没法看,大概就是查看这个字段是不是期望的 0 然后改成1(返回true),否则返回 false
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 如果原容量 < 64,那么新容量变成 2 * old + 2,否则变成 1.5 * old。可以看得出来,当原容量越小,增长越快
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
// 如果新的容量大于最大容量
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
// 如果扩容时增加一个都导致容量大于最大容量,那就抛错,没法扩了
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
// 能扩,那就直接扩到最大
newCap = MAX_ARRAY_SIZE;
}
// 如果扩容了,并且数组没改变(这里是什么意思呢?首先,这个扩容机制是在 offer 中用的,传的 array 就是原 queue 的地址。这里没改变说明没有其他线程进行扩容,只有扩容了才会把地址进行修改),那就创建新容量大小的数组
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
// 将扩容锁变成释放状态(虽然这边是 volatile,但是还是会有并发安全的问题啊。。。作者咋想的)
allocationSpinLock = 0;
}
}
// 说明上面的循环没进,说明有线程在扩容,那么放弃 CPU
if (newArray == null) // back off if another thread is allocating
Thread.yield();
// 获得锁并复制旧元素
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
5-2-4、队列方法
这边虽然说是队列操作,但其实由于内部变成了堆的关系,已经不是传统意义上的入队出队的,所以虽然有dequeue
方法,但是enqueue
变成了堆的’上浮’和’下沉’。
/**
* 下面的方法都是基于已经获取锁的情况下才能调用的。上浮操作用于增加元素后堆重排,下沉操作用于元素出队以后的堆重排
*/
// 出队操作
private E dequeue() {
// 获得最后一个元素的下标
int n = size - 1;
// 如果没有元素,就返回 null
if (n < 0)
return null;
else {
Object[] array = queue;
// 获取堆顶元素
E result = (E) array[0];
// 获取堆底的元素,用于堆的重排
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 从顶部开始向下比较。如果比较器为空,那么使用默认的方法(obj.compareTo)进行比较
siftDownComparable(0, x, array, n);
else
// 从顶部开始向下比较。如果比较器不为空,那么使用比较器的比较方法(cmp.compareTo)进行比较
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
// 使用默认的比较器,从堆底部开始向上进行比较。可以看的出来,当子节点 > 父节点时会停止(这里的 > 看比较器是怎么比的),说明堆默认是小根堆???
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
// 这段代码完全可以直接调用下面的方法啊。。。
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
// 当元素比父元素大,停止'上浮'
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
// 使用给定的比较器,从堆底部开始向上进行比较。
private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
Comparator<? super T> cmp) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
// 当元素比父元素大,停止'上浮'
if (cmp.compare(x, (T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = x;
}
// 使用默认的比较器,从下表为 k 的节点作为父节点开始向下比较
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
// 找到下标为 k 的节点的左右子节点,并将其中的最小值与堆底的节点进行比较
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right];
// 如果 x 的值小于 k 的最小子节点,那么就跳出循环,把 x 赋值给下标 k 的位置(一般在整个堆有序的情况下,只有在进行叶子节点的父节点的位置才会发生。因为堆是有序的,默认 x 是大于上面的节点的)。否则将子节点中的更小值'上浮',并从子节点开始继续向下比较。(常威,还说你不是小根堆???!!!)
if (key.compareTo((T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = key;
}
}
// 使用默认的比较器,从下表为 k 的节点作为父节点开始向下比较。逻辑跟上面的差不多,只是将比较方法使用比较器的 compare 方法替换
private static <T> void siftDownUsingComparator(int k, T x, Object[] array,
int n,
Comparator<? super T> cmp) {
if (n > 0) {
int half = n >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = array[child];
int right = child + 1;
if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
c = array[child = right];
if (cmp.compare(x, (T) c) <= 0)
break;
array[k] = c;
k = child;
}
array[k] = x;
}
}
// 整个堆都进行'下沉'操作,使整个堆变得有序
private void heapify() {
Object[] array = queue;
int n = size;
int half = (n >>> 1) - 1;
Comparator<? super E> cmp = comparator;
if (cmp == null) {
for (int i = half; i >= 0; i--)
siftDownComparable(i, (E) array[i], array, n);
}
else {
for (int i = half; i >= 0; i--)
siftDownUsingComparator(i, (E) array[i], array, n, cmp);
}
}
5-2-5、接口方法实现
下面是接口方法的实现:
// 增加元素,调用 offer 方法
public boolean add(E e) {
return offer(e);
}
// 增加元素的底层方法,其他的方法都是调用它
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
// 获取锁
lock.lock();
int n, cap;
Object[] array;
// 如果元素个数大小 >= 数组大小(???这个不应该是严格相等的嘛???惊了!!!),执行扩容操作
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
// 使用默认的比较器进行'上浮'
siftUpComparable(n, e, array);
else
// 使用队列中已给的比较器进行'上浮'
siftUpUsingComparator(n, e, array, cmp);
// 元素个数 +1
size = n + 1;
// 唤醒那些因为队列空了而挂起的线程
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
// 增加元素,调用 offer 方法
public void put(E e) {
offer(e); // never need to block
}
// 增加元素,调用 offer 方法。可以看到这个给的时间限制根本没用!!!因为这个阻塞队列是真正无界的!永远不会满!因此增加元素时不会因为满了而挂起,只要锁被释放了就会去增加
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // never need to block
}
// 出队元素。执行出队策略。如果队列空了显然就返回 null 了也不会报错。。。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
// 出队元素。执行出队策略。如果队列空了就挂起,除非被打断
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
// 出队元素。执行出队策略。如果队列空了就挂起,直到超时或被打断
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}
// 直接获取堆顶元素而不修改数组
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (size == 0) ? null : (E) queue[0];
} finally {
lock.unlock();
}
}
// 返回比较器,如果没有那就是初始值 null
public Comparator<? super E> comparator() {
return comparator;
}
// 返回队列元素个数
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
// 返回剩余容量。。。要知道虽然返回了 Integer.MAX_VALUE 但其实不止,只要内存够,无限的好哇!
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
// 获取元素 o 的下标,使用得是 equals 方法而不是 ==,因此是指同一个元素对象。(没有锁,会出问题的吧!)
private int indexOf(Object o) {
if (o != null) {
Object[] array = queue;
int n = size;
for (int i = 0; i < n; i++)
if (o.equals(array[i]))
return i;
}
return -1;
}
// 删除指定下标的元素
private void removeAt(int i) {
Object[] array = queue;
int n = size - 1;
// 如果是最后一个元素,那就直接删了即可
if (n == i) // removed last element
array[i] = null;
// 否则要进行堆重排操作
else {
// 拿出最后一个元素来进行重排
E moved = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
// 从位置 i 下沉进行重排(为什么???此时不应该是有序的嘛?)
if (cmp == null)
siftDownComparable(i, moved, array, n);
else
siftDownUsingComparator(i, moved, array, n, cmp);
// 重排后如果原来删除位置的元素 == 堆底元素,从 i 上浮(???更看不懂了?为什么?)
if (array[i] == moved) {
if (cmp == null)
siftUpComparable(i, moved, array);
else
siftUpUsingComparator(i, moved, array, cmp);
}
}
size = n;
}
// 删除元素 o,注意这边是 indexOf() 方法,用的 equals 方法,跟下面有点不同
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(o);
if (i == -1)
return false;
removeAt(i);
return true;
} finally {
lock.unlock();
}
}
// 删除元素 o,比较的是地址
void removeEQ(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] array = queue;
for (int i = 0, n = size; i < n; i++) {
if (o == array[i]) {
removeAt(i);
break;
}
}
} finally {
lock.unlock();
}
}
// 查找是否有元素 o
public boolean contains(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return indexOf(o) != -1;
} finally {
lock.unlock();
}
}
// 复制了一个数组
public Object[] toArray() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return Arrays.copyOf(queue, size);
} finally {
lock.unlock();
}
}
// toString 方法
public String toString() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = size;
if (n == 0)
return "[]";
StringBuilder sb = new StringBuilder();
sb.append('[');
for (int i = 0; i < n; ++i) {
Object e = queue[i];
sb.append(e == this ? "(this Collection)" : e);
if (i != n - 1)
sb.append(',').append(' ');
}
return sb.append(']').toString();
} finally {
lock.unlock();
}
}
// 将元素搬到 c 中,调用了下面的方法
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}
// 将元素搬到 c 中,执行将重复出队策略。。。(我靠,那每次出队还得下沉。。。为啥不直接把数组给复制过去算了???反正有序啊。。。看不懂)
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = Math.min(size, maxElements);
for (int i = 0; i < n; i++) {
c.add((E) queue[0]); // In this order, in case add() throws.
dequeue();
}
return n;
} finally {
lock.unlock();
}
}
// 清楚所有元素,可以看到数组长度没有重置,还是原来的大小
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] array = queue;
int n = size;
size = 0;
for (int i = 0; i < n; i++)
array[i] = null;
} finally {
lock.unlock();
}
}
// 把元素放到 a 中。如果 a 不够大,那么直接返回阻塞队列存储数组的 copy
public <T> T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = size;
if (a.length < n)
// Make a new array of a's runtime type, but my contents:
return (T[]) Arrays.copyOf(queue, size, a.getClass());
System.arraycopy(queue, 0, a, 0, n);
if (a.length > n)
a[n] = null;
return a;
} finally {
lock.unlock();
}
}
5-2-6、其他内部类和方法
后面是序列化反序列化方法以及迭代器内部类和拆分器内部类。由于这些内容和阻塞队列核心不怎么相关,就搁置不讲了,以后有机会再补。
5-2-7、unsafe 机制扩充
// 补充了静态代码块来帮助扩容的字段
private static final sun.misc.Unsafe UNSAFE;
private static final long allocationSpinLockOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = PriorityBlockingQueue.class;
// 通过反射将 allocationSpinLock 字段赋值给 allocationSpinLockOffset(???)
allocationSpinLockOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("allocationSpinLock"));
} catch (Exception e) {
throw new Error(e);
}
}
5-1、PriorityBlockingQueue 概述
看到这个阻塞队列,说实话三观有点崩坏。有时候程序员出问题,并不一定是程序员水平不到位,也有可能是源码出问题了喂!也有可能是网上的博客说的根本就是照搬人家的东西啊!也有可能官方说明就不对啊!!!下面来对这个阻塞队列进行总结:
PriorityBlockingQueue
是一个无界(真正无界!)的阻塞队列,自带扩容机制,因此容易 OOM ?;PriorityBlockingQueue
内部使用数组存储元素实现小根堆(当然了,我们自己使用比较器,强行把比较结果调换也能实现大根堆),因此优先值越小的代表优先级越高,先执行。。。跟我们普通人理解的不一样啊喂orz;PriorityBlockingQueue
是使用ReentrantLock
的非公平策略,但是由于堆的构造,所以入队公不公平他不care;- 内部使用 Condition 机制是进程进行休眠和唤醒;
- 整个内部对于高并发的处理不够严谨,包括构造函数中没有用锁,可能会出现线程A构造玩数组还在将集合 c 中的元素搬过来的时候,线程B已经开始使用了,这样元素的个数也不对了;
声明
由于篇幅太长,准备将这篇分为上中下三部分,敬请期待。。。