RabbitMQ高级特性——消费端限流,prefetch的设置

阅读: 评论:0

RabbitMQ高级特性——消费端限流,prefetch的设置

RabbitMQ高级特性——消费端限流,prefetch的设置

大伙可以到我的RabbitMQ专栏获取更多信息

demo示例这里拿

概述

在之前介绍的MQ的文章中——什么是MQ:消息队列的基本概念和优劣势,我提到了MQ对于处理高并发场景的能力,即削峰填谷

  1. 当请求并发增高的时候,所有业务请求首先放入MQ
  2. 业务系统则在保证系统正常平稳的情况下,尽力从MQ获取业务信息,按照自己的“节奏”处理业务
  3. 过多的业务积压在MQ中,拉长了整体业务的执行时长,这就是填谷

 上述过程中,第二步其实就是对消费端的限流,帮助MQ的各个消费者平稳的度过业务请求的高峰,其实还有一个场景就是当消费端因为某些情况下停止服务了一段时间,或者是宕机,或者是系统升级,在这个停止服务的过程中,在MQ中挤压了大量等待消费者消化的message,一旦消费者重启启动后提供业务服务,这时候如果不对消费端限流的话,队列中的message将一并倾泻给消费端,这很可能瞬间压垮消费端服务。所以在高并发系统的设计中,服务限流是一种很好的办法,并且是非常的必要的。

代码示例

消费者端配置:

主要注意prefetch参数,代表消费端限流数量,每个消费者未确认的未处理消息的最大数量。

server:port: 2002spring:rabbitmq:host: 127.0.0.1port: 5672username: LeoLeepassword: lyl512240816virtual-host: /LeoLeelistener:simple:acknowledge-mode: manual #消费者端确认模式:none自动确认 manual手动确认 auto通过抛出异常的类型,来做响应的处理concurrency: 1 #当前监听的数量max-concurrency: 5 #最大监听数量retry:enabled: true #是否支持重试max-attempts: 4 #最大重试次数,默认为3prefetch: 2 #消费端限流5 每个消费者未确认的未处理消息的最大数量direct:acknowledge-mode: manual    #acknowledgeMode设置为手动模式

接收消息监听器:

需要注意的是,代码中我故意屏蔽了手动确认的方法,导致消费者端在接收到消息后始终无法确认,这样就会阻塞消费者端继续从MQ获取消息,达到验证限流的目的

package com.leolee.rabbitmq.MsgListener;import com.rabbitmq.client.Channel;
import org.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;SimpleDateFormat;
import java.util.Date;/*** @ClassName QosListener* @Description: 消费者限流 QOS:服务质量(Quality of Service)* 1.确保ack机制为手动确认:acknowledge-mode: manual* 2.设置消费限流配置:prefetch: 2 代表消费端每次从MQ拉去两条消息处理,知道手动的确认完毕后,才继续从MQ拉去消息* @Author LeoLee* @Date 2020/11/8* @Version V1.0**/
@Component
public class QosListener implements ChannelAwareMessageListener {@RabbitListener(queues = "boot_queue")@Overridepublic void onMessage(Message message, Channel channel) throws Exception {System.out.println("-----------------------------------------");System.out.println("msg:" + new Body()));System.out.println("处理业务逻辑用时需要3秒中,当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));Thread.sleep(3000);//        channel.MessageProperties().getDeliveryTag(), true);}
}

消息生产者:

连续发送10条消息

    /** 功能描述: <br>* 〈测试客户端限流,多发几条给客户端〉* @Param: []* @Return: void* @Author: LeoLee* @Date: 2020/11/8 3:06*/@Testpublic void testQos() {for (int i = 1; i <= 10; i++) {vertAndSend(RabbitMQConfig.EXCHANGE_NAME, &#st", "test msg send:[" + i + "]");}}

执行结果:

可以清楚的看到,消费者由于配置了prefetch: 2并且接收代码中没有消息确认,剩下队列中的8条消息,始终无法被获取到,达到了限流的目的。

 

 

本文发布于:2024-01-31 14:53:29,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170668401129316.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:特性   高级   RabbitMQ   端限流   prefetch
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23