发布确认高级
# 150.发布确认高级
在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。
于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?
# 确认机制
为此,我们可以加一个缓存,生产者先发消息给缓存,由缓存交给 MQ;当 MQ 宕机后,缓存会定期尝试重发,直到重发成功,架构图:
此时 RabbitMQ 宕机有几种情况:交换机不存在,队列不存在,或者都不存在
代码架构图:
我们先模拟交换机收不到消息的情况。我们先写一个简单的生产者和消费者,等链路是通的之后,再模拟交换机宕机的情况。
# 添加配置
声明交换机,队列和绑定关系:
package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 发布确认高级
*/
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String CONFIRM_ROUTING_KEY = "key1";
// 声明交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
@Bean("confirmQueue")
public Queue confirmQueue() {
return new Queue(CONFIRM_QUEUE_NAME);
}
// 绑定
@Bean
public Binding bindingConfirmQueue(
@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange) {
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 生产者
package com.peterjxl.learnrabbitmq.springbootrabbitmq.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 开始发消息, 测试确认
@GetMapping("/sendMessage/{message}")
public void sendConfirmMsg(@PathVariable String message){
rabbitTemplate.convertAndSend("confirm.exchange", "key1", message);
log.info("发送时间: {}, 发送内容: {}", new Date(), message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 消费者
package com.peterjxl.learnrabbitmq.springbootrabbitmq.comsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class ConfirmConsumer {
@RabbitListener(queues = "confirm.queue")
public void receiveConfirmMessage(Message msg) {
log.info("接收时间: {}, 接收内容: {}", new Date(), new String(msg.getBody()));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 测试
http://localhost: 8080/confirm/sendMessage/大家好 1
控制台输出:
发送时间: Wed May 31 19:35:48 CST 2023, 发送内容: 大家好1
接收时间: Wed May 31 19:35:48 CST 2023, 接收内容: 大家好1
2
可以看到是正常的,接下来我们模拟出问题的情况。
# 回调
虽然一般不会出问题(后续我们会搭建 RabbitMQ 集群),但还是得先预防,为此我们得定义一个回调方法,我们发送消息后, 不管 RabbitMQ 是否响应,都会调用该回调方法,然后我们判断是否成功,如果失败则需要保存消息,待后续继续发送。
在 RabbitTemplate 的源码中,就有定义该接口:ConfirmCallback
public class RabbitTemplate extends RabbitAccessor implements BeanFactoryAware, RabbitOperations, MessageListener, ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {
//..............
@Nullable
private ConfirmCallback confirmCallback;
@FunctionalInterface
public interface ConfirmCallback {
void confirm(@Nullable CorrelationData var1, boolean var2, @Nullable String var3);
}
//..............
}
2
3
4
5
6
7
8
9
10
11
12
13
该接口是一个函数式接口,里面只有一个方法 confirm,该方法有 3 个参数:
- var1:发送的内容
- var2:是否发送成功
- var3:失败的原因
所以我们实现该接口,并定义回调方法:
package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback{
// 交换机确认回调方法
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : null;
if(ack){
log.info("交换机已经收到Id为: {} 的消息", id);
}
else{
log.info("交换机还未收到Id为: {} 的消息,由于原因: {}", id, cause);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
我们定义了该接口,但是我们并没有注入到 RabbitTemplate 对象中,所以当我们调用 RabbitTemplate 的回调接口时,还是调用原本的。因此我们得注入:
public class MyCallBack implements RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
// 注入
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
}
2
3
4
5
6
7
8
9
10
11
12
小结下步骤:
- 定义一个类实现该接口
- 注入 RabbitTemplate 对象
- 使用 PostConstruct 注解,在所有
@Component
注解之后执行 init 方法,完成自定义接口的配置
# 启用回调的配置
除了自定义回调方法外,还需在配置文件中启用该配置(application.properties 添加第 5 行):
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root123
spring.rabbitmq.publisher-confirm-type=correlated
2
3
4
5
publisher-confirm-type 有 3 种取值:
NONE:禁用发布确认模式,是默认值
CORRELATED:发布消息成功到交换器后会触发回调方法
SIMPLE:经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑
要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
有点类似同步确认,发送一条消息就确认一条
# 修改生产者
回调方法中,有个参数是 correlationData
,该参数并不是凭空出现的,而是我们自己定义的。在我们发送消息的时候,convertAndSend
方法有很多个重载,其中有一个是这样的:
该方法需要我们传一个 CorrelationData
参数:
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 开始发消息, 测试确认
@GetMapping("/sendMessage/{message}")
public void sendConfirmMsg(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend("confirm.exchange", "key1", message, correlationData);
log.info("发送时间: {}, 发送内容: {}", new Date(), message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
我们重启,重新访问 http://localhost: 8080/confirm/sendMessage/大家好 1,控制台输出:
发送时间: Wed May 31 20:18:37 CST 2023, 发送内容: 大家好1
交换机已经收到Id为: 1 的消息
接收时间: Wed May 31 20:18:37 CST 2023, 接收内容: 大家好1
2
3
也就是正常调用了我们的回调方法。
接下来我们模拟发送失败的情况:我们故意写错交换机的名字(加个 123)
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
// 开始发消息, 测试确认
@GetMapping("/sendMessage/{message}")
public void sendConfirmMsg(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend("confirm.exchange123", "key1", message, correlationData);
log.info("发送时间: {}, 发送内容: {}", new Date(), message);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
控制台输出:可以看到有相关的报错信息,没有交换机名为“confirm.exchange123”
发送时间: Wed May 31 20:19:57 CST 2023, 发送内容: 大家好1
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm.exchange123' in vhost '/', class-id=60, method-id=40)
交换机还未收到Id为: 1 的消息,由于原因: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm.exchange123' in vhost '/', class-id=60, method-id=40)
2
3
同理,我们故意写错 routing key 试试。这里发送 2 条消息,并且我们设置每条消息的内容都不一样(加上了 routing key)
@GetMapping("/sendMessage/{message}")
public void sendConfirmMsg(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend("confirm.exchange", "key1", message + "key1", correlationData);
log.info("发送时间: {}, 发送内容: {}, routing: key1", new Date(), message);
CorrelationData correlationData2 = new CorrelationData("2");
rabbitTemplate.convertAndSend("confirm.exchange", "key123", message+ "key12", correlationData2);
log.info("发送时间: {}, 发送内容: {}, routing: key2", new Date(), message);
}
2
3
4
5
6
7
8
9
10
11
测试结果:
发送时间: Wed May 31 20:26:57 CST 2023, 发送内容: 大家好1, routing: key1
发送时间: Wed May 31 20:26:57 CST 2023, 发送内容: 大家好1, routing: key2
交换机已经收到Id为: 2 的消息
交换机已经收到Id为: 1 的消息
接收时间: Wed May 31 20:26:57 CST 2023, 接收内容: 大家好1key1
2
3
4
5
可以看到交换机收到了 2 个消息,但是由于有一个 routing key 写错了,消费者只收到了一个。
这是有问题的,因为明明该交换机没成功发送消息(队列那一步出错了),但是没有回调,这是因为我们只定义了交换机的回调。
# 回退消息
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。
可以通过设置 mandatory 参数,在当消息传递过程中不可达目的地时,将消息返回给生产者。我们启用回退的设置,然后定义一个回退接口。
在 application.properties 中添加:
spring.rabbitmq.publisher-returns=true
在 MyCallBack 中,新实现一个接口 ReturnCallback,并定义回调方法,然后注入:
package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
// 注入
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
// 交换机确认回调方法
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : null;
if(ack){
log.info("交换机已经收到Id为: {} 的消息", id);
}
else{
log.info("交换机还未收到Id为: {} 的消息,由于原因: {}", id, cause);
}
}
// 可以在消息传递过程中不可达目的地时将消息返回给生产者
// 只有不可达目的地的时候才会回调
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routinyKey) {
log.error("消息: {}, 被交换机: {} 退回, 退回原因: {}, 路由key: {}", new String(message.getBody()), exchange, replyText, routinyKey);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
测试结果:
发送时间: Wed May 31 20:52:07 CST 2023, 发送内容: 大家好1, routing: key1
交换机已经收到Id为: 1 的消息
发送时间: Wed May 31 20:52:07 CST 2023, 发送内容: 大家好1, routing: key2
消息: 大家好1key12, 被交换机: confirm.exchange 退回, 退回原因: NO_ROUTE, 路由key: key123
交换机已经收到Id为: 2 的消息
接收时间: Wed May 31 20:52:07 CST 2023, 接收内容: 大家好1key1
2
3
4
5
6
可以看到发送了 2 个消息(第 1 和第 3 行)
交换机也收到了 2 个消息(第 2 和第 5 行)
消费者收到一个消息(第 6 行)
被回退一个消息(第 4 行)
# 源码
已将源码上传到 Gitee (opens new window) 或 GitHub (opens new window) 上。并且创建了分支 demo3,读者可以通过切换分支来查看本文的示例代码