从 01 开始 从 01 开始
首页
  • 📚 计算机基础

    • 计算机简史
    • 数字电路
    • 计算机组成原理
    • 操作系统
    • Linux
    • 计算机网络
    • 数据库
    • 编程工具
    • 装机
  • 🎨 前端

    • Node
  • JavaSE
  • Java 高级
  • JavaEE

    • 构建、依赖管理
    • Ant
    • Maven
    • 日志框架
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • 环境管理和配置管理-科普篇
    • Servlet
  • Spring

    • Spring基础
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC

    • SpringMVC 基础
  • SpringBoot

    • SpringBoot 基础
  • Windows 使用技巧
  • 手机相关技巧
  • 最全面的输入法教程
  • 最全面的浏览器教程
  • Office
  • 图片类工具
  • 效率类工具
  • 最全面的 RSS 教程
  • 码字工具
  • 各大平台
  • 校招
  • 五险一金
  • 职场规划
  • 关于离职
  • 杂谈
  • 自媒体
  • 📖 读书

    • 读书工具
    • 走进科学
  • 🌍 英语

    • 从零开始学英语
    • 英语兔的相关视频
    • Larry 想做技术大佬的相关视频
  • 🏛️ 政治

    • 新闻合订本
    • 反腐
    • GFW
    • 404 内容
    • 审查与自我审查
    • 互联网
    • 战争
    • 读书笔记
  • 💰 经济

    • 关于税
    • 理财
  • 💪 健身

    • 睡眠
    • 皮肤
    • 口腔健康
    • 学会呼吸
    • 健身日志
  • 🏠 其他

    • 驾驶技能
    • 租房与买房
    • 厨艺
  • 电影

    • 电影推荐
  • 电视剧
  • 漫画

    • 漫画软件
    • 漫画推荐
  • 游戏

    • Steam
    • 三国杀
    • 求生之路
  • 小说
  • 关于本站
  • 关于博主
  • 打赏
  • 网站动态
  • 友人帐
  • 从零开始搭建博客
  • 搭建邮件服务器
  • 本站分享
  • 🌈 生活

    • 2022
    • 2023
    • 2024
    • 2025
  • 📇 文章索引

    • 文章分类
    • 文章归档

晓林

程序猿,自由职业者,博主,英语爱好者,健身达人
首页
  • 📚 计算机基础

    • 计算机简史
    • 数字电路
    • 计算机组成原理
    • 操作系统
    • Linux
    • 计算机网络
    • 数据库
    • 编程工具
    • 装机
  • 🎨 前端

    • Node
  • JavaSE
  • Java 高级
  • JavaEE

    • 构建、依赖管理
    • Ant
    • Maven
    • 日志框架
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • 环境管理和配置管理-科普篇
    • Servlet
  • Spring

    • Spring基础
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC

    • SpringMVC 基础
  • SpringBoot

    • SpringBoot 基础
  • Windows 使用技巧
  • 手机相关技巧
  • 最全面的输入法教程
  • 最全面的浏览器教程
  • Office
  • 图片类工具
  • 效率类工具
  • 最全面的 RSS 教程
  • 码字工具
  • 各大平台
  • 校招
  • 五险一金
  • 职场规划
  • 关于离职
  • 杂谈
  • 自媒体
  • 📖 读书

    • 读书工具
    • 走进科学
  • 🌍 英语

    • 从零开始学英语
    • 英语兔的相关视频
    • Larry 想做技术大佬的相关视频
  • 🏛️ 政治

    • 新闻合订本
    • 反腐
    • GFW
    • 404 内容
    • 审查与自我审查
    • 互联网
    • 战争
    • 读书笔记
  • 💰 经济

    • 关于税
    • 理财
  • 💪 健身

    • 睡眠
    • 皮肤
    • 口腔健康
    • 学会呼吸
    • 健身日志
  • 🏠 其他

    • 驾驶技能
    • 租房与买房
    • 厨艺
  • 电影

    • 电影推荐
  • 电视剧
  • 漫画

    • 漫画软件
    • 漫画推荐
  • 游戏

    • Steam
    • 三国杀
    • 求生之路
  • 小说
  • 关于本站
  • 关于博主
  • 打赏
  • 网站动态
  • 友人帐
  • 从零开始搭建博客
  • 搭建邮件服务器
  • 本站分享
  • 🌈 生活

    • 2022
    • 2023
    • 2024
    • 2025
  • 📇 文章索引

    • 文章分类
    • 文章归档
  • JavaSE

  • JavaSenior

  • JavaEE

  • JavaWeb

  • Spring

  • 主流框架

    • Redis

    • Mybatis

    • Lucene

    • Elasticsearch

    • MQ

      • RabbitMQ-尚硅谷
      • 什么是 MQ
      • RabbitMQ 介绍
      • RabbitMQ 的安装-Windows
      • RabbitMQ 的安装-Linux
      • RabbitMQ 的安装-Docker
      • RabbitMQ 的插件
      • RabbitMQ 用户
      • HelloWorld 程序
      • WorkQueues
      • 消息应答
      • RabbitMQ 持久化和预取值
        • 队列持久化
        • 消息持久化
        • 不公平分发
        • 预取值
        • 测试
        • 源码
      • 发布确认
      • 交换机
      • Topics 交换机
      • 死信队列
      • 延迟队列
      • 延迟插件
      • 发布确认高级
      • 备份交换机
      • 其他知识点
      • RabbitMQ 集群
      • 镜像队列
      • Haproxy + Keepalive 实现高可用负载均衡
      • Federation
      • Shovel
    • MyCat

    • Lombok

  • SpringMVC

  • SpringBoot

  • Java
  • 主流框架
  • MQ
2023-06-05
目录

RabbitMQ 持久化和预取值

# 80.RabbitMQ 持久化和预取值

刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失?

默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它会丢弃队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:将队列和消息都标记为持久化。 ‍

# 队列持久化

要队列实现持久化,需要在声明队列的时候把 durable 参数设置为持久化: ‍

channel.queueDeclare(QUEUE_NAME, true, false, false, null);
1

这里复习下参数说明:

  • 第一个参数:队列名
  • 第二个参数:是否持久化,默认 false,表示保存在内存中(不持久化)
  • 第三个参数:是否独占队列,默认 false,表示不独占队列(消息共享),true 则表示只供一个消费者消费
  • 第四个参数:最后一个消费者断开连接后,是否自动删除队列,默认 false,表示不自动删除
  • 第五个参数:队列的其他参数,如:存活时间 ‍ 但是需要注意的是,如果一个现有的队列,在声明的时候不是持久化的,想要修改为持久化:需要把原先队列先删除,然后重新创建一个持久化的队列。

不能直接声明(queueDeclare),会出现错误:

inequivalent arg 'durable' for queue 'hello' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
1

我们可以先试着删除队列后新建,我们点进某个队列:

‍

然后点击删除:

然后再新建:

package com.peterjxl.rabbitmq.demo4;

import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;

public class Producer {
  
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare("hello", true, false, false, null);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12

‍ 此时能在可视化插件中看到 hello 队列被重新创建了,并且有个关键字 D,这是持久化 Durable 的首字母。这个时候即使重启 RabbitMQ 队列也依然存在

# 消息持久化

要想让消息实现持久化,需要发送消息的时候添加属性 MessageProperties.PERSISTENT_TEXT_PLAIN:

channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "Hello".getBytes());
1

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在这样的情况:当消息存储到硬盘的过程中宕机了,那么就不能真正持久化。但对于我们的简单任务队列而言,这已经绰绰有余了。

如果需要更强有力的持久化策略,在讲发布确认时会讲解。

# 不公平分发

在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢。

这个时候我们还是采用轮训分发的话,就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。

为了避免这种情况,我们可以在消费者代码中设置参数 channel.basicQos(1);

int prefetchCount = 1;
channel.basicQos(prefetchCount);
1
2

‍ 同时,我们可以看到控制台中会列出该消费者的 Prefetch Count(预取值)

‍

意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 RabbitMQ 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。

测试:我们可以在上一节的案例程序中,给 Worker03 和 Worker04 都设置上这个值,然后发送多个消息:可以看到 Worker04 只处理了一个,而 Worker03 处理了多个。

# 预取值

本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息;另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此有时候我们希望能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。

这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认

例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag = 6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。

消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取值将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器),应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同, 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。

预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下。对于大多数应用来说,稍微高一点的值将是最佳的。

# 测试

我们可以设置 Worker03 的预取值是 2,睡眠时间是 10 秒,Worker04 的预取值是 5,然后我们可以快速发送 7 条消息:

‍

由于预取值的关系,可以看到 Worker03 只处理了 2 条,而 Worker04 处理了 5 条。 ‍

# 源码

本项目已将源码上传到 Gitee (opens new window) 或 GitHub (opens new window) 上。并且创建了分支 demo4,读者可以通过切换分支来查看本文的示例代码

上次更新: 2025/6/3 09:31:54
消息应答
发布确认

← 消息应答 发布确认→

最近更新
01
新闻合订本 2025-10
10-31
02
2025 年 10 月记
10-30
03
用 AI 批量优化思源笔记排版
10-15
更多文章>
Theme by Vdoing | Copyright © 2022-2025 | 粤 ICP 备 2022067627 号 -1 | 粤公网安备 44011302003646 号 | 点击查看十年之约
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式