并发容器类list_set_queue

并发容器类list_set_queue

List

CopyOnWriteArrayList——容器即写时复制的容器

和ArrayList比较,优点是并发安全,缺点有两个:

1、多了内存占用:写数据是copy一份完整的数据,单独进行操作。占用双份内存。

2、数据一致性:数据写完之后,其他线程不一定是马上读取到最新内容。

add
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
get
1
2
3
private E get(Object[] a, int index) {
return (E) a[index];
}

add方法一开始就加上了锁来实现线程安全

那么为什么要copy一份完整数据呢?这样就不会出现读到一半的数据,线程安全,但是会出现数据不一致。

set

💡set和list重要区别:不重复

实现 原理 特点
HashSet 基于HashMap实现 非线程安全
CopyOnWriteArraySet 基于CopyOnWriteArrayList 线程安全
ConcurrentSkipListSet 基于ConcurrentSkipListMap 线程安全,有序,查询快

猜想:set如何保证不重复,插入的时候比对一下,无重复再插入?

这样不可避免的会造成性能低

HashSet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class HashSet<E>
extends AbstractSet<E>
implements Set<E>, Cloneable, java.io.Serializable
{
static final long serialVersionUID = -5024744406713321676L;

private transient HashMap<E,Object> map;

// Dummy value to associate with an Object in the backing Map(与支持Map中的Object关联的虚拟值)
private static final Object PRESENT = new Object();

/**
* Constructs a new, empty set; the backing <tt>HashMap</tt> instance has
* default initial capacity (16) and load factor (0.75).
*/
public HashSet() {
map = new HashMap<>();
}
}
add
1
2
3
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}

可见,HashSet是将元素放在在map的key位置,而map的value位置是放的PRESENT这个虚拟值,而HashMap的key是不能重复的,所以HashSet就保证了元素不可重复。

CopyOnWriteArraySet

基于list如何实现不重复?

add
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Adds the specified element to this set if it is not already present.
* More formally, adds the specified element {@code e} to this set if
* the set contains no element {@code e2} such that
* <tt>(e==null&nbsp;?&nbsp;e2==null&nbsp;:&nbsp;e.equals(e2))</tt>.
* If this set already contains the element, the call leaves the set
* unchanged and returns {@code false}.
*
* @param e element to be added to this set
* @return {@code true} if this set did not already contain the specified
* element
*/
public boolean add(E e) {
return al.addIfAbsent(e);
}
1
2
3
4
5
public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();// snapshot(快照)类似于当前版本号
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :// 判断当前元素是否已存在
addIfAbsent(e, snapshot);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* A version of addIfAbsent using the strong hint that given
* recent snapshot does not contain e.
*/
private boolean addIfAbsent(E e, Object[] snapshot) {
final ReentrantLock lock = this.lock;
lock.lock(); // 加锁
try {
Object[] current = getArray();
int len = current.length;
if (snapshot != current) {// 二次判断
// Optimize for lost race to another addXXX operation
int common = Math.min(snapshot.length, len);
for (int i = 0; i < common; i++)
if (current[i] != snapshot[i] && eq(e, current[i]))
return false;
if (indexOf(e, current, common, len) >= 0)
return false;
}
Object[] newElements = Arrays.copyOf(current, len + 1);
newElements[len] = e;
setArray(newElements);// 修改array
return true;
} finally {
lock.unlock();
}
}

ConcurrentSkipListSet

基于map的实现,相对于基于list的实现会更高效,因为list里面会不可避免的出现多次比较。

Queue

运用场景:线程池、数据库连接池、消息队列

在rocketmq中,消息会先利用queue存储到内存中,在写入硬盘(刷盘),这样就不会在机器重启的情况下丢失消息,实现持久化。

API

ConcurrentLinkedQueue

offer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Inserts the specified element at the tail of this queue.
* As the queue is unbounded, this method will never return {@code false}.
*
* @return {@code true} (as specified by {@link Queue#offer})
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
checkNotNull(e);// 非空校验
final Node<E> newNode = new Node<E>(e);

for (Node<E> t = tail, p = t;;) {// 获取当前队列的尾部
Node<E> q = p.next;
if (q == null) {// 找到队列最后一个节点
// p is last node p为最后一个节点
if (p.casNext(null, newNode)) {// 将p的next节点指向newNode
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
// 尾节点指向newNode
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
// 在casTail方法失败的情况下,p=q
p = (p != t && t != (t = tail)) ? t : q;
}
}
poll
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {// 获取当前队列的头部
E item = p.item;

if (item != null && p.casItem(item, null)) {// 头部置空
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
// 提升下一个节点作为新的头部
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
size
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Returns the number of elements in this queue. If this queue
* contains more than {@code Integer.MAX_VALUE} elements, returns
* {@code Integer.MAX_VALUE}.
*
* <p>Beware that, unlike in most collections, this method is
* <em>NOT</em> a constant-time operation. Because of the
* asynchronous nature of these queues, determining the current
* number of elements requires an O(n) traversal.
* Additionally, if elements are added or removed during execution
* of this method, the returned result may be inaccurate. Thus,
* this method is typically not very useful in concurrent
* applications.
*
* @return the number of elements in this queue
*/
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}

在ConcurrentLinkedQueue中的size方法会轮询整个Queue

为什么不在offer的时候直接size++?

因为size++这个操作本身就是非线程安全的,而ConcurrentLinkedQueue是无锁编程(CAS),所以放在offer里面并不合适,这样会破坏ConcurrentLinkedQueue线程安全的立意。

💡由于ConcurrentLinkedQueue中的size方法会轮询整个Queue,若仅仅只是想判断是否为空建议使用isEmpty。

在数据库连接池中,获取连接的时候,如果连接不够用,这时候需要阻塞等待,这是ConcurrentLinkedQueue不具备的功能。

ArrayBlockingQueue——— 线程安全、阻塞

put——阻塞式入队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;// 加锁
lock.lockInterruptibly();
try {
while (count == items.length)// 队列已满,利用Condition实现阻塞,进入等待
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

💡offer方法不会阻塞,若队列满了,则直接返回false

take—阻塞式获取队列头部
1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) // 队列为空,进入等待
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

💡ReentrantLock.lockInterruptibly允许在等待时由其它线程调用等待线程的Thread.interrupt方法来中断等待线程的等待而直接返回,这时不用获取锁,而会抛出一个InterruptedException。 ReentrantLock.lock方法不允许Thread.interrupt中断,即使检测到Thread.isInterrupted,一样会继续尝试获取锁,失败则继续休眠。只是在最后获取锁成功后再把当前线程置为interrupted状态,然后再中断线程。

🗣🥑🥝💎💡

LinkedBlockingQueue

构造方法
1
2
3
4
5
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);// 构造一个空的node节点
}
put
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();// 加锁
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();// 队列已满 阻塞
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();// 队列没满 唤醒
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
offer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full.
* When using a capacity-restricted queue, this method is generally
* preferable to method {@link BlockingQueue#add add}, which can fail to
* insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity) // 容量判断
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();// 加锁
try {
if (count.get() < capacity) {
enqueue(node);// 入队列(加入链表)
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal(); // 唤醒
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}

ArrayBlockingQueue和LinkedBlockingQueue 入队列和出队列对比,原理相同

不同之处 参考https://zhidao.baidu.com/question/553304139806031732.html

案例:Tomcat实现的数据库连接池

为什么tomcat选择用LinkedBlockingQueue实现的数据库连接池,而不是ArrayBlockingQueue?

因为ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁;
LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock。数据库连接池的场景读取和写入应该是分离的,所以LinkedBlockingQueue更为适合。

PriorityBlockingQueue——— 优先级阻塞队列 线程安全

PriorityQueue是非线程安全的

利用堆排序实现排序

DelayQueue——— 延时队列

![](并发容器类list-set-queue/图片 1.png)

poll

take—— 阻塞式获取,不需要轮询

延时队列典型场景就是计划任务,线程池 ——— ScheduledThreadPoolExecutor是一个使用线程池执行定时任务的类,ScheduledThreadPoolExecutor内部自己实现了一个类似延时队列的DelayWorkQueue