RocketMQ-安装部署教程

RocketMQ-安装部署教程

前期准备

环境准备

Java8安装

Linux系统

安装包

rocketmq-4.2.0.zip

安装

解压缩zip安装包到指定文件夹

1
unzip rocketmq-all-4.2.0-bin-release.zip -d rocketmq-4,2,0

部署

参考mq官方quickstart文档 :http://rocketmq.apache.org/docs/quick-start/

启动Name Server
1
2
3
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

注:启动过程中可能报错显示内存不足,报错信息如下:

1
2
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 33554432 bytes for committing reserved memory.

需修改bin文件夹下的runserver.sh文件,改成

1
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=320m"

xms:初始化内存 xmx:最大内存 xmn:新生代内存 permsize:永久区 maxPermSize:最大永久区

一般建议是按官方推荐的内存设置,否则可能发挥不出mq的性能。

同理,修改runbroker.sh文件,改成

1
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m

再同理,修改tools.sh文件,改成

1
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m"
启动Broker
1
2
3
> nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...

注:官网上启动Broker用的命令就是nohup sh bin/mqbroker -n localhost:9876 & ,这样启动后在后来运行Producer发送的时候会出现

1
2
3
4
com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, TopicTest1
See http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist for further details.
at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:542)
...

autoCreateTopicEnable在broker配置文件里面默认是false,如果在配置文件里面改成true,就可以直接用nohup sh bin/mqbroker -n localhost:9876 &启动了。

发送&接收信息

在发送/接收消息之前,我们需要告诉客户端服务器的ip。RocketMQ提供了多种实现此目的的方法。为简单起见,我们使用环境变量namesrvaddr

1
2
3
4
5
6
> export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

看见日志飞速刷,就是启动成功了。

关闭服务器
1
2
3
4
5
6
7
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK

本地测试类

ProducerTest——生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class ProducerTest {
private static DefaultMQProducer producer = null;

public static void main(String[] args) {
System.out.print("[----------]Start");
boolean result = false;
try {
ProducerStart();
System.out.println("producer 启动成功");
SendMessage("qch_20170706", "hello rocketmq!");
}finally {
producer.shutdown();
}
System.out.print("[----------]Succeed");
}

private static boolean ProducerStart() {
producer = new DefaultMQProducer("pro_qch_test");
producer.setNamesrvAddr("119.23.211.22:9876");
producer.setInstanceName(UUID.randomUUID().toString());
producer.setVipChannelEnabled(false);
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
return false;
}
return true;
}

private static boolean SendMessage(String topic, String str) {
Message msg = new Message(topic, str.getBytes());
try {
producer.send(msg);
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
return false;
}
return true;
}
}

ConsumerTest——消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ConsumerTest {
public static void main(String[] args) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("con_qch_test");
consumer.setInstanceName(UUID.randomUUID().toString());
consumer.setConsumeMessageBatchMaxSize(32);
consumer.setNamesrvAddr("119.23.211.22:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for(MessageExt me : list) {
System.out.print(new String(me.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
try {
consumer.subscribe("qch_20170706", "*");
consumer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}

注:在本地启动的时候,可以会出现这个错误

RocketMQ-安装部署教程

需要在conf/broker.conf文件里面添加brokerIP1=你的服务器IP,然后启动broker的时候用

1
sh bin/mqbroker -n localhost:9876 autoCrenohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true -c ~/software/rocketmq-4.2.0/conf/broker.conf &

看一下broker.log文件

1
2
3
4
5
6
7
8
9
10
11
12
13
2018-05-09 19:43:45 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=CID_ONS-HTTP-PROXY, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]
2018-05-09 19:43:45 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=FILTERSRV_CONSUMER, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]
2018-05-09 19:43:45 INFO main - load exist subscription group, SubscriptionGroupConfig [groupName=CID_ONSAPI_PULL, consumeEnable=true, consumeFromMinEnable=true, consumeBroadcastEnable=true, retryQueueNums=1, retryMaxTimes=16, brokerId=0, whichBrokerWhenConsumeSlowly=1, notifyConsumerIdsChangedEnable=true]
2018-05-09 19:43:45 INFO main - load /root/store/config/subscriptionGroup.json OK
2018-05-09 19:43:45 INFO main - load /root/store/config/consumerFilter.json OK
2018-05-09 19:43:45 INFO main - load /root/store/config/delayOffset.json OK
2018-05-09 19:43:46 INFO main - Set user specified name server address: localhost:9876
2018-05-09 19:43:46 INFO PullRequestHoldService - PullRequestHoldService service started
2018-05-09 19:43:47 INFO main - register broker to name server localhost:9876 OK
2018-05-09 19:43:47 INFO main - The broker[broker-a, 119.23.211.22:10911] boot success. serializeType=JSON and name server is localhost:9876
2018-05-09 19:43:56 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2018-05-09 19:43:56 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 974140 bytes
2018-05-09 19:43:57 INFO BrokerControllerScheduledThread1 - register broker to name server localhost:9876 OK

再看一下nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true & 命令启动的broker的log日志

1
2
3
4
5
2018-05-09 19:39:02 INFO main - register broker to name server localhost:9876 OK
2018-05-09 19:39:02 INFO main - The broker[izwz9f1jdvkslcnjfonvbiz, 172.18.213.113:10911] boot success. serializeType=JSON and name server is localhost:9876
2018-05-09 19:39:12 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2018-05-09 19:39:12 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 974140 bytes
2018-05-09 19:39:12 INFO BrokerControllerScheduledThread1 - register broker to name server localhost:9876 OK

貌似多加载了config文件夹下的几个配置。

真是个深渊巨坑。。。。。。。。。。

END