7.消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长时间的任务并且只完成了部分突然就挂掉了,会发生什么情况?

RabbitMQ一旦向消费者传递了一条消息,便立即将该消息标记为删除。这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,也无法接收到。

为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制

消息应答机制

消费者在接收到消息并且处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq可以把该消息删除了。

1.自动应答

并不完善。需要一个良好的环境,不发生极端的情况。使用较少。容易产生消息都是。

2.手动应答

Channel.basicAck用于肯定确认,RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃。

Channel.basicNack用于否定确认。

Channel.basicReject用于否定确认,不处理该消息了直接拒绝,可以将其丢弃。比basicNack方法少了一个参数。

multiple 手动应答的好处在于可以批量应答且减少网络拥堵。

basicAck方法第二个参数,multiple为true表示批量。

批量应答,会将信道中的消息都进行应答。如果不是批量应答,只会应答信道中当前这条消息。

建议不批量应答,以免造成消息的丢失。

消息自动重新入队

如果消费者由于某些原因失去连接(通道已关闭,连接已关闭,TCP连接丢失),导致消息未发送ack确认,RabbitMQ发现消息未完全处理,并将其重新排队。如果此时其他消费者可以处理,它将很快将其重新分配给其他消费者。这样即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

如果防止消息丢失?

rabbitmq中有消息应答机制,不要采用自动应答,需要一个良好的条件以及不能发生极端的情况,不建议使用。而是应该采用手动应答,手动应答又分为批量应答和非批量应答。批量应答会将信道里的消息都进行应答,不建议使用,以免造成消息丢失。非批量应答只会应答当前这条消息。

如果消息真的发生了丢失怎么办?

应该将消息自动重新入队

 代码

package com.xkj.org.mq.ack;

import com.rabbitmq.client.Channel;
import com.xkj.org.utils.RabbitMQUtil;

import java.io.IOException;
import java.util.Scanner;

public class Task01 {

    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMQUtil.getChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //发送消息
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        }
    }
}
package com.xkj.org.mq.ack;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xkj.org.utils.RabbitMQUtil;

import java.io.IOException;

public class Worker01 {

    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMQUtil.getChannel();
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("接受到消息:"+ new String(message.getBody(), "UTF-8"));
            //第一个参数,消息标记tag
            //第二个参数,false非批量应答
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("work1 消息消费被中断");
        };
        System.out.println("worker1等待1s接收消息.......");
        //设置手动应答
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
    }

}
package com.xkj.org.mq.ack;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xkj.org.utils.RabbitMQUtil;

import java.io.IOException;

public class Worker02 {

    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMQUtil.getChannel();
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("接受到消息:"+ new String(message.getBody(), "UTF-8"));
            //第一个参数,消息标记tag
            //第二个参数,false非批量应答
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("work2 消息消费被中断");
        };
        System.out.println("worker2等待10s接收消息.......");
        //设置手动应答
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
    }

}

相关推荐

  1. 7、无消息丢失配置怎么实现?

    2024-07-22 20:02:02       45 阅读
  2. Git(7)之提交消息模板

    2024-07-22 20:02:02       47 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-22 20:02:02       52 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-22 20:02:02       54 阅读
  3. 在Django里面运行非项目文件

    2024-07-22 20:02:02       45 阅读
  4. Python语言-面向对象

    2024-07-22 20:02:02       55 阅读

热门阅读

  1. MLlib机器学习入门:用Spark打造预测模型

    2024-07-22 20:02:02       18 阅读
  2. python绘制函数调用图总结

    2024-07-22 20:02:02       16 阅读
  3. PHP 表单验证:邮件和URL

    2024-07-22 20:02:02       17 阅读
  4. org.apache.ibatis.session是什么?

    2024-07-22 20:02:02       13 阅读
  5. C语言 指针方法 有一个班4个学生,5门课程

    2024-07-22 20:02:02       12 阅读
  6. C++中的explicit关键字

    2024-07-22 20:02:02       13 阅读
  7. 数组常见的实例方法

    2024-07-22 20:02:02       14 阅读
  8. 精通Gradle发布配置:打造自动化部署的高速公路

    2024-07-22 20:02:02       16 阅读
  9. 力扣283.移动零

    2024-07-22 20:02:02       15 阅读
  10. SAP 如何修改统驭科目类型

    2024-07-22 20:02:02       15 阅读
  11. 部署Mojo模型:生产环境中的智能自动化

    2024-07-22 20:02:02       15 阅读