RocketMQ-producer启动
启动流程图
先上一张producer的启动流程图,上面清晰的标注着不同的模块,后面的源码解析就围绕这几个模块来展开。
源码解析
Producer.java
1 | public class Producer { |
我们本篇博客就是围绕 producer.start();来展开。
DefaultMQProducer
参数设置
1 | // 生产者相应的topic |
ClientConfig —— DefaultMQProducer的父类
底层通信参数配置
1 | // 服务的服务器地址 |
1:start()
1 | public void start(final boolean startFactory) throws MQClientException { |
1.1MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook)
1 | private static MQClientManager instance = new MQClientManager(); |
此时的instance是空的
getAndCreateMQClientInstance()
1 | public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) { |
从获取MQClientInstance的过程,我们可以看出一个ip客户端下面的应用,只有在启动多个进程下才会创建多个MQClientInstance对象,否则只会有一个。
new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);——初始化MQClientInstance对象
1 | public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) { |
1.2registerProducer()
1 | public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) { |
registerProducer()就是将组名key,以及客户端实例value作为键值对,存入MQ客户端实例下的map中,即注册相应生产者组名及生产者。
1.4.MQClientInstance.start()
1 | public void start() throws MQClientException { |
1.4.1MQClientAPIImpl.start()
1 | public void start() { |
1.4.1.1NettyRemotingClient.start()
1 | public void start() { |
可以看到在NettyRemotingClient类中,调用的netty的语法,创建了一个连接。
1.4.2startScheduledTask()——启动定时任务
1 | private void startScheduledTask() { |
1.5MQClientInstance.sendHeartbeatToAllBrokerWithLock()
1 | public void sendHeartbeatToAllBrokerWithLock() { |
Producer定时发送心跳将producer信息(其实就是procduer的group)定时发送到, brokerAddrTable集合中列出的broker上去,Producer发送消息只发送到master的broker机器,在通过broker的主从复制机制拷贝到broker的slave上去。
这样整个producer的启动过程就大致的解析完毕了。