消息应答
# 70.消息应答
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分,突然它挂掉了,会发生什么情况?
RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ 它已经处理了,RabbitMQ 可以把该消息删除了。
# 自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了
当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
一句话,自动应答不是很靠谱,比较需要良好的网络环境,所以一般用手动应答的。
# 如何手动应答消息
不使用自动应答的情况下,有如下方法用于手动应答消息:
Channel.basicAck
:用于肯定确认, RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃Channel.basicNack
:用于否定确认Channel.basicReject
:用于否定确认,与 Channel.basicNack 相比少一个参数,不处理该消息,直接拒绝,可以将其丢弃了
# Multiple 批量应答
手动应答的好处是可以批量应答并且减少网络拥堵。例如调用 basicAck 方法进行应答:
channel.basicAck(deliveryTag, true);
第一个参数的含义:消息的标记 tag,可以理解为是每个消息的“主键”
第二个参数的含义:
- true:代表批量应答 channel 上未应答的消息,比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答(如下图)
- false:同上面相比只会应答 tag = 8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
- 我们一般不使用批量应答,避免还有消息未处理完,就被应答了的情况。一般是处理完一个消息,就应答一个
# 消息自动重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到有消息未完全处理,并将对其重新排队。
如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
示意图:
- C1 消费者处理消息 1
- C1 失去连接
- 消息 1 重新入队
- C2 处理消息 1
# 消息手动应答代码
默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答,然后我们在一个消费者处理消息的时候,先将其停止(也就是失去连接),然后观察消息是否会重新入队,给到其他消费者处理。
我们新建一个包 demo3,里面放代码。
# 生产者
package com.peterjxl.rabbitmq.demo3;
import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
/**
* 消息在手动应答时不丢失,放回队列重新消费
*/
public class Task3 {
// 队列名称
private final static String task_queue_name = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(task_queue_name, false, false, false, null);
// 从控制台接收消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", task_queue_name, null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
接下来我们写消费者。我们设置两个消费者有一段睡眠的时间,并且时间不同,以此来模拟两个消费者的效率不一样。
# 新建睡眠工具类
该类用于沉睡一段时间:
package com.peterjxl.rabbitmq.util;
public class SleepUtils {
public static void sleep(int second) {
try {
Thread.sleep(1000L * second);
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
2
3
4
5
6
7
8
9
10
11
12
# 消费者 03
主要是设置为手动应答,并沉睡:
SleepUtils.sleep(10);
System.out.println("接收到消息:" + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
2
3
完整代码:
package com.peterjxl.rabbitmq.demo3;
import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.peterjxl.rabbitmq.util.SleepUtils;
import com.rabbitmq.client.Channel;
/**
* 消息在手动应答时不丢失,放回队列重新消费
*/
public class Worker03 {
// 队列名称
private final static String task_queue_name = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtils.getChannel();
System.out.println("C1消费者等待消息处理,处理时间较短(效率高)...");
// 采用手动应答
boolean autoAck = false;
channel.basicConsume(task_queue_name, autoAck, (consumerTag, message) -> {
// 接收消息并处理
System.out.println("接收到消息:" + new String(message.getBody()));
// 休眠1秒
SleepUtils.sleep(1);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {
System.out.println("消息消费被中断");
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 消费者 04
我们复制一份消费者 03 ,修改输出语句和 沉睡的时间为 30 秒即可。完整代码:
package com.peterjxl.rabbitmq.demo3;
import com.peterjxl.rabbitmq.util.RabbitMQUtils;
import com.peterjxl.rabbitmq.util.SleepUtils;
import com.rabbitmq.client.Channel;
/**
* 消息在手动应答时不丢失,放回队列重新消费
*/
public class Worker04 {
// 队列名称
private final static String task_queue_name = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMQUtils.getChannel();
System.out.println("C2消费者等待消息处理,处理时间较长(效率低)...");
// 采用手动应答
boolean autoAck = false;
channel.basicConsume(task_queue_name, autoAck, (consumerTag, message) -> {
SleepUtils.sleep(30);
System.out.println("接收到消息:" + new String(message.getBody()));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}, consumerTag -> {
System.out.println("消息消费被中断");
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 测试
我们依次运行 Task3、Worker03、Worker04;
然后我们在 Task3 中输入消息 aa 和 bb,可以看到 Worker03 收到了 aa,Worker04 收到了 bb:
然后我们发送 cc 和 dd,并停止 Work04:可以看到 Worker03 处理了消息 dd
# 源码
本项目已将源码上传到 Gitee (opens new window) 或 GitHub (opens new window) 上。并且创建了分支 demo3,读者可以通过切换分支来查看本文的示例代码