安装
参考:https://hub.docker.com/r/foxiswho/rocketmq
- 基础环境
基于虚拟机启动Ubuntu18,用Docker跑RoketMQ。
192.168.204.186是我虚拟机的IP。
- 创建文件夹
mkdir logs
mkdir conf
mkdir store
chmod -R 777 logs
chmod -R 777 conf
chmod -R 777 store
.
├── conf
│ ├── broker.conf
├── logs
└── store
vim ./conf/broker.conf
:在conf
下创建broker.conf
文件
注意修改
brokerIP1
值
#所属集群名字
brokerClusterName=DefaultCluster
#broker名字,注意此处不同的配置文件填写的不一样,如果在broker-a.properties使用:broker-a,
#在broker-b.properties使用:broker-b
brokerName=broker-a
#0 表示Master,>0 表示Slave
brokerId=0
#nameServer地址,分号分割
#namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
namesrvAddr=rmqnamesrv:9876
# 服务器IP
brokerIP1=192.168.204.186
#启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
#brokerIP1=192.168.0.253
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 !!!这里仔细看是false,false,false
#原因下篇博客见~ 哈哈哈哈
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨4点
deleteWhen=04
#文件保留时间,默认48小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
#storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
#commitLog 存储路径
#storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
#消费队列存储
#storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
#消息索引存储路径
#storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
#checkpoint 文件存储路径
#storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
#abort 文件存储路径
#abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
- 网络
docker network create --driver bridge rmq-network
- server
docker run -d -v $(pwd)/logs:/home/rocketmq/logs \
--name rmqnamesrv --network=rmq-network\
-e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
-p 9876:9876 \
foxiswho/rocketmq:4.8.0 \
sh mqnamesrv
- broker
docker run -d \
-v $(pwd)/logs:/home/rocketmq/logs \
-v $(pwd)/store:/home/rocketmq/store \
-v $(pwd)/conf:/home/rocketmq/conf \
--name rmqbroker --network=rmq-network\
-e "NAMESRV_ADDR=rmqnamesrv:9876" \
-e "JAVA_OPT_EXT=-Xms512M -Xmx512M -Xmn128m" \
-p 10911:10911 -p 10912:10912 -p 10909:10909 \
foxiswho/rocketmq:4.8.0 \
sh mqbroker -c /home/rocketmq/conf/broker.conf
- console
docker run -d --name rmqconsole --network=rmq-network \
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" \
-p 8180:8080 -t styletang/rocketmq-console-ng:1.0.0
整合SpringBoot
- 依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
</exclusion>
</exclusions>
</dependency>
- 配置
#RocketMQ
rocketmq.name-server=192.168.204.186:9876
rocketmq.producer.group=order-group
rocketmq.producer.sendMsgTimeout=300000
- 测试
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {SeckillOrderStarter.class})//指向启动类
public class RocketMQTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@org.junit.Test
public void testLoad() {
Message<String> msg = MessageBuilder.withPayload("hello rocketmq").build();
rocketMQTemplate.send("broker-a", msg);
}
}