本文从本人博客搬运,原文格式更加美观,可以移步原文阅读:RabbitMQ实现消费端限流与非公平分配
默认情况下,rabbitmq在分发消息给消费者时,处理方式是将所有消息按照消费者的数量平均分配,一次性发送给所有消费者,然后等待消费者的响应:
ack
,代表消费成功,rabbitmq会从队列中删除该条消息。响应ack
分为两种情况: ack
给rabbitmqack
的时机nack
,则代表消费失败,rabbitmq不会删除该消息,并且会尝试重新发送消息(默认重发的次数无限制)。响应nack
同样分为两种情况: nack
给rabbitmqnack
的时机,并且可以控制回复nack
的同时是否要求该消息重新入队,如果不要求重新入队,那么rabbitmq会直接删除该消息而不是尝试重发上述rabbitmq分发消息的默认策略会存在2个问题:
在rabbitmq中可以用Qos
机制解决以上问题。Qos
机制的原理是当消费者有一定数量prefetchCount
(可手动配置)的消息未被ack
确认时,rabbitmq不会给消费者发送新的消息。这样就很好地解决了上述2个问题:
prefetchCount
数量的消息给消费者,而不是发送所有消息,此时未被确认的消息数量就是prefetchCount
。消费者每处理完1条消息并回复ack
时,rabbitmq在收到ack
后,此时未被确认的消息数量为prefetchCount-1
,这时rabbitmq才会再发送1条消息给消费者。如此直到mq发送完所有消息ack
的速度快,那么就会促使rabbitmq将剩余的消息更多地发给它,达到一种能者多劳的效果首先在yml中添加rabbitmq的配置,其中关键配置是prefetch
,代表多少消息未被ack
时,rabbitmq不会给消费者发送新的消息
spring:rabbitmq:host: 192.168.153.130port: 5672username: guestpassword: guest#virtual-host:listener:simple:prefetch: 2 # 代表多少消息未被ack时,rabbitmq不会给消费者发送新的消息
先解释一下rabbitmq控制台队列标签中,针对消息的几个状态的说明:
Ready
:代表保存在rabbitmq本地,未发送给消费者的消息数量Unack
:代表已经发送给消费者,但是还未收到消费者ack
或者nack
响应的消息数量Total
:队列中消息总数量,Ready
+ Unack
之和然后我们先用下列代码给rabbitmq发送5条消息
@Test
public void testSendMessage() {for (int i = 0; i < 5; i++) {vertAndSend("test.direct", "queue", new User("baobao" + i, 18, new Date()));}
}
此时控制台的初始状态如下
然后我们创建消费者
@Component
@Slf4j
public class UserConsumer {@RabbitListener(queues = "test.queue")public void handleUserMessage(User user) throws InterruptedException {log.info("开始处理消息");log.String());// 休眠23秒,模拟消息处理的时间很长TimeUnit.SECONDS.sleep(23);log.info("消息处理结束");}
}
启动消费者后观察日志打印和控制台消息数量的变化:
prefetch
为2,所以rabbitmq刚开始会发送2条消息给消费者,消费者开始顺序处理消息ack
给rabbitmq,此时Unacked
消息数量由2减少为1,小于了prefetch
,rabbitmq就会再发送一条消息给消费者,发送后Ready
消息数量减1,Unacked
消息数量又增加为2,达到prefetch
,此时rabbitmq又会停止继续发送消息给消费者本文发布于:2024-02-02 09:24:10,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170683704942859.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |