Sprngboot 整合 mqtt 完整代码示例

阅读: 评论:0

Sprngboot 整合 mqtt 完整代码示例

Sprngboot 整合 mqtt 完整代码示例

代码教程

pom文件引入依赖

        <!--mqtt--><dependency><groupId&lipse.paho</groupId><artifactId&lipse.paho.client.mqttv3</artifactId><version>1.2.4</version></dependency>
spring:application:name: test-porjectmqtt:url: tcp://${MQTT_HOST:172.16.10.201:1883}client-id: ${spring.application.name}topic:- ${MQTT_TOPIC:/iot/#}

配置类


import com.workface.fullymechanizeminemon.listener.MqttSubscribeListener;
import com.ool.utils.StringUtil;
lipse.paho.client.mqttv3.IMqttClient;
lipse.paho.client.mqttv3.MqttClient;
lipse.paho.client.mqttv3.MqttConnectOptions;
lipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.t.properties.EnableConfigurationProperties;
import t.annotation.Bean;
import t.annotation.Configuration;@Configuration
@EnableConfigurationProperties({MqttProperties.class})
public class MqttConfiguration {private static final Logger log = Logger(MqttConfiguration.class);@Autowiredprivate MqttProperties mqttProperties;public MqttConfiguration() {}@Beanpublic MqttConnectOptions mqttConnectOptions() {MqttConnectOptions connectOptions = new MqttConnectOptions();connectOptions.setServerURIs(new String[]{Url()});if (StringUtil.isNotBlank(Url())) {connectOptions.setUserName(Username());}if (StringUtil.isNotBlank(Password())) {connectOptions.setPassword(Password().toCharArray());}connectOptions.setKeepAliveInterval(60);return connectOptions;}@Beanpublic IMqttClient mqttClient(MqttConnectOptions options) throws MqttException {IMqttClient mqttClient = new MqttClient(Url(), ClientId());t(options);for(int x = 0; x < Topic().length; ++x) {mqttClient.subscribe(Topic()[x], new MqttSubscribeListener());}return mqttClient;}
}
MqttProperties属性类

import lombok.Data;
import org.t.properties.ConfigurationProperties;@Data
@ConfigurationProperties("spring.mqtt")
public class MqttProperties {private String url;private String clientId;private String username;private String password;private String[] topic;
}
MqttSubscribeListener 订阅监听类

import com.workface.fullymechanizeminemon.event.MqttSubscribeEvent;
import com.ool.utils.SpringUtil;
lipse.paho.client.mqttv3.IMqttMessageListener;
lipse.paho.client.mqttv3.MqttMessage;public class MqttSubscribeListener implements IMqttMessageListener {@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) {String content = new Payload());MqttSubscribeEvent event = new MqttSubscribeEvent(s, content);SpringUtil.publishEvent(event);}
}
MqttEventListener 事件监听类

import lkit.StringPool;
import com.fig.mqtt.MqttProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import t.annotation.Configuration;import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;@Configuration
public class MqttEventListener {private static final Logger log = Logger(MqttEventListener.class);@Resourceprivate MqttProperties mqttProperties;private String processTopic (String topic) {List<String> topics = Arrays.Topic());for (String wild : topics) {wild = place(StringPool.HASH, StringPool.EMPTY);if (topic.startsWith(wild)) {place(wild, StringPool.EMPTY);}}return StringPool.EMPTY;}//    private static List<AcceptPointDTO> toPoints (Object source) {
//        String str = String(source);
//        List<AcceptPointDTO> data = JSONArray.parseArray(str, AcceptPointDTO.class);
//        return data;
//    }//    @Async
//    @EventListener(MqttSubscribeEvent.class)
//    public void listen (MqttSubscribeEvent event) {
//        String topic = Topic());
//        Object source = Source();
//        List<AcceptPointDTO> data = toPoints(source);
//        if (Func.isEmpty(data)) {
//            return;
//        }
//        ConcurrentHashMap<String, WebSocketService> webSocketMap = WebSocketMap();
//        if (!Func.isEmpty(webSocketMap) && webSocketMap.size()>=1){
//            for (Map.Entry<String, WebSocketService> entry : Set()) {
//                WebSocketService webSocketService = Value();
//                try {
//                    webSocketService.JSONString(data));
//                } catch (IOException e) {
//                    e.printStackTrace();
//                }
//            }
//        }
//        I_POINT_VALUE_SERVICE.savePointValue(data);
//    }
}
MqttUtil 工具类

import com.ool.utils.SpringUtil;
lipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MqttUtil {private static final Logger log = Logger(MqttUtils.class);public MqttUtils() {}public static IMqttClient getClient() {IMqttClient client = (IMqttClient) Bean(IMqttClient.class);MqttConnectOptions options = (Bean(MqttConnectOptions.class);if (!client.isConnected()) {log.info("client:" + ClientId() + "未连接,初始化连接");try {t(options);} catch (MqttException var3) {throw new RuntimeException("mqtt客户端连接失败", var3);}}return client;}public static boolean publish(String topic, String message) {try {getClient().publish(topic, new Bytes()));return true;} catch (MqttException var3) {("mqtt-message 发送失败", var3);return false;}}public static boolean subscribe(String topic, IMqttMessageListener listener) {try {getClient().subscribe(topic, listener);return true;} catch (MqttException var3) {("客户端订阅{0}失败", topic);return false;}}
}
SpringUtil 工具类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.fig.BeanDefinition;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import t.ApplicationContext;
import t.ApplicationContextAware;
import t.ApplicationEvent;
import t.ConfigurableApplicationContext;
import org.springframework.lang.Nullable;public class SpringUtil implements ApplicationContextAware {private static final Logger log = Logger(SpringUtil.class);private static ConfigurableApplicationContext context;public static DefaultListableBeanFactory getBeanFactory() {return (BeanFactory();}public static <T> T getBean(Class<T> clazz) {return clazz == null ? null : Bean(clazz);}public static <T> T getBean(String beanId) {return beanId == null ? null : (T) Bean(beanId);}public static <T> T getBean(String beanName, Class<T> clazz) {if (null != beanName && !"".im())) {return clazz == null ? null : Bean(beanName, clazz);} else {return null;}}public static ApplicationContext getContext() {return context == null ? null : context;}public static void publishEvent(ApplicationEvent event) {if (context != null) {try {context.publishEvent(event);} catch (Exception var2) {(Message());}}}public static void registerBeanDefinition(String beanName, BeanDefinition definition) {getBeanFactory().registerBeanDefinition(beanName, definition);}public void setApplicationContext(@Nullable ApplicationContext context) throws BeansException {t = (ConfigurableApplicationContext)context;}public static String getProperty(String prop) {Environment().getProperty(prop);}
}

相关知识

MQTT(Message Queuing Telemetry Transport)是一种轻量级、开放的通信协议,特别适用于物联网(IoT)环境中的设备间通信。它具有简单、高效、可靠的特点,被广泛应用于各种物联网应用场景。

MQTT协议是基于发布/订阅(Publish/Subscribe)模式的,其中有三个核心角色:消息发布者(Publisher)、消息代理(Broker)和消息订阅者(Subscriber)。消息发布者将消息发布到消息代理,消息代理再将消息转发给订阅者。这种模式使得设备之间的通信变得灵活、可扩展,并且能够实现异步通信。

下面将对MQTT协议的几个关键概念进行详细解释。

  1. 主题(Topic) 主题是MQTT中的一个重要概念,用于标识消息的类型或者内容。主题可以以层次结构的形式组织,使用斜杠(/)进行分隔。例如,"sensors/temperature"表示温度传感器的主题。发布者和订阅者可以根据主题进行消息的发布和订阅,实现精确的消息传递。

  2. QoS(Quality of Service) MQTT提供了三个不同的消息传递质量级别:QoS 0、QoS 1和QoS 2。QoS级别用于确保消息的可靠性和顺序传递。QoS 0是最低级别,消息发送者只发送一次消息,不保证可靠性。而QoS 1和QoS 2则通过消息确认、重传机制和顺序控制来保证消息的可靠性和顺序传递。不同的应用场景可以根据需求选择适当的QoS级别。

  3. 客户端(Client) MQTT协议中,设备是通过客户端与消息代理进行通信的。客户端可以是发布者或订阅者,也可以是两者兼备。每个客户端都有一个唯一的客户端ID用于标识自己。在与消息代理建立连接之前,客户端需要发送连接请求,并进行身份验证(如用户名和密码)。

  4. 遗嘱消息(Will Message) 遗嘱消息是在客户端异常断开连接时由消息代理发布的消息。在连接请求中,客户端可以设置遗嘱消息的内容和主题。当客户端意外下线时,消息代理会将遗嘱消息发布出去,以通知其他订阅了该主题的客户端。

  5. 保留消息(Retained Message) 保留消息是指消息代理会将最新的发布消息保留起来,并在有新的订阅者连接时发送给它们。发布消息时,可以设置消息的保留标志,使消息代理将其保留。这样,新的订阅者可以获取到最新的消息状态,而不必等待下一次消息发布。

  6. 安全性(Security) MQTT协议提供了一些安全机制来保护数据的传输和设备的身份验证。可以使用TLS/SSL协议来加密整个MQTT连接,确保数据的机密性和完整性。同时,还可以使用用户名和密码进行客户端的身份验证。

  7. 使用场景 MQTT协议在物联网领域有广泛的应用,例如:

  • 传感器网络:将传感器节点连接到云端,实时获取和传输传感器数据。
  • 物联网设备管理:用于设备的远程监控、配置和控制。
  • 智能家居:实现家庭设备的互联互通,比如灯光控制、温度调节等。
  • 工业自动化:实现设备间的实时通信和数据交换。
  • 能源监测:用于能源系统的数据采集、分析和管理。

总结: MQTT协议是一种轻量级、开放、可靠的通信协议,适用于物联网环境中设备间的通信。它采用发布/订阅模式,具有简单、高效的特点,并支持多种QoS级别保证消息的可靠性。MQTT协议在物联网应用中发挥着重要的作用,促进了智能设备的互联互通,推动了物联网技术的发展

本文发布于:2024-01-29 18:57:15,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170652583917567.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:示例   完整   代码   Sprngboot   mqtt
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23