springboot整合activeMq的使用,队列,主题,消息手动确认,重发机制

阅读: 评论:0

springboot整合activeMq的使用,队列,主题,消息手动确认,重发机制

springboot整合activeMq的使用,队列,主题,消息手动确认,重发机制

转载:=distribute.-task-blog-2%7Edefault%l&depth_1-utm_source=distribute.-task-blog-2%7Edefault%l

1.springboot项目搭建

可本地启动activemq项目,直接官网下载后,启动即可。
springboot项目搭建两个模块(mq-producer,mq-consumer),分别模拟生产值和消费者;

2.队列消息(点对点模式Queue),主题订阅模式(Topic)

队列消息是点对点模式,生产方生产一条消息只能给一个消费方消费。
主题订阅模式,生产方的一条消息可同时下发给多个订阅该主题的消费方去使用和消费。

3.消息手动确认

消息手动确认,主要用于消费方是否由于异常等原因,未处理完该消息;
因此只有手动确认后,给出回馈消息,该消息才确认消费完成,否则consumer重启后会重新消费该消息。控制面板中依旧存在Messages Enqueued 中。
activemq的消息确认机制,本文中使用的是通过INDIVIDUAL_ACKNOWLEDGE = 4, 单条消息确认,是 activemq 独有。

4.消息重发

消息重发机制,主要用户消费方是否由于异常等原因,未处理完该消息,需要在一定时间后重发该消息,继续处理。
消息重发可通过RedeliveryPolicy,定义各种参数,如消息重发的次数,间隔时间等;
本文只简单介绍activemq使用以上四点时的关键步骤,如需进一步了解可查阅更多资料。
给出主要实现代码

项目结构截图

mq-producer

mq-consumer

一、 配置文件(mq-producer, mq-consumer模块项目的配置文件相同即可)

注:需修改 server.port,避免端口冲突

#ActiveMQ配置
## 基础配置
spring:activemq:broker-url: tcp://localhost:61616user: adminpassword: adminpool:   # 线程池配置enabled: falsemax-connections: 50
#  jms:  ## 开启支持发布订阅模式,默认点对点模式
#    pub-sub-domain: truemq:topic: mq.topicqueue: mq.queue## 集群配置
#spring.activemq.broker-url: failover:(tcp://10.10.2.137:61616,tcp://10.10.2.138:61616,tcp://10.10.2.139:61616)
server:port: 8078

其中 spring.jms.pub-sub-domain参数如果设置为true,则开启发布订阅模式,无法使用点对点队列模式。由于在代码中需要同时使用两种模式,故此处配置去掉。

 

二、生产者(mq-producer)

① 定义主题和队列对象给spring管理,注入使用

package fig;import org.apache.activemqmand.ActiveMQQueue;
import org.apache.activemqmand.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import t.annotation.Bean;
import t.annotation.Configuration;import javax.jms.Queue;
import javax.jms.Topic;@Configuration
public class JmsConfig {@Value("${mq.queue}")private String mqQueue ;@Value("${mq.topic}")private String mqTopic ;@Bean//交给spring管理,方便后续注入public Queue queue(){return new ActiveMQQueue(mqQueue) ;}@Bean //将主题对象交给spring管理public Topic topic(){return new ActiveMQTopic(mqTopic) ;}}

② 消息生产方法类

package com.cdm.service;import org.apache.activemqmand.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.JmsMessagingTemplate;
import org.springframework.stereotype.Service;import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.Topic;@Service
public class ProduceService {@Autowiredprivate Queue queue ;@Autowiredprivate JmsMessagingTemplate  jmsTemplate ; //用来发送消息到broker的对象/*** 发送消息* @param destinationName 是发送到的队列名* @param message 是待发送的消息*/public void sendMsg(String destinationName, String message){System.out.println("发送消息=====>>>>>:" + message);Destination destination = new ActiveMQQueue(destinationName) ;vertAndSend(destination, message);}/*** message是待发送的消息* @param message  是待发送的消息*/public void sendMsg(final String message){System.out.println("发送消息=====>>>>>:" + message);vertAndSend(this.queue, message);}//=======发布订阅相关代码=========@Autowiredprivate Topic topic;public void publish(String msg) {pic, msg);}}

③ 测试发送消息和发布主题消息

package ller;import com.cdm.service.ProduceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;@RestController
public class ProduceController {@Autowiredprivate ProduceService produceService ;@RequestMapping("/queue")@ResponseBodypublic String sendQueue(){String msgStr = System.currentTimeMillis() + ".MQ.Queue" ;produceService.sendMsg(msgStr);System.out.println("点对点通信, message:" + msgStr);return msgStr ;}@RequestMapping("/topic")@ResponseBodypublic String sendTopic(){String msgStr = System.currentTimeMillis() + ".MQ.Topic" ;produceService.publish(msgStr);System.out.println("发布订阅模式通信, message:" + msgStr);return msgStr ;}
}

④主类

package com.cdm;import org.apache.activemqmand.ActiveMQQueue;
import org.apache.activemqmand.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import t.annotation.Bean;
import t.annotation.Configuration;import javax.jms.Queue;
import javax.jms.Topic;
import javax.swing.*;@SpringBootApplication
public class ProducerApplication {public static void main(String[] args) {System.out.println("****** Producer 启动****** : " );SpringApplication.run(ProducerApplication.class, args) ;}
}

⑤测试

localhost:8078/queue

localhost:8078/topic

发送消息=====>>>>>:1624353880692.MQ.Queue
点对点通信, message:1624353880692.MQ.Queue
发布订阅模式通信, message:1624353894689.MQ.Topic
发布订阅模式通信, message:1624353974750.MQ.Topic
发送消息=====>>>>>:1624353989749.MQ.Queue
点对点通信, message:1624353989749.MQ.Queue
发送消息=====>>>>>:1624354058811.MQ.Queue
点对点通信, message:1624354058811.MQ.Queue
发布订阅模式通信, message:1624355610134.MQ.Topic

 三、消费者(mq-consumer)

1.上面提到,同时支持点对点和主题订阅模式。因此还需要自定义 topicListenerContainerFactory 和 queueListenerContainerFactory,如下。

2.支持手动确认接收消息,此处为方便大家查看不同的配置方式,定义用订阅主题方式不用手动确认,而队列模式支持ack手动确认。想使用那种方式都可以,主要是定义ContainerFactory的配置中这一项

    factory.setSessionTransacted(false);
    factory.setSessionAcknowledgeMode(4);

这里也普及下activemq的消息确认机制:

AUTO_ACKNOWLEDGE = 1    自动确认
CLIENT_ACKNOWLEDGE = 2    客户端手动确认   
DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
SESSION_TRANSACTED = 0    事务提交并确认
INDIVIDUAL_ACKNOWLEDGE = 4    单条消息确认 activemq 独有
ACK模式描述了Consumer与broker确认消息的方式(时机),比如当消息被Consumer接收之后,Consumer将在何时确认消息。
对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,通过ACK,可以在consumer(/producer)
与Broker之间建立一种简单的“担保”机制.
手动确认和单条消息确认需要手动的在客户端调用message.acknowledge();

3.支持消息未确认重发机制 , 即在 jmsConnectionFactory 中定义 RedeliveryPolicy,具体实现如下。

① 消息同时支持两种模式和手动确认机制的配置类

package fig;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import t.annotation.Bean;
import t.annotation.Configuration;
import org.fig.DefaultJmsListenerContainerFactory;
import org.fig.JmsListenerContainerFactory;import javax.jms.ConnectionFactory;@Configuration
public class JmsConfig {@Value("${mq.queue}")private String mqQueue ;@Value("${mq.topic}")private String mqTopic ;@Value("${spring.activemq.broker-url}")private String brokerUrl;@Value("${spring.activemq.user}")private String userName;@Value("${spring.activemq.password}")private String password;/*** @title 发布-订阅*/@Beanpublic JmsListenerContainerFactory<?> topicListenerContainerFactory(ConnectionFactory connectionFactory){DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setPubSubDomain(true);factory.setConnectionFactory(connectionFactory);return factory ;}/*** @title 点对点*/@Beanpublic JmsListenerContainerFactory<?> queueListenerContainerFactory(ConnectionFactory connectionFactory){DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();factory.setPubSubDomain(false);factory.setConnectionFactory(connectionFactory);factory.setSessionTransacted(false);factory.setSessionAcknowledgeMode(4);return factory ;}@Beanpublic ConnectionFactory connectionFactory(){ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory() ;connectionFactory.setBrokerURL(brokerUrl);connectionFactory.setUserName(userName);connectionFactory.setPassword(password);connectionFactory.setTrustAllPackages(true);connectionFactory.setMaxThreadPoolSize(ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE);RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();//定义ReDelivery(重发机制)机制 ,重发时间间隔是100毫秒,最大重发次数是3次//是否在每次尝试重新发送失败后,增长这个等待时间redeliveryPolicy.setUseExponentialBackOff(true);//重发次数,默认为6次   这里设置为1次redeliveryPolicy.setMaximumRedeliveries(2);//重发时间间隔,默认为1秒redeliveryPolicy.setInitialRedeliveryDelay(5000);//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是valueredeliveryPolicy.setBackOffMultiplier(2);//最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第//二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。redeliveryPolicy.setMaximumRedeliveryDelay(1000);connectionFactory.setRedeliveryPolicy(redeliveryPolicy);return connectionFactory;}}

② 队列消费方法

package com.cdmponent;import org.springframework.beans.factory.annotation.Value;
import t.annotation.Configuration;
import org.springframework.jms.JmsException;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;@Component
@Configuration
public class QueueConsumer {@Value("${mq.queue}")private String mqQueue ;@Value("${mq.topic}")private String mqTopic ;@JmsListener(destination = "${mq.queue}", containerFactory = "queueListenerContainerFactory")public void receive3(Message message, Session session) throws JMSException {TextMessage textMessage = (TextMessage) message;System.out.println("queue 消费者 receive3 = " + Text());try {if (Text().contains("MQ.Queue")){throw new JMSException("故意抛出的异常");}message.acknowledge();} catch (JMSException e) {System.out.println(String.format("触发重发机制msg = %s", Text()));ver();}}}

 ③ 两个主题消费

package com.cdmponent;import t.annotation.Configuration;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;@Component
@Configuration
public class TopicConsumer {@JmsListener(destination="${mq.topic}", containerFactory = "topicListenerContainerFactory")public void receive1(String text){System.out.println("topic 消费者:receive1= " + text);}@JmsListener(destination="${mq.topic}", containerFactory = "topicListenerContainerFactory")public void receive2(String text){System.out.println("topic 消费者:receive2= " + text);}}

④主类

package com.cdm;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;import javax.security.auth.login.Configuration;@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args) ;}
}

 通过生产者项目中发送测试请求,可以发现消费者接收消息的日志。

 

测试:分别运行两个主类,运行MQ:binwin64activemq.bat, 

浏览器发送请求:

localhost:8078/queue

localhost:8078/topic

可以在MQ上看到,也可在mq-consumer的控制台信息看到数据:

重发:

 

 

 

本文发布于:2024-02-01 16:28:14,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170677609137943.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:队列   重发   机制   消息   主题
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23