AQS抽象队列同步器详解及工具类拓展

AQS抽象队列同步器详解及工具类拓展

信号量和栅栏和倒计数器

CountDownLatch

Java1.5被引入的一个工具类,常被称为:倒计数器。创建对象时,传入指定数值作为线程参与的数量;

await​:方法等待计数器值变为0,在这之前,线程进入等待状态;

countdown:计数器数值减一,直到为0;

经常用于等待其他线程执行到某一节点,在继续执行当前线程代码

使用场景实例:

1、统计线程执行的情况

2、压力测试中,使用countDownLatch实现最大程度的并发处理

3、多个线程之间,相互通信,比如线程异步调用完接口,结果通知

demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.wyj.jvm.cdl.benchmark;

import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

// 高并发测试场景 2000并发操作
public class BenchmarkTeatsCDL {

@Test
public void benchmarkTest() throws IOException, Exception {
CountDownLatch countDownLatch = new CountDownLatch(2000); // 2000个参与的线程
for(int i =0; i < 2000; i++) {
new Thread(() -> {
try {
countDownLatch.countDown();// -1
// TODO 并发执行这段代码
// service.method;
countDownLatch.await(); // 等待计数器归0
System.out.println(Thread.currentThread() + "就绪");
} catch (Exception e) {

}
}).start();
}
}
}

CyclicBarrier

也是1.5加入的,又称为“线程栅栏”

创建对象时,执行栅栏线程数量。

await:等指定数量的线程都处于等待状态时,继续执行后续代码。

barrierAction:线程数量到了指定量之后,自动触发执行指定任务。

和CountDownLatch重要区别在于,CyclicBarrier对象可多次触发执行;

典型场景:

1、数据量比较大时,实现批量插入数据到数据库;

2、数据统计,30个线程统计30天数据,全部统计完毕后,执行汇总

demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.wyj.jvm.cdl.benchmark;

import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

// 高并发测试场景 2000并发操作
public class BenchmarkTeatsCDL {

@Test
public void benchmarkTest() throws IOException, Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(500); // 2000个参与的线程
for(int i =0; i < 2000; i++) {
// 场景调整:压力测试,直接怼2000线程~~ 2000线程,分成4波去发起,500一批,
new Thread(() -> {
try {
cyclicBarrier.await(); // 等待栅栏开启。500
System.out.println(Thread.currentThread() + "就绪");
// TODO 并发执行这段代码
// service.method;
} catch (Exception e) {

}
}).start();
}
}
}

Semaphore

又称”信号量“,控制多个线程争抢许可。

acquire:获取一个许可,如果没有就等待,

release:释放一个许可。

availablePermits:方法得到可用的许可数目

典型场景

1、代码并发处理限流

demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.wyj.jvm.semaphore;


import java.util.Random;
import java.util.concurrent.Semaphore;

// 信号量机制
public class SemaphoreDemo {
public static void main(String[] args) {
SemaphoreDemo semaphoreTest = new SemaphoreDemo();
int N = 9; // 客人数量
Semaphore semaphore = new Semaphore(5); // 手牌数量,限制请求数量
for (int i = 0; i < N; i++) {
String vipNo = "vip-00" + i;
new Thread(() -> {
try {
semaphore.acquire(); // 获取令牌,没拿到的就等
// System.out.println(semaphore.count);
semaphoreTest.service(vipNo); // 实现了对service方法的限流

semaphore.release(); // 释放令牌,令牌数+1
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

// 限流 控制5个线程 同时访问
public void service(String vipNo) throws InterruptedException {
System.out.println("楼上出来迎接贵宾一位,贵宾编号" + vipNo + ",...");
Thread.sleep(new Random().nextInt(3000));
System.out.println("欢送贵宾出门,贵宾编号" + vipNo);
}

}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
楼上出来迎接贵宾一位,贵宾编号vip-000,...
楼上出来迎接贵宾一位,贵宾编号vip-001,...
楼上出来迎接贵宾一位,贵宾编号vip-002,...
楼上出来迎接贵宾一位,贵宾编号vip-003,...
楼上出来迎接贵宾一位,贵宾编号vip-004,...
欢送贵宾出门,贵宾编号vip-000
楼上出来迎接贵宾一位,贵宾编号vip-005,...
欢送贵宾出门,贵宾编号vip-003
楼上出来迎接贵宾一位,贵宾编号vip-006,...
欢送贵宾出门,贵宾编号vip-002
楼上出来迎接贵宾一位,贵宾编号vip-007,...
欢送贵宾出门,贵宾编号vip-004
楼上出来迎接贵宾一位,贵宾编号vip-008,...
欢送贵宾出门,贵宾编号vip-005
欢送贵宾出门,贵宾编号vip-001
欢送贵宾出门,贵宾编号vip-008
欢送贵宾出门,贵宾编号vip-006
欢送贵宾出门,贵宾编号vip-007

上面三个jdk类其实内部实现相似,都是将线程先挂起,达到某条件的时候,再运行。

自己手写一个CountDownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.wyj.jvm.cdl;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;

public class MyCDL {
AtomicInteger count = null; // 共享资源 -- 可以被一定数量线程
// 需要锁池
LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();

public MyCDL(int count) {
this.count = new AtomicInteger(count);
}

public void await(){ // 等待计数器归零
waiters.add(Thread.currentThread());
while(this.count.get() != 0) {
// 挂起线程
LockSupport.park(); // 挂起,等待被唤醒...
}
waiters.remove(Thread.currentThread());
}

public void countDown() { // 计数器-1
if (count.decrementAndGet() == 0) {
// 释放锁之后,要唤醒线程
for (Thread waiter : waiters) {
LockSupport.unpark(waiter);
}
}
}

public static void main(String[] args) throws Exception {
MyCDL cdl = new MyCDL(2000); //2000个参与的线程
for (int i = 0; i < 2000; i++) {
int finalI = i;
new Thread(() -> {
try {
cdl.countDown(); // -1
System.out.println("线程" + finalI + "就绪");
cdl.await();// 等待计数器归0,如果计数器不归0 则等待

// TODO 并发执行这段代码
System.out.println(Thread.currentThread() + "运行了");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}


}

手写一个Semaphore

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.wyj.jvm.semaphore;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

// 自定义的信号量实现
public class MySemaphore {
AtomicInteger count = null;
// 需要锁池
LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();

public MySemaphore(int num) {
this.count = new AtomicInteger(num); // 令牌数量 数值
}

public void acquire() { // 获取令牌,没有令牌就等待
// 进入等待列表
waiters.add(Thread.currentThread());
for (; ; ) {
int current = count.get();
int n = current - 1; // 发出一个令牌
if (current <= 0 || n < 0) {
// 挂起线程
LockSupport.park();
}
if (count.compareAndSet(current, n)) {
break;
}
}
waiters.remove(Thread.currentThread());
}

public void release() { // 释放令牌 -- 令牌数量+1
if (this.count.incrementAndGet() > 0) {
// 释放锁之后,要唤醒下一个等待的线程
Thread next = waiters.poll();
waiters.peek();
LockSupport.unpark(next);
}
}
}

同步锁的本质——— 排队

同步的方式:独享锁— 单个队列窗口(如lock),共享锁— 多个队列窗口(如CountDownLatch)

抢锁的方式:插队抢(不公平锁)、先来后到抢锁(公平锁)

没抢到锁的处理方式:快速尝试多次(CAS自旋锁)、阻塞等待

唤醒阻塞线程的方式(叫号器):全部通知、通知下一个

jdk将上面的各种同步器通过模板方法模式,抽象为AbstractQueuedSynchronizer类,简称AQS(抽象队列同步器)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package com.wyj.jvm;

import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;

/** 自己写的AQS */
public class AQSdemo {
// 同步资源状态
volatile AtomicInteger state = new AtomicInteger(0);
// 当前锁的拥有者
protected volatile AtomicReference<Thread> owner = new AtomicReference<>();
// java q 线程安全
public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();

// 独占
public void acquire() {
// 塞到等待锁的集合中
waiters.offer(Thread.currentThread());
while (!tryAcquire()) {
// 挂起这个线程
LockSupport.park();
}
// 后续,等待其他线程释放锁,收到通知之后继续循环
waiters.remove(Thread.currentThread());
}

public void release() {
// cas 修改 owner 拥有者
if (tryRelease()) {
Thread waiter = waiters.peek();
LockSupport.unpark(waiter); // 唤醒线程继续 抢锁
}
}
// 由实现类自己去实现获取的条件
public boolean tryAcquire() {
throw new UnsupportedOperationException();
}

public boolean tryRelease() {
throw new UnsupportedOperationException();
}


// 共享资源获取
public void acquireShared() {
// 塞到等待锁的集合中
waiters.offer(Thread.currentThread());
while (tryAcquireShared() < 0) {
// 挂起这个线程
LockSupport.park();
}
// 后续,等待其他线程释放锁,收到通知之后继续循环
waiters.remove(Thread.currentThread());
}

// 共享资源的释放
public void releaseShared() {
// cas 修改 owner 拥有者
if (tryReleaseShared()) {
Thread waiter = waiters.peek();
LockSupport.unpark(waiter); // 唤醒线程继续 抢锁
}
}

public int tryAcquireShared() {
throw new UnsupportedOperationException();
}

public boolean tryReleaseShared() {
throw new UnsupportedOperationException();
}

public AtomicInteger getState() {
return state;
}

public void setState(AtomicInteger state) {
this.state = state;
}
}
基于自己写的AQS实现上面的demp MyCDL
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.wyj.jvm.cdl;


import com.wyj.jvm.AQSdemo;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

// CountDownLatch 自己实现
public class CDLdemo {
// Object AQS = new Oject(state, queue);
// AQS 具体实现对象(state、queue、owner)
AQSdemo aqSdemo = new AQSdemo() {
@Override
public int tryAcquireShared() { // 如果非等于0,代表当前还有线程没准备就绪,则认为需要等待
return this.getState().get() == 0 ? 1 : -1;
}

@Override
public boolean tryReleaseShared() { // 如果非等于0,代表当前还有线程没准备就绪,则不会通知继续执行
return this.getState().decrementAndGet() == 0;
}
};

public CDLdemo(int count) {
aqSdemo.setState(new AtomicInteger(count));
}

public void await() {
aqSdemo.acquireShared();
}

public void countDown() {
aqSdemo.releaseShared();
}

public static void main(String[] args) throws InterruptedException {
// 一个请求,后台需要调用多个接口 查询数据
CDLdemo cdLdemo = new CDLdemo(10); // 创建,计数数值
for (int i = 0; i < 10; i++) { // 启动九个线程,最后一个两秒后启动
int finalI = i;
new Thread(() -> {
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是" + Thread.currentThread() + ".我执行接口-" + finalI +"调用了");
cdLdemo.countDown(); // 参与计数
// 不影响后续操作
}).start();
}

cdLdemo.await(); // 等待计数器为0
System.out.println("全部执行完毕.我来召唤神龙");
}
}
基于自己写的AQS实现上面的demp MySemaphore
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.wyj.jvm.semaphore;


import com.wyj.jvm.AQSdemo;

// 自定义的信号量实现
public class SemaphoreByAQS {
AQSdemo aqs = new AQSdemo() {
@Override
public int tryAcquireShared() { // 信号量获取, 数量 - 1
for(;;) {
int count = getState().get();
int n = count - 1;
if(count <= 0 || n < 0) {
return -1;
}
if(getState().compareAndSet(count, n)) {
return 1;
}
}
}

@Override
public boolean tryReleaseShared() { // state + 1
return getState().incrementAndGet() >= 0;
}
};

/** 许可数量 */
public SemaphoreByAQS(int count) {
aqs.getState().set(count); // 设置资源的状态
}

public void acquire() {
aqs.acquireShared();
} // 获取令牌

public void release() {
aqs.releaseShared();
} // 释放令牌
}

JDK中的AQS抽象队列同步器