dubbo系列之网络通信-consumer发送原理

dubbo系列之网络通信-consumer发送原理

简要

先来看一张dubbo的网络通信架构图(也可以说是调用链设计图)

dubbo网络通信架构图

备注:inherit代表继承interface接口,init代表初始化,call代表调用

这篇博客主要就是介绍红色区域,红色区域下面在前几篇博客都已经介绍过了。

源码介绍

接着前面loadbalance往下走

FailoverClusterInvoker类

doInvoke()
1
Result result = invoker.invoke(invocation);//debug进去

ProtocolFilterWrapper类

buildInvokerChain()
1
2
3
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}

—————————这里开始进去filter了——————————-

ConsumerContextFilter类

invoke()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0)
.setRemoteAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
return invoker.invoke(invocation);
} finally {
RpcContext.getContext().clearAttachments();
}
}

进入上下文context,为Invocation设置需要执行的方法和参数等重要信息。

MonitorFilter类

invoke()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 调用过程拦截
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {
RpcContext context = RpcContext.getContext(); // 提供方必须在invoke()之前获取context信息
String remoteHost = context.getRemoteHost();
long start = System.currentTimeMillis(); // 记录起始时间戮
getConcurrent(invoker, invocation).incrementAndGet(); // 并发计数
try {
Result result = invoker.invoke(invocation); // 让调用链往下执行
collect(invoker, invocation, result, remoteHost, start, false);
return result;
} catch (RpcException e) {
collect(invoker, invocation, null, remoteHost, start, true);
throw e;
} finally {
getConcurrent(invoker, invocation).decrementAndGet(); // 并发计数
}
} else {
return invoker.invoke(invocation);
}
}

进入监控的filter

DubboInvoker类

doInvoke()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);

ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
//发送请求
return (Result) currentClient.request(inv, timeout).get();//debug进去
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}

HeaderExchangeChannel类

request()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);//发送请求
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}

AbstractClient类

send()
1
2
3
4
5
6
7
8
9
10
11
public void send(Object message, boolean sent) throws RemotingException {
if (send_reconnect && !isConnected()) {
connect();
}
Channel channel = getChannel();
//TODO getChannel返回的状态是否包含null需要改进
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
channel.send(message, sent);//debug进去
}

NettyChannel类

send()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);

boolean success = true;
int timeout = 0;
try {
//最终目的:通过netty的channel发送网络数据
ChannelFuture future = channel.write(message);
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
Throwable cause = future.getCause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}

if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}

进去到了NettyChannel,就是实现netty的方法:channel.write(message)。达到最终目的:通过netty的channel发送网络数据。

最后总结一下框架图上我们圈起来三步骤

Filter:ConsumerContextFilter MonitorFilter等一些过滤器类
Invoker:DubboInvoker类
看框架图中知道,invoker的实现是一个protocol,为什么DubboInvoker是个protocol?

如果前面的服务引用我们认真看源码的话,应该熟悉DubboInvoker这个类,就是在RegistryDirectory类

——>refreshInvoker()

——>toInvokers()

    ——>DubboProtocol类

        ——>refer()
1
2
3
4
5
6
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}

所以在服务引用的这个地方,DubboInvoker就被创建了,所以说,DubboInvoker是一个protocol。

Client:就是NettyClient类。

但是我们上面的代码好像没有看到NettyClient类,原来NettyClient类就是我们看到的AbstractClient类的继承类。

那client的实现Transporter又是什么意思呢

transporter:网络传输层,用来抽象netty和mina的统一接口。