Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

发布与确认原理

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配队列之后,broker就会发送一个确认给生产者(包含消息唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示这个序列号之前的所有消息都已经得到了处理。

confirm模式最大的好吃在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送吓一跳消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理nack消息。

发布确认的策略

单个发布确认

这是一种简单的确认方式,他是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布之后,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有消息被确认的时候才返回,如果在指定时间范围内这个消息没有确认,那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条消息的吞吐量。当然对于一些应用来说就已经足够了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void publishMessageSign() throws Exception{
Channel channel = RabbitUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认模式
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量发消息
for (int i = 0; i < 1000; i++) {
String msg = i + "";
channel.basicPublish("",queueName,null,msg.getBytes());
boolean flg = channel.waitForConfirms();
if (flg){
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("耗费时间:"+(end - begin));
}

批量确认发布

单个确认发布这种方式十分的缓慢,我们可以经过逻辑的修改,先发布一批消息然后一起确认,这样的话可以极高的提高吞吐量,当然这种方式也有缺点:当发生故障导致发布出现问题时,不知道哪个消息出现了问题,我们必须将整个批处理保存在内存中,以记录重要的信息,然后重现发布丢失的消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void publishMessageBatch() throws Exception{
Channel channel = RabbitUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.confirmSelect();
channel.queueDeclare(queueName,true,false,false,null);
long begin = System.currentTimeMillis();
//设置多少甜确认一次
int confirmSize = 100;
for (int i = 1; i < 1000+1; i++) {
String msg = i + "";
channel.basicPublish("",queueName,null,msg.getBytes());
if (i % confirmSize == 0){
channel.waitForConfirms();
System.out.println("消息确认成功");

}
}
long end = System.currentTimeMillis();
System.out.println("耗费时间:"+(end - begin));
}

异步确认发布

异步确认发布虽然在代码量个和逻辑上比上面两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。

image-20220105221320439

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void publishMessageAsync() throws Exception{
Channel channel = RabbitUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
channel.confirmSelect();
//记录队列
ConcurrentSkipListMap<Long,String> concurrentSkipListMap = new ConcurrentSkipListMap<>();
long begin = System.currentTimeMillis();
//消息成功回调
ConfirmCallback ackCallBack = (deliveryTag,multiple)->{
System.out.println(multiple);
if (multiple){
ConcurrentNavigableMap<Long, String> map = concurrentSkipListMap.headMap(deliveryTag);
map.clear();
}else {
concurrentSkipListMap.remove(deliveryTag);
}
System.out.println("消息已接收"+deliveryTag);
};
//消息失败回调
ConfirmCallback nackCallBack = (deliveryTag,multiple)->{
System.out.println("未确认消息"+deliveryTag);
};
//消息监听器 监听哪些消息成功,哪些消息失败
channel.addConfirmListener(ackCallBack,nackCallBack);
//批量发送消息
for (int i = 0; i < 1000; i++) {
String msg = i + "";
channel.basicPublish("",queueName,null,msg.getBytes());
//记录发送的消息
concurrentSkipListMap.put(channel.getNextPublishSeqNo(),msg);

}

long end = System.currentTimeMillis();
System.out.println("异步发布耗费时间: "+(end - begin));
}

评论