dubbo系列之服务发布-流程

dubbo系列之服务发布-流程

先上两张经典图,大致了解一下发布流程官方文档服务发布序列图

发布活动图

源码构成

dubbo-demo-provider.xml——注册文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
<!-- 提供方应用信息,用于计算依赖关系 -->
<dubbo:application name="demo-provider"/>

<!-- 使用multicast广播注册中心暴露服务地址 -->
<dubbo:registry address="multicast://224.5.6.7:1234"/>

<!-- 用dubbo协议在20880端口暴露服务 -->
<dubbo:protocol name="dubbo" port="20880"/>

<!-- 声明需要暴露的服务接口 -->
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>

<!-- 和本地bean一样实现服务 -->
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>

可以看到服务是通过dubbo的schema service进行注入的,那我们找到DubboNameSpaceHandler文件,即dubbo的命名空间处理器,找到dubbo:service标签解析行。

DubboNameSpaceHandler()
1
2
3
4
5
6
7
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
public void init() {
//省略一些其他加载语句
//这就话就是用于加载服务
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
}
}
ServiceBean类

ServiceBean类可以看到实现了ApplicationListener,这个接口就是spring的时间机制,看看ServiceBean继承这个接口实现的方法。

1
2
3
4
5
6
7
8
9
10
public void onApplicationEvent(ApplicationEvent event) {
if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();//服务暴露的开始
}
}
}

export()

–>ServiceConfig.export()

-->doExport()

  -->doExportUrls()
1
2
3
4
5
6
7
private void doExportUrls() {
//加载注册中心,从dubbo.properties里面组装registry的url信息
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {//for循环,代表了一个服务可以有多个通信协议,例如 tcp协议 http协议,默认是tcp协议
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}

doExportUrlsFor1Protocol()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//省略一段代码
//配置为none不暴露
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
//配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
暴露本地服务和暴露远程服务的区别是什么?

1.暴露本地服务:指暴露在用一个JVM里面,不用通过调用zk来进行远程通信。例如:在同一个服务,自己调用自己的接口,就没必要进行网络IP连接来通信。
2.暴露远程服务:指暴露给远程客户端的IP和端口号,通过网络来实现通信。

本地暴露

exportLocal(url)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SuppressWarnings({"unchecked", "rawtypes"})
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(NetUtils.LOCALHOST)
.setPort(0);
Exporter<?> exporter = protocol.export(
//创建代理类
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
1
2
3
Exporter<?> exporter = protocol.export(
//创建代理类
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));接下来的代码流程就用一张图代替了,很清晰

这句话是本地暴露的关键,看官方给的服务提供者暴露一个服务的详细过程,其实也就是这句代码

服务提供者暴露一个服务的详细过程

为了方便理解,介绍一下一些类和方法的概念:

proxyFactory

proxyFactory:就是为了获取一个接口的代理类,例如获取一个远程接口的代理。
它有2个方法,代表2个作用
a.getInvoker:针对server端,将服务对象,如DemoServiceImpl包装成一个Invoker对象。
b.getProxy :针对client端,创建接口的代理对象,例如DemoService的接口。

Wrapper

它类似spring的BeanWrapper,它就是包装了一个接口或一个类,可以通过wrapper对实例对象进行赋值 取值以及指定方法的调用。

Invoker

它是一个可执行的对象,能够根据方法的名称、参数得到相应的执行结果。

1
它里面有一个很重要的方法 Result invoke(Invocation invocation);
Invocation

包含了需要执行的方法和参数等重要信息,目前它只有2个实现类RpcInvocation MockInvocation

它有3种类型的Invoker
1.本地执行类的Invoker

1
server端:要执行 demoService.sayHello,就通过InjvmExporter来进行反射执行demoService.sayHello就可以了。

2.远程通信类的Invoker
client端:要执行 demoService.sayHello,它封装了DubboInvoker进行远程通信,发送要执行的接口给server端。
server端:采用了AbstractProxyInvoker执行了DemoServiceImpl.sayHello,然后将执行结果返回发送给client.

3.多个远程通信执行类的Invoker聚合成集群版的Invoker
client端:要执行 demoService.sayHello,就要通过AbstractClusterInvoker来进行负载均衡,DubboInvoker进行远程通信,发送要执行的接口给server端。
server端:采用了AbstractProxyInvoker执行了DemoServiceImpl.sayHello,然后将执行结果返回发送给client.

Protocol

1.export:暴露远程服务(用于服务端),就是将proxyFactory.getInvoker创建的代理类 invoker对象,通过协议暴露给外部。
2.refer:引用远程服务(用于客户端), 通过proxyFactory.getProxy来创建远程的动态代理类,例如DemoService的远程动态接口。

exporter

维护invoder的生命周期。

exchanger

信息交换层,封装请求响应模式,同步转异步。

介绍完上面一些概念,接着看看本地暴露接下来的步骤,用一张清晰的图表示。

本地暴露流程图

最终本地暴露的目的就是

1
exporterMap.put(key, this)//key=com.alibaba.dubbo.demo.DemoService, this=InjvmExporter

远程暴露

doExportUrlsFor1Protocol()
1
2
3
4
5
//原理和本地暴露一样都是为了获取一个Invoker对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//远程暴露的关键:将Invoker对象封装到protocol协议对象中,同时开启socket服务监听端口。这里socket通信是使用netty框架来处理的
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);

远程暴露的关键就是invoker转换导exporter的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-->proxyFactory.getInvoker//原理和本地暴露一样都是为了获取一个Invoker对象
-->protocol.export(invoker)
-->Protocol$Adpative.export
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("registry");
-->extension.export(arg0)
-->ProtocolFilterWrapper.export//AOP的过滤
-->ProtocolListenerWrapper.export//AOP的监听
-->RegistryProtocol.export//进入目标类
-->doLocalExport(originInvoker)
-->getCacheKey(originInvoker);//读取 dubbo://192.168.100.51:20880/
-->protocol.export
-->Protocol$Adpative.export
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("dubbo");//和本地暴露不同的是从"injvm"变成了"dubbo"
-->extension.export(arg0)
-->ProtocolFilterWrapper.export
-->buildInvokerChain//创建8个filter
-->ProtocolListenerWrapper.export
———1.netty服务暴露的开始——- –>DubboProtocol.export
1
2
3
4
-->serviceKey(url)//组装key=com.alibaba.dubbo.demo.DemoService:20880
-->目的:exporterMap.put(key, this)//key=com.alibaba.dubbo.demo.DemoService:20880, this=DubboExporter
-->openServer(url)//netty服务开启
-->createServer(url)
——–2.信息交换层 exchanger 开始————–>Exchangers.bind(url, requestHandler)//exchaanger是一个信息交换层
1
2
3
4
5
6
7
8
-->getExchanger(url)
-->getExchanger(type)
-->ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension("header")
-->HeaderExchanger.bind
-->Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
-->new HeaderExchangeHandler(handler)//this.handler = handler
-->new DecodeHandler
-->new AbstractChannelHandlerDelegate//this.handler = handler;
———3.网络传输层 transporter———————>Transporters.bind
1
2
3
4
5
6
7
8
9
10
-->getTransporter()  
-->ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension()
-->Transporter$Adpative.bind
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension("netty");
-->extension.bind(arg0, arg1)
-->NettyTransporter.bind
--new NettyServer(url, listener)
-->AbstractPeer //this.url = url; this.handler = handler;
-->AbstractEndpoint//codec timeout=1000 connectTimeout=3000
-->AbstractServer //bindAddress accepts=0 idleTimeout=600000
———4.打开断开,暴露netty服务——————————–>doOpen()
1
2
3
4
5
6
7
8
-->设置 NioServerSocketChannelFactory boss worker的线程池 线程个数为3
-->设置编解码 hander
-->bootstrap.bind(getBindAddress())
-->new HeaderExchangeServer
-->this.server=NettyServer
-->heartbeat=60000
-->heartbeatTimeout=180000
-->startHeatbeatTimer()//这是一个心跳定时器,采用了线程池,如果断开就心跳重连。

还是贴一张图,理解更清晰一点

dubbo远程服务暴露总结

DubboProtocol()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();

// export service.
//组装key=com.alibaba.dubbo.demo.DemoService:20880
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
//目的:exporterMap.put(key, this)//key=com.alibaba.dubbo.demo.DemoService:20880, this=DubboExporter
exporterMap.put(key, exporter);

//export an stub service for dispaching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//开启netty服务
openServer(url);
//返回exporter
return exporter;
}

参考链接

[]: https://www.jianshu.com/p/893f7e6e0c58