从01开始 从01开始
首页
  • 计算机科学导论
  • 数字电路
  • 计算机组成原理

    • 计算机组成原理-北大网课
  • 操作系统
  • Linux
  • Docker
  • 计算机网络
  • 计算机常识
  • Git
  • JavaSE
  • Java高级
  • JavaEE

    • Ant
    • Maven
    • Log4j
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • Servlet
  • Spring
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC
  • SpringBoot
  • 学习网课的心得
  • 输入法
  • 节假日TodoList
  • 其他
  • 关于本站
  • 网站日记
  • 友人帐
  • 如何搭建一个博客
GitHub (opens new window)

peterjxl

人生如逆旅,我亦是行人
首页
  • 计算机科学导论
  • 数字电路
  • 计算机组成原理

    • 计算机组成原理-北大网课
  • 操作系统
  • Linux
  • Docker
  • 计算机网络
  • 计算机常识
  • Git
  • JavaSE
  • Java高级
  • JavaEE

    • Ant
    • Maven
    • Log4j
    • Junit
    • JDBC
    • XML-JSON
  • JavaWeb

    • 服务器软件
    • Servlet
  • Spring
  • 主流框架

    • Redis
    • Mybatis
    • Lucene
    • Elasticsearch
    • RabbitMQ
    • MyCat
    • Lombok
  • SpringMVC
  • SpringBoot
  • 学习网课的心得
  • 输入法
  • 节假日TodoList
  • 其他
  • 关于本站
  • 网站日记
  • 友人帐
  • 如何搭建一个博客
GitHub (opens new window)
  • JavaSE

  • JavaSenior

  • JavaEE

  • JavaWeb

  • Spring

  • 主流框架

    • Redis

    • Mybatis

    • Lucene

    • Elasticsearch

    • MQ

      • RabbitMQ-尚硅谷
      • 什么是MQ
      • RabbitMQ介绍
      • RabbitMQ的安装-Windows
      • RabbitMQ的安装-Linux
      • RabbitMQ的安装-Docker
      • RabbitMQ的插件
      • RabbitMQ用户
      • HelloWorld程序
        • 需求
        • 准备环境
        • 生产者
        • 消费者
        • 源码
      • WorkQueues
      • 消息应答
      • RabbitMQ持久化和预取值
      • 发布确认
      • 交换机
      • Topics交换机
      • 死信队列
      • 延迟队列
      • 延迟插件
      • 发布确认高级
      • 备份交换机
      • 其他知识点
      • RabbitMQ集群
      • 镜像队列
      • Haproxy+Keepalive实现高可用负载均衡
      • Federation
      • Shovel
      • RabbitMQ
    • MyCat

    • Lombok

    • 主流框架
  • SpringMVC

  • SpringBoot

  • Java并发

  • Java源码

  • JVM

  • 韩顺平

  • Java
  • Java
  • 主流框架
  • MQ
2023-06-05
目录

HelloWorld程序

# 50.HelloWorld程序

安装好RabbitMQ后,下一步就是使用了 我们将用 Java 编写两个程序:

  • 发送单个消息的生产者
  • 接收消息并打印出来的消费者

并介绍 Java API 中的一些细节。

‍

# 需求

我们使用生产者,发消息给MQ

然后消费者从队列中取出消息

​(https://image.peterjxl.com/blog/image-20230521203625-psnbky8.png)​

# 准备环境

我们新建一个Maven工程,并引入依赖:

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>5.8.0</version>
</dependency>
1
2
3
4
5

‍

引入日志框架:


<dependency>
  <groupId>org.apache.logging.log4j</groupId>
  <artifactId>log4j-to-slf4j</artifactId>
  <version>2.13.3</version>
</dependency>

<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-api</artifactId>
  <version>1.7.32</version>
</dependency>

<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-simple</artifactId>
  <version>1.7.21</version>
</dependency>

<dependency>
  <groupId>log4j</groupId>
  <artifactId>log4j</artifactId>
  <version>1.2.17</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

‍

# 生产者

我们先声明一个队列的名字,将来后期用该队列存储信息:

package com.peterjxl.rabbitmq.demo;
public class Producer {
    public static final String QUEUE_NAME = "hello";
}
1
2
3
4

‍

然后我们写一个main方法来发送信息。有点类似Mybatis,我们不直接创建连接,而是用工厂模式:

// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root123");
1
2
3
4
5
6

‍

然后我们创建一个连接:

Connection connection = factory.newConnection();
1

‍

之前我们说过,一个Connection是客户端和MQ的TCP连接,为了避免频繁创建,我们是使用信道的。这里我们使用connection对象创建信道:

 Channel channel = connection.createChannel();
1

‍

下一步应该就是配置交换机和队列了,但为了方便学习,我们先不使用交换机,而是直接创建一个队列:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
1

参数说明:

  • 第一个参数:队列名
  • 第二个参数:是否持久化,默认false,表示保存在内存中(不持久化)
  • 第三个参数:是否独占队列,默认false,表示不独占队列(消息共享),true则表示只供一个消费者消费
  • 第四个参数:最后一个消费者断开连接后,是否自动删除队列,默认false,表示不自动删除
  • 第五个参数:队列的其他参数,如:存活时间

‍

‍

下一步就是发送消息了:

String message = "Hello World";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成");
1
2
3

‍

参数说明:

  • 第一个参数:交换机名称,如果没有则指定空字符串(表示使用默认的交换机)
  • 第二个参数:路由key,简单模式中可以使用队列名称
  • 第三个参数:消息其他属性
  • 第四个参数:消息内容

‍

然后我们运行该方法,然后可以在RabbitMQ的可视化界面中看到有消息了:

​(https://image.peterjxl.com/blog/image-20230521221421-kpy7vx9.png)​

‍

‍

我们点击hello,可以看到该队列的详情:ready的意思是有一个消息,已经准备好被消费了

​​(https://image.peterjxl.com/blog/image-20230521223009-6na78pp.png)​​

‍

‍

完整代码:

package com.peterjxl.rabbitmq.demo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root123");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String message = "Hello World";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("发送消息完成");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

‍

# 消费者

同样的,我们也是创建connection和channel,然后获取消息,完整代码:

package com.peterjxl.rabbitmq.demo;

import com.rabbitmq.client.*;

public class Consumer {
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root123");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("接收到消息:" + message);
        };

        CancelCallback cancelCallback = consumerTag -> {
            System.out.println("接收消息被中断");
        };

        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

‍

‍

关键方法:

channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
1

参数说明:

  • 第一个参数:队列名称
  • 第二个参数:是否自动确认,设置为true表示消息接收到自动向MQ回复接收到了,MQ则会将消息从队列中删除;设置为false则需要手动确认
  • 第三个参数:消费者未成功消费的回调函数,可以用Lambda
  • 第四个参数:消费者中断消费的回调函数,可以用Lambda

为此,在调用该方法之前我们定义了2个回调函数:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println("接收到消息:" + message);
};

CancelCallback cancelCallback = consumerTag -> {
    System.out.println("接收消息被中断");
};
1
2
3
4
5
6
7
8

‍

‍

‍

运行结果:

接收到消息:Hello World
1

‍

同时在控制台也看到消息被清零了:

​(https://image.peterjxl.com/blog/image-20230521222947-9xllcwf.png)​

‍

‍

# 源码

本项目已将源码上传到Gitee (opens new window)或GitHub (opens new window)上。并且创建了分支demo1,读者可以通过切换分支来查看本文的示例代码

在GitHub上编辑此页 (opens new window)
上次更新: 2023/6/7 08:46:24
RabbitMQ用户
WorkQueues

← RabbitMQ用户 WorkQueues→

Theme by Vdoing | Copyright © 2022-2023 粤ICP备2022067627号-1 粤公网安备 44011302003646号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式