镜像队列
# 190.镜像队列
即使在主节点创建了一个队列,该队列也不会同步在其余节点中报错;一旦主节点挂了,那么消息就丢失了。为此可以用镜像队列
# 如果主节点宕机
如果只有主节点有队列,那么宕机后就会丢失信息。虽然可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后,在被写入磁盘之前有一个短暂的间隔,这时候宕机也会丢失。
虽然通过 publisherconfirm 机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。
我们目前创建一个队列:
package com.peterjxl.rabbitmq.demo11Mirror;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.101");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
运行,然后可以在后台看到:
(https://image.peterjxl.com/blog/image-20230603165631-z834x5n.png)
然后我们在node1上关闭服务:
rabbitmqctl stop_app
此时我们后台是连不上了:
(https://image.peterjxl.com/blog/image-20230603165851-cz38e28.png)
但我们可以连node2,然后可以看到node1是宕机的:
(https://image.peterjxl.com/blog/image-20230603165934-4objm2o.png)
并且队列也是宕机状态,有多少个消息(ready、unacked和total都不显示了)。
(https://image.peterjxl.com/blog/image-20230603170022-dmb89wk.png)
此时我们获取消息也是不行的,即使连的是其他节点:
package com.peterjxl.rabbitmq.demo11Mirror;
import com.rabbitmq.client.*;
public class Consumer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.103");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收到消息:" + message);
}, consumerTag -> {
System.out.println("接收消息被中断");
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
运行结果:队列hello已经被停止了(stopped),因为消息是在磁盘上
queue 'hello' in vhost '/' process is stopped by supervisor, class-id=60, method-id=20)
我们重启下node1:
rabbitmqctl start_app
过了一阵子,就可以看到状态正常了:
(https://image.peterjxl.com/blog/image-20230603171723-z0lnc0j.png)
但是消息已经丢失了
(https://image.peterjxl.com/blog/image-20230603171821-cdkvjht.png)
# 创建协议
引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。
当然,镜像队列也有缺点,那就是重复了,比如有100w个消息,那么其他节点也会存,有点浪费空间。
随便找一个节点添加 policy
(https://image.peterjxl.com/blog/image-20230603172320-qxd2wgp.png)
然后我们开始填写协议的信息:
(https://image.peterjxl.com/blog/image-20230603172706-xvuebm3.png)
ha-params是指2份(主机+备机,一共2份),ha-sync-mode同步模式(自动)
# 创建队列
在 node1上创建一个队列发送一条消息,集群会帮我们创建一个镜像队列(可能在node2或node3)
我们创建一个新的队列
package com.peterjxl.rabbitmq.demo11Mirror;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static final String QUEUE_NAME = "mirrior_hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.101");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
运行,此时可以看到队列有个“+1”的字眼
(https://image.peterjxl.com/blog/image-20230603173140-wcm9g96.png)
点进去可以看到详情:镜像队列在node2上。
(https://image.peterjxl.com/blog/image-20230603173210-4s9ravu.png)
# 停止node1
我们停止node1:
[root@node1 ~]# rabbitmqctl stop_app
此时node1宕机:
(https://image.peterjxl.com/blog/image-20230603173333-blp3lhm.png)
然后此时node3也会有个镜像队列:
(https://image.peterjxl.com/blog/image-20230603173345-m8eorwi.png)
然后我们获取消息:
package com.peterjxl.rabbitmq.demo11Mirror;
import com.rabbitmq.client.*;
public class Consumer {
public static final String QUEUE_NAME = "mirrior_hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.103");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收到消息:" + message);
}, consumerTag -> {
System.out.println("接收消息被中断");
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
运行结果:正常获取到消息
接收到消息:Hello World
# 源码
本项目已将源码上传到Gitee (opens new window)或GitHub (opens new window)上。并且创建了分支demo11,读者可以通过切换分支来查看本文的示例代码