从 01 开始 从 01 开始
首页
  • 📚 计算机基础

    • 计算机简史
    • 数字电路
    • 计算机组成原理
    • 操作系统
    • Linux
    • 计算机网络
    • 数据库
    • 编程工具
    • 装机
  • 🎨 前端

    • Node
  • JavaSE
  • Java 高级
  • JavaEE

    • 构建、依赖管理
    • Ant
    • Maven
    • 日志框架
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • 环境管理和配置管理-科普篇
    • Servlet
  • Spring

    • Spring基础
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC

    • SpringMVC 基础
  • SpringBoot

    • SpringBoot 基础
  • Windows 使用技巧
  • 手机相关技巧
  • 最全面的输入法教程
  • 最全面的浏览器教程
  • Office
  • 图片类工具
  • 效率类工具
  • 最全面的 RSS 教程
  • 码字工具
  • 各大平台
  • 校招
  • 五险一金
  • 职场规划
  • 关于离职
  • 杂谈
  • 自媒体
  • 📖 读书

    • 读书工具
    • 走进科学
  • 🌍 英语

    • 从零开始学英语
    • 英语兔的相关视频
    • Larry 想做技术大佬的相关视频
  • 🏛️ 政治

    • 反腐
    • GFW
    • 404 内容
    • 审查与自我审查
    • 互联网
    • 战争
    • 读书笔记
  • 💰 经济

    • 关于税
    • 理财
  • 💪 健身

    • 睡眠
    • 皮肤
    • 口腔健康
    • 学会呼吸
    • 健身日志
  • 🏠 其他

    • 驾驶技能
    • 租房与买房
    • 厨艺
  • 电影

    • 电影推荐
  • 电视剧
  • 漫画

    • 漫画软件
    • 漫画推荐
  • 游戏

    • Steam
    • 三国杀
    • 求生之路
  • 小说
  • 关于本站
  • 关于博主
  • 打赏
  • 网站动态
  • 友人帐
  • 从零开始搭建博客
  • 搭建邮件服务器
  • 本站分享
  • 🌈 生活

    • 2022
    • 2023
    • 2024
    • 2025
  • 📇 文章索引

    • 文章分类
    • 文章归档

晓林

程序猿,自由职业者,博主,英语爱好者,健身达人
首页
  • 📚 计算机基础

    • 计算机简史
    • 数字电路
    • 计算机组成原理
    • 操作系统
    • Linux
    • 计算机网络
    • 数据库
    • 编程工具
    • 装机
  • 🎨 前端

    • Node
  • JavaSE
  • Java 高级
  • JavaEE

    • 构建、依赖管理
    • Ant
    • Maven
    • 日志框架
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • 环境管理和配置管理-科普篇
    • Servlet
  • Spring

    • Spring基础
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC

    • SpringMVC 基础
  • SpringBoot

    • SpringBoot 基础
  • Windows 使用技巧
  • 手机相关技巧
  • 最全面的输入法教程
  • 最全面的浏览器教程
  • Office
  • 图片类工具
  • 效率类工具
  • 最全面的 RSS 教程
  • 码字工具
  • 各大平台
  • 校招
  • 五险一金
  • 职场规划
  • 关于离职
  • 杂谈
  • 自媒体
  • 📖 读书

    • 读书工具
    • 走进科学
  • 🌍 英语

    • 从零开始学英语
    • 英语兔的相关视频
    • Larry 想做技术大佬的相关视频
  • 🏛️ 政治

    • 反腐
    • GFW
    • 404 内容
    • 审查与自我审查
    • 互联网
    • 战争
    • 读书笔记
  • 💰 经济

    • 关于税
    • 理财
  • 💪 健身

    • 睡眠
    • 皮肤
    • 口腔健康
    • 学会呼吸
    • 健身日志
  • 🏠 其他

    • 驾驶技能
    • 租房与买房
    • 厨艺
  • 电影

    • 电影推荐
  • 电视剧
  • 漫画

    • 漫画软件
    • 漫画推荐
  • 游戏

    • Steam
    • 三国杀
    • 求生之路
  • 小说
  • 关于本站
  • 关于博主
  • 打赏
  • 网站动态
  • 友人帐
  • 从零开始搭建博客
  • 搭建邮件服务器
  • 本站分享
  • 🌈 生活

    • 2022
    • 2023
    • 2024
    • 2025
  • 📇 文章索引

    • 文章分类
    • 文章归档
  • JavaSE

  • JavaSenior

  • JavaEE

  • JavaWeb

  • Spring

  • 主流框架

    • Redis

    • Mybatis

    • Lucene

    • Elasticsearch

    • MQ

      • RabbitMQ-尚硅谷
      • 什么是 MQ
      • RabbitMQ 介绍
      • RabbitMQ 的安装-Windows
      • RabbitMQ 的安装-Linux
      • RabbitMQ 的安装-Docker
      • RabbitMQ 的插件
      • RabbitMQ 用户
      • HelloWorld 程序
      • WorkQueues
      • 消息应答
        • 自动应答
        • 如何手动应答消息
        • Multiple 批量应答
        • 消息自动重新入队
        • 消息手动应答代码
        • 测试
        • 源码
      • RabbitMQ 持久化和预取值
      • 发布确认
      • 交换机
      • Topics 交换机
      • 死信队列
      • 延迟队列
      • 延迟插件
      • 发布确认高级
      • 备份交换机
      • 其他知识点
      • RabbitMQ 集群
      • 镜像队列
      • Haproxy + Keepalive 实现高可用负载均衡
      • Federation
      • Shovel
    • MyCat

    • Lombok

  • SpringMVC

  • SpringBoot

  • Java
  • 主流框架
  • MQ
2023-06-05
目录

消息应答

# 70.消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分,突然它挂掉了,会发生什么情况?

RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ 它已经处理了,RabbitMQ 可以把该消息删除了。 ‍

# 自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了

当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

一句话,自动应答不是很靠谱,比较需要良好的网络环境,所以一般用手动应答的。

# 如何手动应答消息

不使用自动应答的情况下,有如下方法用于手动应答消息:

  • Channel.basicAck:用于肯定确认, RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃
  • Channel.basicNack:用于否定确认
  • Channel.basicReject:用于否定确认,与 Channel.basicNack 相比少一个参数,不处理该消息,直接拒绝,可以将其丢弃了 ‍

# Multiple 批量应答

手动应答的好处是可以批量应答并且减少网络拥堵。例如调用 basicAck 方法进行应答:

channel.basicAck(deliveryTag, true);
1

第一个参数的含义:消息的标记 tag,可以理解为是每个消息的“主键”

第二个参数的含义:

  • true:代表批量应答 channel 上未应答的消息,比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答(如下图)
  • false:同上面相比只会应答 tag = 8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
  • 我们一般不使用批量应答,避免还有消息未处理完,就被应答了的情况。一般是处理完一个消息,就应答一个 ‍

# 消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到有消息未完全处理,并将对其重新排队。

如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

示意图:

  1. C1 消费者处理消息 1
  2. C1 失去连接
  3. 消息 1 重新入队
  4. C2 处理消息 1

# 消息手动应答代码

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答,然后我们在一个消费者处理消息的时候,先将其停止(也就是失去连接),然后观察消息是否会重新入队,给到其他消费者处理。

我们新建一个包 demo3,里面放代码。 ‍

# 生产者

package com.peterjxl.rabbitmq.demo3;

import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**
 * 消息在手动应答时不丢失,放回队列重新消费
 */
public class Task3 {

    // 队列名称
    private final static String task_queue_name = "ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(task_queue_name, false, false, false, null);
        // 从控制台接收消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", task_queue_name, null, message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:" + message);
        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

接下来我们写消费者。我们设置两个消费者有一段睡眠的时间,并且时间不同,以此来模拟两个消费者的效率不一样。 ‍

# 新建睡眠工具类

该类用于沉睡一段时间:

package com.peterjxl.rabbitmq.util;

public class SleepUtils {
    public static void sleep(int second) {
        try {
            Thread.sleep(1000L * second);
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

# 消费者 03

‍ 主要是设置为手动应答,并沉睡:

SleepUtils.sleep(10);
System.out.println("接收到消息:" + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
1
2
3

完整代码:

package com.peterjxl.rabbitmq.demo3;

import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.peterjxl.rabbitmq.util.SleepUtils;
import com.rabbitmq.client.Channel;
/**
 * 消息在手动应答时不丢失,放回队列重新消费
 */
public class Worker03 {
    // 队列名称
    private final static String task_queue_name = "ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("C1消费者等待消息处理,处理时间较短(效率高)...");

        // 采用手动应答
        boolean autoAck = false;
        channel.basicConsume(task_queue_name, autoAck, (consumerTag, message) -> {
            // 接收消息并处理
            System.out.println("接收到消息:" + new String(message.getBody()));
            // 休眠1秒
            SleepUtils.sleep(1);
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        }, consumerTag -> {
            System.out.println("消息消费被中断");
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

‍

# 消费者 04

我们复制一份消费者 03 ,修改输出语句和 沉睡的时间为 30 秒即可。完整代码:

package com.peterjxl.rabbitmq.demo3;

import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.peterjxl.rabbitmq.util.SleepUtils;
import com.rabbitmq.client.Channel;

/**
 * 消息在手动应答时不丢失,放回队列重新消费
 */
public class Worker04 {
    // 队列名称
    private final static String task_queue_name = "ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("C2消费者等待消息处理,处理时间较长(效率低)...");

        // 采用手动应答
        boolean autoAck = false;
        channel.basicConsume(task_queue_name, autoAck, (consumerTag, message) -> {
            SleepUtils.sleep(30);
            System.out.println("接收到消息:" + new String(message.getBody()));
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        }, consumerTag -> {
            System.out.println("消息消费被中断");
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 测试

我们依次运行 Task3、Worker03、Worker04;

然后我们在 Task3 中输入消息 aa 和 bb,可以看到 Worker03 收到了 aa,Worker04 收到了 bb:

‍

然后我们发送 cc 和 dd,并停止 Work04:可以看到 Worker03 处理了消息 dd

# 源码

本项目已将源码上传到 Gitee (opens new window) 或 GitHub (opens new window) 上。并且创建了分支 demo3,读者可以通过切换分支来查看本文的示例代码

上次更新: 2025/5/17 12:26:09
WorkQueues
RabbitMQ 持久化和预取值

← WorkQueues RabbitMQ 持久化和预取值→

最近更新
01
学点统计学:轻松识破一本正经的胡说八道
06-05
02
2025 年 5 月记
05-31
03
《贫穷的本质》很棒,但可能不适合你
05-27
更多文章>
Theme by Vdoing | Copyright © 2022-2025 | 粤 ICP 备 2022067627 号 -1 | 粤公网安备 44011302003646 号 | 点击查看十年之约
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式