如何用Redis实现消息队列

lpush+brpop

#include <cpp_redis/cpp_redis>
#include <iostream>
#include <thread>

void producer(cpp_redis::client& client) {
    while (true) {
        std::string message;
        std::cout << "Enter message: ";
        std::getline(std::cin, message);
        client.lpush("message_queue", { message }, [](cpp_redis::reply& reply) {});
    }
}

void consumer(cpp_redis::client& client) {
    while (true) {
        client.brpop({ "message_queue" }, [](cpp_redis::reply& reply) {
            if (reply.is_array() && reply.as_array().size() == 2) {
                std::string message = reply.as_array()[1].as_string();
                std::cout << "Received message: " << message << std::endl;
            }
        });
    }
}

int main() {
    cpp_redis::client client;
    client.connect("127.0.0.1", 6379, [](const std::string& host, std::size_t port, cpp_redis::connect_state status) {
        if (status == cpp_redis::connect_state::ok) {
            std::cout << "Connected to Redis server" << std::endl;
        } else {
            std::cerr << "Failed to connect to Redis server" << std::endl;
        }
    });

    std::thread producer_thread([&] { producer(client); });
    std::thread consumer_thread([&] { consumer(client); });

    producer_thread.join();
    consumer_thread.join();

    return 0;
}

缺点 

1.brpop是一个阻塞操作,会阻塞 Redis 的事件循环。如果队列中没有消息,消费者线程将一直阻塞在这里,浪费了资源。尤其是在高并发的情况下,大量的阻塞请求可能会导致性能下降。

2.brpop函数每次只能由一个消费者调用,这意味着无法实现多个消费者同时从队列中获取消息。如果需要实现多个消费者同时消费消息,需要通过其他方式实现。

3.不支持重复消费。

发布/订阅模型

#include <iostream>
#include <cpp_redis/cpp_redis>
#include <thread>

void subscriber() {
    cpp_redis::subscriber sub;

    // 订阅频道
    sub.connect();
    sub.subscribe("message_channel", [](const std::string& chan, const std::string& msg) {
        std::cout << "Received message on channel " << chan << ": " << msg << std::endl;
    });

    // 运行监听事件循环
    sub.commit();
    std::this_thread::sleep_for(std::chrono::seconds(5)); // 等待5秒,让程序能接收到一些消息
    sub.disconnect();
}

void publisher() {
    cpp_redis::client client;

    // 发布消息
    client.connect();
    client.publish("message_channel", "Hello, world!");
    client.disconnect();
}

int main() {
    // 创建发布者线程
    std::thread pub_thread(publisher);

    // 创建订阅者线程
    std::thread sub_thread(subscriber);

    // 等待线程结束
    pub_thread.join();
    sub_thread.join();

    return 0;
}

ps:

sub.subscribe()方法用于订阅一个或多个频道,参数为频道名称和一个回调函数,当接收到消息时,将调用该回调函数。

Lambda 表达式作为回调函数,接收两个参数:频道名称和消息内容。

在 Lambda 表达式中,消息的频道名称和内容被打印到控制台上。

缺点

1.发布/订阅功能是一种 fire-and-forget 模式,消息一旦发布就无法确保每个订阅者都能成功接收到。如果某个订阅者在消息发布后断开连接或者未能及时处理消息,那么该消息可能会丢失。

2.Redis 不会对发布的消息进行持久化存储,如果 Redis 服务器重启或者出现故障,未处理的消息可能会丢失。虽然可以通过配置 Redis 实例来启用持久化功能,但这增加了系统的复杂性和成本。

3.不支持重复消费。

4.每个消费者再订阅一个队列的时候,在redis中会给每个消费者分配一块内存消费消息的缓冲区,当由大量的消息过来的时候,消费者消费不及时,造成消息堆积达到一定阈值的时候,redis就会强制让这个消费者下线。

Stream

#include <cpp_redis/cpp_redis>
#include <iostream>

void producer(cpp_redis::client& client) {
    // 发送消息到名为 message_stream 的 Stream 中
    client.xadd("message_stream", "*", {{"key1", "value1"}, {"key2", "value2"}}, [](cpp_redis::reply& reply) {
        std::cout << "Message ID: " << reply.as_string() << std::endl;
    });
    client.sync_commit();
}

void consumer(cpp_redis::client& client) {
    // 从名为 message_stream 的 Stream 中读取消息
    client.xread({{"message_stream", "0"}}, [](cpp_redis::reply& reply) {
        if (reply.is_array()) {
            for (auto& msg : reply.as_array()) {
                std::cout << "Received message: " << msg.as_array()[1].as_string() << std::endl;
            }
        }
    });
    client.sync_commit();
}

int main() {
    cpp_redis::client client;
    client.connect();

    // 生产者发送消息
    producer(client);

    // 消费者消费消息
    consumer(client);

    client.disconnect();
    return 0;
}

ps 

1.支持重复消费。Stream数据结构中每个消息都有消息ID,保证消息的消费需要用到这个消息ID。当一组消费者处理完消息后,需要执行XACK命令告知Redis,这时候Redis会把这条消息标记为“处理完成”。当消费者在消费消息的时候宕机了,这时候消费者不会发送XACK,当消费者重新上线后,会将消息重新发给消费者。

2.它是持久化的,保证了数据不会丢失。

相关推荐

  1. 如何Redis实现消息队列

    2024-04-01 13:20:04       46 阅读
  2. 如何基于Redis实现消息队列

    2024-04-01 13:20:04       31 阅读
  3. springboot redis 实现消息队列

    2024-04-01 13:20:04       42 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-01 13:20:04       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-01 13:20:04       100 阅读
  3. 在Django里面运行非项目文件

    2024-04-01 13:20:04       82 阅读
  4. Python语言-面向对象

    2024-04-01 13:20:04       91 阅读

热门阅读

  1. Codeforces Round 932 (Div. 2)(A,B,C,D)

    2024-04-01 13:20:04       33 阅读
  2. [蓝桥杯 2016 国 C] 赢球票

    2024-04-01 13:20:04       43 阅读
  3. 专升本-大数据

    2024-04-01 13:20:04       41 阅读
  4. 银联扫码接口开通流程及注意事项

    2024-04-01 13:20:04       46 阅读
  5. 【Spring】通过Spring收集自定义注解标识的方法

    2024-04-01 13:20:04       46 阅读
  6. 03-28 周四 Linux 并行工具使用xargs和parallel

    2024-04-01 13:20:04       41 阅读
  7. 装饰器模式:灵活增强功能的利器

    2024-04-01 13:20:04       40 阅读
  8. 手机投屏到电脑

    2024-04-01 13:20:04       37 阅读
  9. Leetcode 2810. 故障键盘

    2024-04-01 13:20:04       37 阅读