从 01 开始 从 01 开始
首页
  • 📚 计算机基础

    • 计算机简史
    • 数字电路
    • 计算机组成原理
    • 操作系统
    • Linux
    • 计算机网络
    • 数据库
    • 编程工具
    • 装机
  • 🎨 前端

    • Node
  • JavaSE
  • Java 高级
  • JavaEE

    • 构建、依赖管理
    • Ant
    • Maven
    • 日志框架
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • 环境管理和配置管理-科普篇
    • Servlet
  • Spring

    • Spring基础
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC

    • SpringMVC 基础
  • SpringBoot

    • SpringBoot 基础
  • Windows 使用技巧
  • 手机相关技巧
  • 最全面的输入法教程
  • 最全面的浏览器教程
  • Office
  • 图片类工具
  • 效率类工具
  • 最全面的 RSS 教程
  • 码字工具
  • 各大平台
  • 校招
  • 五险一金
  • 职场规划
  • 关于离职
  • 杂谈
  • 自媒体
  • 📖 读书

    • 读书工具
    • 走进科学
  • 🌍 英语

    • 从零开始学英语
    • 英语兔的相关视频
    • Larry 想做技术大佬的相关视频
  • 🏛️ 政治

    • 反腐
    • GFW
    • 404 内容
    • 审查与自我审查
    • 互联网
    • 战争
    • 读书笔记
  • 💰 经济

    • 关于税
    • 理财
  • 💪 健身

    • 睡眠
    • 皮肤
    • 口腔健康
    • 学会呼吸
    • 健身日志
  • 🏠 其他

    • 驾驶技能
    • 租房与买房
    • 厨艺
  • 电影

    • 电影推荐
  • 电视剧
  • 漫画

    • 漫画软件
    • 漫画推荐
  • 游戏

    • Steam
    • 三国杀
    • 求生之路
  • 小说
  • 关于本站
  • 关于博主
  • 打赏
  • 网站动态
  • 友人帐
  • 从零开始搭建博客
  • 搭建邮件服务器
  • 本站分享
  • 🌈 生活

    • 2022
    • 2023
    • 2024
    • 2025
  • 📇 文章索引

    • 文章分类
    • 文章归档

晓林

程序猿,自由职业者,博主,英语爱好者,健身达人
首页
  • 📚 计算机基础

    • 计算机简史
    • 数字电路
    • 计算机组成原理
    • 操作系统
    • Linux
    • 计算机网络
    • 数据库
    • 编程工具
    • 装机
  • 🎨 前端

    • Node
  • JavaSE
  • Java 高级
  • JavaEE

    • 构建、依赖管理
    • Ant
    • Maven
    • 日志框架
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • 环境管理和配置管理-科普篇
    • Servlet
  • Spring

    • Spring基础
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC

    • SpringMVC 基础
  • SpringBoot

    • SpringBoot 基础
  • Windows 使用技巧
  • 手机相关技巧
  • 最全面的输入法教程
  • 最全面的浏览器教程
  • Office
  • 图片类工具
  • 效率类工具
  • 最全面的 RSS 教程
  • 码字工具
  • 各大平台
  • 校招
  • 五险一金
  • 职场规划
  • 关于离职
  • 杂谈
  • 自媒体
  • 📖 读书

    • 读书工具
    • 走进科学
  • 🌍 英语

    • 从零开始学英语
    • 英语兔的相关视频
    • Larry 想做技术大佬的相关视频
  • 🏛️ 政治

    • 反腐
    • GFW
    • 404 内容
    • 审查与自我审查
    • 互联网
    • 战争
    • 读书笔记
  • 💰 经济

    • 关于税
    • 理财
  • 💪 健身

    • 睡眠
    • 皮肤
    • 口腔健康
    • 学会呼吸
    • 健身日志
  • 🏠 其他

    • 驾驶技能
    • 租房与买房
    • 厨艺
  • 电影

    • 电影推荐
  • 电视剧
  • 漫画

    • 漫画软件
    • 漫画推荐
  • 游戏

    • Steam
    • 三国杀
    • 求生之路
  • 小说
  • 关于本站
  • 关于博主
  • 打赏
  • 网站动态
  • 友人帐
  • 从零开始搭建博客
  • 搭建邮件服务器
  • 本站分享
  • 🌈 生活

    • 2022
    • 2023
    • 2024
    • 2025
  • 📇 文章索引

    • 文章分类
    • 文章归档
  • JavaSE

  • JavaSenior

  • JavaEE

  • JavaWeb

  • Spring

  • 主流框架

    • Redis

    • Mybatis

    • Lucene

    • Elasticsearch

    • MQ

      • RabbitMQ-尚硅谷
      • 什么是 MQ
      • RabbitMQ 介绍
      • RabbitMQ 的安装-Windows
      • RabbitMQ 的安装-Linux
      • RabbitMQ 的安装-Docker
      • RabbitMQ 的插件
      • RabbitMQ 用户
      • HelloWorld 程序
      • WorkQueues
      • 消息应答
      • RabbitMQ 持久化和预取值
      • 发布确认
      • 交换机
      • Topics 交换机
      • 死信队列
      • 延迟队列
      • 延迟插件
        • 下载插件
        • 注意事项
        • 需求
        • 配置类
        • 生产者
        • 消费者
        • 测试
        • 总结
        • 源码
      • 发布确认高级
      • 备份交换机
      • 其他知识点
      • RabbitMQ 集群
      • 镜像队列
      • Haproxy + Keepalive 实现高可用负载均衡
      • Federation
      • Shovel
    • MyCat

    • Lombok

  • SpringMVC

  • SpringBoot

  • Java
  • 主流框架
  • MQ
2023-06-05
目录

延迟插件

# 140.延迟插件

如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列。

为此,RabbitMQ3.5.7 及以后的版本提供了 rabbitmq-delayed-message-exchange 插件来做延时消息任务。 ‍

# 下载插件

我们可以去 GitHub (opens new window) 下载该插件,所有发行版:Releases · rabbitmq/rabbitmq-delayed-message-exchange (opens new window)

注意,插件的版本,最好和 RabbitMQ 版本一致。

一些老旧的版本,githup 上已经不再提供相应的 ez 文件了,只有源码,这样情况下只能下载网上已经编译好的,或者自己编译。 ‍ 读者也可以去我的 百度云网盘 (opens new window) 下载安装包,路径为编程资料/Java 相关/06.主流框架/30.MQ ‍ 然后我们将下载后的 ez 文件,放到安装目录的 plugins 目录下(之前讲安装 RabbitMQ 可视化插件的时候讲过)。然后启用,启用方法和之前启用可视化插件一样,在命令行输入:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
1

‍ 启用后记得重启 RabbitMQ

> rabbitmq-service stop
RabbitMQ 服务正在停止...
RabbitMQ 服务已成功停止。

> rabbitmq-service start
RabbitMQ 服务正在启动 .
RabbitMQ 服务已经启动成功。
1
2
3
4
5
6
7

然后我们访问可视化界面,在新增交换机的时候,可以看到多了一个选项:

这个交换机类型就可以实现延迟消息,也就是由交换机来完成延迟,而不是队列。

# 注意事项

注意,该插件的限制还是有很多的,GitHub (opens new window) 的说明文档也说了:

This plugin adds delayed-messaging (or scheduled-messaging) to RabbitMQ. Its current design has plenty of limitation (documented below), consider using an external scheduler and a data store that fits your needs first.

This plugin badly needs a new design (opens new window) and a reimplementation from the ground up.

If you accept the limitations, please read on.

....

This plugin requires Erlang 23.2 or later versions (opens new window), same as RabbitMQ 3.8.16+. ‍ 大意:目前的设计的有很多的限制的(可以参考下面的文档),可以考虑用一个第三方定时任务框架和数据库来完成你的需求。

这个插件迫切地需要一个新的设计和重构,如果你接受这些限制,请继续。

要求至少 23.2 及之上的 Erlang 版本,RabbitMQ 要求 3.8.16 以上的版本 ‍

# 需求

我们上一篇博客中,架构是这样的:基于死信来完成延迟

‍

当我们使用插件后:实现起来会简单一点,只需一个交换机一个队列即可,真正延迟的地方是在交换机,重点是配置延迟交换机

接下来我们说下需求:很简单,一个交换机一个队列,一个生产者一个消费者

# 配置类

先声明队列、交换机和 routingkey:

private static final String DELAYED_QUEUE_NAME = "delayed.queue";

private static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";

private static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
1
2
3
4
5

‍ 然后声明交换机:

@Bean
public CustomExchange delayedExchange(){
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");   //延迟类型为direct
    return new CustomExchange(
            DELAYED_EXCHANGE_NAME,  //交换机名称
            "x-delayed-message",    //交换机类型
            true,                   //是否持久化
            false,                  //是否自动删除
            args                    //参数
    );
}
1
2
3
4
5
6
7
8
9
10
11
12

注意我们返回的交换机类型是 CustomExchange,而不是直接交换机、删除交换机等,因为延迟交换机是新出现的,因此我们返回一个自定义的交换机。 ‍ 然后是队列和绑定:

//声明队列
@Bean
public Queue delayedQueue(){
    return new Queue(DELAYED_QUEUE_NAME);
}


//绑定
@Bean
public Binding delayedQueueBindingDelayedExchange(
        @Qualifier("delayedQueue") Queue delayedQueue,
        @Qualifier("delayedExchange") CustomExchange delayedExchange){
    return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

完整代码:

package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayedQueueConfig {


    private static final String DELAYED_QUEUE_NAME = "delayed.queue";

    private static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";

    private static final String DELAYED_ROUTING_KEY = "delayed.routingkey";


    //声明队列
    @Bean
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE_NAME);
    }
  
    //声明交换机
    @Bean
    public CustomExchange delayedExchange(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");   //延迟类型为direct
        return new CustomExchange(
                DELAYED_EXCHANGE_NAME,  //交换机名称
                "x-delayed-message",    //交换机类型
                true,                   //是否持久化
                false,                  //是否自动删除
                args                    //参数
        );
    }
  
    //绑定
    @Bean
    public Binding delayedQueueBindingDelayedExchange(Queue delayedQueue, CustomExchange delayedExchange){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

# 生产者

‍ 在 SendMsgController 新增一个方法:

@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendDelayMsg(@PathVariable String message, @PathVariable Integer delayTime){
    log.info("当前时间:{}, 发送一条时长{}毫秒延迟信息给延迟队列delayed.queue:{}" , new Date(), delayTime, message);

    rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingkey", "消息来自delayed.exchange交换机的延迟队列:" + message, msg -> {
        msg.getMessageProperties().setDelay(delayTime);
        return msg;
    });
}
1
2
3
4
5
6
7
8
9

‍

# 消费者

在 consumer 包下新建一个 DelayedQueueConsumer 类:

package com.peterjxl.learnrabbitmq.springbootrabbitmq.comsumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;

@Slf4j
@Component
public class DelayedQueueConsumer {

     @RabbitListener(queues = "delayed.queue")
     public void receiveDelayedQueue(Message message){
         String msg = new String(message.getBody());
         log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), msg);
     }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

‍

# 测试

我们重启服务,然后访问以下路径

http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
1
2

‍ 测试结果符合预期:第二个消息被先消费掉了

当前时间:Wed May 31 07:46:37 CST 2023, 发送一条时长20000毫秒延迟信息给延迟队列delayed.queue:come on baby1
当前时间:Wed May 31 07:46:40 CST 2023, 发送一条时长2000毫秒延迟信息给延迟队列delayed.queue:come on baby2
当前时间:Wed May 31 07:46:42 CST 2023,收到延迟队列的消息:消息来自delayed.exchange交换机的延迟队列:come on baby2
当前时间:Wed May 31 07:46:57 CST 2023,收到延迟队列的消息:消息来自delayed.exchange交换机的延迟队列:come on baby1
1
2
3
4

# 总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用

RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用 Java 自带的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 Kafka 的时间轮,这些方式各有特点,看需要适用的场景 ‍

# 源码

已将源码上传到 Gitee (opens new window) 或 GitHub (opens new window) 上。并且创建了分支 demo2,读者可以通过切换分支来查看本文的示例代码

上次更新: 2025/5/17 12:26:09
延迟队列
发布确认高级

← 延迟队列 发布确认高级→

最近更新
01
学点统计学:轻松识破一本正经的胡说八道
06-05
02
2025 年 5 月记
05-31
03
《贫穷的本质》很棒,但可能不适合你
05-27
更多文章>
Theme by Vdoing | Copyright © 2022-2025 | 粤 ICP 备 2022067627 号 -1 | 粤公网安备 44011302003646 号 | 点击查看十年之约
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式