延迟插件
# 140.延迟插件
如果不能实现在消息粒度上的 TTL,并使其在设置的TTL 时间及时死亡,就无法设计成一个通用的延时队列。
为此,RabbitMQ3.5.7及以后的版本提供了 rabbitmq-delayed-message-exchange 插件来做延时消息任务。
# 下载插件
我们可以去GitHub (opens new window)下载该插件,所有发行版:Releases · rabbitmq/rabbitmq-delayed-message-exchange (opens new window)
注意,插件的版本,最好和RabbitMQ版本一致。
一些老旧的版本,githup上已经不再提供相应的ez文件了,只有源码,这样情况下只能下载网上已经编译好的,或者自己编译。
读者也可以去我的百度云网盘 (opens new window)下载安装包,路径为编程资料/Java相关/06.主流框架/30.MQ
然后我们将下载后的ez文件,放到安装目录的plugins目录下(之前讲安装RabbitMQ可视化插件的时候讲过)。然后启用,启用方法和之前启用可视化插件一样,在命令行输入:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
启用后记得重启RabbitMQ
> rabbitmq-service stop
RabbitMQ 服务正在停止...
RabbitMQ 服务已成功停止。
> rabbitmq-service start
RabbitMQ 服务正在启动 .
RabbitMQ 服务已经启动成功。
2
3
4
5
6
7
然后我们访问可视化界面,在新增交换机的时候,可以看到多了一个选项:
这个交换机类型就可以实现延迟消息,也就是由交换机来完成延迟,而不是队列。
# 注意事项
注意,该插件的限制还是有很多的,GitHub (opens new window)的说明文档也说了:
This plugin adds delayed-messaging (or scheduled-messaging) to RabbitMQ. Its current design has plenty of limitation (documented below), consider using an external scheduler and a data store that fits your needs first.
This plugin badly needs a new design (opens new window) and a reimplementation from the ground up.
If you accept the limitations, please read on.
....
This plugin requires Erlang 23.2 or later versions (opens new window), same as RabbitMQ 3.8.16+.
大意:目前的设计的有很多的限制的(可以参考下面的文档),可以考虑用一个第三方定时任务框架和数据库来完成你的需求。
这个插件迫切地需要一个新的设计和重构,如果你接受这些限制,请继续。
要求至少23.2及之上的Erlang版本,RabbitMQ要求3.8.16以上的版本
# 需求
我们上一篇博客中,架构是这样的:基于死信来完成延迟
当我们使用插件后:实现起来会简单一点,只需一个交换机一个队列即可,真正延迟的地方是在交换机,重点是配置延迟交换机
接下来我们说下需求:很简单,一个交换机一个队列,一个生产者一个消费者
# 配置类
先声明队列、交换机和routingkey:
private static final String DELAYED_QUEUE_NAME = "delayed.queue";
private static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
private static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
2
3
4
5
然后声明交换机:
@Bean
public CustomExchange delayedExchange(){
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); //延迟类型为direct
return new CustomExchange(
DELAYED_EXCHANGE_NAME, //交换机名称
"x-delayed-message", //交换机类型
true, //是否持久化
false, //是否自动删除
args //参数
);
}
2
3
4
5
6
7
8
9
10
11
12
注意我们返回的交换机类型是CustomExchange
,而不是直接交换机、删除交换机等,因为延迟交换机是新出现的,因此我们返回一个自定义的交换机。
然后是队列和绑定:
//声明队列
@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}
//绑定
@Bean
public Binding delayedQueueBindingDelayedExchange(
@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange){
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
完整代码:
package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedQueueConfig {
private static final String DELAYED_QUEUE_NAME = "delayed.queue";
private static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
private static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
//声明队列
@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}
//声明交换机
@Bean
public CustomExchange delayedExchange(){
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct"); //延迟类型为direct
return new CustomExchange(
DELAYED_EXCHANGE_NAME, //交换机名称
"x-delayed-message", //交换机类型
true, //是否持久化
false, //是否自动删除
args //参数
);
}
//绑定
@Bean
public Binding delayedQueueBindingDelayedExchange(Queue delayedQueue, CustomExchange delayedExchange){
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 生产者
在SendMsgController
新增一个方法:
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendDelayMsg(@PathVariable String message, @PathVariable Integer delayTime){
log.info("当前时间:{}, 发送一条时长{}毫秒延迟信息给延迟队列delayed.queue:{}" , new Date(), delayTime, message);
rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingkey", "消息来自delayed.exchange交换机的延迟队列:" + message, msg -> {
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
2
3
4
5
6
7
8
9
# 消费者
在consumer包下新建一个DelayedQueueConsumer
类:
package com.peterjxl.learnrabbitmq.springbootrabbitmq.comsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class DelayedQueueConsumer {
@RabbitListener(queues = "delayed.queue")
public void receiveDelayedQueue(Message message){
String msg = new String(message.getBody());
log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), msg);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 测试
我们重启服务,然后访问以下路径
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
2
测试结果符合预期:第二个消息被先消费掉了
当前时间:Wed May 31 07:46:37 CST 2023, 发送一条时长20000毫秒延迟信息给延迟队列delayed.queue:come on baby1
当前时间:Wed May 31 07:46:40 CST 2023, 发送一条时长2000毫秒延迟信息给延迟队列delayed.queue:come on baby2
当前时间:Wed May 31 07:46:42 CST 2023,收到延迟队列的消息:消息来自delayed.exchange交换机的延迟队列:come on baby2
当前时间:Wed May 31 07:46:57 CST 2023,收到延迟队列的消息:消息来自delayed.exchange交换机的延迟队列:come on baby1
2
3
4
# 总结
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用
RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用 Java 自带的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 Kafka 的时间轮,这些方式各有特点,看需要适用的场景
# 源码
已将源码上传到Gitee (opens new window)或GitHub (opens new window)上。并且创建了分支demo2,读者可以通过切换分支来查看本文的示例代码