并发系列之生产者消费者模式

并发系列之生产者消费者模式

简要

生产者消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好了解决方案。一般这种设计模式都是基于阻塞队列的,当数据生成时,生产者把数据放入队列,而当消费者准备处理数据时,将从队列中获取数据。

例子

以两个人洗盘子为例,二者的劳动分工也是一种生产者—消费者模式:其中一个人把洗好的盘子放在盘架上,而另一个从盘架上取出盘子并把他们烘干。在这个示例中,盘架相当于阻塞队列。如果盘架上没有盘子,那么消费者就会一直等待,直到有盘子需要烘干。如果盘架放满了,那么生产者就会停止清洗盘子直到盘架上有更多的空间。我们可以将这种类比扩展为多个生产者(虽然可能存在对水槽的竞争)和多个消费者,每个工人只需与盘架打交道。人们不需要知道究竟有多少生产者或消费者,或者谁生产了某个指定的工作项。

代码实现

创建“盘子”这个被消费的对象

1
2
3
4
'public class Plate {
private String id;
private String name;
}

创建一个生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Provider implements Runnable{

/**
* 共享缓存区
*/
private BlockingQueue<Plate> queue;
/**
* 多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程状态
*/
private volatile boolean isRunning = true;
/**
* id生成器
*/
private static AtomicInteger count = new AtomicInteger();

public Provider(BlockingQueue<Plate> queue) {
this.queue = queue;
}

@Override
public void run() {
while (isRunning) {
try {
Thread.sleep(1000);
//计数
int id = count.incrementAndGet();
Plate plate = new Plate(Integer.toString(id), "盘子" + id);
System.out.println("当前线程:" + Thread.currentThread().getName() + ",获取了盘子,id为:"
+ id + ",进行装载到公共缓冲区中...");
if (!this.queue.offer(plate, 2, TimeUnit.SECONDS)) {
System.out.println("提交缓冲区数据失败...");
// do something... 比如重新提交
}
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}
public void stop() {
this.isRunning = false;
}
}

创建一个消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Consumer implements Runnable{

private BlockingQueue<Plate> queue;

public Consumer(BlockingQueue queue) {
this.queue = queue;
}

@Override
public void run() {
while (true) {
try {
//获取数据
Plate plate = this.queue.take();
Thread.sleep(1000);
System.out.println("当前消费线程:" + Thread.currentThread().getName() +
", 洗盘子成功,消费数据为id: " + plate.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}
}

测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class MainTest {

public static void main(String[] args) throws Exception {
//内存缓冲区
BlockingQueue<Plate> queue = new LinkedBlockingQueue<Plate>(10);
//生产者
Provider p1 = new Provider(queue);

Provider p2 = new Provider(queue);
Provider p3 = new Provider(queue);
//消费者
Consumer c1 = new Consumer(queue);
Consumer c2 = new Consumer(queue);
Consumer c3 = new Consumer(queue);
//创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,
//没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)

ExecutorService cachePool = Executors.newCachedThreadPool();
cachePool.execute(p1);
cachePool.execute(p2);
cachePool.execute(p3);
cachePool.execute(c1);
cachePool.execute(c2);
cachePool.execute(c3);

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
p1.stop();
p2.stop();
p3.stop();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// cachePool.shutdown();
// cachePool.shutdownNow();


}
}

console

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
当前线程:pool-1-thread-1,获取了盘子,id为:1,进行装载到公共缓冲区中...
当前线程:pool-1-thread-3,获取了盘子,id为:2,进行装载到公共缓冲区中...
当前线程:pool-1-thread-2,获取了盘子,id为:3,进行装载到公共缓冲区中...
当前线程:pool-1-thread-2,获取了盘子,id为:5,进行装载到公共缓冲区中...
当前消费线程:pool-1-thread-6, 洗盘子成功,消费数据为id: 3
当前消费线程:pool-1-thread-5, 洗盘子成功,消费数据为id: 2
当前消费线程:pool-1-thread-4, 洗盘子成功,消费数据为id: 1
当前线程:pool-1-thread-3,获取了盘子,id为:4,进行装载到公共缓冲区中...
当前线程:pool-1-thread-1,获取了盘子,id为:6,进行装载到公共缓冲区中...
当前消费线程:pool-1-thread-6, 洗盘子成功,消费数据为id: 5
当前线程:pool-1-thread-3,获取了盘子,id为:9,进行装载到公共缓冲区中...
当前线程:pool-1-thread-2,获取了盘子,id为:8,进行装载到公共缓冲区中...
当前消费线程:pool-1-thread-5, 洗盘子成功,消费数据为id: 4
当前消费线程:pool-1-thread-4, 洗盘子成功,消费数据为id: 6
当前线程:pool-1-thread-1,获取了盘子,id为:7,进行装载到公共缓冲区中...
当前消费线程:pool-1-thread-4, 洗盘子成功,消费数据为id: 7
当前消费线程:pool-1-thread-5, 洗盘子成功,消费数据为id: 8
当前消费线程:pool-1-thread-6, 洗盘子成功,消费数据为id: 9