从01开始 从01开始
首页
  • 计算机科学导论
  • 数字电路
  • 计算机组成原理

    • 计算机组成原理-北大网课
  • 操作系统
  • Linux
  • Docker
  • 计算机网络
  • 计算机常识
  • Git
  • JavaSE
  • Java高级
  • JavaEE

    • Ant
    • Maven
    • Log4j
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • Servlet
  • Spring
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC
  • SpringBoot
  • 学习网课的心得
  • 输入法
  • 节假日TodoList
  • 其他
  • 关于本站
  • 网站日记
  • 友人帐
  • 如何搭建一个博客
GitHub (opens new window)

peterjxl

人生如逆旅,我亦是行人
首页
  • 计算机科学导论
  • 数字电路
  • 计算机组成原理

    • 计算机组成原理-北大网课
  • 操作系统
  • Linux
  • Docker
  • 计算机网络
  • 计算机常识
  • Git
  • JavaSE
  • Java高级
  • JavaEE

    • Ant
    • Maven
    • Log4j
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • Servlet
  • Spring
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC
  • SpringBoot
  • 学习网课的心得
  • 输入法
  • 节假日TodoList
  • 其他
  • 关于本站
  • 网站日记
  • 友人帐
  • 如何搭建一个博客
GitHub (opens new window)
  • JavaSE

  • JavaSenior

  • JavaEE

  • JavaWeb

  • Spring

  • 主流框架

    • Redis

    • Mybatis

    • Lucene

    • Elasticsearch

    • MQ

      • RabbitMQ-尚硅谷
      • 什么是MQ
      • RabbitMQ介绍
      • RabbitMQ的安装-Windows
      • RabbitMQ的安装-Linux
      • RabbitMQ的安装-Docker
      • RabbitMQ的插件
      • RabbitMQ用户
      • HelloWorld程序
      • WorkQueues
        • 工作队列原理
        • 轮训分发
        • 工具类
        • 工作线程代码
        • 启动多个工作线程
        • 发送消息
        • 源码
      • 消息应答
      • RabbitMQ持久化和预取值
      • 发布确认
      • 交换机
      • Topics交换机
      • 死信队列
      • 延迟队列
      • 延迟插件
      • 发布确认高级
      • 备份交换机
      • 其他知识点
      • RabbitMQ集群
      • 镜像队列
      • Haproxy+Keepalive实现高可用负载均衡
      • Federation
      • Shovel
      • RabbitMQ
    • MyCat

    • Lombok

    • 主流框架
  • SpringMVC

  • SpringBoot

  • Java并发

  • Java源码

  • JVM

  • 韩顺平

  • Java
  • Java
  • 主流框架
  • MQ
2023-06-05
目录

WorkQueues

# 60.WorkQueues

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。   当有多个工作线程时,这些工作线程将一起处理这些任务。

‍

# 工作队列原理

‍

简单来说,就是生产者会发送大量消息;

然后此时会有多个消费者(也叫工作线程)处理消息。

​(https://image.peterjxl.com/blog/image-20230522071201-fbijapa.png)

‍

​

# 轮训分发

相当于一个工作线程处理完后,由下一个工作线程处理消息。

在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。

‍

‍

# 工具类

为了减少重复代码,我们可以定义一个工具类:

package com.peterjxl.rabbitmq.util;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitMQUtils {
    public static Channel getChannel() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root123");
        return factory.newConnection().createChannel();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

‍

‍

# 工作线程代码

我们定义一个消费者。第16行的输出“C1等待接收消息...”表明当前是工作线程1

package com.peterjxl.rabbitmq.demo2;


import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;

/**
 * 这是一个工作线程,用于处理消息(相当于之前的消费者)
 */
public class Worker01 {

    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("C1等待接收消息.......");
        // 消费者接收消息
        channel.basicConsume(QUEUE_NAME, true, (consumerTag, message) -> {
            System.out.println("接收到的消息:" + new String(message.getBody()));
        }, consumerTag -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        });
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

‍

‍

# 启动多个工作线程

我们可以将该类启动多次,造成多个线程的样子。

我们先启动main方法:启动后该方法是运行的。

​(https://image.peterjxl.com/blog/image-20230522074216-fkc69u0.png)​

‍

‍

然后我们点击编辑配置:

​(https://image.peterjxl.com/blog/image-20230522074239-h28x132.png)​

‍

‍

点击修改选项:

​(https://image.peterjxl.com/blog/image-20230522074406-em43tlg.png)​

‍

设置运行多个实例:

​(https://image.peterjxl.com/blog/image-20230522074421-htb8uy1.png)​

‍

然后我们修改下输出语句为:

System.out.println("C2等待接收消息.......");
1

‍

然后再次运行,此时就由2个线程了:

​(https://image.peterjxl.com/blog/image-20230522074625-g9rof0w.png)​

‍

‍

# 发送消息

接下来我们发送多个消息,来查看两个工作线程处理消息的情况。

‍

接下来定义一个类,用来发送消息;这次我们改为从控制台中读取消息

package com.peterjxl.rabbitmq.demo2;

import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;


public class Task01 {
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMQUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 从控制台中输入消息
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String message = scanner.next();
            // 发送消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("发送消息完成:" + message);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

‍

我们运行该方法,然后分别发送“AA”,“BB”,“CC”,“DD”。

不管是哪个工作线程接受到了“AA”,那么另一个就应该是处理“BB”,然后轮流处理“CC”,“DD”。

发送:

​(https://image.peterjxl.com/blog/image-20230522075529-pgx2jh3.png)​

‍

两个工作线程处理的结果:成功轮询

​(https://image.peterjxl.com/blog/image-20230522075553-d07oe91.png)​

‍

# 源码

已将源码上传到Gitee (opens new window)或GitHub (opens new window)上。并且创建了分支demo2,读者可以通过切换分支来查看本文的示例代码

在GitHub上编辑此页 (opens new window)
上次更新: 2023/6/7 08:46:24
HelloWorld程序
消息应答

← HelloWorld程序 消息应答→

Theme by Vdoing | Copyright © 2022-2023 粤ICP备2022067627号-1 粤公网安备 44011302003646号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式