从01开始 从01开始
首页
  • 计算机科学导论
  • 数字电路
  • 计算机组成原理

    • 计算机组成原理-北大网课
  • 操作系统
  • Linux
  • Docker
  • 计算机网络
  • 计算机常识
  • Git
  • JavaSE
  • Java高级
  • JavaEE

    • Ant
    • Maven
    • Log4j
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • Servlet
  • Spring
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC
  • SpringBoot
  • 学习网课的心得
  • 输入法
  • 节假日TodoList
  • 其他
  • 关于本站
  • 网站日记
  • 友人帐
  • 如何搭建一个博客
GitHub (opens new window)

peterjxl

人生如逆旅,我亦是行人
首页
  • 计算机科学导论
  • 数字电路
  • 计算机组成原理

    • 计算机组成原理-北大网课
  • 操作系统
  • Linux
  • Docker
  • 计算机网络
  • 计算机常识
  • Git
  • JavaSE
  • Java高级
  • JavaEE

    • Ant
    • Maven
    • Log4j
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • Servlet
  • Spring
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC
  • SpringBoot
  • 学习网课的心得
  • 输入法
  • 节假日TodoList
  • 其他
  • 关于本站
  • 网站日记
  • 友人帐
  • 如何搭建一个博客
GitHub (opens new window)
  • JavaSE

  • JavaSenior

  • JavaEE

  • JavaWeb

  • Spring

  • 主流框架

    • Redis

    • Mybatis

    • Lucene

    • Elasticsearch

    • MQ

      • RabbitMQ-尚硅谷
      • 什么是MQ
      • RabbitMQ介绍
      • RabbitMQ的安装-Windows
      • RabbitMQ的安装-Linux
      • RabbitMQ的安装-Docker
      • RabbitMQ的插件
      • RabbitMQ用户
      • HelloWorld程序
      • WorkQueues
      • 消息应答
      • RabbitMQ持久化和预取值
      • 发布确认
      • 交换机
      • Topics交换机
      • 死信队列
      • 延迟队列
      • 延迟插件
      • 发布确认高级
        • 确认机制
        • 添加配置
        • 生产者
        • 消费者
        • 测试
        • 回调
        • 启用回调的配置
        • 修改生产者
        • 回退消息
        • 源码
      • 备份交换机
      • 其他知识点
      • RabbitMQ集群
      • 镜像队列
      • Haproxy+Keepalive实现高可用负载均衡
      • Federation
      • Shovel
      • RabbitMQ
    • MyCat

    • Lombok

    • 主流框架
  • SpringMVC

  • SpringBoot

  • Java并发

  • Java源码

  • JVM

  • 韩顺平

  • Java
  • Java
  • 主流框架
  • MQ
2023-06-05
目录

发布确认高级

# 150.发布确认高级

在生产环境中由于一些不明原因,导致 RabbitMQ 重启,在重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。

于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?   ‍

‍

‍

# 确认机制

为此,我们可以加一个缓存,生产者先发消息给缓存,由缓存交给MQ;当MQ宕机后,缓存会定期尝试重发,直到重发成功,架构图:

​(https://image.peterjxl.com/blog/image-20230531075756-v1r9qr3.png)​

‍

此时RabbitMQ宕机有几种情况:交换机不存在,队列不存在,或者都不存在

‍

代码架构图:

​(https://image.peterjxl.com/blog/image-20230531075819-trefrxc.png)​

‍

‍

我们先模拟交换机收不到消息的情况。我们先写一个简单的生产者和消费者,等链路是通的之后,再模拟交换机宕机的情况。

‍

# 添加配置

声明交换机,队列和绑定关系:

package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 发布确认高级
 */
@Configuration
public class ConfirmConfig {

    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    public static final String CONFIRM_ROUTING_KEY = "key1";

    // 声明交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return new Queue(CONFIRM_QUEUE_NAME);
    }

    // 绑定
    @Bean
    public Binding bindingConfirmQueue(
            @Qualifier("confirmQueue")    Queue confirmQueue,
            @Qualifier("confirmExchange") DirectExchange confirmExchange) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

‍

‍

# 生产者

‍

package com.peterjxl.learnrabbitmq.springbootrabbitmq.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;


@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    // 开始发消息, 测试确认
    @GetMapping("/sendMessage/{message}")
    public void sendConfirmMsg(@PathVariable String message){
        rabbitTemplate.convertAndSend("confirm.exchange", "key1", message);
        log.info("发送时间: {}, 发送内容: {}", new Date(), message);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

‍

‍

‍

‍

# 消费者

package com.peterjxl.learnrabbitmq.springbootrabbitmq.comsumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Slf4j
@Component
public class ConfirmConsumer {

    @RabbitListener(queues = "confirm.queue")
    public void receiveConfirmMessage(Message msg) {
        log.info("接收时间: {}, 接收内容: {}", new Date(), new String(msg.getBody()));
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

‍

# 测试

http://localhost:8080/confirm/sendMessage/大家好1

‍

控制台输出:

发送时间: Wed May 31 19:35:48 CST 2023, 发送内容: 大家好1
接收时间: Wed May 31 19:35:48 CST 2023, 接收内容: 大家好1
1
2

‍

可以看到是正常的,接下来我们模拟出问题的情况。

‍

# 回调

虽然一般不会出问题(后续我们会搭建RabbitMQ集群),但还是得先预防,为此我们得定义一个回调方法,我们发送消息后, 不管RabbitMQ是否响应,都会调用该回调方法,然后我们判断是否成功,如果失败则需要保存消息,待后续继续发送。

‍

在RabbitTemplate的源码中,就有定义该接口:ConfirmCallback

public class RabbitTemplate extends RabbitAccessor implements BeanFactoryAware, RabbitOperations, MessageListener, ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {

    //..............
    @Nullable
    private ConfirmCallback confirmCallback;
  
    @FunctionalInterface
    public interface ConfirmCallback {
        void confirm(@Nullable CorrelationData var1, boolean var2, @Nullable String var3);
    }

    //..............
}
1
2
3
4
5
6
7
8
9
10
11
12
13

‍

该接口是一个函数式接口,里面只有一个方法confirm,该方法有3个参数:

  • var1:发送的内容
  • var2:是否发送成功
  • var3:失败的原因

‍

所以我们实现该接口,并定义回调方法:

package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback{

  
    // 交换机确认回调方法
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : null;
        if(ack){
            log.info("交换机已经收到Id为: {} 的消息", id);
        }
        else{
            log.info("交换机还未收到Id为: {} 的消息,由于原因: {}", id, cause);
        }
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

‍

‍

我们定义了该接口,但是我们并没有注入到RabbitTemplate对象中,所以当我们调用RabbitTemplate的回调接口时,还是调用原本的。因此我们得注入:

public class MyCallBack implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 注入
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12

‍

小结下步骤:

  1. 定义一个类实现该接口
  2. 注入RabbitTemplate对象
  3. 使用PostConstruct注解,在所有@Component​注解之后执行init方法,完成自定义接口的配置

‍

# 启用回调的配置

除了自定义回调方法外,还需在配置文件中启用该配置(application.properties添加第5行):

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root123
spring.rabbitmq.publisher-confirm-type=correlated
1
2
3
4
5

publisher-confirm-type有3种取值:

  • NONE:禁用发布确认模式,是默认值

  • CORRELATED:发布消息成功到交换器后会触发回调方法

  • SIMPLE:经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑

    要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker

    有点类似同步确认,发送一条消息就确认一条

‍

‍

# 修改生产者

回调方法中,有个参数是correlationData​,该参数并不是凭空出现的,而是我们自己定义的。在我们发送消息的时候,convertAndSend​方法有很多个重载,其中有一个是这样的:

​(https://image.peterjxl.com/blog/image-20230531201030-wp2drkw.png)​

‍

该方法需要我们传一个CorrelationData​参数:

public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    // 开始发消息, 测试确认
    @GetMapping("/sendMessage/{message}")
    public void sendConfirmMsg(@PathVariable String message){
        CorrelationData correlationData = new CorrelationData("1");

        rabbitTemplate.convertAndSend("confirm.exchange", "key1", message, correlationData);
        log.info("发送时间: {}, 发送内容: {}", new Date(), message);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

‍

‍

我们重启,重新访问http://localhost:8080/confirm/sendMessage/大家好1,控制台输出:

发送时间: Wed May 31 20:18:37 CST 2023, 发送内容: 大家好1
交换机已经收到Id为: 1 的消息
接收时间: Wed May 31 20:18:37 CST 2023, 接收内容: 大家好1
1
2
3

‍

也就是正常调用了我们的回调方法。

‍

接下来我们模拟发送失败的情况:我们故意写错交换机的名字(加个123)

public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    // 开始发消息, 测试确认
    @GetMapping("/sendMessage/{message}")
    public void sendConfirmMsg(@PathVariable String message){
        CorrelationData correlationData = new CorrelationData("1");

        rabbitTemplate.convertAndSend("confirm.exchange123", "key1", message, correlationData);
        log.info("发送时间: {}, 发送内容: {}", new Date(), message);
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

‍

控制台输出:可以看到有相关的报错信息,没有交换机名为“confirm.exchange123”

发送时间: Wed May 31 20:19:57 CST 2023, 发送内容: 大家好1
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm.exchange123' in vhost '/', class-id=60, method-id=40)
交换机还未收到Id为: 1 的消息,由于原因: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm.exchange123' in vhost '/', class-id=60, method-id=40)
1
2
3

‍

‍

同理,我们故意写错routing key试试。这里发送2条消息,并且我们设置每条消息的内容都不一样(加上了routing key)

@GetMapping("/sendMessage/{message}")
public void sendConfirmMsg(@PathVariable String message){
    CorrelationData correlationData = new CorrelationData("1");
    rabbitTemplate.convertAndSend("confirm.exchange", "key1", message + "key1", correlationData);
    log.info("发送时间: {}, 发送内容: {}, routing: key1", new Date(), message);


    CorrelationData correlationData2 = new CorrelationData("2");
    rabbitTemplate.convertAndSend("confirm.exchange", "key123", message+ "key12", correlationData2);
    log.info("发送时间: {}, 发送内容: {}, routing: key2", new Date(), message);
}
1
2
3
4
5
6
7
8
9
10
11

‍

‍

测试结果:

发送时间: Wed May 31 20:26:57 CST 2023, 发送内容: 大家好1, routing: key1
发送时间: Wed May 31 20:26:57 CST 2023, 发送内容: 大家好1, routing: key2
交换机已经收到Id为: 2 的消息
交换机已经收到Id为: 1 的消息
接收时间: Wed May 31 20:26:57 CST 2023, 接收内容: 大家好1key1
1
2
3
4
5

可以看到交换机收到了2个消息,但是由于有一个routing key 写错了,消费者只收到了一个。

这是有问题的,因为明明该交换机没成功发送消息(队列那一步出错了),但是没有回调,这是因为我们只定义了交换机的回调。

‍

‍

# 回退消息

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

可以通过设置 mandatory 参数,在当消息传递过程中不可达目的地时,将消息返回给生产者。我们启用回退的设置,然后定义一个回退接口。

‍

在application.properties中添加:

spring.rabbitmq.publisher-returns=true
1

‍

在MyCallBack中,新实现一个接口ReturnCallback,并定义回调方法,然后注入:

package com.peterjxl.learnrabbitmq.springbootrabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;


@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // 注入
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    // 交换机确认回调方法
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : null;
        if(ack){
            log.info("交换机已经收到Id为: {} 的消息", id);
        }
        else{
            log.info("交换机还未收到Id为: {} 的消息,由于原因: {}", id, cause);
        }
    }

    // 可以在消息传递过程中不可达目的地时将消息返回给生产者
    // 只有不可达目的地的时候才会回调
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routinyKey) {
        log.error("消息: {}, 被交换机: {} 退回, 退回原因: {}, 路由key: {}", new String(message.getBody()), exchange, replyText, routinyKey);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

‍

‍

‍

测试结果:

发送时间: Wed May 31 20:52:07 CST 2023, 发送内容: 大家好1, routing: key1
交换机已经收到Id为: 1 的消息
发送时间: Wed May 31 20:52:07 CST 2023, 发送内容: 大家好1, routing: key2
消息: 大家好1key12, 被交换机: confirm.exchange 退回, 退回原因: NO_ROUTE, 路由key: key123
交换机已经收到Id为: 2 的消息
接收时间: Wed May 31 20:52:07 CST 2023, 接收内容: 大家好1key1
1
2
3
4
5
6

‍

可以看到发送了2个消息(第1和第3行)

交换机也收到了2个消息(第2和第5行)

消费者收到一个消息(第6行)

被回退一个消息(第4行)

‍

# 源码

已将源码上传到Gitee (opens new window)或GitHub (opens new window)上。并且创建了分支demo3,读者可以通过切换分支来查看本文的示例代码

在GitHub上编辑此页 (opens new window)
上次更新: 2023/6/7 08:46:24
延迟插件
备份交换机

← 延迟插件 备份交换机→

Theme by Vdoing | Copyright © 2022-2023 粤ICP备2022067627号-1 粤公网安备 44011302003646号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式