RocketMQ的docker安装和SpringBoot的集成

1.Docker安装

1.1创建docker-compose.yml文件

version: '3.5'
services:
  rmqnamesrv:
    image: foxiswho/rocketmq:server
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    networks:
      rmq:
        aliases:
          - rmqnamesrv
  rmqbroker:
    image: foxiswho/rocketmq:broker
    container_name: rmqbroker
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - ./broker.conf:/etc/rocketmq/broker.conf
    environment:
      NAMESRV_ADDR: "rmqnamesrv:9876"
      JAVA_OPTS: " -Duser.home=/opt"
      JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
    command: mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqbroker
  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
      - 8090:8080
    environment:
      JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqconsole
networks:
  rmq:
    name: rmq
    driver: bridge

1.2.新建broker.conf文件

# 集群名称
brokerClusterName = DefaultCluster
# 节点名称

brokerName = broker-a
# broker id节点ID, 0 表示 master, 其他的正整数表示 slave,不能小于0

brokerId = 0
# 在每天的什么时间删除已经超过文件保留时间的 commit log,默认值04

deleteWhen = 04
# 以小时计算的文件保留时间 默认值72小时

fileReservedTime = 72
# Broker角色

brokerRole = ASYNC_MASTER
# 刷盘方式

flushDiskType = ASYNC_FLUSH
# Broker服务地址,内部使用填内网ip,如果是需要给外部使用填公网ip,自行更改

brokerIP1 = 192.168.11.99

1.3. 执行docker-compose up -d 即可成功启动

1.4.访问控制台

2.SpringBoot集成RockerMQ

2.1.引入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

2.2.配置服务器地址

#Rocketmq配置
rocketmq.name-server=192.168.11.99:9876
# 必须指定生产者组
rocketmq.producer.group=group01
# 消息发送超时时长,默认3s
rocketmq.producer.send-message-timeout=3000
# 同步发送消息失败重试次数,默认2
rocketmq.producer.retry-times-when-send-failed=3
# 异步发送消息失败重试次数,默认2
rocketmq.producer.retry-times-when-send-async-failed=3

2.3创建生产者 

@Service
@Slf4j
public class Producer1 {
    @Autowired
    RocketMQTemplate rocketMQTemplate;
    public void sendMassage(String msg){
        Message<String> message = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.send("topic_01",message);
    }
    //rocketMQ的延迟队列
    public void sendMassage2(String msg){
        Message<String> message = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.syncSend("topic_01",message,3000,4);
        log.debug("生产者发送消息成功:{}"+msg);
    }
    //死信队列
    public void sendMassage3(String msg){
//        Message<String> message = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.convertAndSend("topic_01",msg);
        log.debug("生产者发送消息成功:{}"+msg);
    }
}

2.4创建消费者

@Component
@Slf4j
@RocketMQMessageListener(topic = "topic_01",consumerGroup ="group_205" )
public class Consumer1 implements RocketMQListener<String> , RocketMQPushConsumerLifecycleListener {


    @Override
    public void onMessage(String massage) {
          System.out.println("消费者1接收到消息:"+massage);
          int a =1/0;
          log.debug("消费者1接收消息成功:{}"+massage);

    }
    //死信队列需要重写RocketMQPushConsumerLifecycleListener方法
    @Override
    public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
        //设置最大尝试次数2次
        defaultMQPushConsumer.setMaxReconsumeTimes(2);
        //设置最大抓取次数
        defaultMQPushConsumer.setPullBatchSize(5);
    }

4.延迟消息

RocketMQ支持指定级别的延迟消息,即只能设置预设的几个时间等级的延迟,而不是任意时间延迟。目前RocketMQ社区版并不支持任意时间的精确延迟,RocketMQ在4.x版本只能够支持18种内置的延迟消息(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h),具体实现方式是在发送消息时设置消息的延迟等级。

5.死信队列

代码如上

 

相关推荐

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-11 23:58:04       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-11 23:58:04       106 阅读
  3. 在Django里面运行非项目文件

    2024-04-11 23:58:04       87 阅读
  4. Python语言-面向对象

    2024-04-11 23:58:04       96 阅读

热门阅读

  1. JQuery

    2024-04-11 23:58:04       32 阅读
  2. ubuntu下使用ndk编译libevnet

    2024-04-11 23:58:04       32 阅读
  3. 算法刷题记录 Day37

    2024-04-11 23:58:04       34 阅读
  4. 代码随想录训练营16day:二叉树5

    2024-04-11 23:58:04       33 阅读
  5. 21. 面试指导-高频面试题详解

    2024-04-11 23:58:04       34 阅读
  6. Frp多端口映射

    2024-04-11 23:58:04       31 阅读
  7. day04-MQ

    day04-MQ

    2024-04-11 23:58:04      28 阅读
  8. 二叉树链式存储详解

    2024-04-11 23:58:04       39 阅读
  9. c++手机通讯录管理系统280

    2024-04-11 23:58:04       39 阅读
  10. [树莓派]树莓派Raspbian系统安装tesseract-ocr实现OCR

    2024-04-11 23:58:04       37 阅读