从01开始 从01开始
首页
  • 计算机科学导论
  • 数字电路
  • 计算机组成原理

    • 计算机组成原理-北大网课
  • 操作系统
  • Linux
  • Docker
  • 计算机网络
  • 计算机常识
  • Git
  • JavaSE
  • Java高级
  • JavaEE

    • Ant
    • Maven
    • Log4j
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • Servlet
  • Spring
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC
  • SpringBoot
  • 学习网课的心得
  • 输入法
  • 节假日TodoList
  • 其他
  • 关于本站
  • 网站日记
  • 友人帐
  • 如何搭建一个博客
GitHub (opens new window)

peterjxl

人生如逆旅,我亦是行人
首页
  • 计算机科学导论
  • 数字电路
  • 计算机组成原理

    • 计算机组成原理-北大网课
  • 操作系统
  • Linux
  • Docker
  • 计算机网络
  • 计算机常识
  • Git
  • JavaSE
  • Java高级
  • JavaEE

    • Ant
    • Maven
    • Log4j
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • Servlet
  • Spring
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC
  • SpringBoot
  • 学习网课的心得
  • 输入法
  • 节假日TodoList
  • 其他
  • 关于本站
  • 网站日记
  • 友人帐
  • 如何搭建一个博客
GitHub (opens new window)
  • JavaSE

  • JavaSenior

  • JavaEE

  • JavaWeb

  • Spring

  • 主流框架

    • Redis

    • Mybatis

    • Lucene

    • Elasticsearch

    • MQ

      • RabbitMQ-尚硅谷
      • 什么是MQ
      • RabbitMQ介绍
      • RabbitMQ的安装-Windows
      • RabbitMQ的安装-Linux
      • RabbitMQ的安装-Docker
      • RabbitMQ的插件
      • RabbitMQ用户
      • HelloWorld程序
      • WorkQueues
      • 消息应答
      • RabbitMQ持久化和预取值
      • 发布确认
      • 交换机
      • Topics交换机
      • 死信队列
      • 延迟队列
        • 使用场景
        • 整合SpringBoot
        • 添加依赖
        • 修改配置文件
        • 配置Swagger
        • 需求
        • 配置文件类
        • 生产者
        • 消费者
        • 测试
        • 优化
        • 源码
      • 延迟插件
      • 发布确认高级
      • 备份交换机
      • 其他知识点
      • RabbitMQ集群
      • 镜像队列
      • Haproxy+Keepalive实现高可用负载均衡
      • Federation
      • Shovel
      • RabbitMQ
    • MyCat

    • Lombok

    • 主流框架
  • SpringMVC

  • SpringBoot

  • Java并发

  • Java源码

  • JVM

  • 韩顺平

  • Java
  • Java
  • 主流框架
  • MQ
2023-06-05
目录

延迟队列

# 130.延迟队列

之前讲安装RabbitMQ可视化插件的时候讲过   ‍

‍

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

‍

‍

# 使用场景

  1. 订单在十分钟之内未支付则自动取消
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
  3. 用户注册成功后,如果三天内没有登陆则进行短信提醒。
  4. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  5. 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

‍

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?

如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。

但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下

其实延迟队列是死信队列的一种,当消息一直没被处理,达到了TTL后,就会被放到死信队列中。

‍

例如这是用户下单后,要在30分钟内完成付款的流程图:

​(https://image.peterjxl.com/blog/image-20230525072635-w79qql7.png)​

‍

‍

# 整合SpringBoot

为了方便演示,我们先整合下SpringBoot。在IDEA中新建一个:

​​(https://image.peterjxl.com/blog/image-20230525080028-222hhg8.png)​​

‍

‍

这里我们不使用SpringBoot3

​(https://image.peterjxl.com/blog/image-20230525080040-y51aad7.png)

‍

为了不让版本造成影响,我们改为使用低版本的SpringBoot

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.11.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
1
2
3
4
5
6

‍

# 添加依赖

‍

<!--RabbitMQ 依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.67_noneautotype2</version>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

<!--swagger-->
<dependency>
    <groupId>io.springfox</groupId>
    <artifactId>springfox-swagger2</artifactId>
    <version>2.9.2</version>
</dependency>

<dependency>
    <groupId>io.springfox</groupId>
    <artifactId>springfox-swagger-ui</artifactId>
    <version>2.9.2</version>
</dependency>

<!--RabbitMQ 测试依赖-->
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

‍

# 修改配置文件

我们修改src/main/resources/application.properties,添加如下内容:

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root123
1
2
3
4

‍

# 配置Swagger

Swagger是一个文档框架,这里我们只需会用就行,新建一个config包并新增代码:

package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;


@Configuration
@EnableSwagger2
public class SwaggerConfig
{@Bean
public Docket webApiConfig(){
    return new Docket(DocumentationType.SWAGGER_2)
            .groupName("webApi")
            .apiInfo(webApiInfo())
            .select()
            .build();
}
    private ApiInfo webApiInfo(){
        return new ApiInfoBuilder()
                .title("rabbitmq 接口文档")
                .description("本文档描述了 rabbitmq 微服务接口定义")
                .version("1.0")
                .contact(new Contact("peterjxl", "https:www.peterjxl.com","perterjxl@qq.com"))
                .build();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

‍

‍

如果使用高版本的SpringBoot话,启动时可能会遇到报错:

Failed to start bean ‘documentationPluginsBootstrapper‘; nested exception is java.lang.NullPointerEx
1

‍

‍

‍

# 需求

在继续演示之前,我们说下需求。

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y(用作延迟交换机),它们的类型都是direct,创建一个死信队列 QD,它们的绑定关系如下:

​(https://image.peterjxl.com/blog/image-20230526075803-dl5f5ts.png)​

‍

也就是为QA和QB都绑定一个死信交换机Y。

‍

# 配置文件类

之前我们整合SpringBoot之前,都是在消费者中声明队列和交换机的;但有了SpringBoot,我们就可以通过配置文件来声明队列和交换机。消费者和生产者各司其职,不用在消费者或生产者中声明了,使得代码更加简单明了

‍

‍

首先新建一个Spring的配置文件类:

package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;

import org.springframework.context.annotation.Configuration;

@Configuration
public class TTLQueueConfig {
}
1
2
3
4
5
6
7

‍

然后我们定义各个交换机和队列的名称:

//普通交换机
private static final String X_EXCHANGE = "X";

//死信交换机
private static final String Y_DEAD_LETTER_EXCHANGE = "Y";

//普通队列
private static final String QUEUE_A = "QA";
private static final String QUEUE_B = "QB";

//死信队列
private static final String DEAD_LETTER_QUEUE = "QD";
1
2
3
4
5
6
7
8
9
10
11
12

‍

然后我们声明两个交换机:

// 声明xExchange
@Bean("xExchange")
public DirectExchange xExchange(){
    return new DirectExchange(X_EXCHANGE);
}

// 声明yExchange
@Bean("yExchange")
public DirectExchange yExchange(){
    return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
1
2
3
4
5
6
7
8
9
10
11

‍

‍

声明QA、QB和死信队列QD

// 声明普通队列A ttl为10s
@Bean("queueA")
public Queue queueA(){
    Map<String, Object> args = new HashMap<>(3);
    // 统一设置队列中的所有消息的过期时间,单位毫秒
    args.put("x-message-ttl", 10000);

    // 统一设置队列的死信交换机
    args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);

    // 统一设置队列的死信routingKey
    args.put("x-dead-letter-routing-key", "YD");

    return (Queue) QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}

// 声明普通队列A ttl为40s
@Bean("queueB")
public Queue queueB(){
    Map<String, Object> args = new HashMap<>(3);
    // 统一设置队列中的所有消息的过期时间,单位毫秒
    args.put("x-message-ttl", 40000);

    // 统一设置队列的死信交换机
    args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);

    // 统一设置队列的死信routingKey
    args.put("x-dead-letter-routing-key", "YD");

    return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}

// 声明死信队列QD
@Bean("queueD")
public Queue queueD(){
    return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

‍

‍

然后是声明交换机和队列的绑定关系:

// 声明QA绑定关系
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){
    return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}

// 声明QB绑定关系
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange){
    return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}

// 声明QD绑定关系
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange){
    return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

‍

‍

# 生产者

我们使用发送HTTP请求的方式,来发送消息。当访问指定的链接,就发送指定的消息。

新建一个Controller类:

package com.peterjxl.learnrabbitmq.springbootrabbitmq.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

}
1
2
3
4
5
6
7
8
9
10
11
12

Slf4j注解是日志的注解

‍

‍

然后我们发送消息:

@Autowired
private RabbitTemplate rabbitTemplate;

// 开始发消息
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
    log.info("当前时间:{}, 发送一条信息给两个TTL队列:{}" , new Date(), message);
    rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列:" + message);
    rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列:" + message);
}
1
2
3
4
5
6
7
8
9
10

‍

首先我们使用RabbitMQ提供的模板对象来发送,因此定义了一个RabbitTemplate对象。

然后log.info,记录了日志;然后使用convertAndSend方法发送消息(第一个参数是交换机的名字,第二个是routing key,第三个是消息)

‍

# 消费者

package com.peterjxl.learnrabbitmq.springbootrabbitmq.comsumer;


import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class DeadLetterQueueConsumer {

     @RabbitListener(queues = "QD")
     public void receiveD(Message message, Channel channel){
         String msg = new String(message.getBody());
         log.info("当前时间:{},收到死信队列的消息:{}", new Date(), msg);
     }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

‍

# 测试

我们访问 localhost:8080/ttl/sendMsg/嘻嘻嘻​,此时页面是空白的(这很正常,我们没写返回值),然后我们可以看到控制台有输出


当前时间:Tue May 30 07:15:47 CST 2023, 发送一条信息给两个TTL队列:嘻嘻嘻
当前时间:Tue May 30 07:15:57 CST 2023,收到死信队列的消息:消息来自ttl为10s的队列:嘻嘻嘻
当前时间:Tue May 30 07:16:27 CST 2023,收到死信队列的消息:消息来自ttl为40s的队列:嘻嘻嘻
1
2
3
4

‍

从时间来看,确实是发了消息后10秒,队列QA消费消息;而QB也是40秒后消费消息

‍

# 优化

第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

解决方法:由生产者发送消息的时候指定TTL。

我们新增一个队列 QC,绑定关系如下,该队列不设置TTL 时间:

​(https://image.peterjxl.com/blog/image-20230530072619-ea0mcg1.png)​

‍

代码:在TTLQueueConfig​中添加如下代码

private static final String QUEUE_C = "QC";


// 声明普通队列C
@Bean("queueC")
public Queue queueC(){
    Map<String, Object> args = new HashMap<>(3);

    // 统一设置队列的死信交换机
    args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);

    // 统一设置队列的死信routingKey
    args.put("x-dead-letter-routing-key", "YD");

    return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
}

 // 声明QC绑定关系
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange){
    return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

‍

‍

在SendMsgController​中添加:

// 开始发消息, 带过期时间
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message, @PathVariable String ttlTime){
    log.info("当前时间:{}, 发送一条时长{}毫秒TTL信息给队列QC:{}" , new Date(), ttlTime, message);
  
    rabbitTemplate.convertAndSend("X", "XC", "消息来自ttl为" + ttlTime + "毫秒的队列:" + message, msg -> {
        msg.getMessageProperties().setExpiration(ttlTime);
        return msg;
    });
}
1
2
3
4
5
6
7
8
9
10

‍

‍

测试:我们重启,然后分别访问如下两个链接

http://localhost:8080/ttl/sendExpirationMsg/你好1/20000,这是20秒

http://localhost:8080/ttl/sendExpirationMsg/你好2/2000,这是2秒

‍

结果:

当前时间:Tue May 30 07:54:49 CST 2023, 发送一条时长20000毫秒TTL信息给队列QC:你好1
当前时间:Tue May 30 07:54:54 CST 2023, 发送一条时长2000毫秒TTL信息给队列QC:你好2
当前时间:Tue May 30 07:55:09 CST 2023,收到死信队列的消息:消息来自ttl为20000毫秒的队列:你好1
当前时间:Tue May 30 07:55:09 CST 2023,收到死信队列的消息:消息来自ttl为2000毫秒的队列:你好2
1
2
3
4

可以看到消息1,确实是20秒后才被消费;但为什么消息2,明明应该是2秒的,也是20秒后被消费呢?

我们之前就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡”,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

‍

‍

# 源码

已将源码上传到Gitee (opens new window)或GitHub (opens new window)上。并且创建了分支demo1,读者可以通过切换分支来查看本文的示例代码

在GitHub上编辑此页 (opens new window)
上次更新: 2023/6/7 08:46:24
死信队列
延迟插件

← 死信队列 延迟插件→

Theme by Vdoing | Copyright © 2022-2023 粤ICP备2022067627号-1 粤公网安备 44011302003646号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式