在金融互联网领域广泛应用,在阿里双11活动经历过多次考验, 经过严苛的生产验证,有比较高的可靠性,在数据处理上有比较高的稳定性, 能从最大程度上保证消息不易丢失,如果业务上有一定的规模, 且对数据的一致性,稳定性要求严苛, 那么可以采用RocketMQ, 比如金融互联网领域, 支付场景、交易场景等。如果有借助消息队列实现分布式事务, RocketMQ可以作为首选。
Spring Boot 官方提供了spring-boot-starter-activemq 对ActiveMQ的支持, 但并没有提供对RocketMQ的支持, 这不代表Spring Boot 本身不支持, RocketMQ 官方给我们提供了RocketMQ-Spring 框架, 整合了RocketMQ与Spring Boot, 主要提供3个特性:
RocketMQTemplate
用来统一发送消息,包括同步、异步发送消息和事务消息@RocketMQTransactionListener
注解用来处理事务消息的监听和回查@RocketMQMessageListener
注解用来消费消息简要安装说明, 可参考RocketMQ的官方文档。
下载RocketMQ 安装文件
如果不能连接, 采用其他镜像下载: .3.2/rocketmq-all-4.3.2-bin-release.zip
启动 NameServer
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
启动Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
采用RocketMQ官方提供得rocketmq-spring-boot-starter作为集成组件。
创建spring-boot-mq-rocket父级工程
MAVEN依赖:
<properties><rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
</properties><dependencies><!-- RocketMq与Spring Boot 集成组件依赖--><dependency><groupId>ketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq-spring-boot-starter-version}</version></dependency>
</dependencies>
创建rocketmq-basic工程
工程依赖直接继承父级依赖, 无须添加其他依赖组件。
工程配置:
server:port: 12613
spring:application:name: rocketmq-basic# RocketMQ配置
rocketmq:name-server: 10.10.20.15:9876producer:group: basic-group
配置填写RocketMQ地址信息, 如果是集群,多个以逗号分割。
创建启动类
com.mirson.spring.ket.basic.startup.RocketMqBasicApplication
@SpringBootApplication
@ComponentScan(basePackages = {"com.mirson"})
public class RocketMqBasicApplication {public static void main(String[] args) {SpringApplication.run(RocketMqBasicApplication.class, args);}
}
扫描包含com.mirson包下所有路径。
定义监听器
com.mirson.spring.sume.StringConsumer:
@Service
@RocketMQMessageListener(topic = RabbitMqConfig.TOPIC, consumerGroup = RabbitMqConfig.CONSUME_GROUP_STRING)
@Log4j2
public class StringConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("StringConsumer => receive: " + message);}
}
提供接口
com.mirson.spring.ket.basic.provider.RocketMqProviderContorller
@RestController
@Log4j2
public class RocketMqProviderContorller {@Resourceprivate RocketMQTemplate rocketMQTemplate;/*** 生产者发送字符类型消息* @return*/@GetMapping("/sendString")public String sendString() {String msg = "random number: " + Int(0, 100);// Send stringSendResult sendResult = rocketMQTemplate.syncSend(RabbitMqConfig.TOPIC, msg);log.info("send result: " + SendStatus());return msg;}...
}
提供sendString接口, 每次请求发送一个随机数, 通过RocketMQTemplate的syncSend同步方法发送数据。
如果发送成功, 会返回状态: SEND_OK。
调用验证
访问接口地址:127.0.0.1:12613/sendString
查看监听器日志
可以看到, String类型的普通消息监听器, 正常接收到消息。
RocketMQ原生消息,除了发送的数据, 还可以获取RocketMQ内置的系统信息, 比如消息ID, 主机名称,时间戳, 队列信息等。
定义监听器
com.mirson.spring.sume.MessageExtConsumer
@Service
@RocketMQMessageListener(topic = RabbitMqConfig.TOPIC_EXT, selectorExpression = "tag1", consumerGroup = RabbitMqConfig.CONSUME_GROUP_EXT)
@Log4j2
public class MessageExtConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener{@Overridepublic void onMessage(MessageExt message) {log.info("MessageExtConsumer => receive msgId:{}, msgData:{} ", MsgId(), new Body()));}/*** 自定义消费者的开始位置,这里设置的是当前时间* @param consumer*/@Overridepublic void prepareStart(DefaultMQPushConsumer consumer) {// set consumer consume message from nowconsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));}
}
订阅的主题为RabbitMqConfig.TOPIC_EXT, 订阅的Group为RabbitMqConfig.CONSUME_GROUP_EXT。打印接收到的消息ID与数据。
实现RocketMQPushConsumerLifecycleListener接口, 可以自定义消费者的开始消息位置, 这里设置的是当前时间。
提供发送接口
com.mirson.spring.ket.basic.provider.RocketMqProviderContorller, 增加接口:
/*** 发送RocketMQ 原生消息* @return*/
@GetMapping("/sendStringExt")
public String sendStringExt() {String msg = "random number: " + Int(0, 100);try {SendResult result = rocketMQTemplate.syncSend(RabbitMqConfig.TOPIC_EXT + ":tag1", msg);log.info("result: " + SendStatus());}catch(Exception e) {(e.getMessage(), e);}// Send String Ext Messagereturn msg;
}
测试验证
访问接口:
127.0.0.1:12613/sendStringExt
查看监听器日志
能够正常接收到消息, 并打印出了Rocketmq封装的消息ID。
Spring Message 是一种消息传输规范, RocketMQ可以支持, 在Spring Cloud Stream 中采用的就是Spring Message作为消息传输规范, 这是一个用于构建基于消息的微服务应用框架。
定义传输对象
在实际消息交互当中, 不会传输简单的数据结构, 一般传递的是业务对象,这里定义一个订单对象:
com.mirson.spring.ket.basic.bo.Order
@Data
public class Order implements Serializable {private static final long serialVersionUID = -1L;/*** 订单ID*/private String orderId;/*** 创建时间*/private Date createDate;}
消息交互当中, 默认会通过序列化传递, 需要实现序列化接口。
定义监听器
com.mirson.spring.sume.OrderSpringMessageConsumer
@Service
@RocketMQMessageListener(topic = RabbitMqConfig.TOPIC_SPRING_MESSAGE, consumerGroup = RabbitMqConfig.CONSUME_GROUP_SPRING_MESSAGE)
@Log4j2
public class OrderSpringMessageConsumer implements RocketMQListener<Order> {@Overridepublic void onMessage(Order order) {log.info("OrderSpringMessageConsumer => receive order: " + order);}}
定义发送接口
com.mirson.spring.ket.basic.provider.RocketMqProviderContorller
/*** 发送RocketMQ Spring Message封装消息* @return*/
@GetMapping("/sendSpringMessage")
public String sendSpringMessage() {String msg = "random number: " + Int(0, 100);Order order = new Order();order.setOrderId(UUID.randomUUID().toString());order.setCreateDate(new Date());// Send Spring Message With OrderSendResult result = rocketMQTemplate.syncSend(RabbitMqConfig.TOPIC_SPRING_MESSAGE, MessageBuilder.withPayload(order).build());log.info("send result: " + SendStatus());return msg;
}
测试验证
调用发送接口
127.0.0.1:12613/sendSpringMessage
查看监听器日志
能够正常接收并打印出完整的订单数据。
本文发布于:2024-01-29 05:19:31,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170647677612976.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |