从 01 开始 从 01 开始
首页
  • 📚 计算机基础

    • 计算机简史
    • 数字电路
    • 计算机组成原理
    • 操作系统
    • Linux
    • 计算机网络
    • 数据库
    • 编程工具
    • 装机
  • 🎨 前端

    • Node
  • JavaSE
  • Java 高级
  • JavaEE

    • 构建、依赖管理
    • Ant
    • Maven
    • 日志框架
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • 环境管理和配置管理-科普篇
    • Servlet
  • Spring

    • Spring基础
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC

    • SpringMVC 基础
  • SpringBoot

    • SpringBoot 基础
  • Windows 使用技巧
  • 手机相关技巧
  • 最全面的输入法教程
  • 最全面的浏览器教程
  • Office
  • 图片类工具
  • 效率类工具
  • 最全面的 RSS 教程
  • 码字工具
  • 各大平台
  • 校招
  • 五险一金
  • 职场规划
  • 关于离职
  • 杂谈
  • 自媒体
  • 📖 读书

    • 读书工具
    • 走进科学
  • 🌍 英语

    • 从零开始学英语
    • 英语兔的相关视频
    • Larry 想做技术大佬的相关视频
  • 🏛️ 政治

    • 新闻合订本
    • 反腐
    • GFW
    • 404 内容
    • 审查与自我审查
    • 互联网
    • 战争
    • 读书笔记
  • 💰 经济

    • 关于税
    • 理财
  • 💪 健身

    • 睡眠
    • 皮肤
    • 口腔健康
    • 学会呼吸
    • 健身日志
  • 🏠 其他

    • 驾驶技能
    • 租房与买房
    • 厨艺
  • 电影

    • 电影推荐
  • 电视剧
  • 漫画

    • 漫画软件
    • 漫画推荐
  • 游戏

    • Steam
    • 三国杀
    • 求生之路
  • 小说
  • 关于本站
  • 关于博主
  • 打赏
  • 网站动态
  • 友人帐
  • 从零开始搭建博客
  • 搭建邮件服务器
  • 本站分享
  • 🌈 生活

    • 2022
    • 2023
    • 2024
    • 2025
  • 📇 文章索引

    • 文章分类
    • 文章归档

晓林

程序猿,自由职业者,博主,英语爱好者,健身达人
首页
  • 📚 计算机基础

    • 计算机简史
    • 数字电路
    • 计算机组成原理
    • 操作系统
    • Linux
    • 计算机网络
    • 数据库
    • 编程工具
    • 装机
  • 🎨 前端

    • Node
  • JavaSE
  • Java 高级
  • JavaEE

    • 构建、依赖管理
    • Ant
    • Maven
    • 日志框架
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • 环境管理和配置管理-科普篇
    • Servlet
  • Spring

    • Spring基础
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC

    • SpringMVC 基础
  • SpringBoot

    • SpringBoot 基础
  • Windows 使用技巧
  • 手机相关技巧
  • 最全面的输入法教程
  • 最全面的浏览器教程
  • Office
  • 图片类工具
  • 效率类工具
  • 最全面的 RSS 教程
  • 码字工具
  • 各大平台
  • 校招
  • 五险一金
  • 职场规划
  • 关于离职
  • 杂谈
  • 自媒体
  • 📖 读书

    • 读书工具
    • 走进科学
  • 🌍 英语

    • 从零开始学英语
    • 英语兔的相关视频
    • Larry 想做技术大佬的相关视频
  • 🏛️ 政治

    • 新闻合订本
    • 反腐
    • GFW
    • 404 内容
    • 审查与自我审查
    • 互联网
    • 战争
    • 读书笔记
  • 💰 经济

    • 关于税
    • 理财
  • 💪 健身

    • 睡眠
    • 皮肤
    • 口腔健康
    • 学会呼吸
    • 健身日志
  • 🏠 其他

    • 驾驶技能
    • 租房与买房
    • 厨艺
  • 电影

    • 电影推荐
  • 电视剧
  • 漫画

    • 漫画软件
    • 漫画推荐
  • 游戏

    • Steam
    • 三国杀
    • 求生之路
  • 小说
  • 关于本站
  • 关于博主
  • 打赏
  • 网站动态
  • 友人帐
  • 从零开始搭建博客
  • 搭建邮件服务器
  • 本站分享
  • 🌈 生活

    • 2022
    • 2023
    • 2024
    • 2025
  • 📇 文章索引

    • 文章分类
    • 文章归档
  • JavaSE

  • JavaSenior

  • JavaEE

  • JavaWeb

  • Spring

  • 主流框架

    • Redis

    • Mybatis

    • Lucene

    • Elasticsearch

    • MQ

      • RabbitMQ-尚硅谷
      • 什么是 MQ
      • RabbitMQ 介绍
      • RabbitMQ 的安装-Windows
      • RabbitMQ 的安装-Linux
      • RabbitMQ 的安装-Docker
      • RabbitMQ 的插件
      • RabbitMQ 用户
      • HelloWorld 程序
      • WorkQueues
      • 消息应答
      • RabbitMQ 持久化和预取值
      • 发布确认
      • 交换机
      • Topics 交换机
      • 死信队列
      • 延迟队列
      • 延迟插件
      • 发布确认高级
        • 确认机制
        • 添加配置
        • 生产者
        • 消费者
        • 测试
        • 回调
        • 启用回调的配置
        • 修改生产者
        • 回退消息
        • 源码
      • 备份交换机
      • 其他知识点
      • RabbitMQ 集群
      • 镜像队列
      • Haproxy + Keepalive 实现高可用负载均衡
      • Federation
      • Shovel
    • MyCat

    • Lombok

  • SpringMVC

  • SpringBoot

  • Java
  • 主流框架
  • MQ
2023-06-05
目录

发布确认高级

# 150.发布确认高级

在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。

于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?

# 确认机制

为此,我们可以加一个缓存,生产者先发消息给缓存,由缓存交给 MQ;当 MQ 宕机后,缓存会定期尝试重发,直到重发成功,架构图:

‍

此时 RabbitMQ 宕机有几种情况:交换机不存在,队列不存在,或者都不存在 ‍ 代码架构图:

我们先模拟交换机收不到消息的情况。我们先写一个简单的生产者和消费者,等链路是通的之后,再模拟交换机宕机的情况。 ‍

# 添加配置

声明交换机,队列和绑定关系:

package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 发布确认高级
 */
@Configuration
public class ConfirmConfig {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    public static final String CONFIRM_ROUTING_KEY = "key1";

    // 声明交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return new Queue(CONFIRM_QUEUE_NAME);
    }

    // 绑定
    @Bean
    public Binding bindingConfirmQueue(
            @Qualifier("confirmQueue")    Queue confirmQueue,
            @Qualifier("confirmExchange") DirectExchange confirmExchange) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 生产者

‍

package com.peterjxl.learnrabbitmq.springbootrabbitmq.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;


@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    // 开始发消息, 测试确认
    @GetMapping("/sendMessage/{message}")
    public void sendConfirmMsg(@PathVariable String message){
        rabbitTemplate.convertAndSend("confirm.exchange", "key1", message);
        log.info("发送时间: {}, 发送内容: {}", new Date(), message);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

‍

# 消费者

package com.peterjxl.learnrabbitmq.springbootrabbitmq.comsumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Slf4j
@Component
public class ConfirmConsumer {

    @RabbitListener(queues = "confirm.queue")
    public void receiveConfirmMessage(Message msg) {
        log.info("接收时间: {}, 接收内容: {}", new Date(), new String(msg.getBody()));
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

‍

# 测试

http://localhost: 8080/confirm/sendMessage/大家好 1 ‍ 控制台输出:

发送时间: Wed May 31 19:35:48 CST 2023, 发送内容: 大家好1
接收时间: Wed May 31 19:35:48 CST 2023, 接收内容: 大家好1
1
2

‍ 可以看到是正常的,接下来我们模拟出问题的情况。 ‍

# 回调

虽然一般不会出问题(后续我们会搭建 RabbitMQ 集群),但还是得先预防,为此我们得定义一个回调方法,我们发送消息后, 不管 RabbitMQ 是否响应,都会调用该回调方法,然后我们判断是否成功,如果失败则需要保存消息,待后续继续发送。 ‍ 在 RabbitTemplate 的源码中,就有定义该接口:ConfirmCallback

public class RabbitTemplate extends RabbitAccessor implements BeanFactoryAware, RabbitOperations, MessageListener, ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {

    //..............
    @Nullable
    private ConfirmCallback confirmCallback;
  
    @FunctionalInterface
    public interface ConfirmCallback {
        void confirm(@Nullable CorrelationData var1, boolean var2, @Nullable String var3);
    }

    //..............
}
1
2
3
4
5
6
7
8
9
10
11
12
13

‍ 该接口是一个函数式接口,里面只有一个方法 confirm,该方法有 3 个参数:

  • var1:发送的内容
  • var2:是否发送成功
  • var3:失败的原因 ‍ 所以我们实现该接口,并定义回调方法:
package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback{

  
    // 交换机确认回调方法
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : null;
        if(ack){
            log.info("交换机已经收到Id为: {} 的消息", id);
        }
        else{
            log.info("交换机还未收到Id为: {} 的消息,由于原因: {}", id, cause);
        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

我们定义了该接口,但是我们并没有注入到 RabbitTemplate 对象中,所以当我们调用 RabbitTemplate 的回调接口时,还是调用原本的。因此我们得注入:

public class MyCallBack implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 注入
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12

‍ 小结下步骤:

  1. 定义一个类实现该接口
  2. 注入 RabbitTemplate 对象
  3. 使用 PostConstruct 注解,在所有 @Component 注解之后执行 init 方法,完成自定义接口的配置 ‍

# 启用回调的配置

除了自定义回调方法外,还需在配置文件中启用该配置(application.properties 添加第 5 行):

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root123
spring.rabbitmq.publisher-confirm-type=correlated
1
2
3
4
5

publisher-confirm-type 有 3 种取值:

  • NONE:禁用发布确认模式,是默认值

  • CORRELATED:发布消息成功到交换器后会触发回调方法

  • SIMPLE:经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑

    要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker

    有点类似同步确认,发送一条消息就确认一条

# 修改生产者

回调方法中,有个参数是 correlationData,该参数并不是凭空出现的,而是我们自己定义的。在我们发送消息的时候,convertAndSend 方法有很多个重载,其中有一个是这样的:

‍

该方法需要我们传一个 CorrelationData 参数:

public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    // 开始发消息, 测试确认
    @GetMapping("/sendMessage/{message}")
    public void sendConfirmMsg(@PathVariable String message){
        CorrelationData correlationData = new CorrelationData("1");

        rabbitTemplate.convertAndSend("confirm.exchange", "key1", message, correlationData);
        log.info("发送时间: {}, 发送内容: {}", new Date(), message);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

我们重启,重新访问 http://localhost: 8080/confirm/sendMessage/大家好 1,控制台输出:

发送时间: Wed May 31 20:18:37 CST 2023, 发送内容: 大家好1
交换机已经收到Id为: 1 的消息
接收时间: Wed May 31 20:18:37 CST 2023, 接收内容: 大家好1
1
2
3

‍ 也就是正常调用了我们的回调方法。 ‍ 接下来我们模拟发送失败的情况:我们故意写错交换机的名字(加个 123)

public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    // 开始发消息, 测试确认
    @GetMapping("/sendMessage/{message}")
    public void sendConfirmMsg(@PathVariable String message){
        CorrelationData correlationData = new CorrelationData("1");

        rabbitTemplate.convertAndSend("confirm.exchange123", "key1", message, correlationData);
        log.info("发送时间: {}, 发送内容: {}", new Date(), message);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

‍ 控制台输出:可以看到有相关的报错信息,没有交换机名为“confirm.exchange123”

发送时间: Wed May 31 20:19:57 CST 2023, 发送内容: 大家好1
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm.exchange123' in vhost '/', class-id=60, method-id=40)
交换机还未收到Id为: 1 的消息,由于原因: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm.exchange123' in vhost '/', class-id=60, method-id=40)
1
2
3

同理,我们故意写错 routing key 试试。这里发送 2 条消息,并且我们设置每条消息的内容都不一样(加上了 routing key)

@GetMapping("/sendMessage/{message}")
public void sendConfirmMsg(@PathVariable String message){
    CorrelationData correlationData = new CorrelationData("1");
    rabbitTemplate.convertAndSend("confirm.exchange", "key1", message + "key1", correlationData);
    log.info("发送时间: {}, 发送内容: {}, routing: key1", new Date(), message);


    CorrelationData correlationData2 = new CorrelationData("2");
    rabbitTemplate.convertAndSend("confirm.exchange", "key123", message+ "key12", correlationData2);
    log.info("发送时间: {}, 发送内容: {}, routing: key2", new Date(), message);
}
1
2
3
4
5
6
7
8
9
10
11

测试结果:

发送时间: Wed May 31 20:26:57 CST 2023, 发送内容: 大家好1, routing: key1
发送时间: Wed May 31 20:26:57 CST 2023, 发送内容: 大家好1, routing: key2
交换机已经收到Id为: 2 的消息
交换机已经收到Id为: 1 的消息
接收时间: Wed May 31 20:26:57 CST 2023, 接收内容: 大家好1key1
1
2
3
4
5

可以看到交换机收到了 2 个消息,但是由于有一个 routing key 写错了,消费者只收到了一个。

这是有问题的,因为明明该交换机没成功发送消息(队列那一步出错了),但是没有回调,这是因为我们只定义了交换机的回调。

# 回退消息

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

可以通过设置 mandatory 参数,在当消息传递过程中不可达目的地时,将消息返回给生产者。我们启用回退的设置,然后定义一个回退接口。 ‍ 在 application.properties 中添加:

spring.rabbitmq.publisher-returns=true
1

‍ 在 MyCallBack 中,新实现一个接口 ReturnCallback,并定义回调方法,然后注入:

package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;


@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 注入
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    // 交换机确认回调方法
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : null;
        if(ack){
            log.info("交换机已经收到Id为: {} 的消息", id);
        }
        else{
            log.info("交换机还未收到Id为: {} 的消息,由于原因: {}", id, cause);
        }
    }

    // 可以在消息传递过程中不可达目的地时将消息返回给生产者
    // 只有不可达目的地的时候才会回调
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routinyKey) {
        log.error("消息: {}, 被交换机: {} 退回, 退回原因: {}, 路由key: {}", new String(message.getBody()), exchange, replyText, routinyKey);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

测试结果:

发送时间: Wed May 31 20:52:07 CST 2023, 发送内容: 大家好1, routing: key1
交换机已经收到Id为: 1 的消息
发送时间: Wed May 31 20:52:07 CST 2023, 发送内容: 大家好1, routing: key2
消息: 大家好1key12, 被交换机: confirm.exchange 退回, 退回原因: NO_ROUTE, 路由key: key123
交换机已经收到Id为: 2 的消息
接收时间: Wed May 31 20:52:07 CST 2023, 接收内容: 大家好1key1
1
2
3
4
5
6

‍ 可以看到发送了 2 个消息(第 1 和第 3 行)

交换机也收到了 2 个消息(第 2 和第 5 行)

消费者收到一个消息(第 6 行)

被回退一个消息(第 4 行) ‍

# 源码

已将源码上传到 Gitee (opens new window) 或 GitHub (opens new window) 上。并且创建了分支 demo3,读者可以通过切换分支来查看本文的示例代码

上次更新: 2025/5/17 12:26:09
延迟插件
备份交换机

← 延迟插件 备份交换机→

最近更新
01
新闻合订本 2025-10
10-31
02
2025 年 10 月记
10-30
03
用 AI 批量优化思源笔记排版
10-15
更多文章>
Theme by Vdoing | Copyright © 2022-2025 | 粤 ICP 备 2022067627 号 -1 | 粤公网安备 44011302003646 号 | 点击查看十年之约
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式