RocketMQ路由中心NameServer

RocketMQ路由中心NameServer

NameServer架构设计

RocketMQ网络部署图

Producer同一时间,与NameServer集群中其中一台建立长连接。

Producer与Broker之间的Master保持长连接。

Consumer同一时间与NameServer集群中一台建立长连接。

Consumer与所有Broker建立长连接。

Producer发送某一主题的消息到消息服务器,消息服务器负责该消息的持久化存储。消息生产者(Producer)在发送消息之前先从NameServer获取Broker服务器地址列表,然后根据负载算法从列表中选择一台消息服务器进行消息发送。

Consumer订阅感兴趣的主题,消息服务器根据订阅信息(路由信息)将消息推送到消费者(PUSH模式)或者消息消费者主动向消费服务器拉取消息(PULL模式),从而实现消息生产者和消息消费者解耦。

Broker消息服务器在启动时向所有NameServer注册。Broker 部署相对复杂,Broker 分为 Master 不 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能 对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 Name Server 集群中的所有节 点建立长连接,定时注册 Topic 信息到所有 Name Server。

NameServer和每台Broker服务器保持长连接,并间隔30s检测Broker是否存活。如果检测到Broker宕机,则从路由注册表中将其移除。NameServer本身的高可用可通过部署多台NameServer服务器来实现,但彼此之间互不通信,也就是NameServer服务器之间在某一时刻的数据并不会完全相同,但这对消息发送不会造成任何影响。

NameServer启动流程

NameServer启动的主类——NamesrvStartup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public static NamesrvController main0(String[] args) {
// 将参数设置为系统的全局变量,可以在项目的任何一个地方通过System.getProperty("变量");来获得
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
try {
//PackageConflictDetect.detectFastjson();

Options options = ServerUtil.buildCommandlineOptions(new Options());// 初始化Option
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}

final NamesrvConfig namesrvConfig = new NamesrvConfig();// NameServer业务参数
final NettyServerConfig nettyServerConfig = new NettyServerConfig(); // NameServer网络参数
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {// -c 可以指定 NameServer 的配置文件,如果不指定,则使用默认值
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);

namesrvConfig.setConfigStorePath(file);

System.out.printf("load config properties file OK, " + file + "%n");
in.close();
}
}

if (commandLine.hasOption('p')) {// -p 打印 NameServer 的配置参数信息。打印完参数后退出进程
MixAll.printObjectProperties(null, namesrvConfig);
MixAll.printObjectProperties(null, nettyServerConfig);
System.exit(0);
}

MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);

if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}

LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);

final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);

// remember all configs to prevent discard
// 记住所有配置以防止丢弃
controller.getConfiguration().registerConfig(properties);
// ①NamesrvController类实例化
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 注册JVM钩子函数代码,以便监听Broker,移除处于不激活状态的Broker
// 在JVM退出之前,调用ShutdownHookThread服务来进行关闭服务,释放资源,实现优雅停机。
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));

controller.start();

String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf(tip + "%n");

return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}

return null;
}
①NamesrvController类实例化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public boolean initialize() {
// KVConfigManager类默认是从 /namesrv/kvConfig.json 配置文件中加载NameServer的配置参数.将配置参数加载保存到configTable变量中。
this.kvConfigManager.load();
// 创建NettyServer网络处理对象
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 线程池,用来处理broker和client提交的请求,其工作线程的大小可配置
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
//②注册 NameServer 服务接受请求的处理类
this.registerProcessor();
// 定时任务1:NameServer每隔10s扫描一次Broker,移除处于不激活状态的Broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
// ④RouteInfoManager类存储所有请求数据
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 定时任务2:NameServer每隔10min打印一次KV配置
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);

return true;
}
②注册 NameServer 服务接受请求的处理类
1
2
3
4
5
6
7
8
9
10
11
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {

this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
// ③注册核心业务处理器DefaultRequestProcessor,这里是把这个处理器注册到了remotingServer中去了,通过channelEventListener监听到broker和client的请求,
// 然后交给DefaultRequestProcessor进行处理,最后还是丢到线程池里面处理,根据不同的request code交给不同的processor进行处理,像registerBroker等。
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
③请求分发processRequest
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// (1)RemotingCommand:自定义的协议,携带请求参数和响应(2)ChannelHandlerContext:netty的数据结构,携带channel相关的信息。
// processRequest 请求分发
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",
request.getCode(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}

switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:// 向Namesrv追加KV配置
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:// 从Namesrv获取KV配置
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:// 从Namesrv删除KV配置
return this.deleteKVConfig(ctx, request);
case RequestCode.REGISTER_BROKER:// 注册一个Broker,数据都是持久化的,如果存在则覆盖配置
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:// 卸载一个Broker,数据都是持久化的
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:// 根据Topic获取Broker Name、topic配置信息
return this.getRouteInfoByTopic(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_INFO:// 获取注册到Name Server的所有Broker集群信息
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:// 去掉BrokerName的写权限
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:// 从Name Server获取完整Topic列表
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:// 从Namesrv删除Topic配置
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:// 通过NameSpace获取所有的KV List
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:// 获取指定集群下的所有 topic
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:// 获取所有系统内置 Topic 列表
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:// 单元化相关 topic
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:// 获取含有单元化订阅组的 Topic 列表
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:// 获取含有单元化订阅组的非单元化
return this.getHasUnitSubUnUnitTopicList(ctx, request);
case RequestCode.UPDATE_NAMESRV_CONFIG:// 更新Name Server配置
return this.updateConfig(ctx, request);
case RequestCode.GET_NAMESRV_CONFIG:// 获取NameSrv配置
return this.getConfig(ctx, request);
default:
break;
}
return null;
}

④RouteInfoManager类存储所有请求数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class RouteInfoManager {
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
// 表示一个broker距离上次发心跳包的最长时间,即120秒
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
// 使用可重入读写锁实现并发安全、使用轻量级的非线程安全容器实现高效并发
// ReentrantLock是一个排他锁,同一时间只允许一个线程访问,而ReentrantReadWriteLock允许多个读线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。相对于排他锁,提高了并发性
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// 用于管理topic和属于这个topic的队列的映射关系
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// 用于管理某个broker和它对应的信息
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 用于管理broker集群和集群中对应的broker的映射关系
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// 用于管理broker的存活信息
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// 用于管理broker和过滤服务列表
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
//....
}

这里的lock是ReentrantReadWriteLock非公平锁,优点在于允许多个线程同时访问,但不允许写线程和读线程、写线程和写线程同时访问。相当于排它锁,提高了并发性。

这样整个NameSrv的启动就完成了。