死信队列
# 120.死信队列
死信,顾名思义就是无法被消费的消息。 一般来说,producer发送消息,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列
# 应用场景
- 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中
- 用户在商城下单成功,并点击去支付后,在指定时间未支付时,订单自动失效
当放到死信队列后,后续还有机会能取出来消费
# 死信的来源
- 消息 TTL 过期(例如一定时间内未支付,就认为订单失败)
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝 (
basic.reject 或 basic.nack
) 并且requeue=false.
# 需求
下面我们来实践下。需求如下
- 首先是一个生产者,两个消费者
- 生产者发送消息给交换机
- 交换机将消息放到队列中
- 正常的消息由C1消费
- 由于一些问题,发生了死信
- 死信会被放到一个死信交换机
- 死信队列由C2消费
# 消费者1
我们先声明几个交换机和队列的名字,然后写一个消费消息的代码:
package com.peterjxl.rabbitmq.demo9;
import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;
public class Consumer01 {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String DEAD_EXCHANGE = "dead_exchange";
public static final String NORMAL_QUEUE = "normal_queue";
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtils.getChannel();
channel.basicConsume(NORMAL_QUEUE, true, (consumerTag, message) -> {
System.out.println("Consumer01接收到消息:" + new String(message.getBody()));
}, consumerTag -> {});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
然后我们声明交换机:
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
2
声明队列:注意死信队列和我们之前的定义方式不一样,因为要正常队列中的信息,通过转换后才能转发给死信队列,需要设置一些参数。
// 要传入队列配置对象
Map<String, Object> arguments = new HashMap<>();
// 过期时间,单位毫秒,这里设置10秒。
arguments.put("x-message-ttl", 10000);
// 正常队列设置死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 正常队列设置死信routing-key
arguments.put("x-dead-letter-routing-key", "lisi");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
2
3
4
5
6
7
8
9
10
11
12
13
14
其实我们还可以设置队列的过期时间:
// 过期时间,单位毫秒,这里设置10秒。
arguments.put("x-message-ttl", 10000);
2
不仅仅是队列可以设置过期时间TTL,生产者也可以设置TTL,例如可以每次都不同的TTL;而队列不能修改TTL,所以通常是生产者设置TTL
绑定交换机和队列:
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
2
# 生产者
此时我们发送消息的时候,就需要指定参数了。先设置下:
// 设置消息的TTL时间, 单位是ms
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
2
3
发送消息的时候指定:
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
完整代码:我们发送十个信息:
package com.peterjxl.rabbitmq.demo9;
import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
public class ProducerDemo9 {
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
// 设置消息的TTL时间, 单位是ms
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
// 发送死信消息,设置TTL
channel.basicPublish(NORMAL_EXCHANGE, "lisi", null, "这是一条消息".getBytes());
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 测试
我们先启动下消费者,创建好交换机和队列,然后停止;
停止消费者后,可以看到后台有这两个队列:
可以看到有DLX和DLK:
DLX:x-dead-letter-exchange说明该队列配置了死信交换机,
DLK:全称x-dead-letter-routing-key,也就是有设置死信消息的routing key
也可以看到有交换机:
正常队列和死信队列的绑定状态也正常:
然后启动生产者,由于没有消费者,因此过了TTL后,就会有死信。我们看到的现象应该是这样:
- 首先正常队列中有10个消息
- 随着时间的推移,消息一个个的过期,然后就会转发到死信队列中
- 最后所有消息都在死信队列中
实验结果:
PS:由于我们10个消息都是很快发送完的,所以可能看不到正常队列逐个减少、死信队列逐个增多的情况
# 消费者2
消费者2就是消费死信队列的消息即可,非常简单
package com.peterjxl.rabbitmq.demo9;
import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;
public class Consumer02 {
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtils.getChannel();
channel.basicConsume(DEAD_QUEUE, true, (consumerTag, message) -> {
System.out.println("Consumer02接收到消息:" + new String(message.getBody()));
}, consumerTag -> {});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
运行结果:
Consumer02接收到消息:info1
Consumer02接收到消息:info2
Consumer02接收到消息:info3
Consumer02接收到消息:info4
Consumer02接收到消息:info5
Consumer02接收到消息:info6
Consumer02接收到消息:info7
Consumer02接收到消息:info8
Consumer02接收到消息:info9
Consumer02接收到消息:info10
2
3
4
5
6
7
8
9
10
消费者1的代码最复杂,定义了交换机、队列和绑定关系等,而生产者只需发送消息,消费者2只需消费死信队列的消息。
# 队列达到最大长度
之前我们仅仅演示了TTL的情况导致死信,接下来我们演示其他两种情况,首先是队列达到最大长度
我们在Consumer01
里添加如下配置:
// 设置正常队列的长度限制
arguments.put("x-max-length", 6);
2
注意:由于我们修改了队列的参数,因此得先删除,然后再重新运行。此时能看到有个Lim的字眼,这是Limit的缩写。
然后我们开始发送消息,这里我们就不设置TTL了,修改ProducerDemo9
的发送消息代码,改为传入null:
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, message.getBytes());
然后我们先停止消费者,然后启动生产者,这样消息就会积压在正常队列中,然后剩下4个消息就会被转发到死信队列:
# 消息被拒
还剩下一个场景:消费者拒绝消息,此时信息也会变成死信
为了不让之前的消息影响到接下来的实验,我们先启动消费者1和2,处理掉队列中的信息,处理完后队列是空的:
然后我们将限制队列长度的代码注释掉,然后我们在后台删掉这个队列。
假设我们要拒绝 info5
这个消息:
channel.basicConsume(NORMAL_QUEUE, true, (consumerTag, message) -> {
String msg = new String(message.getBody());
if(msg.equals("info5")){
System.out.println("Consumer01接收到消息:" + msg + ",此消息被拒绝");
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("Consumer01接收到消息:" + msg);
}
}, consumerTag -> {});
2
3
4
5
6
7
8
9
然后我们修改为开启手动应答(不批量):
channel.basicConsume(NORMAL_QUEUE, false, (consumerTag, message) -> {
String msg = new String(message.getBody());
if(msg.equals("info5")){
System.out.println("Consumer01接收到消息:" + msg + ",此消息被拒绝");
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("Consumer01接收到消息:" + msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
}, consumerTag -> {});
2
3
4
5
6
7
8
9
10
此时我们启动消费者1,然后再启动生产者:能看到死信队列有一个消息
我们点进去dead_quque,然后可以获取信息:确实是info5被拒绝后放到死信队列了
消费者1的输出:
Consumer01接收到消息:info1
Consumer01接收到消息:info2
Consumer01接收到消息:info3
Consumer01接收到消息:info4
Consumer01接收到消息:info5,此消息被拒绝
Consumer01接收到消息:info6
Consumer01接收到消息:info7
Consumer01接收到消息:info8
Consumer01接收到消息:info9
Consumer01接收到消息:info10
2
3
4
5
6
7
8
9
10
启动消费者2的输出:
Consumer02接收到消息:info5
# 源码
已将源码上传到Gitee (opens new window)或GitHub (opens new window)上。并且创建了分支demo9,读者可以通过切换分支来查看本文的示例代码