从01开始 从01开始
首页
  • 计算机科学导论
  • 数字电路
  • 计算机组成原理

    • 计算机组成原理-北大网课
  • 操作系统
  • Linux
  • Docker
  • 计算机网络
  • 计算机常识
  • Git
  • JavaSE
  • Java高级
  • JavaEE

    • Ant
    • Maven
    • Log4j
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • Servlet
  • Spring
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC
  • SpringBoot
  • 学习网课的心得
  • 输入法
  • 节假日TodoList
  • 其他
  • 关于本站
  • 网站日记
  • 友人帐
  • 如何搭建一个博客
GitHub (opens new window)

peterjxl

人生如逆旅,我亦是行人
首页
  • 计算机科学导论
  • 数字电路
  • 计算机组成原理

    • 计算机组成原理-北大网课
  • 操作系统
  • Linux
  • Docker
  • 计算机网络
  • 计算机常识
  • Git
  • JavaSE
  • Java高级
  • JavaEE

    • Ant
    • Maven
    • Log4j
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • Servlet
  • Spring
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC
  • SpringBoot
  • 学习网课的心得
  • 输入法
  • 节假日TodoList
  • 其他
  • 关于本站
  • 网站日记
  • 友人帐
  • 如何搭建一个博客
GitHub (opens new window)
  • JavaSE

  • JavaSenior

  • JavaEE

  • JavaWeb

  • Spring

  • 主流框架

    • Redis

    • Mybatis

    • Lucene

    • Elasticsearch

    • MQ

      • RabbitMQ-尚硅谷
      • 什么是MQ
      • RabbitMQ介绍
      • RabbitMQ的安装-Windows
      • RabbitMQ的安装-Linux
      • RabbitMQ的安装-Docker
      • RabbitMQ的插件
      • RabbitMQ用户
      • HelloWorld程序
      • WorkQueues
      • 消息应答
      • RabbitMQ持久化和预取值
      • 发布确认
      • 交换机
      • Topics交换机
      • 死信队列
      • 延迟队列
      • 延迟插件
      • 发布确认高级
      • 备份交换机
      • 其他知识点
      • RabbitMQ集群
      • 镜像队列
      • Haproxy+Keepalive实现高可用负载均衡
      • Federation
        • 启用插件
        • 原理
        • 创建交换机和队列
        • 配置upstream
        • 添加 policy
        • Federation Queue
      • Shovel
      • RabbitMQ
    • MyCat

    • Lombok

    • 主流框架
  • SpringMVC

  • SpringBoot

  • Java并发

  • Java源码

  • JVM

  • 韩顺平

  • Java
  • Java
  • 主流框架
  • MQ
2023-06-05
目录

Federation

# 210.Federation

Federation是联邦的意思,接下来我们讲讲Federation Exchange和Federation Queue。   ‍

# 为什么要用Federation Exchange

假设我们目前有2个broker,一个在北京,一个在深圳(异地灾备),彼此之间相距甚远,网络延迟是一个不得不面对的问题。两个节点之间需要同步数据,将数据同步给另一个节点的我们称为上游,另一个则称为下游。

有一个在北京的业务(Client 北京) 需要连接(broker 北京),向其中的交换器 exchangeA 发送消息,此时的网络延迟很小,(Client 北京)可以迅速将消息发送至 exchangeA 中,就算在开启了 publisherconfirm 机制或者事务机制的情况下,也可以迅速收到确认信息。

此时又有个在深圳的业务(Client 深圳)需要向 exchangeA 发送消息, 那么(Client 深圳) (broker 北京)之间有很大的网络延迟,(Client 深圳) 将发送消息至 exchangeA 会经历一定的延迟,尤其是在开启了 publisherconfirm 机制或者事务机制的情况下,(Client 深圳) 会等待很长的延迟时间来接收(broker 北京)的确认信息,进而必然造成这条发送线程的性能降低,甚至造成一定程度上的阻塞。

将业务(Client 深圳)部署到北京的机房可以解决这个问题,但是如果(Client 深圳)调用的另些服务都部署在深圳,那么又会引发新的时延问题,总不见得将所有业务全部部署在一个机房。

使用 Federation 插件就可以很好地解决这个问题。

‍

# 启用插件

首先需要保证每台节点单独运行。在每台机器上开启 federation 相关插件

rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
1
2

‍

‍

‍

然后我们登录后台,可以看到多了两个选项:

​(https://image.peterjxl.com/blog/image-20230603190128-cchsbwv.png)​

‍

如果登录后没有看到,则可以试下重启RabbitMQ。实在不行就重启Linux,然后重新配置集群

‍

# 原理

我们假设node1是上游,node2是下游,上游的数据会同步给下游

​(https://image.peterjxl.com/blog/image-20230603181418-bn7nm9p.png)​

‍

既然是联邦交换机,那么就是上游的交换机,同步数据给下游的交换机,同步的过程就交给我们刚刚配置的联邦插件。配置过程:

  1. 在node2创建联邦交换机,队列和绑定
  2. 在node2上配置node1的地址(upstream)
  3. 添加 policy

‍

‍

# 创建交换机和队列

新写一个类,创建交换机、队列和绑定关系:

package com.peterjxl.rabbitmq.demo12Federation;

import com.rabbitmq.client.*;

public class Consumer {

    public static final String QUEUE_NAME = "mirror_hello";
    public static final String EXCHANGE_NAME = "fed_exchange";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.56.103");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        channel.queueDeclare("node2_queue", true, false, false, null);
        channel.queueBind("node2_queue", EXCHANGE_NAME, "routeKey");
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

‍

‍

运行,此时后台可以看到有创建成功

​(https://image.peterjxl.com/blog/image-20230603214543-7vqwe5i.png)​

‍

‍

# 配置upstream

然后我们在node2后台配置:

​​​(https://image.peterjxl.com/blog/image-20230603215117-4e51vyu.png)​​​

‍

然后配置上游:

​​(https://image.peterjxl.com/blog/image-20230603215401-0kqdvwr.png)​​

‍

# 添加 policy

然后配置规则:

​(https://image.peterjxl.com/blog/image-20230603215726-gdkt41u.png)​

‍

‍

‍

​(https://image.peterjxl.com/blog/image-20230603215654-rj3v4tm.png)​

‍

然后点击Add / update policy

‍

然后我们可以查看下联邦的状态,此时可以看到是正常的:

​(https://image.peterjxl.com/blog/image-20230603215841-uuzb8l4.png)​

‍

‍

‍

# Federation Queue

之前我们是按交换机来同步数据的,其实也可以通过队列来同步。联邦队列可以在多个 Broker 节点(或者集群)之间为单个队列提供均衡负载的功能。

一个联邦队列可以连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息的需求,原理图:

​(https://image.peterjxl.com/blog/image-20230603220231-x8g76v3.png)​

‍

‍

我们将node1结点的数据同步给node2,因此先将node2节点的队列联邦到node1上,这样2个消费者可以消费相同的数据(消费者1可能在北京,而消费者2可能在深圳)。

配置步骤和我们之前配置联邦交换机类似:

  1. 创建交换机、队列
  2. 配置upstream(之前我们已经配过了)
  3. 添加 policy

‍

​(https://image.peterjxl.com/blog/image-20230603222059-wb4hh9c.png)​

在GitHub上编辑此页 (opens new window)
上次更新: 2023/6/7 08:46:24
Haproxy+Keepalive实现高可用负载均衡
Shovel

← Haproxy+Keepalive实现高可用负载均衡 Shovel→

Theme by Vdoing | Copyright © 2022-2023 粤ICP备2022067627号-1 粤公网安备 44011302003646号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式