线程池-ForkJoin框架详解-FutureTask源码剖析

线程池-ForkJoin框架详解-FutureTask源码剖析

多线程应用

原本串行执行的程序,耗时3+2+5 = 10s的程序,时候缩短到<10。实际上是一种空间换时间的思想,对于用户来说响应时间是最重要的。

线程池

FutureTask

Callable VS Runnable

Callable的get方法有阻塞效果,如果任务结果还没有返回,那么就让线程等待,因此不需要像Runnable利用栅栏等待返回结果。

区别

1:方法名不同Callable为call(),Runnable为run()

2:call()有返回值,run()则无

3:call()会抛出异常,run()没有

源码解析

查看submit()源码

newTaskFor()

线程池里面任务 ——— 载体FutureTask, 线程去执行任务 ——— FutureTask

自己写一个FutureTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.study.thread.future.service;

import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;

// 我们想一想,这个功能怎么实现
// (jdk本质,就是利用一些底层API,为开发人员提供便利)
public class FutureTaskDemo<T> implements Runnable, Future { // 获取 线程异步执行结果 的方式
Callable<T> callable; // 业务逻辑在callable里面
T result = null;
volatile String state = "NEW"; // task执行状态
LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();// 定义一个存储等待者的集合

public FutureTaskDemo(Callable<T> callable) {
this.callable = callable;
}

@Override
public void run() {
try {
result = callable.call();
} catch (Exception e) {
e.printStackTrace();
// result = exception
} finally {
state = "END";
}

// 唤醒等待者
Thread waiter = waiters.poll();
while (waiter != null) {
LockSupport.unpark(waiter);

waiter = waiters.poll(); // 继续取出队列中的等待者
}
}

// 返回结果,
@Override
public T get() {
if ("END".equals(state)) {
return result;
}

waiters.offer(Thread.currentThread()); // 加入到等待队列,线程不继续往下执行

while (!"END".equals(state)) {
LockSupport.park(); // 线程通信的知识点
}
// 如果没有结束,那么调用get方法的线程,就应该进入等待
return result;
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}

@Override
public boolean isCancelled() {
return false;
}

@Override
public boolean isDone() {
return false;
}

@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
}

FutureTaskDemo此类仅供研究FutureTask的功能框架,实际生产中会存在线程不安全等问题,具体代码查看FutureTask源码

这是一个只实现基础功能的FutureTask,它继承了Runnable,再是利用park挂起获取result的线程,再result有结果返回时,unpark线程,实现Runnable的有返回值版本。

💡:为什么这个地方没有用CAS自旋锁的方式,而是park/unpark机制?

因为CAS自旋适合短时间内有返回的场景,比如入队列操作,而callable.call()方法是,调用方自己实现的,无法预估返回时间,用CAS自旋锁的形式会长时间占用CPU。

fork / join并发处理框架

用来做什么

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package com.study.thread.future.service;

import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.ArrayList;
import java.util.concurrent.*;

/**
* 并行调用http接口
*/
@Service
public class UserServiceForkJoin {
// 本质是一个线程池,默认的线程数量:CPU的核数
ForkJoinPool forkJoinPool = new ForkJoinPool(10, ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
@Autowired
private RestTemplate restTemplate;

/**
* 查询多个系统的数据,合并返回
*/
public Object getUserInfo(String userId) throws ExecutionException, InterruptedException {
// 其他例子, 查数据库的多个表数据,分多次查询
// fork/join
// forkJoinPool.submit()
ArrayList<String> urls = new ArrayList<>();
urls.add("http://www.tony.com/userinfo-api/get?userId=" + userId);
urls.add("http://www.tony.com/integral-api/get?userId=" + userId);

HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, 0, urls.size() - 1);
ForkJoinTask<JSONObject> forkJoinTask = forkJoinPool.submit(httpJsonRequest);

JSONObject result = forkJoinTask.get();
return result;
// RecursiveTask 有返回结果
// RecursiveAction 无返回结果

}
}

// 任务
class HttpJsonRequest extends RecursiveTask<JSONObject> {

RestTemplate restTemplate;
ArrayList<String> urls;
int start;
int end;

HttpJsonRequest(RestTemplate restTemplate, ArrayList<String> urls, int start, int end) {
this.restTemplate = restTemplate;
this.urls = urls;
this.start = start;
this.end = end;
}

// 就是实际去执行的一个方法入口(任务拆分)
@Override
protected JSONObject compute() {
int count = end - start; // 代表当前这个task需要处理多少数据
// 自行根据业务场景去判断是否是大任务,是否需要拆分
if (count == 0) {
String url = urls.get(start);
// TODO 如果只有一个接口调用,立刻调用
long userinfoTime = System.currentTimeMillis();
String response = restTemplate.getForObject(url, String.class);
JSONObject value = JSONObject.parseObject(response);
System.out.println(Thread.currentThread() + " 接口调用完毕" + (System.currentTimeMillis() - userinfoTime) + " #" + url);
return value;
} else { // 如果是多个接口调用,拆分成子任务 7,8, 9,10
System.out.println(Thread.currentThread() + "任务拆分一次");
int x = (start + end) / 2;
HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, start, x);// 负责处理哪一部分?
httpJsonRequest.fork();

HttpJsonRequest httpJsonRequest1 = new HttpJsonRequest(restTemplate, urls, x + 1, end);// 负责处理哪一部分?
httpJsonRequest1.fork();

// join获取处理结果
JSONObject result = new JSONObject();
result.putAll(httpJsonRequest.join());
result.putAll(httpJsonRequest1.join());
return result;
}
}
}

意图梳理

任务拆分

关键点:分解任务fork出新任务(fork就是将拆分好的任务加入workQueue),汇集join任务执行结果(join就是返回执行结果),这就是任务拆分

而在ExecutorService中,N个不同的任务需要submitN次

工作窃取

Fork-join和Future的另一个区别在于Fork-join里面不止一个workQueue,一个任务对应一个workQueue,每个workQueue对应一个Thread

Thread-2发下queue-2没有数据,Thread-2会去Thread-1中窃取任务,这就是工作窃取

运用场景:大数据计算