JDK8下的BlockingQueue源码解析

JDK8下的BlockingQueue源码解析

简要

BlockingQueue是一个阻塞队列,阻塞队列能够阻塞当前试图从队列中获取元素的线程,而非阻塞队列不会。因此在面对类似生产者-消费者的模型时,使用非阻塞队列就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦。但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞知道阻塞队列中有了元素,当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。这样提供了极大的方便性。

BlockingQueue的实现类BlockingQueue实现类

ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量带下。并且可以指定公平性与非公平性,默认情况下为非公平性,即不保证等待时间最长的线程最优先能够访问队列。

LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。

DelayQueue:基于PriorityBlockingQueue实现的延迟队列,是一个无界的阻塞队列,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。因此向队列中插入时永远不会阻塞,获取时才有可能被阻塞。

DelayedWorkQueue:该队列为ScheduledThreadPoolExecutor中的静态内部类,ScheduledThreadPoolExecutor便是通过该队列使得队列中的元素按一定顺序排列从而时延迟任务和周期性任务得以顺利执行。

PriorityBlockingQueue:无界阻塞队列,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。

SynchronousQueue:同步阻塞队列,队列大小为1,一个元素要放到该队列中必须有一个线程在等待获取元素。

TransferQueue:定义了另一种阻塞情况:生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费,而BlockingQueue只需将元素添加到队列中后生产者便会停止被阻塞。

LinkedTransferQueue:实现TransferQueue接口。

BlockingDeque:双向阻塞队列的接口。

LinkedBlockingDeque:实现BlockingDeque接口。

方法详细信息

非阻塞队列常用方法

在非阻塞队列中常用的操作队列的方法主要是下面几种:
add(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;

remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;

offer(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;

poll():移除并获取队首元素,若成功,则返回队首元素;否则返回null;

peek():获取队首元素,但不移除。若成功,则返回队首元素;否则返回null

对于非阻塞队列,一般情况下建议使用offer、poll和peek三个方法,不建议使用add和remove方法。原因看上面的描述很明显了:使用offer、poll和peek三个方法可以通过返回值判断操作成功与否,而使用add和remove方法需要捕获异常才能判断操作是否成功。另外需要注意非阻塞队列的这些方法都没有进行同步处理。

阻塞队列常用方法

阻塞队列也实现了Queue,因此也具有上述方法并且都进行了同步处理。除此之外还有4个很有用的方法:
put(E e):向队尾存入元素,如果队列满,则等待;

take():从队首取元素,如果队列为空,则等待;

offer(E e,long timeout, TimeUnit unit):向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;

poll(long timeout, TimeUnit unit):从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取不到,则返回null;否则返回取得的元素;

源码实现

ArrayBlockingQueue

构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 创建一个指定容量的队列对象
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* fair为true就表示创建一个公平的队列,即所有等待的消费者或者是生产者按照顺序来访问这个队列
* 为false就表示不保证这种排序
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* 创建一个指定数据的队列对象
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
// 可重入锁
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {// 遍历集合
checkNotNull(e);// 非空检查
items[i++] = e;// 存入数组中
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
变量属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/** The queued items 存数据的数组*/
final Object[] items;

/** items index for next take, poll, peek or remove 拿数据的索引*/
int takeIndex;

/** items index for next put, offer, or add 放数据的索引*/
int putIndex;

/** Number of elements in the queue 队列元素个数*/
int count;

/** Main lock guarding all access 可重入锁*/
final ReentrantLock lock;

/** Condition for waiting takes 队列不为空的条件*/
private final Condition notEmpty;

/** Condition for waiting puts 队列没有满的条件*/
private final Condition notFull;
add()
1
2
3
4
5
6
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
offer(E e)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean offer(E e) {
checkNotNull(e);// 非空检查
final ReentrantLock lock = this.lock;
lock.lock();// 上锁
try {
if (count == items.length)// 容量已满
return false;
else {
enqueue(e);// 放入元素
return true;
}
} finally {
lock.unlock();
}
}
offer(E e, long timeout, TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//和lock()方法的区别是让它在阻塞时也可抛出异常跳出
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);//带有超时等待的阻塞方法
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
enqueue()——元素入队列,这个方法调用前都是要上锁的
1
2
3
4
5
6
7
8
9
10
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();//有一个元素加入成功,那肯定队列不为空
}
put()
1
2
3
4
5
6
7
8
9
10
11
12
public void put(E e) throws InterruptedException {
checkNotNull(e);// 非空检查
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//和lock()方法的区别是让它在阻塞时也可抛出异常跳出
try {
while (count == items.length)
notFull.await();//这里就是阻塞了,要注意。如果运行到这里,那么它会释放上面的锁,一直等到notify
enqueue(e);
} finally {
lock.unlock();
}
}
poll()
1
2
3
4
5
6
7
8
9
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();// 上锁
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
poll(long timeout, TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);//超时等待
}
return dequeue();
} finally {
lock.unlock();
}
}
dequeue()——元素出队列,注意调用这个方法时需要先上锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();//有一个元素取出成功,那肯定队列不满
return x;
}
take()
1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();//队列为空,阻塞方法
return dequeue();
} finally {
lock.unlock();
}
}

可以发现,ArrayBlockingQueue在取出和添加元素的时候,notEmpty和notFull这两个变量在实现同步的时候起着很重要的作用,添加元素的时候notEmpty.signal(); 发出非空信号,唤醒等待取数据的方法;取出元素的时候notFull.signal(); 发出未满信号,唤醒因容量已满而等待添加的方法。此外,notEmpty和notFull这两个变量还可用作让方法等待指定时间notEmpty.awaitNanos(nanos)。

LinkedBlockingQueue

构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 未指定容量的时候,默认容量为Integer.MAX_VALUE
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
变量属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/** Current number of elements 元素个数*/
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
* 头结点
*/
transient Node<E> head;

/**
* Tail of linked list.
* Invariant: last.next == null
* 尾节点
*/
private transient Node<E> last;

/** Lock held by take, poll, etc 出队的锁*/
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes 出队的条件对象*/
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc 入队的锁*/
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts 入队的条件对象*/
private final Condition notFull = putLock.newCondition();
put()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
// 以当前元素新建一个节点
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 上锁,和lock()方法的区别是让它在阻塞时也可抛出异常跳出
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
//如果队列已满,那么将该线程加入到Condition的等待队列中
while (count.get() == capacity) {
notFull.await();
}
// 节点入队列
enqueue(node);
// 得到插入之前队列的元素个数
c = count.getAndIncrement();
// 容量未满,发出未满信号
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// c==0表示插入之前容量为0,则在插入后需要发出非空信号
if (c == 0)
signalNotEmpty();
}
enqueue()

/**

1
2
3
4
5
6
7
8
9
 * 创建一个节点,并加入链表尾部
* @param x
*/
private void enqueue(E x) {
/*
* 封装新节点,并赋给当前的最后一个节点的下一个节点,然后在将这个节点设为最后一个节点
*/
last = last.next = new Node<E>(x);
}
signalNotEmpty()
1
2
3
4
5
6
7
8
9
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
offer(E e, long timeout, TimeUnit unit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)//超时
return false;
/*
* 进行等待: 在这个过程中可能发生三件事:
* 1、被唤醒-->继续当前这个while循环
* 2、超时-->继续当前这个while循环
* 3、被中断-->抛出中断异常InterruptedException
*/
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}

take()——如果队列空了,一直阻塞,直到队列不为空或者线程被中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {// 队列没有元素
/*
* 加入等待队列, 一直等待条件notEmpty(即被其他线程唤醒)
* (唤醒其实就是,有线程将一个元素入队了,然后调用notEmpty.signal()唤醒其他等待这个条件的线程,同时队列也不空了)
*/
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

dequeue()——表面上看,只是将头节点的next指针指向了要删除的x1.next,事实上这样我觉的就完全可以,但是jdk实际上是将原来的head节点删除了,而上边看到的这个head节点,正是刚刚出队的x1节点,只是其值被置空了。

1
2
3
4
5
6
7
8
9
10
11
12
 /**
* 从队列头部移除一个节点
*/
private E dequeue() {
Node<E> h = head;//获取头节点:x==null
Node<E> first = h.next;//将头节点的下一个节点赋值给first
h.next = h; // 将当前将要出队的节点置null(为了使其做head节点做准备)
head = first;//将当前将要出队的节点作为了头节点
E x = first.item;//获取出队节点的值
first.item = null;//将出队节点的值置空
return x;
}

poll(long timeout, TimeUnit unit)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {// 队列没有元素
if (nanos <= 0)// 已经超时
return null;
/*
* 进行等待:
* 在这个过程中可能发生三件事:
* 1、被唤醒-->继续当前这个while循环
* 2、超时-->继续当前这个while循环
* 3、被中断-->抛出异常
*/
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();// 出队
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

ArrayBlockingQueue与LinkedBlockingQueue对比

  • ArrayBlockingQueue:
    • 一个对象数组+一把锁+两个条件
    • 入队与出队都用同一把锁
    • 在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高
    • 采用了数组,必须指定大小,即容量有限
  • LinkedBlockingQueue:
    • 一个单向链表+两把锁+两个条件
    • 两把锁,一把用于入队,一把用于出队,有效的避免了入队与出队时使用一把锁带来的竞争。
    • 在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多
    • 采用了链表,最大容量为整数最大值,可看做容量无限