从 01 开始 从 01 开始
首页
  • 📚 计算机基础

    • 计算机简史
    • 数字电路
    • 计算机组成原理
    • 操作系统
    • Linux
    • 计算机网络
    • 数据库
    • 编程工具
    • 装机
  • 🎨 前端

    • Node
  • JavaSE
  • Java 高级
  • JavaEE

    • 构建、依赖管理
    • Ant
    • Maven
    • 日志框架
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • 环境管理和配置管理-科普篇
    • Servlet
  • Spring

    • Spring基础
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC

    • SpringMVC 基础
  • SpringBoot

    • SpringBoot 基础
  • Windows 使用技巧
  • 手机相关技巧
  • 最全面的输入法教程
  • 最全面的浏览器教程
  • Office
  • 图片类工具
  • 效率类工具
  • 最全面的 RSS 教程
  • 码字工具
  • 各大平台
  • 校招
  • 五险一金
  • 职场规划
  • 关于离职
  • 杂谈
  • 自媒体
  • 📖 读书

    • 读书工具
    • 走进科学
  • 🌍 英语

    • 从零开始学英语
    • 英语兔的相关视频
    • Larry 想做技术大佬的相关视频
  • 🏛️ 政治

    • 新闻合订本
    • 反腐
    • GFW
    • 404 内容
    • 审查与自我审查
    • 互联网
    • 战争
    • 读书笔记
  • 💰 经济

    • 关于税
    • 理财
  • 💪 健身

    • 睡眠
    • 皮肤
    • 口腔健康
    • 学会呼吸
    • 健身日志
  • 🏠 其他

    • 驾驶技能
    • 租房与买房
    • 厨艺
  • 电影

    • 电影推荐
  • 电视剧
  • 漫画

    • 漫画软件
    • 漫画推荐
  • 游戏

    • Steam
    • 三国杀
    • 求生之路
  • 小说
  • 关于本站
  • 关于博主
  • 打赏
  • 网站动态
  • 友人帐
  • 从零开始搭建博客
  • 搭建邮件服务器
  • 本站分享
  • 🌈 生活

    • 2022
    • 2023
    • 2024
    • 2025
  • 📇 文章索引

    • 文章分类
    • 文章归档

晓林

程序猿,自由职业者,博主,英语爱好者,健身达人
首页
  • 📚 计算机基础

    • 计算机简史
    • 数字电路
    • 计算机组成原理
    • 操作系统
    • Linux
    • 计算机网络
    • 数据库
    • 编程工具
    • 装机
  • 🎨 前端

    • Node
  • JavaSE
  • Java 高级
  • JavaEE

    • 构建、依赖管理
    • Ant
    • Maven
    • 日志框架
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • 环境管理和配置管理-科普篇
    • Servlet
  • Spring

    • Spring基础
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC

    • SpringMVC 基础
  • SpringBoot

    • SpringBoot 基础
  • Windows 使用技巧
  • 手机相关技巧
  • 最全面的输入法教程
  • 最全面的浏览器教程
  • Office
  • 图片类工具
  • 效率类工具
  • 最全面的 RSS 教程
  • 码字工具
  • 各大平台
  • 校招
  • 五险一金
  • 职场规划
  • 关于离职
  • 杂谈
  • 自媒体
  • 📖 读书

    • 读书工具
    • 走进科学
  • 🌍 英语

    • 从零开始学英语
    • 英语兔的相关视频
    • Larry 想做技术大佬的相关视频
  • 🏛️ 政治

    • 新闻合订本
    • 反腐
    • GFW
    • 404 内容
    • 审查与自我审查
    • 互联网
    • 战争
    • 读书笔记
  • 💰 经济

    • 关于税
    • 理财
  • 💪 健身

    • 睡眠
    • 皮肤
    • 口腔健康
    • 学会呼吸
    • 健身日志
  • 🏠 其他

    • 驾驶技能
    • 租房与买房
    • 厨艺
  • 电影

    • 电影推荐
  • 电视剧
  • 漫画

    • 漫画软件
    • 漫画推荐
  • 游戏

    • Steam
    • 三国杀
    • 求生之路
  • 小说
  • 关于本站
  • 关于博主
  • 打赏
  • 网站动态
  • 友人帐
  • 从零开始搭建博客
  • 搭建邮件服务器
  • 本站分享
  • 🌈 生活

    • 2022
    • 2023
    • 2024
    • 2025
  • 📇 文章索引

    • 文章分类
    • 文章归档
  • JavaSE

  • JavaSenior

  • JavaEE

  • JavaWeb

  • Spring

  • 主流框架

    • Redis

    • Mybatis

    • Lucene

    • Elasticsearch

    • MQ

      • RabbitMQ-尚硅谷
      • 什么是 MQ
      • RabbitMQ 介绍
      • RabbitMQ 的安装-Windows
      • RabbitMQ 的安装-Linux
      • RabbitMQ 的安装-Docker
      • RabbitMQ 的插件
      • RabbitMQ 用户
      • HelloWorld 程序
      • WorkQueues
      • 消息应答
      • RabbitMQ 持久化和预取值
      • 发布确认
      • 交换机
      • Topics 交换机
      • 死信队列
      • 延迟队列
        • 使用场景
        • 整合 SpringBoot
        • 添加依赖
        • 修改配置文件
        • 配置 Swagger
        • 需求
        • 配置文件类
        • 生产者
        • 消费者
        • 测试
        • 优化
        • 源码
      • 延迟插件
      • 发布确认高级
      • 备份交换机
      • 其他知识点
      • RabbitMQ 集群
      • 镜像队列
      • Haproxy + Keepalive 实现高可用负载均衡
      • Federation
      • Shovel
    • MyCat

    • Lombok

  • SpringMVC

  • SpringBoot

  • Java
  • 主流框架
  • MQ
2023-06-05
目录

延迟队列

# 130.延迟队列

之前讲安装 RabbitMQ 可视化插件的时候讲过

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

# 使用场景

  1. 订单在十分钟之内未支付则自动取消
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议 ‍ 这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?

如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。

但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下

其实延迟队列是死信队列的一种,当消息一直没被处理,达到了 TTL 后,就会被放到死信队列中。 ‍ 例如这是用户下单后,要在 30 分钟内完成付款的流程图:

# 整合 SpringBoot

为了方便演示,我们先整合下 SpringBoot。在 IDEA 中新建一个:

这里我们不使用 SpringBoot3

‍

为了不让版本造成影响,我们改为使用低版本的 SpringBoot

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.11.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
1
2
3
4
5
6

‍

# 添加依赖

‍

<!--RabbitMQ 依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.67_noneautotype2</version>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

<!--swagger-->
<dependency>
    <groupId>io.springfox</groupId>
    <artifactId>springfox-swagger2</artifactId>
    <version>2.9.2</version>
</dependency>

<dependency>
    <groupId>io.springfox</groupId>
    <artifactId>springfox-swagger-ui</artifactId>
    <version>2.9.2</version>
</dependency>

<!--RabbitMQ 测试依赖-->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

‍

# 修改配置文件

我们修改 src/main/resources/application.properties,添加如下内容:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root123
1
2
3
4

‍

# 配置 Swagger

Swagger 是一个文档框架,这里我们只需会用就行,新建一个 config 包并新增代码:

package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;


@Configuration
@EnableSwagger2
public class SwaggerConfig
{@Bean
public Docket webApiConfig(){
    return new Docket(DocumentationType.SWAGGER_2)
            .groupName("webApi")
            .apiInfo(webApiInfo())
            .select()
            .build();
}
    private ApiInfo webApiInfo(){
        return new ApiInfoBuilder()
                .title("rabbitmq 接口文档")
                .description("本文档描述了 rabbitmq 微服务接口定义")
                .version("1.0")
                .contact(new Contact("peterjxl", "https:www.peterjxl.com","perterjxl@qq.com"))
                .build();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

如果使用高版本的 SpringBoot 话,启动时可能会遇到报错:

Failed to start bean ‘documentationPluginsBootstrapper‘; nested exception is java.lang.NullPointerEx
1

# 需求

在继续演示之前,我们说下需求。

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y(用作延迟交换机),它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

‍

也就是为 QA 和 QB 都绑定一个死信交换机 Y。 ‍

# 配置文件类

之前我们整合 SpringBoot 之前,都是在消费者中声明队列和交换机的;但有了 SpringBoot,我们就可以通过配置文件来声明队列和交换机。消费者和生产者各司其职,不用在消费者或生产者中声明了,使得代码更加简单明了

首先新建一个 Spring 的配置文件类:

package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;

import org.springframework.context.annotation.Configuration;

@Configuration
public class TTLQueueConfig {
}
1
2
3
4
5
6
7

‍ 然后我们定义各个交换机和队列的名称:

//普通交换机
private static final String X_EXCHANGE = "X";

//死信交换机
private static final String Y_DEAD_LETTER_EXCHANGE = "Y";

//普通队列
private static final String QUEUE_A = "QA";
private static final String QUEUE_B = "QB";

//死信队列
private static final String DEAD_LETTER_QUEUE = "QD";
1
2
3
4
5
6
7
8
9
10
11
12

‍ 然后我们声明两个交换机:

// 声明xExchange
@Bean("xExchange")
public DirectExchange xExchange(){
    return new DirectExchange(X_EXCHANGE);
}

// 声明yExchange
@Bean("yExchange")
public DirectExchange yExchange(){
    return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
1
2
3
4
5
6
7
8
9
10
11

声明 QA、QB 和死信队列 QD

// 声明普通队列A ttl为10s
@Bean("queueA")
public Queue queueA(){
    Map<String, Object> args = new HashMap<>(3);
    // 统一设置队列中的所有消息的过期时间,单位毫秒
    args.put("x-message-ttl", 10000);

    // 统一设置队列的死信交换机
    args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);

    // 统一设置队列的死信routingKey
    args.put("x-dead-letter-routing-key", "YD");

    return (Queue) QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}

// 声明普通队列A ttl为40s
@Bean("queueB")
public Queue queueB(){
    Map<String, Object> args = new HashMap<>(3);
    // 统一设置队列中的所有消息的过期时间,单位毫秒
    args.put("x-message-ttl", 40000);

    // 统一设置队列的死信交换机
    args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);

    // 统一设置队列的死信routingKey
    args.put("x-dead-letter-routing-key", "YD");

    return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}

// 声明死信队列QD
@Bean("queueD")
public Queue queueD(){
    return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

然后是声明交换机和队列的绑定关系:

// 声明QA绑定关系
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){
    return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}

// 声明QB绑定关系
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange){
    return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}

// 声明QD绑定关系
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange){
    return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 生产者

我们使用发送 HTTP 请求的方式,来发送消息。当访问指定的链接,就发送指定的消息。

新建一个 Controller 类:

package com.peterjxl.learnrabbitmq.springbootrabbitmq.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

}
1
2
3
4
5
6
7
8
9
10
11
12

Slf4j 注解是日志的注解

然后我们发送消息:

@Autowired
private RabbitTemplate rabbitTemplate;

// 开始发消息
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
    log.info("当前时间:{}, 发送一条信息给两个TTL队列:{}" , new Date(), message);
    rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列:" + message);
    rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列:" + message);
}
1
2
3
4
5
6
7
8
9
10

‍ 首先我们使用 RabbitMQ 提供的模板对象来发送,因此定义了一个 RabbitTemplate 对象。

然后 log.info,记录了日志;然后使用 convertAndSend 方法发送消息(第一个参数是交换机的名字,第二个是 routing key,第三个是消息) ‍

# 消费者

package com.peterjxl.learnrabbitmq.springbootrabbitmq.comsumer;


import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class DeadLetterQueueConsumer {

     @RabbitListener(queues = "QD")
     public void receiveD(Message message, Channel channel){
         String msg = new String(message.getBody());
         log.info("当前时间:{},收到死信队列的消息:{}", new Date(), msg);
     }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

‍

# 测试

我们访问 localhost:8080/ttl/sendMsg/嘻嘻嘻,此时页面是空白的(这很正常,我们没写返回值),然后我们可以看到控制台有输出


当前时间:Tue May 30 07:15:47 CST 2023, 发送一条信息给两个TTL队列:嘻嘻嘻
当前时间:Tue May 30 07:15:57 CST 2023,收到死信队列的消息:消息来自ttl为10s的队列:嘻嘻嘻
当前时间:Tue May 30 07:16:27 CST 2023,收到死信队列的消息:消息来自ttl为40s的队列:嘻嘻嘻
1
2
3
4

‍ 从时间来看,确实是发了消息后 10 秒,队列 QA 消费消息;而 QB 也是 40 秒后消费消息 ‍

# 优化

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

解决方法:由生产者发送消息的时候指定 TTL。

我们新增一个队列 QC,绑定关系如下,该队列不设置 TTL 时间:

‍

代码:在 TTLQueueConfig 中添加如下代码

private static final String QUEUE_C = "QC";


// 声明普通队列C
@Bean("queueC")
public Queue queueC(){
    Map<String, Object> args = new HashMap<>(3);

    // 统一设置队列的死信交换机
    args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);

    // 统一设置队列的死信routingKey
    args.put("x-dead-letter-routing-key", "YD");

    return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}

 // 声明QC绑定关系
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange){
    return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

在 SendMsgController 中添加:

// 开始发消息, 带过期时间
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime){
    log.info("当前时间:{}, 发送一条时长{}毫秒TTL信息给队列QC:{}" , new Date(), ttlTime, message);
  
    rabbitTemplate.convertAndSend("X", "XC", "消息来自ttl为" + ttlTime + "毫秒的队列:" + message, msg -> {
        msg.getMessageProperties().setExpiration(ttlTime);
        return msg;
    });
}
1
2
3
4
5
6
7
8
9
10

测试:我们重启,然后分别访问如下两个链接

http://localhost: 8080/ttl/sendExpirationMsg/你好 1/20000,这是 20 秒

http://localhost: 8080/ttl/sendExpirationMsg/你好 2/2000,这是 2 秒 ‍ 结果:

当前时间:Tue May 30 07:54:49 CST 2023, 发送一条时长20000毫秒TTL信息给队列QC:你好1
当前时间:Tue May 30 07:54:54 CST 2023, 发送一条时长2000毫秒TTL信息给队列QC:你好2
当前时间:Tue May 30 07:55:09 CST 2023,收到死信队列的消息:消息来自ttl为20000毫秒的队列:你好1
当前时间:Tue May 30 07:55:09 CST 2023,收到死信队列的消息:消息来自ttl为2000毫秒的队列:你好2
1
2
3
4

可以看到消息 1,确实是 20 秒后才被消费;但为什么消息 2,明明应该是 2 秒的,也是 20 秒后被消费呢?

我们之前就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡”,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

# 源码

已将源码上传到 Gitee (opens new window) 或 GitHub (opens new window) 上。并且创建了分支 demo1,读者可以通过切换分支来查看本文的示例代码

上次更新: 2025/5/17 12:26:09
死信队列
延迟插件

← 死信队列 延迟插件→

最近更新
01
新闻合订本 2025-10
10-31
02
2025 年 10 月记
10-30
03
用 AI 批量优化思源笔记排版
10-15
更多文章>
Theme by Vdoing | Copyright © 2022-2025 | 粤 ICP 备 2022067627 号 -1 | 粤公网安备 44011302003646 号 | 点击查看十年之约
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式