RabbitMQ线程和连接模型详解

1. 线程、信道、连接、请求的概念

客户端(生产者)和服务端(服务端)之间建立连接。例如TCP连接,是一个长连接,也是较为稳定的连接,开销也较大。一般而言主客户端之间需要一个连接。但服务器需要连接多个客户端。

客户端的消息(请求)需要进过经过信道到达服务端。信道是一种逻辑上的连接通道,多个信道复用了同一个连接。(RabbitMQ默认的最大通道数2047)

2. RabbitMQ的线程模型

rabbitmq.client源码:

通过断点查找发现原来是 ConsumerWorkService MQ工作线程池这个类控制的,工作线程池和心跳线程会在消费者(服务端)启动时初始化。

这个类构造函数里有一个executor参数,当这个参数为空时,就会创建一个Executors.newFixedThreadPool,代码如下:

amqp-client的package com.rabbitmq.client.impl下有类

final public class ConsumerWorkService {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerWorkService.class);
    private static final int MAX_RUNNABLE_BLOCK_SIZE = 256;
    private static final int DEFAULT_NUM_THREADS = Math.max(1, Utils.availableProcessors());
    private final ExecutorService executor;
    private final boolean privateExecutor;
    private final WorkPool<Channel, Runnable> workPool;
    private final int shutdownTimeout;

    public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int queueingTimeout, int shutdownTimeout) {
        this.privateExecutor = (executor == null);
        if (executor == null) {
            LOGGER.debug("Creating executor service with {} thread(s) for consumer work service", DEFAULT_NUM_THREADS);
            this.executor = Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory);
        } else {
            this.executor = executor;
        }
        this.workPool = new WorkPool<>(queueingTimeout);
        this.shutdownTimeout = shutdownTimeout;
    }

这个类定义RabbitMQ服务端(消费者)的线程模型,底层也是一个ExecutorService的线程池模型,默认的线程数量是可用的CPU核数(处理器数)。

3. 浅浅的看看连接模型

从一般性的客户端的rabbitmq的使用出发

ConnectionFactory factory = new ConnectionFactory();
factory.setHost(ip);
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String message = "RabbitMQ Demo Test:" + System.currentTimeMillis();
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.close();
connection.close();

建立连接时一般会用到ConnectionFactory的newConnection()

public Connection newConnection(Address[] addrs) throws IOException, TimeoutException {
    return newConnection(this.sharedExecutor, Arrays.asList(addrs), null);
}

连接会传入一个共享的线程池,broker服务端和客户端的连接共享这个连接池,这个属性表示内部使用共享的唯一一个ExecutorService
设置这个属性就可以一直传到ConsumerWorkService中。

Set the executor to use for consumer operation dispatch by default for newly created connections. All connections that use this executor share it. It’s developer’s responsibility to shut down the executor when it is no longer needed.

默认情况下,为新创建的连接设置用于消费者操作调度的线程池。所有使用此线程池的连接都共享它。当不再需要线程池时,关闭它是开发人员的责任。

通过设置shareExecutorService,无论多少个channel,都可以统一控制线程数量、队列数量,根据实际情况进行配置。

也可以传入其他的executor。

4. RabbitMQ连接的具体过程

rabbitmq采用的amqp协议,是一个高级的应用层协议。

  1. 将 AMQP 0-9-1 的连接头写入底层套接字,包含指定的版本信息(客户端告诉 broker 自己使用的协议及版本,底层使用 java 自带的 socket)。

  2. 客户端等待 broker 发送的 Connection.Start (broker 告诉客户端 通信的协议和版本、SASL认证机制(详细见)、语言环境以及RabbitMQ的版本信息和支持能力)。

  3. 客户端接收后 发送 Connection.StartOk (客户端告诉 broker 连接使用的帐号和密码、认证机制、语言环境、客户的信息以及能力)。

  4. 客户端等待 broker 发送的 Connection.Tune (broker 与 客户端 进行参数协商)。

  5. 客户端接收后 发送 Connection.TuneOk (客户端 参数 [ChannelMax、FrameMax、Heartbeat] 协商完成后告诉 broker)。

  6. 客户端发送 Connection.Open (客户端 告诉 broker 打开一个连接,并请求设置_virtualHost [vhost])。

  7. broker 接收到后返回 Connection.OpenOk (client 对 vhost 进行验证,成功则返回如下此信息)。

  8. 客户端发送 Channel.Open,broker 接收到后返回 Channel.OpenOk (客户端 创建通道;broker 收到并创建通道完成)。

  9. 客户端发送 Confirm.Select,broker 接收到后返回 Confirm.SelectOk(客户端告诉 broker 消息需要使用 confirm的机制,broker收到并回复)。。

  10. 客户端发送消息 Basic.Publish,broker 应答返回 Basic.Ack。

  11. 期间 客户端和 broker 会相互检查彼此的心跳 heartbeat。

  12. 客户端 关闭通道 Channel.Close,broker 应答返回 Channel.CloseOk。

  13. 客户端 关闭连接 Connection.Close,broker 应答返回 Connection.CloseOk。

rabbitmq具体源码细节和spring中的是类似的,可以参考我的另一篇文章,分析地很详细。敲详细的springframework-amqp-rabbit源码解析

不同在于消息底层是用帧来包装,有分心跳帧和携带方法或信息的帧。Consumer也是带有queue的。

客户端单个connection对应的单个channel实际上是单线程的,每次收到Socket消息都触发处理逻辑,从任务队列里面取出一定的任务进行依次处理,如果一个channel订阅了多个topic的话也是单线程依次处理的。

相关推荐

  1. RabbitMQ线连接模型详解

    2024-07-20 17:06:04       23 阅读
  2. Uinx线详解

    2024-07-20 17:06:04       31 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-20 17:06:04       52 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-20 17:06:04       54 阅读
  3. 在Django里面运行非项目文件

    2024-07-20 17:06:04       45 阅读
  4. Python语言-面向对象

    2024-07-20 17:06:04       55 阅读

热门阅读

  1. 探索现代Web开发:WebKit的剪贴板API革新

    2024-07-20 17:06:04       26 阅读
  2. Node.js 路由

    2024-07-20 17:06:04       18 阅读
  3. JDK版本详解

    2024-07-20 17:06:04       18 阅读
  4. Zookeeper是什么,为什么要用,怎么用?

    2024-07-20 17:06:04       23 阅读
  5. 【c++】用c++类做一个猜数字游戏

    2024-07-20 17:06:04       18 阅读
  6. execjs._exceptions.ProgramError: SyntaxError: 语法错误

    2024-07-20 17:06:04       18 阅读
  7. MySQL自增主键出现不连续的原因?

    2024-07-20 17:06:04       19 阅读