springboot集成Rabbitmq 手动确认消息

阅读: 评论:0

springboot集成Rabbitmq 手动确认消息

springboot集成Rabbitmq 手动确认消息

springboot集成Rabbitmq 手动确认消息

pom文件:
 <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.4.0</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
yml文件:
#配置rabbitMq 服务器
server:port: 9096#配置rabbitMq 服务器
spring:rabbitmq:host: 127.0.0.1port: 5672username: rootpassword: root#虚拟host 可以不设置,使用server默认hostvirtual-host: /#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: truetemplate:# 消息发送失败返回到队列中, yml需要配置 publisher-returns: truemandatory: truelistener:simple:# NONE 自动确认 RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。#      所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。#      一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。# MANUAL 手动确认 需要人为地获取到channel之后调用方法向server发送ack(或消费失败时的nack)信息。# AUTO 由spring-rabbit依据消息处理逻辑是否抛出异常自动发送ack(无异常)或nack(异常)到server端。acknowledge-mode: manual #手动ACKredis:host: 127.0.0.1 # Redis服务器地址database: 1 # Redis数据库索引(默认为0)port: 6379 # Redis服务器连接端口password: 123456 # Redis服务器连接密码(默认为空)timeout: 5000ms # 连接超时时间(毫秒)
config配置选其一

************************************第一种方式******************************************package com.fig;slf4j.Slf4j;
import org.springframework.tion.ConnectionFactory;
import org.springframework.RabbitTemplate;
import t.annotation.Bean;
import t.annotation.Configuration;@Slf4j
@Configuration
public class RabbitMqConfig {@Beanpublic RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(connectionFactory);//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);//消息发送成功的回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {log.info("ConfirmCallback:     "+"相关数据:"+correlationData);log.info("ConfirmCallback:     "+"确认情况:"+ack);log.info("ConfirmCallback:     "+"原因:"+cause);});//发生异常时的消息返回提醒rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {log.info("ReturnCallback:     "+"消息:"+message);log.info("ReturnCallback:     "+"回应码:"+replyCode);log.info("ReturnCallback:     "+"回应信息:"+replyText);log.info("ReturnCallback:     "+"交换机:"+exchange);log.info("ReturnCallback:     "+"路由键:"+routingKey);});return rabbitTemplate;}
}************************************第二种方式******************************************package com.fig;slf4j.Slf4j;
import org.Message;
import org.springframework.tion.CorrelationData;
import org.springframework.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import t.annotation.Configuration;import javax.annotation.Resource;@Slf4j
@Configuration
public class RabbitInitializingBean implements InitializingBean {@Resourceprivate RabbitTemplate rabbitTemplate;//初始化@Overridepublic void afterPropertiesSet() throws Exception {//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate.setMandatory(true);//消息发送成功的回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("ConfirmCallback:     " + "相关数据:" + correlationData);log.info("ConfirmCallback:     " + "确认情况:" + ack);log.info("ConfirmCallback:     " + "原因:" + cause);}});//发生异常时的消息返回提醒rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("ReturnCallback:     " + "消息:" + message);log.info("ReturnCallback:     " + "回应码:" + replyCode);log.info("ReturnCallback:     " + "回应信息:" + replyText);log.info("ReturnCallback:     " + "交换机:" + exchange);log.info("ReturnCallback:     " + "路由键:" + routingKey);}});}
}
主题设置TopicRabbitConfig:
package com.fig;import org.*;
import t.annotation.Bean;
import t.annotation.Configuration;@Configuration
public class TopicRabbitConfig {public final static String STUDENT_QUEUE = "topic.student.queue";//队列public final static String DELAY_QUEUE = "delay.queue";//延迟public final static String STUDENT_EXCHANGE  = "topic.student";//交换机public final static String DELAY_EXCHANGE  = &#hange";//延迟public final static String STUDENT_KEY = "topic.student.key";//路由public final static String DELAY_KEY = "delay.key";//延迟public final static Long EXPIRATION = 5000L;//过期时间@BeanTopicExchange delayEexchange() {return new TopicExchange(DELAY_EXCHANGE);}@Beanpublic Queue delayQueue() {// 设置超时队列return QueueBuilder.durable(DELAY_QUEUE)// DLX,dead letter发送到的exchange ,设置死信队列交换器到处理交换器.withArgument("x-dead-letter-exchange", TopicRabbitConfig.STUDENT_EXCHANGE)// dead letter携带的routing key,配置处理队列的路由key.withArgument("x-dead-letter-routing-key", STUDENT_KEY).build();}@Beanpublic Binding delayBinding() {return BindingBuilder.bind(delayQueue()).to(delayEexchange()).with(DELAY_KEY);}@Beanpublic Queue studentQueue() {return new Queue(STUDENT_QUEUE);}@BeanTopicExchange studentExchange() {return new TopicExchange(STUDENT_EXCHANGE);}@BeanBinding bindingExchangeStudent() {return BindingBuilder.bind(studentQueue()).to(studentExchange()).with(STUDENT_KEY);}}
监听类ReceiveHandler:
package com.jy.springbootRabbitmq.Handler;import onvert.Convert;
import util.ClassUtil;
import util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.fig.TopicRabbitConfig;
import com.ity.student;
import com.ity.RetrySendmsg;
import com.rabbitmq.client.Channel;
slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.Message;
import org.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.RabbitTemplate;
import org.StringRedisTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.io.IOException;
import java.util.UUID;@Slf4j
@Component
public class ReceiveHandler {@Resourceprivate RabbitTemplate rabbitTemplate;@Resourceprivate StringRedisTemplate stringRedisTemplate;/*** 监听消息* @param channel* @param message* @param dto* @throws IOException*/@RabbitListener(queues = {TopicRabbitConfig.STUDENT_QUEUE})public void studentMessage(Channel channel, Message message, student dto) throws IOException {try {log.info("=============>>>监听队列数据:"+ dto);channel.MessageProperties().getDeliveryTag(), false);} catch (Exception e) {channel.MessageProperties().getDeliveryTag(), false);resendMessage(TopicRabbitConfig.DELAY_EXCHANGE, TopicRabbitConfig.DELAY_KEY, dto,message);}}/*** 重试机制代码* @param exchange* @param routingKey* @param obj* @param message*/public void resendMessage(String exchange,String routingKey,Object obj,Message message) {String messageId = MessageProperties().getMessageId();if(StrUtil.isNotBlank(messageId) && Int(stringRedisTemplate.opsForValue().get(messageId)) >3) {log.info("===========>>>重试三次失败后依然失败存库人工处理"&#String());RetrySendmsg retrySendmsg = new RetrySendmsg();retrySendmsg.MessageProperties().getReceivedExchange());retrySendmsg.MessageProperties().getReceivedRoutingKey());retrySendmsg.JsonStr(obj));retrySendmsg.ClassName(obj, false));//TODO insert方法return;}vertAndSend(exchange, routingKey, obj, msg -> {MessageProperties messageProperties = MessageProperties();//设置消息IDString new_messageId = Str(UUID.randomUUID());if(StrUtil.isBlank(messageId)) {messageProperties.setMessageId(new_messageId);messageProperties.Str(TopicRabbitConfig.EXPIRATION));stringRedisTemplate.opsForValue().set(new_messageId,"1");}else {messageProperties.setMessageId(messageId);messageProperties.Str(TopicRabbitConfig.EXPIRATION));stringRedisTemplate.opsForValue().set(messageId, Int(stringRedisTemplate.opsForValue().get(messageId)) + 1 + "");}return msg;});}
}
实体类:
package com.ity;import lombok.Data;import java.io.Serializable;@Data
public class student implements Serializable{private static final long serialVersionUID = 1L;private String studentName;private String studentCode;public student(String studentName, String studentCode) {this.studentName = studentName;this.studentCode = studentCode;}}package com.ity;import lombok.Data;
import lombok.EqualsAndHashCode;@Data
@EqualsAndHashCode(callSuper = false)
public class RetrySendmsg {private static final long serialVersionUID = 1L;private String exchange;private String routingKey;private String msgBody;private String newMsgBody;private String className;private Integer status = 0;private Integer isSend = 0;
}

参考:

结束END

本文发布于:2024-02-01 16:29:07,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170677614837949.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   springboot   Rabbitmq
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23