dubbo系列之网络通信-provider的接收与发送原理

dubbo系列之网络通信-provider的接收与发送原理

简要

绿色区域在《dubbo系列之服务发布-流程》里面有提到过,这里简单回顾一下。

ThreadPool:线程池,在NettyServer类中设置了boss和wroker线程池。
doOpen()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
// 设置线程池,名为boss wroker
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
// 数量为本机CPU核数 + 1
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);

final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
Server:实现类就有NettyServer类以及MinaNetty类。transport:网络传输层,抽象mina和netty为统一接口,以Message为中心;

接下来,我们就要开始介绍上面架构图的红色区域,也是这篇博客的重点。

源码解析

NettyServer类

doOpen()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());//解码
pipeline.addLast("encoder", adapter.getEncoder());//编码
pipeline.addLast("handler", nettyHandler);//处理器
return pipeline;
}
});

进入处理器

NettyHandler类

messageReceived()——消息接收
1
2
3
4
5
6
7
8
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}

AllChannelHandler类

received()
1
2
3
4
5
6
7
8
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();//创建线程池
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));//线程池执行线程,debug进去
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}

ChannelEventRunnable类

run()

此时的state是RECEIVED

1
2
3
4
5
6
7
8
case RECEIVED:
try {
handler.received(channel, message);//debug
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
break;

DecodeHandler类

received()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}

if (message instanceof Request) {
decode(((Request) message).getData());//设置编码
}

if (message instanceof Response) {
decode(((Response) message).getResult());
}

handler.received(channel, message);//debug
}

HeaderExchangeHandler类

received()
1
2
3
4
5
6
7
else {
if (request.isTwoWay()) {
Response response = handleRequest(exchangeChannel, request);//网络通信接收处理,debug
channel.send(response);//把接受处理的结果,发回consumer
} else {
handler.received(exchangeChannel, request.getData());//
}
handleRequest()
1
Object result = handler.reply(channel, msg);

DubboProtocol类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);//debug
//如果是callback 需要处理高版本调用低版本的问题
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
...
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);// debug
}
...
}
getInvoker()
1
2
3
4
5
6
7
8
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
...
// 从服务暴露里面提取exporter
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
...
//最终得到一个invoker,并返回
return exporter.getInvoker();
}

这样invoker的值就拿到了,再次回到DubboProtocol类,执行invoker.invoke(inv);

——————————————–中间省略一些monitor,context,filter———————————————————

AbstractProxyInvoker—-进入代理类

JavassistProxyFactory类

1
2
3
4
5
6
public Result invoke(Invocation invocation) throws RpcException {
try {
return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()));
}
...
}

AbstractProxyInvoker是代理类,再往下执行就是执行JavassistProxyFactory类的getInvoker方法,然后就是进入真正执行的实现类 DemoServiceImpl.sayHello,完成类的代理。

这样response

response在debug下的执行值

这样通过Response这个类,provider接收的过程就完成了。。。。。。。。。。。。。。。

接着看channel.send(response),把接受处理的结果,发回consumer。

NettyChannel类

send()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);

boolean success = true;
int timeout = 0;
try {
//最终目的:通过netty的channel发送网络数据
ChannelFuture future = channel.write(message);
if (sent) {
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
...
}

这个地方就是和上一篇博客一样了,走到了channel.write(message);将数据发回consumer。

现在回过头看,博客一开始图上圈出来的红色区域。

Exporter:DubboProtocol类

Filter:ProtocolFilterWrapper ProtocolFilterWrapper ClassLoaderFilter GenericFilter TraceFilter MonitorFilter TimeoutFilter ExceptionFilter InvokerWrapper类

invoker:就是JavassistProxyFactory.AbstractProxyInvoker.doInvoke.通过上面的源码解析,我们能看出来invoker存储于DubboProtocol的ExporterMap里面的DubboExporter.getInvoker().

implement:最终实现类 DemoServiceImpl.sayHello

至此:网络通信的架构图就说完了。