从01开始 从01开始
首页
  • 计算机科学导论
  • 数字电路
  • 计算机组成原理

    • 计算机组成原理-北大网课
  • 操作系统
  • Linux
  • Docker
  • 计算机网络
  • 计算机常识
  • Git
  • JavaSE
  • Java高级
  • JavaEE

    • Ant
    • Maven
    • Log4j
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • Servlet
  • Spring
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC
  • SpringBoot
  • 学习网课的心得
  • 输入法
  • 节假日TodoList
  • 其他
  • 关于本站
  • 网站日记
  • 友人帐
  • 如何搭建一个博客
GitHub (opens new window)

peterjxl

人生如逆旅,我亦是行人
首页
  • 计算机科学导论
  • 数字电路
  • 计算机组成原理

    • 计算机组成原理-北大网课
  • 操作系统
  • Linux
  • Docker
  • 计算机网络
  • 计算机常识
  • Git
  • JavaSE
  • Java高级
  • JavaEE

    • Ant
    • Maven
    • Log4j
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • Servlet
  • Spring
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC
  • SpringBoot
  • 学习网课的心得
  • 输入法
  • 节假日TodoList
  • 其他
  • 关于本站
  • 网站日记
  • 友人帐
  • 如何搭建一个博客
GitHub (opens new window)
  • JavaSE

  • JavaSenior

  • JavaEE

  • JavaWeb

  • Spring

  • 主流框架

    • Redis

    • Mybatis

    • Lucene

    • Elasticsearch

    • MQ

      • RabbitMQ-尚硅谷
      • 什么是MQ
      • RabbitMQ介绍
      • RabbitMQ的安装-Windows
      • RabbitMQ的安装-Linux
      • RabbitMQ的安装-Docker
      • RabbitMQ的插件
      • RabbitMQ用户
      • HelloWorld程序
      • WorkQueues
      • 消息应答
      • RabbitMQ持久化和预取值
      • 发布确认
      • 交换机
      • Topics交换机
      • 死信队列
      • 延迟队列
      • 延迟插件
      • 发布确认高级
      • 备份交换机
      • 其他知识点
      • RabbitMQ集群
      • 镜像队列
        • 如果主节点宕机
        • 创建协议
        • 创建队列
        • 停止node1
        • 源码
      • Haproxy+Keepalive实现高可用负载均衡
      • Federation
      • Shovel
      • RabbitMQ
    • MyCat

    • Lombok

    • 主流框架
  • SpringMVC

  • SpringBoot

  • Java并发

  • Java源码

  • JVM

  • 韩顺平

  • Java
  • Java
  • 主流框架
  • MQ
2023-06-05
目录

镜像队列

# 190.镜像队列

即使在主节点创建了一个队列,该队列也不会同步在其余节点中报错;一旦主节点挂了,那么消息就丢失了。为此可以用镜像队列   ‍

‍

‍

# 如果主节点宕机

如果只有主节点有队列,那么宕机后就会丢失信息。虽然可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后,在被写入磁盘之前有一个短暂的间隔,这时候宕机也会丢失。

虽然通过 publisherconfirm 机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。

我们目前创建一个队列:

package com.peterjxl.rabbitmq.demo11Mirror;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.56.101");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        String message = "Hello World";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("发送消息完成");
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

‍

运行,然后可以在后台看到:

​(https://image.peterjxl.com/blog/image-20230603165631-z834x5n.png)​

‍

然后我们在node1上关闭服务:

rabbitmqctl stop_app
1

‍

‍

此时我们后台是连不上了:

​(https://image.peterjxl.com/blog/image-20230603165851-cz38e28.png)​

‍

但我们可以连node2,然后可以看到node1是宕机的:

​(https://image.peterjxl.com/blog/image-20230603165934-4objm2o.png)​

‍

‍

并且队列也是宕机状态,有多少个消息(ready、unacked和total都不显示了)。

​(https://image.peterjxl.com/blog/image-20230603170022-dmb89wk.png)

‍

​

此时我们获取消息也是不行的,即使连的是其他节点:

package com.peterjxl.rabbitmq.demo11Mirror;

import com.rabbitmq.client.*;

public class Consumer {
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.56.103");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("接收到消息:" + message);
        }, consumerTag -> {
            System.out.println("接收消息被中断");
        });
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

‍

运行结果:队列hello已经被停止了(stopped),因为消息是在磁盘上

queue 'hello' in vhost '/' process is stopped by supervisor, class-id=60, method-id=20)
1

‍

我们重启下node1:

rabbitmqctl start_app
1

‍

过了一阵子,就可以看到状态正常了:

​(https://image.peterjxl.com/blog/image-20230603171723-z0lnc0j.png)​

‍

但是消息已经丢失了

​(https://image.peterjxl.com/blog/image-20230603171821-cdkvjht.png)​

‍

‍

‍

# 创建协议

引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。

当然,镜像队列也有缺点,那就是重复了,比如有100w个消息,那么其他节点也会存,有点浪费空间。

‍

随便找一个节点添加 policy

​​(https://image.peterjxl.com/blog/image-20230603172320-qxd2wgp.png)​​

‍

然后我们开始填写协议的信息:

​(https://image.peterjxl.com/blog/image-20230603172706-xvuebm3.png)​

‍

ha-params是指2份(主机+备机,一共2份),ha-sync-mode同步模式(自动)

‍

# 创建队列

在 node1上创建一个队列发送一条消息,集群会帮我们创建一个镜像队列(可能在node2或node3)

我们创建一个新的队列

package com.peterjxl.rabbitmq.demo11Mirror;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    public static final String QUEUE_NAME = "mirrior_hello";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.56.101");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        String message = "Hello World";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("发送消息完成");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

‍

运行,此时可以看到队列有个“+1”的字眼

​(https://image.peterjxl.com/blog/image-20230603173140-wcm9g96.png)​

‍

‍

点进去可以看到详情:镜像队列在node2上。

​(https://image.peterjxl.com/blog/image-20230603173210-4s9ravu.png)​

‍

‍

# 停止node1

我们停止node1:

[root@node1 ~]# rabbitmqctl stop_app
1

‍

此时node1宕机:

​(https://image.peterjxl.com/blog/image-20230603173333-blp3lhm.png)​

‍

然后此时node3也会有个镜像队列:

​(https://image.peterjxl.com/blog/image-20230603173345-m8eorwi.png)​

‍

然后我们获取消息:

package com.peterjxl.rabbitmq.demo11Mirror;

import com.rabbitmq.client.*;

public class Consumer {
    public static final String QUEUE_NAME = "mirrior_hello";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.56.103");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("接收到消息:" + message);
        }, consumerTag -> {
            System.out.println("接收消息被中断");
        });
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

‍

运行结果:正常获取到消息

接收到消息:Hello World
1

‍

‍

# 源码

本项目已将源码上传到Gitee (opens new window)或GitHub (opens new window)上。并且创建了分支demo11,读者可以通过切换分支来查看本文的示例代码

在GitHub上编辑此页 (opens new window)
上次更新: 2023/6/7 08:46:24
RabbitMQ集群
Haproxy+Keepalive实现高可用负载均衡

← RabbitMQ集群 Haproxy+Keepalive实现高可用负载均衡→

Theme by Vdoing | Copyright © 2022-2023 粤ICP备2022067627号-1 粤公网安备 44011302003646号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式