WorkQueues
# 60.WorkQueues
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。
当有多个工作线程时,这些工作线程将一起处理这些任务。
# 工作队列原理
简单来说,就是生产者会发送大量消息;
然后此时会有多个消费者(也叫工作线程)处理消息。
# 轮训分发
相当于一个工作线程处理完后,由下一个工作线程处理消息。
在这个案例中我们会启动两个工作线程,一个消息发送线程,我们来看看他们两个工作线程是如何工作的。
# 工具类
为了减少重复代码,我们可以定义一个工具类:
package com.peterjxl.rabbitmq.util;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQUtils {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root123");
return factory.newConnection().createChannel();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 工作线程代码
我们定义一个消费者。第 16 行的输出“C1 等待接收消息...”表明当前是工作线程 1
package com.peterjxl.rabbitmq.demo2;
import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;
/**
* 这是一个工作线程,用于处理消息(相当于之前的消费者)
*/
public class Worker01 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtils.getChannel();
System.out.println("C1等待接收消息.......");
// 消费者接收消息
channel.basicConsume(QUEUE_NAME, true, (consumerTag, message) -> {
System.out.println("接收到的消息:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 启动多个工作线程
我们可以将该类启动多次,造成多个线程的样子。
我们先启动 main 方法:启动后该方法是运行的。
然后我们点击编辑配置:
点击修改选项:
设置运行多个实例:
然后我们修改下输出语句为:
System.out.println("C2等待接收消息.......");
1
然后再次运行,此时就由 2 个线程了:
# 发送消息
接下来我们发送多个消息,来查看两个工作线程处理消息的情况。
接下来定义一个类,用来发送消息;这次我们改为从控制台中读取消息
package com.peterjxl.rabbitmq.demo2;
import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
public class Task01 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 从控制台中输入消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
// 发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成:" + message);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
我们运行该方法,然后分别发送“AA”,“BB”,“CC”,“DD”。
不管是哪个工作线程接受到了“AA”,那么另一个就应该是处理“BB”,然后轮流处理“CC”,“DD”。
发送:
两个工作线程处理的结果:成功轮询
# 源码
已将源码上传到 Gitee (opens new window) 或 GitHub (opens new window) 上。并且创建了分支 demo2,读者可以通过切换分支来查看本文的示例代码
上次更新: 2024/10/3 10:01:52
- 01
- 中国网络防火长城简史 转载10-12
- 03
- 公告:博客近期 RSS 相关问题10-02