dubbo系列之网络通信-IO异步变同步

dubbo系列之网络通信-IO异步变同步

简要

dubbo 是基于netty NIO的非阻塞 并行调用通信。 (阻塞 非阻塞 异步 同步 区别 )
dubbo 的通信方式 有3类类型:

1.异步,有返回值

1
2
3
<dubbo:method name="sayHello" async="true"></dubbo:method>
Future<String> temp= RpcContext.getContext().getFuture();
hello=temp.get();

2.异步,无返回值

1
<dubbo:method name="sayHello" return="false"></dubbo:method>

3.异步,变同步(默认的通信方式)

源码介绍

三种不同通信方式在代码中判断就是在前面一直有提到过的DubboInvoker类

DubboInvoker类

doInvoker()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected Result doInvoke(final Invocation invocation) throws Throwable {
...
if (isOneway) {//2:异步,无返回值
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {//1:异步,有返回值
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {//3.异步,变同步(默认的通信方式)
RpcContext.getContext().setFuture(null);
//发送请求
return (Result) currentClient.request(inv, timeout).get();
}
}
...
}

那通信方式又是在哪设置的呢?

dubbo-demo-consumer.xml

1
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" />

原来的配置是这样的,默认就是进去3.异步,变同步。

1:异步,有返回值

1
2
3
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" >
<dubbo:method name="sayHello" async="true"></dubbo:method>
</dubbo:reference>

启动服务端和客户端

看一下DemoAction类的打印效果

console

打印的全是null了。

断点进入doInvoke()
1
2
3
4
5
else if (isAsync) {//1:异步,有返回值
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
}

我们修改一下DemoAction的hello打印值

start()
1
2
3
4
5
6
7
8
9
10
11
12
13
public void start() throws Exception {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
try {
String hello = demoService.sayHello("world" + i);
Future<String> temp = RpcContext.getContext().getFuture();
hello=temp.get();
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + hello);
} catch (Exception e) {
e.printStackTrace();
}
Thread.sleep(2000);
}
}

这样打印的结果就不再是null了

console1

2:异步,无返回值

dubbo-demo-consumer.xml

1
2
3
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" >
<dubbo:method name="sayHello" return="false"></dubbo:method>
</dubbo:reference>

DemoAction类

start()
1
2
3
4
5
6
7
8
9
10
11
12
13
 public void start() throws Exception {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
try {
String hello = demoService.sayHello("world" + i);
// Future<String> temp = RpcContext.getContext().getFuture();
// hello=temp.get();
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + hello);
} catch (Exception e) {
e.printStackTrace();
}
Thread.sleep(2000);
}
}

打印结果就又变null了。

看了前面两种情况,我们大致了解了,由于demoService.sayHello(“world” + i)方法是同步的,所以在正常异步情况下打印这个就是null值,那怎么才能获取这个值呢?接着看第三者情况。

下面就是这篇博客的重点了。

3:异步,变同步

问题:当前线程怎么让它 “暂停,等结果回来后,再执行”?

看代码

DubboInvoker类

1
2
3
4
5
else {//3.异步,变同步(默认的通信方式)
RpcContext.getContext().setFuture(null);
//发送请求
return (Result) currentClient.request(inv, timeout).get();
}

HeaderExchangeChannel类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);//利用netty异步发送请求
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}

所以DubboInvoker里面是返回的DefaultFuture的get()方法。

DefaultFuture类

get()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private final Condition done = lock.newCondition();//jdk多线程里面的condition
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {//在没有得到服务端返回结果时
long start = System.currentTimeMillis();
lock.lock();//加锁
try {
while (!isDone()) {//死循环----阻塞
done.await(timeout, TimeUnit.MILLISECONDS);//等待
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();//解锁
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();//返回response结果
}
1
2
3
public boolean isDone() {
return response != null;
}

当reponse拿到数据后,isDone()就为true,就会结束死循环。

我们再看一下上一篇博客介绍的consumer接收数据的处理

DefaultFuture类

doReceived
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void doReceived(Response res) {
lock.lock();//加锁
try {
response = res;//复制
if (done != null) {
done.signal();//唤醒
}
} finally {
lock.unlock();//解锁
}
if (callback != null) {
invokeCallback(callback);
}
}

所以等结果回来,就唤醒。这样,返回结果就不再是null了。

问题:socket是一个全双工的通信方式,那么在多线程的情况下,如何知道那个返回结果对应原先那条线程的调用?

看源码

DefaultFuture类

1
2
3
4
5
6
7
8
9
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();//id是全局唯一的
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);//将id和线程一起存储在map里面
CHANNELS.put(id, channel);
}

Request类

newId()——id初始化
1
2
3
4
private static long newId() {
// getAndIncrement()增长到MAX_VALUE时,再增长会变为MIN_VALUE,负数也可以做为ID
return INVOKE_ID.getAndIncrement();
}
received()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());//map中删除这条数据,并将值赋给future
if (future != null) {
future.doReceived(response);//唤醒
} else {// 超时
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}

在consumer接收数据时,会将FUTURES中的这条数据删除。

所以上面问题的答案就是——>通过一个全局唯一的ID来做consumer 和 provider 来回传输。

单工 全双工 半双工的区别?

单工:在同一时间只允许一方向另一方传送信息,而另一方不能向一方传送

全双工:是指在发送数据的同时也能够接收数据,两者同步进行,这好像我们平时打电话一样,说话的同时也能够听到对方的声音。目前的网卡一般都支持全双工。

半双工:所谓半双工就是指一个时间段内只有一个动作发生。举个简单例子,一条窄窄的马路,同时只能有一辆车通过,当目前有两量车对开,这种情况下就只能一辆先过,等到头后另一辆再开,这个例子就形象的说明了半双工的原理。