并发系列之同步工具类

并发系列之同步工具类

简要

同步工具类可以是任何一个对象,只要它根据自身的状态来协调线程的控制流。同步工具类包括阻塞队列:阻塞队列、闭锁(Latch)、信号量(Semaphore)以及栅栏(Barrier)。阻塞队列前两篇博客以及介绍过了,本篇博文就是介绍后三种同步工具类。

闭锁

闭锁可以延迟线程的进度知道其到达终止状态,其作用相当于一扇门:在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门一直时关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。当闭锁到达结束状态时,将不会再改变状态,因此这扇门将永远保持打开状态。闭锁可用于确保某些活动直到其他活动都完成直到其他活动都完成才继续执行,例如:

1:确保某个计算在其需要的所有资源都被初始化之后才继续执行。

2:确保某个服务在其依赖的所有其他服务都已经启动之后才启动。

3:等待直到某个操作的所有参与者(例如,在多玩家游戏中的所有玩家)都就绪再继续执行。在这种情况中,当所有玩家都准备就绪时,闭锁将到达结束状态。

CountDownLatch是一种灵活闭锁实现,它可以使一个或多个线程等待一组事件的发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器达到0,这表示所有需要等待的事件都已经发生。如果计数器的值非0,那么await会一直阻塞直到计数器为0,或者等待中的线程中断,或者等待超时。

下面是countDownLatch的使用范例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.concurrent.CountDownLatch;

/**
* Created by wyj on 2018/5/10
*
* 在计时测试中使用countDownLatch来启动和停止线程
*
* 测试n个线程并发执行某个任务时需要的时间
*/
public class TestHarness {

public long timeTasks(int nThreads, final Runnable task) throws InterruptedException{
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);

for (int i=0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) {}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end-start;
}

public static void main(String[] args) throws InterruptedException {
TestHarness testHarness = new TestHarness();
long time = testHarness.timeTasks(10, new Task());
System.out.println(time);
}
}
class Task implements Runnable
{
int i = 0 ;
public void run() {
System.out.println(Thread.currentThread().getName());
}
}

console

1
2
3
4
5
6
7
8
9
10
11
12
D:\software\java\jdk1.8.0\bin\java ...
Thread-0
Thread-5
Thread-7
Thread-4
Thread-6
Thread-2
Thread-9
Thread-1
Thread-3
Thread-8
484692

信号量

Semaphore中管理着一组虚拟的许可,许可的初始数量可通过构造函数来指定,在操作过程中可以首先获得许可(只要还有剩余的许可),并且在使用以后释放许可。如果没有许可,那么acquire将阻塞直到有许可(或者直到被中断或者操作超时)。release方法将返回一个许可给信号量。

下面是semaphore实现的一个容器的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class BoundedHashSet<T> {

private final Set<T> set;
private final Semaphore sem;

public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}

public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
}finally {
if (!wasAdded)
sem.release();
}
}
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved)
sem.release();
return wasRemoved;
}

public static void main(String[] args) throws InterruptedException {
BoundedHashSet boundedHashSet = new BoundedHashSet(5);
for (int i = 0; i <= 10; i++ ) {
boundedHashSet.add(i);
System.out.println(i);
}
}
}

console

1
2
3
4
5
6
7
D:\software\java\jdk1.8.0\bin\java ...
0
1
2
3
4
//一直处于阻塞状态

栅栏

栅栏类似于闭锁,它能阻塞一组线程知道某个时间发生。栅栏于闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:这种算法通常讲一个问题拆分成一系列相互独立的子问题。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达了栅栏,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。如果成功地通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来“选举”产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当成功通过栅栏时会(在一个子任务线程中)执行它,但在阻塞线程被释放之前是不可能执行的。

下面是CyclicBarrier的一个使用范例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;

/**
* Created by wyj on 2018/5/10
*/
class CyclicBarrierTask implements Runnable {

private CyclicBarrier cyclicBarrier;
private int timeout;

public CyclicBarrierTask(CyclicBarrier cyclicBarrier, int timeout) {
this.cyclicBarrier = cyclicBarrier;
this.timeout = timeout;
}

@Override
public void run() {
CyclicBarrierTaskTest.print("正在running...");
try {
TimeUnit.MILLISECONDS.sleep(timeout);
CyclicBarrierTaskTest.print("到达栅栏处,等待其他线程到达");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
CyclicBarrierTaskTest.print("所有线程到达栅栏处,继续执行各自线程任务...");
}
}
public class CyclicBarrierTaskTest {

public static void print(String str) {
SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");
System.out.println("[" + dateFormat.format(new Date()) + "]"
+ Thread.currentThread().getName() + str);
}

public static void main(String[] args) {
int count = 5;
ExecutorService es = Executors.newFixedThreadPool(count);
CyclicBarrier cyclicBarrier = new CyclicBarrier(count, new Runnable() {
@Override
public void run() {
CyclicBarrierTaskTest.print("所有线程到达栅栏处,可以在此做一些处理...");
}
});
for (int i = 0; i < count; i++) {
es.execute(new CyclicBarrierTask(cyclicBarrier, (i + 1) * 1000));
}
}
}

console

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
D:\software\java\jdk1.8.0\bin\java ...
[16:11:09]pool-1-thread-5正在running...
[16:11:09]pool-1-thread-1正在running...
[16:11:09]pool-1-thread-2正在running...
[16:11:09]pool-1-thread-4正在running...
[16:11:09]pool-1-thread-3正在running...
[16:11:10]pool-1-thread-1到达栅栏处,等待其他线程到达
[16:11:11]pool-1-thread-2到达栅栏处,等待其他线程到达
[16:11:12]pool-1-thread-3到达栅栏处,等待其他线程到达
[16:11:13]pool-1-thread-4到达栅栏处,等待其他线程到达
[16:11:14]pool-1-thread-5到达栅栏处,等待其他线程到达
[16:11:14]pool-1-thread-5所有线程到达栅栏处,可以在此做一些处理...
[16:11:14]pool-1-thread-5所有线程到达栅栏处,继续执行各自线程任务...
[16:11:14]pool-1-thread-1所有线程到达栅栏处,继续执行各自线程任务...
[16:11:14]pool-1-thread-2所有线程到达栅栏处,继续执行各自线程任务...
[16:11:14]pool-1-thread-3所有线程到达栅栏处,继续执行各自线程任务...
[16:11:14]pool-1-thread-4所有线程到达栅栏处,继续执行各自线程任务...