RocketMQ-producer启动

RocketMQ-producer启动

启动流程图

RocketMQ-producer启动/producer启动图.png

先上一张producer的启动流程图,上面清晰的标注着不同的模块,后面的源码解析就围绕这几个模块来展开。

源码解析

Producer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 构造producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 设置namesrv的地址,不同的地址用分号隔开
producer.setNamesrvAddr("119.23.211.22:9876;47.98.135.44:9876");
// 启动producer
producer.start();

for (int i = 0; i <= 10; i++) {
try {
Message msg = new Message("TopicTest",// topic
"TagA",// tag
"kkk",// key
("Hello RocketMQ " + i).getBytes()// body
);
// 发送消息并返回地址
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}

producer.shutdown();
}
}

我们本篇博客就是围绕 producer.start();来展开。

DefaultMQProducer

参数设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 生产者相应的topic
private String createTopicKey = MixAll.DEFAULT_TOPIC;
// 默认消息队列的数量
private volatile int defaultTopicQueueNums = 4;
// 发送消息的timeout
private int sendMsgTimeout = 3000;
// 消息超过此容量则需压缩
private int compressMsgBodyOverHowmuch = 1024 * 4;
// 消息发送失败后重发次数的上限
private int retryTimesWhenSendFailed = 2;
// 异步消息发送失败后重发次数的上限
private int retryTimesWhenSendAsyncFailed = 2;
// 发送一个broker失败后是否选择换一个broker发送
private boolean retryAnotherBrokerWhenNotStoreOK = false;
// 最大消息大小,4M
private int maxMessageSize = 1024 * 1024 * 4; // 4M
ClientConfig —— DefaultMQProducer的父类
底层通信参数配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 服务的服务器地址
private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
// 客户端IP
private String clientIP = RemotingUtil.getLocalAddress();
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
// 从命名的服务器中拉出主题信息的频率
private int pollNameServerInterval = 1000 * 30;
// 消息代理的心跳间隔
private int heartbeatBrokerInterval = 1000 * 30;
// 消费者偏移持续时间间隔
private int persistConsumerOffsetInterval = 1000 * 5;
private boolean unitMode = false;
private String unitName;
// 设置是否选择VIP消息队列(高优先级)
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
private boolean useTLS = TlsSystemConfig.tlsEnable;
1:start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:// 服务只是被创建,为启动完成
// 先假设会启动失败,修改服务状态,保证不会重复配置,以免客户端同一个进程中重复启动(只有状态为CREATE_JUST时才启动该Producer;其他状态均不执行启动过程)
this.serviceState = ServiceState.START_FAILED;
// 不能为空、长度不能大于255、不能等于"DEFAULT_PRODUCER";
this.checkConfig();

if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
// 构建该Producer的ClientID,等于IP地址@instanceName
this.defaultMQProducer.changeInstanceNameToPID();
}
// 获取客户端实例
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 以主题名"TBW102"为key值,新初始化的TopicPublishInfo对象为value值存入DefaultMQProducerImpl.topicPublishInfoTable变量中;
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

if (startFactory) {
mQClientFactory.start();
}

log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
1.1MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook)
1
2
3
4
5
6
7
private static MQClientManager instance = new MQClientManager();
private AtomicInteger factoryIndexGenerator = new AtomicInteger();
private ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>();
public static MQClientManager getInstance() {
return instance;
}

此时的instance是空的

instance

getAndCreateMQClientInstance()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
// 构造clientIp,格式为ip@instanceName@unitName
String clientId = clientConfig.buildMQClientId();
// 用clientId在map中获取MQClientInstance
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
// 创建一个新的MQClientInstance
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
// 将新创建的clientID和MQClientInstance存入map
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}

return instance;
}

从获取MQClientInstance的过程,我们可以看出一个ip客户端下面的应用,只有在启动多个进程下才会创建多个MQClientInstance对象,否则只会有一个。

new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);——初始化MQClientInstance对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public MQClientInstance(ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) {
this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex;
this.nettyClientConfig = new NettyClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
// 初始化ClientRemotingProcessor对象,处理接受的事件请求
this.clientRemotingProcessor = new ClientRemotingProcessor(this);
// 初始化MQClientAPIImpl对象,在初始化过程中,完成remotingClient完成了netty远程客户端的创建,用以管理具体的网络访问
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig);

if (this.clientConfig.getNamesrvAddr() != null) {
//若ClientConfig.namesrvAddr不为空,则拆分成数组存入MQClientAPIImpl.remotingClient变量中;
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
log.info("user specified name server address: {}", this.clientConfig.getNamesrvAddr());
}

this.clientId = clientId;

this.mQAdminImpl = new MQAdminImpl(this);
// 初始化PullMessageService服务线程,供DefaultMQPushConsumer端使用
this.pullMessageService = new PullMessageService(this);
// 初始化RebalanceService服务线程,供Consumser端使用
this.rebalanceService = new RebalanceService(this);
// 初始化producerGroup等于"CLIENT_INNER_PRODUCER"的DefaultMQProducer对象;
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
this.defaultMQProducer.resetClientConfig(clientConfig);
// 初始化ConsumerStatsManager服务线程
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);

log.info("created a new client Instance, FactoryIndex: {} ClinetID: {} {} {}, serializeType={}",
this.instanceIndex,
this.clientId,
this.clientConfig,
MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION), RemotingCommand.getSerializeTypeConfigInThisServer());
}
1.2registerProducer()
1
2
3
4
5
6
7
8
9
10
11
12
13
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}

MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}

return true;
}

registerProducer()就是将组名key,以及客户端实例value作为键值对,存入MQ客户端实例下的map中,即注册相应生产者组名及生产者。

1.4.MQClientInstance.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void start() throws MQClientException {

synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// 1.如果在producer中未设置nameSrvAddr,则去查找
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 2.启动client端远程通信,并获取当前客户端ip
this.mQClientAPIImpl.start();
// 3.启动各种定时任务
this.startScheduledTask();
// 4.启动拉取服务消息
this.pullMessageService.start();
// 5.启动消费端负载均衡服务
this.rebalanceService.start();
// 6.启动推服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
1.4.1MQClientAPIImpl.start()
1
2
3
4
5
6
7
8
9
10
11
public void start() {
this.remotingClient.start();
//Get app info
try {
String localAddress = RemotingUtil.getLocalAddress();
projectGroupPrefix = this.getProjectGroupByIp(localAddress, 3000);
log.info("The client[{}] in project group: {}", localAddress, projectGroupPrefix);
}
catch (Exception e) {
}
}
1.4.1.1NettyRemotingClient.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(//
nettyClientConfig.getClientWorkerThreads(), //
new ThreadFactory() {

private AtomicInteger threadIndex = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
}
});

Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
//
.option(ChannelOption.TCP_NODELAY, true)
//
.option(ChannelOption.SO_KEEPALIVE, false)
//
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
//
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
//
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(//
defaultEventExecutorGroup, //
new NettyEncoder(), //
new NettyDecoder(), //
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),//
new NettyConnetManageHandler(), //
new NettyClientHandler());
}
});

this.timer.scheduleAtFixedRate(new TimerTask() {

@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
}
catch (Exception e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);

if (this.channelEventListener != null) {
this.nettyEventExecuter.start();
}
}

可以看到在NettyRemotingClient类中,调用的netty的语法,创建了一个连接。

1.4.2startScheduledTask()——启动定时任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
private void startScheduledTask() {
// 如果当前客户端没有指定setNamesrvAddr,启动查找NamesrvAddr地址服务,每两分钟一次.
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
// 启动从nameService更新topic路由信息服务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS);
// 启动清理已经不存在的broker服务 和 启动发送心跳服务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
// 启动consumer offset持久化
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
// 启动调整消费线程池
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
1.5MQClientInstance.sendHeartbeatToAllBrokerWithLock()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
// 向所有在MQClientInstance.brokerAddrTable列表中的Broker发送心跳消息
this.sendHeartbeatToAllBroker();
// 向Filter过滤服务器发送REGISTER_MESSAGE_FILTER_CLASS请求码,更新过滤服务器中的Filterclass文件
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed.");
}
}

Producer定时发送心跳将producer信息(其实就是procduer的group)定时发送到, brokerAddrTable集合中列出的broker上去,Producer发送消息只发送到master的broker机器,在通过broker的主从复制机制拷贝到broker的slave上去。

这样整个producer的启动过程就大致的解析完毕了。