kafka如何保证消息不丢失 不重复消费 消息的顺序

如何保证消息的不丢失

消息为什么会丢失

在这里插入图片描述
想要保证消息不丢失就要首先知道消息为什么会丢失,在哪个环节会丢失,然后在丢失的环节做处理

1.生产者生产消息发送到broker,broker收到消息后会给生产者发送一个ack指令.生产者接收到broker发送成功的指令,这个时候我们就可以认为消息发送成功了.没有接收到ack指令我们就认为消息发送失败.

 public <T,Throwable> void sendEventByKafka(String topic, String content ,T t, KafkaSendErrorCallback<T, java.lang.Throwable> function) {
   
        kafkaTemplate.send(topic, content).addCallback(success -> {
   
            log.info("执行kafka消息发送kafka成功!");
            log.info(content);
        }, failure -> {
   
            log.error("执行kafka消息发送kafka失败!");
            //失败的消息保存到消息表
            function.saveMqDb(t,failure);
        });
    }

上述的逻辑有个前提条件就是,确定broker确实是接受并保存了消息.需要设置ack的级别
acks=0(不等待确认):
在这种模式下,生产者发送消息后不会等待来自Broker的任何确认。它会立即继续发送下一条消息。
这是最低延迟的选项,但也是最不可靠的,因为生产者无法知道消息是否已经成功到达Broker。
acks=1(Leader确认):
在这种模式下,生产者发送消息后会等待Broker的领导者(Leader)确认。领导者会确认消息已经被接收,但不一定已经被完全复制到所有的副本。
这种模式提供了一定程度的可靠性,因为生产者知道消息至少已经被领导者接收,但仍然可能丢失消息,因为它们可能还没有被复制到其他副本。
acks=all(全部确认):
这是最可靠的确认模式,在这种模式下,生产者发送消息后会等待所有的ISR(In-Sync Replicas,同步副本)确认。ISR是分区的所有副本中与领导者保持同步的副本集合。
在这种模式下,消息只有在被领导者和所有同步副本都确认接收后才被视为已提交。这确保了消息的可靠性。

如何保证不重复消费

2.不重复消费,在处理业务时,用唯一建来处理,如果没有唯一建,可以借助消息表来做,处理完了之后给这条消息打个已处理的标记.

.消费者接受消息处理业务给broker发送ack,broker认为消息消费成功,删除这条消息

    @KafkaListener(id = KafkaConstants.MESSAGE_GROUP, topics = KafkaConstants.MESSAGE_TOPIC, concurrency = "5")
    public void listen(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
   
       try{
   
           //处理业务数据
       }catch (Exception e){
   
           //消费失败后的处理,保存到消息表
       }finally {
   
           //ack确认
           acknowledgment.acknowledge();
       }

    }

如何保证消息的顺序

为什么顺序会乱.kafka在生产者生产消息的时候使我们代码控制的,可以保证顺序,比如付款成功后我先发送一个修改订单状态的消息,再发送一个扣减库存的消息,再发送一个物流通知的消息 一个topic 一个partion 代码写入的顺序就是消息的顺序.如果只有一个消费者监听一个partion也是可以保证顺序的.但是多个消费者监听同一个partion消费者2执行完成 消费者1.3还没有执行.这样顺序就乱了.
一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。
写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。(我们就是这么干的,相同的key的数据在一个队列里面,然后使用多线程开worker按照key不同进行分别消费)

相关推荐

  1. Kafka怎么保证消息发送丢失

    2023-12-08 15:34:03       46 阅读
  2. RabbitMQ如何保证消息丢失

    2023-12-08 15:34:03       51 阅读
  3. RabbitMQ 如何保证消息丢失

    2023-12-08 15:34:03       35 阅读
  4. 如何保证消息队列丢失消息(以kafka为例)

    2023-12-08 15:34:03       42 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-08 15:34:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-08 15:34:03       100 阅读
  3. 在Django里面运行非项目文件

    2023-12-08 15:34:03       82 阅读
  4. Python语言-面向对象

    2023-12-08 15:34:03       91 阅读

热门阅读

  1. 学习redis(待完善)

    2023-12-08 15:34:03       53 阅读
  2. 基于MATLAB车辆防碰撞系统仿真

    2023-12-08 15:34:03       48 阅读
  3. 【力扣100】4.移动零

    2023-12-08 15:34:03       60 阅读
  4. ChatGPT的进化史

    2023-12-08 15:34:03       63 阅读
  5. openmmlab加载自训练权重

    2023-12-08 15:34:03       60 阅读
  6. 利用 Python 进行数据分析实验(六)

    2023-12-08 15:34:03       51 阅读
  7. vs2022专业版永久密钥

    2023-12-08 15:34:03       60 阅读
  8. C++提高编程—01.函数模板

    2023-12-08 15:34:03       49 阅读
  9. 一篇文章了解JDK的前世今生

    2023-12-08 15:34:03       54 阅读
  10. 面试题目总结(三)

    2023-12-08 15:34:03       54 阅读
  11. 微信小程序跳转到外部小程序

    2023-12-08 15:34:03       59 阅读