JDK8下的BlockingQueue源码解析
简要
BlockingQueue是一个阻塞队列,阻塞队列能够阻塞当前试图从队列中获取元素的线程,而非阻塞队列不会。因此在面对类似生产者-消费者的模型时,使用非阻塞队列就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦。但是有了阻塞队列就不一样了,它会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞知道阻塞队列中有了元素,当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒)。这样提供了极大的方便性。
BlockingQueue的实现类
ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量带下。并且可以指定公平性与非公平性,默认情况下为非公平性,即不保证等待时间最长的线程最优先能够访问队列。
LinkedBlockingQueue:基于链表实现的一个阻塞队列,在创建LinkedBlockingQueue对象时如果不指定容量大小,则默认大小为Integer.MAX_VALUE。
DelayQueue:基于PriorityBlockingQueue实现的延迟队列,是一个无界的阻塞队列,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。因此向队列中插入时永远不会阻塞,获取时才有可能被阻塞。
DelayedWorkQueue:该队列为ScheduledThreadPoolExecutor中的静态内部类,ScheduledThreadPoolExecutor便是通过该队列使得队列中的元素按一定顺序排列从而时延迟任务和周期性任务得以顺利执行。
PriorityBlockingQueue:无界阻塞队列,它会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。
SynchronousQueue:同步阻塞队列,队列大小为1,一个元素要放到该队列中必须有一个线程在等待获取元素。
TransferQueue:定义了另一种阻塞情况:生产者会一直阻塞直到所添加到队列的元素被某一个消费者所消费,而BlockingQueue只需将元素添加到队列中后生产者便会停止被阻塞。
LinkedTransferQueue:实现TransferQueue接口。
BlockingDeque:双向阻塞队列的接口。
LinkedBlockingDeque:实现BlockingDeque接口。
方法详细信息
非阻塞队列常用方法
在非阻塞队列中常用的操作队列的方法主要是下面几种:
add(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则会抛出异常;
remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常;
offer(E e):将元素e插入到队列末尾,如果插入成功,则返回true;如果插入失败(即队列已满),则返回false;
poll():移除并获取队首元素,若成功,则返回队首元素;否则返回null;
peek():获取队首元素,但不移除。若成功,则返回队首元素;否则返回null
对于非阻塞队列,一般情况下建议使用offer、poll和peek三个方法,不建议使用add和remove方法。原因看上面的描述很明显了:使用offer、poll和peek三个方法可以通过返回值判断操作成功与否,而使用add和remove方法需要捕获异常才能判断操作是否成功。另外需要注意非阻塞队列的这些方法都没有进行同步处理。
阻塞队列常用方法
阻塞队列也实现了Queue,因此也具有上述方法并且都进行了同步处理。除此之外还有4个很有用的方法:
put(E e):向队尾存入元素,如果队列满,则等待;
take():从队首取元素,如果队列为空,则等待;
offer(E e,long timeout, TimeUnit unit):向队尾存入元素,如果队列满,则等待一定的时间,当时间期限达到时,如果还没有插入成功,则返回false;否则返回true;
poll(long timeout, TimeUnit unit):从队首取元素,如果队列空,则等待一定的时间,当时间期限达到时,如果取不到,则返回null;否则返回取得的元素;
源码实现
ArrayBlockingQueue
构造方法
1 | // 创建一个指定容量的队列对象 |
变量属性
1 | /** The queued items 存数据的数组*/ |
add()
1 | public boolean add(E e) { |
offer(E e)
1 | public boolean offer(E e) { |
offer(E e, long timeout, TimeUnit unit)
1 | public boolean offer(E e, long timeout, TimeUnit unit) |
enqueue()——元素入队列,这个方法调用前都是要上锁的
1 | private void enqueue(E x) { |
put()
1 | public void put(E e) throws InterruptedException { |
poll()
1 | public E poll() { |
poll(long timeout, TimeUnit unit)
1 | public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
dequeue()——元素出队列,注意调用这个方法时需要先上锁
1 | private E dequeue() { |
take()
1 | public E take() throws InterruptedException { |
可以发现,ArrayBlockingQueue在取出和添加元素的时候,notEmpty和notFull这两个变量在实现同步的时候起着很重要的作用,添加元素的时候notEmpty.signal();
发出非空信号,唤醒等待取数据的方法;取出元素的时候notFull.signal();
发出未满信号,唤醒因容量已满而等待添加的方法。此外,notEmpty和notFull这两个变量还可用作让方法等待指定时间notEmpty.awaitNanos(nanos)。
LinkedBlockingQueue
构造函数
1 | // 未指定容量的时候,默认容量为Integer.MAX_VALUE |
变量属性
1 | /** Current number of elements 元素个数*/ |
put()
1 | public void put(E e) throws InterruptedException { |
enqueue()
/**
1 | * 创建一个节点,并加入链表尾部 |
signalNotEmpty()
1 | private void signalNotEmpty() { |
offer(E e, long timeout, TimeUnit unit)
1 | public boolean offer(E e, long timeout, TimeUnit unit) |
take()——如果队列空了,一直阻塞,直到队列不为空或者线程被中断
1 | public E take() throws InterruptedException { |
dequeue()——表面上看,只是将头节点的next指针指向了要删除的x1.next,事实上这样我觉的就完全可以,但是jdk实际上是将原来的head节点删除了,而上边看到的这个head节点,正是刚刚出队的x1节点,只是其值被置空了。
1 | /** |
poll(long timeout, TimeUnit unit)
1 | public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
ArrayBlockingQueue与LinkedBlockingQueue对比
- ArrayBlockingQueue:
- 一个对象数组+一把锁+两个条件
- 入队与出队都用同一把锁
- 在只有入队高并发或出队高并发的情况下,因为操作数组,且不需要扩容,性能很高
- 采用了数组,必须指定大小,即容量有限
- LinkedBlockingQueue:
- 一个单向链表+两把锁+两个条件
- 两把锁,一把用于入队,一把用于出队,有效的避免了入队与出队时使用一把锁带来的竞争。
- 在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多
- 采用了链表,最大容量为整数最大值,可看做容量无限