RocketMQ源码级实现原理-消息发送流程与高可用设计

如何学习一个中间件?

1. 通读该中间件的官方文档,从全局了解这个中间件有哪些主要功能

2. 把源码下载下来,从不同模块的包下的test测试案例开始

3. 先主线、然后再细到各个小分支去

消息发送者启动流程

MQClientInstance

1. 一般,一台物理机上只会有一个MQClientInstance实例,一个MQClientInstance实例中保存了不同的producerGroup的producer,也保存了不同consumerGroup的consumer

多个不同的producerGroup的producer,都会调用producer.start(),producer.start()内部都是使用的同一个MQClientInstance实例,也就是调用的同一个一个MQClientInstance.start()方法,此时就有线程安全问题。所以,此时就用了synchronized(this)保证只启动一次

2. 236行启动了很多的定时任务

有定时任务,是producer/consumer每30s去nameserver中拉取一次topic路由信息

有定时任务,consumer每30s去把consumer自己内存中保存的消费进度去做持久化

3. 启动了消息拉取的线程PullMessageService

可以看到,一个MQClientInstance实例内部只会启动一个PullMessageService线程

消息发送流程

每次send()发送消息的过程中,都会先去本地缓存中找有没有topic路由信息,如果有则直接选择topic下的一个queue发送消息。如果本地缓存中没有找到topic路由信息,则发送远程请求去nameserver中查找当前topic对应的路由信息,查找到了则直接选择topic下的一个queue发送消息。如果远程还没有查找到当前topic对应的路由信息,则看有没有开启auto_create_topic,但是生产上我们一般都是关闭的。

消息发送的重试机制

实际就是写了一个for循环,发送失败一次,则int++,进行第二次重试发送消息

RocketMQ的一个好的实践:命令细化,每个功能都有一个唯一的命令与之对应

如何让MASTER和SLAVE在同一套代码下,有不同的行为

Broker端

当消息通过netty发送网络请求到broker端后,会走到DefaultMessageStorage#putMessage()进行消息刷盘操作

RocketMQ底层使用的netty作为通信框架,来发送网络请求

Dubbo是一个RPC框架,底层的通信框架也还是netty,所谓的RPC框架只是一种调用方式,可以让使用者能够像调用本地方法一样的来调用远程方法

消息发送高可用机制

一、消息重试

相关推荐

  1. RockerMQ发送消息流程

    2024-07-16 03:34:01       59 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-16 03:34:01       66 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-16 03:34:01       70 阅读
  3. 在Django里面运行非项目文件

    2024-07-16 03:34:01       57 阅读
  4. Python语言-面向对象

    2024-07-16 03:34:01       68 阅读

热门阅读

  1. 双缓存机制

    2024-07-16 03:34:01       16 阅读
  2. CNN -1 神经网络-概述

    2024-07-16 03:34:01       18 阅读
  3. 输入两个整数,输出最大公约数与最小公倍数。

    2024-07-16 03:34:01       17 阅读
  4. linux压缩/解压缩命令

    2024-07-16 03:34:01       18 阅读
  5. python pandas处理股票量化数据:笔记4

    2024-07-16 03:34:01       17 阅读
  6. Vue3中的ref函数

    2024-07-16 03:34:01       19 阅读
  7. qt 获取父控件

    2024-07-16 03:34:01       19 阅读