RabbitMQ是主流开源消息队列中间件, 遵循AMQP(Advanced Message Queuing Protocol高级消息队列协议), AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求较低。
RabbitMQ 消息结构
Simple 简单队列
多消费者模式(无交换机)
多消费模式(交换机)
RCP通讯模式
如果我们需要将一个函数运行在远程计算机上并且等待从那儿获取结果,这种模式通常被称为远程过程调用(Remote Procedure Call)或者RPC。
RabbitMQ Exchange 根据不同分发策略发送消息, 目前共有四种类型: direct、topic、fanout、headers。 headers 匹配 AMQP 消息的 header 而不是路由键, 且headers 交换器和 direct 交换器完全一致, 但性能差距较多, 所以主要看以下三种类型:
Direct Exchange
Topic Exchange
Topic模式, 所发送的消息都会被转发到订阅了该Topic 队列并且与RouteKey匹配的消费端中。该模式需要先绑定Exchange与Queue。
Fanout Exchange
可以理解为广播模式, 所有发送的消息都会被转发到与该Exchange绑定的所有队列中。
RabbitMQ有三种模式: Direct、Fanout、Topic。不同模式对性能影响不大, 但在Direct模式下消息发布的性能要比其他模式好些。
RabbitMQ单机吞吐性能:
发布: 13988消息/秒
订阅: 15689消息/秒
发布: 18669消息/秒
订阅: 26896消息/秒
非持久化模式相比持久化模式,在发布性能上有33%提升, 在订阅性能上有70%左右提升。
配置优化
vm_memory_high_watermark:用于配置内存阈值,建议小于0.5,因为Erlang GC在最坏情况下会消耗一倍的内存。
vm_memory_high_watermark_paging_ratio:用于配置paging阈值,该值为1时,直接触发内存满阈值,block生产者。
io_thread_pool_size:CPU大于或等于16核时,将Erlang异步线程池数目设为100左右,提高文件IO性能。
hipe_compile:开启Erlang HiPE编译选项(相当于Erlang的jit技术),能够提高性能20%-50%。在Erlang R17后HiPE已经相当稳定,RabbitMQ官方也建议开启此选项。
queue_index_embed_msgs_below:默认值为4096, RabbitMQ 3.5版本引入了将小消息直接存入队列索引(queue_index)的优化,消息持久化直接在amqqueue进程中处理,不再通过msg_store进程。由于消息在5个内部队列中是有序的,所以不再需要额外的位置索引(msg_store_index)。该优化提高了系统性能10%左右。
queue_index_max_journal_entries:默认值为262144,队列的索引日志超过该阈值将刷新到磁盘, journal文件是queue_index为避免过多磁盘寻址添加的一层缓冲(内存文件)。对于生产消费正常的情况,消息生产和消费的记录在journal文件中一致,则不用再保存;对于无消费者情况,该文件增加了一次多余的IO操作。
线程优化: 产者使用多线程发送数据到queue, 三到五个线程性能发送最佳; 消费者的数据处理,使用二到三个线程接收性能是最佳, 具体还需根据CPU核心数决定; 实际当中发送的速率比接收处理的速率要高, 接收者线程 = 发送者线程 * 1.5倍
网上安装教程资料很多, 这里不做赘述, 参考信息:
Erlang下载地址
Windows下安装
Linux下安装
通过spring-boot-starter-amqp组件, 可以实现RabbitMQ的集成, 不需要再作繁琐的配置, RabbitMQ主要有三种模式: Direct、Topic 和 Fanout。配置和使用上存在一些差别, 下面对这三种模式的集成使用做具体讲解。
Spring Boot Starter 下面提供了spring-boot-starter-amqp组件, 能够快速集成各种遵循AMQP协议的消息中间件。
创建spring-boot-mq-rabbitmq工程
工程配置
server:port: 12612
spring:application:name: mq-rabbitmq# rabbitmq 配置rabbitmq:password: guestusername: guestport: 5672addresses: 127.0.0.1#开启发送失败返回publisher-returns: true#开启发送确认publisher-confirms: true
设置RabbitMQ连接地址信息, 开启发送失败返回与发送确认, 确保RabbitMQ能够成功收到所发送的消息。
MAVEN依赖配置:
这里采用Spring Boot 版本为2.1.6.RELEASE。
spring-boot-mq父工程依赖
<dependencies><!-- AMQP消息队列组件依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- Spring boot Web服务组件依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- Apache Commons 工具依赖 --><dependency><groupId>org.apachemons</groupId><artifactId>commons-lang3</artifactId></dependency>
</dependencies>
需要演示多个消息队列组件的使用, 将公用依赖放置父工程。
spring-boot-mq-rabbitmq工程依赖
直接继承spring-boot-mq父工程依赖, 无须再依赖其他组件。
Direct模式配置
基于JavaConfig配置,
新建com.mirson.spring.boot.fig.RabbitDirectConfig:
@Configuration
public class RabbitDirectConfig {public final static String DIRECT_QUEUE = "directQueue";public final static String DIRECT_EXCHANGE = "directExchange";public static final String DIRECT_ROUTINGKEY = "directRoutingKey";@Beanpublic DirectExchange defaultExchange() {return new DirectExchange(DIRECT_EXCHANGE);}// direct模式队列@Beanpublic Queue directQueue() {return new Queue(DIRECT_QUEUE, true);}@Beanpublic Binding binding() {return BindingBuilder.bind(directQueue()).to(defaultExchange()).with(DIRECT_ROUTINGKEY);} }
定义QUEUE、EXCHANGE 和 ROUTING名称, 并配置对应BEAN, 最后做绑定配置。
定义消费端
com.mirson.spring.boot.sume.DirectReceiver
@Component
@Log4j2
public class DirectReceiver {@RabbitListener(queues= "#{directQueue.name}")public void recvOne(String obj) {log.info("DirectReceiver, receive the queue msg:"+obj);}}
注意,RabbitListener与监听对应的队列名称,因上面JavaConfig的配置并未指定别名, 则采用默认名称: directQueue.name
封装发送端
com.mirson.spring.boot.mq.rabbit.provider.RabbitDirectSender
@Component
@Log4j2
public class RabbitDirectSender implements RabbitTemplate.ConfirmCallback {private RabbitTemplate rabbitTemplate;/*** 构造方法注入*/@Autowiredpublic RabbitDirectSender(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;//这是是设置回调能收到发送到响应,confirm()在下面解释rabbitTemplate.setConfirmCallback(this);}/*** 发送消息* @param content*/public void sendMsg(String content) {CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());//convertAndSend(exchange:交换机名称,routingKey:路由关键字,object:发送的消息内容,correlationData:消息vertAndSend(RabbitDirectConfig.DIRECT_EXCHANGE, RabbitDirectConfig.DIRECT_ROUTINGKEY, content, correlationId);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info(" 发送数据id:" + correlationData);if (ack) {log.info("消息已确认发送");} else {log.info("消息发送失败:" + cause);}}
}
提供发送接口
Controller层定义com.mirson.spring.boot.mq.rabbit.provider.ProviderController
@RestController
public class ProviderController {@Autowiredprivate RabbitDirectSender rabbitDirectSender;private static final String SUCCESS = "success";/*** Direct模式, 发送消息* @return*/@GetMapping("/directMessage")public String directMessage() {rabbitDirectSender.sendMsg("from directMessage, random number: " + Int(0, 100));return SUCCESS;}}
引入rabbitDirectSender,提供directMessage接口, 每次请求, 发送一个0至100的随机数。
测试验证
请求接口地址
127.0.0.1:12612/directMessage
查看控制台日志
消息监听成功接收到所发送的数据:
DirectReceiver, receive the queue msg:from directMessage, random number: 95
Topic模式配置
新建com.mirson.spring.boot.fig.RabbitTopicConfig
@Configuration
public class RabbitTopicConfig {public static final String TOPIC_EXCHANGE = "topicExchange";public static final String TOPIC_ROUTINGKEY = "topicRoutingKey";public static final String TOPIC_ONE= ";public static final String TOPIC_TWO= "topic.two";public static final String TOPIC_ALL= "topic.all";public static final String QUEUE_ONE= ";public static final String QUEUE_TWO= "queue.two";public static final String QUEUE_ALL = "queue.all";//创建队列1@Bean(name = QUEUE_ONE)public Queue queueOne(){return new Queue(TOPIC_ONE);}//创建队列2@Bean(name = QUEUE_TWO)public Queue queueTwo(){return new Queue(TOPIC_TWO);}//创建队列, 监听所有@Bean(name = QUEUE_ALL)public Queue queueAll(){return new Queue(TOPIC_ALL);}//创建交换机@Beanpublic TopicExchange exchange(){return new TopicExchange(TOPIC_EXCHANGE);}//绑定队列1@BeanBinding bindingOne(@Qualifier(QUEUE_ONE)Queue queuemessage, TopicExchange exchange){return BindingBuilder.bind(queuemessage).to(exchange).with(TOPIC_ONE);}//绑定队列2@BeanBinding bindingTwo(@Qualifier(QUEUE_TWO)Queue queuemessages, TopicExchange exchange){return BindingBuilder.bind(queuemessages).to(exchange).with(TOPIC_TWO);}//绑定队列, 监听所有@BeanBinding bindingAll(@Qualifier(QUEUE_ALL)Queue queuemessages, TopicExchange exchange){return BindingBuilder.bind(queuemessages).to(exchange).with("topic.#");}}
监听器配置
新增com.mirson.spring.boot.sume.TopicReceiver:
@Component
@Log4j2
public class TopicReceiver {/*** 监听TOPIC_ONE队列数据* @param obj*/@RabbitListener(queues=RabbitTopicConfig.TOPIC_ONE)public void recvOne(String obj) {log.info("TopicReceiver, receive the topic one msg:"+obj);}/*** 监听TOPIC_TWO队列数据* @param obj*/@RabbitListener(queues=RabbitTopicConfig.TOPIC_TWO)public void recvTwo(String obj) {log.info("TopicReceiver, receive the topic two msg:"+obj);}/*** 监听TOPIC_THREE队列数据* @param obj*/@RabbitListener(queues=RabbitTopicConfig.TOPIC_ALL)public void recvAll(String obj) {log.info("TopicReceiver, receive all topic msg:"+obj);}
}
配置了三个TOPIC的监听器, 注意RabbitListener配置的主题名称(TOPIC_ONE), 而非队列名称(QUEUE_ONE)。
Topic发送接口
修改com.mirson.spring.boot.mq.rabbit.provider.ProviderController, 增加:
/*** Topic模式, 发送消息* @param topic* @return*/@GetMapping("/topicMessage")public String topicMessage(String topic) {if(StringUtils.isEmpty(topic)){return "require topic param.";}String content = "from topicMessage, random number: " + Int(0, 100);CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());vertAndSend(RabbitTopicConfig.TOPIC_EXCHANGE, topic, content, correlationId);return SUCCESS;}
接口接收一个topic参数, 用于指定发送的目标topic, 便于测试验证。
测试验证
请求地址
往不同主题分别发送数据:
127.0.0.1:12612/topicMessage?topic
127.0.0.1:12612/topicMessage?topic=topic.two
127.0.0.1:12612/topicMessage?topicst
查看监听日志:
第一条往发送的消息, 和topic.all都能接收到;
第二条往topic.two发送的消息, topic.two和topic.all都能接收到;
第三条往st发送的消息, 只有topic.all能接收到, 因为该主题采用的是匹配模式;
Topic模式配置
新建com.mirson.spring.boot.fig.RabbitFanoutConfig
@Configuration
public class RabbitFanoutConfig {public static final String QUEUE_ONE = ";public static final String QUEUE_TWO = "fanout.two";public static final String QUEUE_THREE = "fanout.three";/*** 广播路由器名称*/public static final String FANOUT_EXCHANGE = "fanoutExchange";/*** 配置队列* @return*/@Bean(name=QUEUE_ONE)public Queue queueOne() {return new Queue(QUEUE_ONE);}@Bean(name=QUEUE_TWO)public Queue queueTwo() {return new Queue(QUEUE_TWO);}@Bean(name=QUEUE_THREE)public Queue queueThree() {return new Queue(QUEUE_THREE);}//配置广播路由器@BeanFanoutExchange fanoutExchange() {return new FanoutExchange(FANOUT_EXCHANGE);}// 绑定队列@BeanBinding bindingExchangeOne(@Qualifier(QUEUE_ONE) Queue queueOne, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queueOne).to(fanoutExchange);}@BeanBinding bindingExchangeTwo(@Qualifier(QUEUE_TWO) Queue queueTwo, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queueTwo).to(fanoutExchange);}@BeanBinding bindingExchangeThree(@Qualifier(QUEUE_THREE) Queue queueThree, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queueThree).to(fanoutExchange);}}
定义监听器
com.mirson.spring.boot.sume.FanoutReceiver:
@Component
@Log4j2
public class FanoutReceiver {/*** 队列监听器, QUEUE_ONE* @param obj*/@RabbitListener(queues= RabbitFanoutConfig.QUEUE_ONE)public void recvOne(String obj) {log.info("FanoutReceiver, receive the queue one msg:"+obj);}/*** 队列监听器, QUEUE_TWO* @param obj*/@RabbitListener(queues=RabbitFanoutConfig.QUEUE_TWO)public void recvTwo(String obj) {log.info("FanoutReceiver, receive the queue two msg:"+obj);}/*** 队列监听器, QUEUE_THREE* @param obj*/@RabbitListener(queues=RabbitFanoutConfig.QUEUE_THREE)public void recvAll(String obj) {log.info("FanoutReceiver, receive the queue three msg:"+obj);}
}
配置三个监听器, 打印接收到的数据日志信息。
发送接口
修改com.mirson.spring.boot.mq.rabbit.provider.ProviderController, 增加:
/*** faout模式, 发送消息* @return*/@GetMapping("/fanoutMessage")public String fanoutMessage() {String content = "from fanoutMessage, random number: " + Int(0, 100);CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());vertAndSend(RabbitFanoutConfig.FANOUT_EXCHANGE, "", content, correlationId);return SUCCESS;}
定义接口, /fanoutMessage用于发送fanout广播消息, 指定交换器名称, 不需指定路由, 发送随机数进行测试。
测试验证
请求地址
127.0.0.1:12612/fanoutMessage
查看控制台结果
可以看到, 发送了一条数据, 三个监听队列都打印了所发送的数据, fanout模式能够正常接收消息。
这里介绍了RabbitMQ三种不同模式的集成配置与使用,大家根据生产环境的需要, 选择不同的模式使用。 通过Spring Boot Start AMQP基于JavaConfig注解模式, 能够方便我们集成使用, 消息的发送与监听, 不需要再做繁琐配置, 非常简便, 如果更改设置, 只需在配置文件修改对应属性即可。
本文发布于:2024-01-29 05:19:07,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170647674912973.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |