从 01 开始 从 01 开始
首页
  • 📚 计算机基础

    • 计算机简史
    • 数字电路
    • 计算机组成原理
    • 操作系统
    • Linux
    • 计算机网络
    • 数据库
    • 编程工具
    • 装机
  • 🎨 前端

    • Node
  • JavaSE
  • Java 高级
  • JavaEE

    • 构建、依赖管理
    • Ant
    • Maven
    • 日志框架
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • 环境管理和配置管理-科普篇
    • Servlet
  • Spring

    • Spring基础
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC

    • SpringMVC 基础
  • SpringBoot

    • SpringBoot 基础
  • Windows 使用技巧
  • 手机
  • 最全面的输入法教程
  • 最全面的浏览器教程
  • Office
  • 图片类工具
  • 效率类工具
  • RSS
  • 码字工具
  • 各大平台
  • 校招
  • 五险一金等
  • 职场规划
  • 关于离职
  • 杂谈
  • 📖 读书

    • 读书工具
    • 读书笔记
  • 🌍 英语

    • 从零开始学英语
    • 英语兔的相关视频
    • Larry 想做技术大佬的相关视频
  • 🏛️ 政治

    • 反腐
    • GFW
    • 404 内容
    • 审查与自我审查
    • 互联网
    • 战争
  • 💰 经济

    • 关于税
    • 理财
  • 💪 健身

    • 睡眠
    • 皮肤
    • 口腔健康
    • 学会呼吸
    • 健身日志
  • 🏠 其他

    • 驾驶技能
    • 租房与买房
    • 厨艺
  • 电影

    • 电影推荐
  • 电视剧
  • 漫画

    • 漫画软件
    • 漫画推荐
  • 游戏

    • Steam
    • 三国杀
    • 求生之路
  • 小说
  • 关于本站
  • 关于博主
  • 打赏
  • 网站动态
  • 友人帐
  • 从零开始搭建博客
  • 搭建邮件服务器
  • 本站分享
  • 🌈 生活

    • 2022
    • 2023
    • 2024
    • 2025
  • 📇 文章索引

    • 文章分类
    • 文章归档

晓林

程序猿,自由职业者,博主,英语爱好者,健身达人
首页
  • 📚 计算机基础

    • 计算机简史
    • 数字电路
    • 计算机组成原理
    • 操作系统
    • Linux
    • 计算机网络
    • 数据库
    • 编程工具
    • 装机
  • 🎨 前端

    • Node
  • JavaSE
  • Java 高级
  • JavaEE

    • 构建、依赖管理
    • Ant
    • Maven
    • 日志框架
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • 环境管理和配置管理-科普篇
    • Servlet
  • Spring

    • Spring基础
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC

    • SpringMVC 基础
  • SpringBoot

    • SpringBoot 基础
  • Windows 使用技巧
  • 手机
  • 最全面的输入法教程
  • 最全面的浏览器教程
  • Office
  • 图片类工具
  • 效率类工具
  • RSS
  • 码字工具
  • 各大平台
  • 校招
  • 五险一金等
  • 职场规划
  • 关于离职
  • 杂谈
  • 📖 读书

    • 读书工具
    • 读书笔记
  • 🌍 英语

    • 从零开始学英语
    • 英语兔的相关视频
    • Larry 想做技术大佬的相关视频
  • 🏛️ 政治

    • 反腐
    • GFW
    • 404 内容
    • 审查与自我审查
    • 互联网
    • 战争
  • 💰 经济

    • 关于税
    • 理财
  • 💪 健身

    • 睡眠
    • 皮肤
    • 口腔健康
    • 学会呼吸
    • 健身日志
  • 🏠 其他

    • 驾驶技能
    • 租房与买房
    • 厨艺
  • 电影

    • 电影推荐
  • 电视剧
  • 漫画

    • 漫画软件
    • 漫画推荐
  • 游戏

    • Steam
    • 三国杀
    • 求生之路
  • 小说
  • 关于本站
  • 关于博主
  • 打赏
  • 网站动态
  • 友人帐
  • 从零开始搭建博客
  • 搭建邮件服务器
  • 本站分享
  • 🌈 生活

    • 2022
    • 2023
    • 2024
    • 2025
  • 📇 文章索引

    • 文章分类
    • 文章归档
  • JavaSE

  • JavaSenior

  • JavaEE

  • JavaWeb

  • Spring

  • 主流框架

    • Redis

    • Mybatis

    • Lucene

    • Elasticsearch

    • MQ

      • RabbitMQ-尚硅谷
      • 什么是 MQ
      • RabbitMQ 介绍
      • RabbitMQ 的安装-Windows
      • RabbitMQ 的安装-Linux
      • RabbitMQ 的安装-Docker
      • RabbitMQ 的插件
      • RabbitMQ 用户
      • HelloWorld 程序
      • WorkQueues
      • 消息应答
      • RabbitMQ 持久化和预取值
      • 发布确认
      • 交换机
      • Topics 交换机
      • 死信队列
        • 应用场景
        • 死信的来源
        • 需求
        • 消费者 1
        • 生产者
        • 测试
        • 消费者 2
        • 队列达到最大长度
        • 消息被拒
        • 源码
      • 延迟队列
      • 延迟插件
      • 发布确认高级
      • 备份交换机
      • 其他知识点
      • RabbitMQ 集群
      • 镜像队列
      • Haproxy + Keepalive 实现高可用负载均衡
      • Federation
      • Shovel
    • MyCat

    • Lombok

  • SpringMVC

  • SpringBoot

  • Java
  • 主流框架
  • MQ
2023-06-05
目录

死信队列

# 120.死信队列

死信,顾名思义就是无法被消费的消息。

一般来说,producer 发送消息,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列 ‍

# 应用场景

  • 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中
  • 用户在商城下单成功,并点击去支付后,在指定时间未支付时,订单自动失效

当放到死信队列后,后续还有机会能取出来消费 ‍

# 死信的来源

  • 消息 TTL 过期(例如一定时间内未支付,就认为订单失败)
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝 (basic.reject 或 basic.nack) 并且 requeue=false. ‍

# 需求

下面我们来实践下。需求如下

‍

  1. 首先是一个生产者,两个消费者
  2. 生产者发送消息给交换机
  3. 交换机将消息放到队列中
  4. 正常的消息由 C1 消费
  5. 由于一些问题,发生了死信
  6. 死信会被放到一个死信交换机
  7. 死信队列由 C2 消费 ‍

# 消费者 1

我们先声明几个交换机和队列的名字,然后写一个消费消息的代码:

package com.peterjxl.rabbitmq.demo9;

import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;

public class Consumer01 {

    public static final String NORMAL_EXCHANGE = "normal_exchange";
    public static final String DEAD_EXCHANGE = "dead_exchange";
    public static final String NORMAL_QUEUE = "normal_queue";
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtils.getChannel();

        channel.basicConsume(NORMAL_QUEUE, true, (consumerTag, message) -> {
            System.out.println("Consumer01接收到消息:" + new String(message.getBody()));
        }, consumerTag -> {});
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

‍ 然后我们声明交换机:

channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
1
2

‍ 声明队列:注意死信队列和我们之前的定义方式不一样,因为要正常队列中的信息,通过转换后才能转发给死信队列,需要设置一些参数。

// 要传入队列配置对象
Map<String, Object> arguments = new HashMap<>();

// 过期时间,单位毫秒,这里设置10秒。
arguments.put("x-message-ttl", 10000);

// 正常队列设置死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);

// 正常队列设置死信routing-key
arguments.put("x-dead-letter-routing-key", "lisi");

channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
1
2
3
4
5
6
7
8
9
10
11
12
13
14

‍ 其实我们还可以设置队列的过期时间:

// 过期时间,单位毫秒,这里设置10秒。
arguments.put("x-message-ttl", 10000);
1
2

不仅仅是队列可以设置过期时间 TTL,生产者也可以设置 TTL,例如可以每次都不同的 TTL;而队列不能修改 TTL,所以通常是生产者设置 TTL

绑定交换机和队列:

channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
1
2

# 生产者

‍ 此时我们发送消息的时候,就需要指定参数了。先设置下:

 // 设置消息的TTL时间, 单位是ms
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

1
2
3

‍ 发送消息的时候指定:

   channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
1

完整代码:我们发送十个信息:

package com.peterjxl.rabbitmq.demo9;

import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

public class ProducerDemo9 {

    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        // 设置消息的TTL时间, 单位是ms
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

        // 发送死信消息,设置TTL
        channel.basicPublish(NORMAL_EXCHANGE, "lisi", null, "这是一条消息".getBytes());
        for (int i = 1; i < 11; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 测试

我们先启动下消费者,创建好交换机和队列,然后停止;

停止消费者后,可以看到后台有这两个队列:

可以看到有 DLX 和 DLK:

DLX:x-dead-letter-exchange 说明该队列配置了死信交换机,

DLK:全称 x-dead-letter-routing-key,也就是有设置死信消息的 routing key ‍ 也可以看到有交换机:

‍

正常队列和死信队列的绑定状态也正常:

‍

然后启动生产者,由于没有消费者,因此过了 TTL 后,就会有死信。我们看到的现象应该是这样:

  1. 首先正常队列中有 10 个消息
  2. 随着时间的推移,消息一个个的过期,然后就会转发到死信队列中
  3. 最后所有消息都在死信队列中 ‍ 实验结果:

‍

PS:由于我们 10 个消息都是很快发送完的,所以可能看不到正常队列逐个减少、死信队列逐个增多的情况

# 消费者 2

消费者 2 就是消费死信队列的消息即可,非常简单

package com.peterjxl.rabbitmq.demo9;

import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;

public class Consumer02 {

    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtils.getChannel();
        channel.basicConsume(DEAD_QUEUE, true, (consumerTag, message) -> {
            System.out.println("Consumer02接收到消息:" + new String(message.getBody()));
        }, consumerTag -> {});
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

‍ 运行结果:

Consumer02接收到消息:info1
Consumer02接收到消息:info2
Consumer02接收到消息:info3
Consumer02接收到消息:info4
Consumer02接收到消息:info5
Consumer02接收到消息:info6
Consumer02接收到消息:info7
Consumer02接收到消息:info8
Consumer02接收到消息:info9
Consumer02接收到消息:info10
1
2
3
4
5
6
7
8
9
10

消费者 1 的代码最复杂,定义了交换机、队列和绑定关系等,而生产者只需发送消息,消费者 2 只需消费死信队列的消息。 ‍

# 队列达到最大长度

之前我们仅仅演示了 TTL 的情况导致死信,接下来我们演示其他两种情况,首先是队列达到最大长度

我们在 Consumer01 里添加如下配置:

// 设置正常队列的长度限制
arguments.put("x-max-length", 6);
1
2

‍ 注意:由于我们修改了队列的参数,因此得先删除,然后再重新运行。此时能看到有个 Lim 的字眼,这是 Limit 的缩写。

然后我们开始发送消息,这里我们就不设置 TTL 了,修改 ProducerDemo9 的发送消息代码,改为传入 null:

channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
1

‍ 然后我们先停止消费者,然后启动生产者,这样消息就会积压在正常队列中,然后剩下 4 个消息就会被转发到死信队列:

# 消息被拒

还剩下一个场景:消费者拒绝消息,此时信息也会变成死信

为了不让之前的消息影响到接下来的实验,我们先启动消费者 1 和 2,处理掉队列中的信息,处理完后队列是空的:

‍

然后我们将限制队列长度的代码注释掉,然后我们在后台删掉这个队列。 ‍ 假设我们要拒绝 info5 这个消息:

channel.basicConsume(NORMAL_QUEUE, true, (consumerTag, message) -> {
    String msg = new String(message.getBody());
    if(msg.equals("info5")){
        System.out.println("Consumer01接收到消息:" + msg + ",此消息被拒绝");
        channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
    }else {
        System.out.println("Consumer01接收到消息:" + msg);
    }
}, consumerTag -> {});
1
2
3
4
5
6
7
8
9

‍ 然后我们修改为开启手动应答(不批量):

 channel.basicConsume(NORMAL_QUEUE, false, (consumerTag, message) -> {
        String msg = new String(message.getBody());
        if(msg.equals("info5")){
            System.out.println("Consumer01接收到消息:" + msg + ",此消息被拒绝");
            channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
        }else {
            System.out.println("Consumer01接收到消息:" + msg);
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        }
    }, consumerTag -> {});
1
2
3
4
5
6
7
8
9
10

‍ 此时我们启动消费者 1,然后再启动生产者:能看到死信队列有一个消息

我们点进去 dead_quque,然后可以获取信息:确实是 info5 被拒绝后放到死信队列了

消费者 1 的输出:

Consumer01接收到消息:info1
Consumer01接收到消息:info2
Consumer01接收到消息:info3
Consumer01接收到消息:info4
Consumer01接收到消息:info5,此消息被拒绝
Consumer01接收到消息:info6
Consumer01接收到消息:info7
Consumer01接收到消息:info8
Consumer01接收到消息:info9
Consumer01接收到消息:info10
1
2
3
4
5
6
7
8
9
10

启动消费者 2 的输出:

Consumer02接收到消息:info5
1

‍

# 源码

已将源码上传到 Gitee (opens new window) 或 GitHub (opens new window) 上。并且创建了分支 demo9,读者可以通过切换分支来查看本文的示例代码 ‍

上次更新: 2025/5/17 12:26:09
Topics 交换机
延迟队列

← Topics 交换机 延迟队列→

最近更新
01
吐槽一下《僵尸校园》
05-15
02
2025 年 4 月记
04-30
03
山西大同 “订婚强奸案” 将会给整个社会带来的影响有多严重? - 知乎 转载
04-26
更多文章>
Theme by Vdoing | Copyright © 2022-2025 | 粤 ICP 备 2022067627 号 -1 | 粤公网安备 44011302003646 号 | 点击查看十年之约
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式