• [技术干货] springboot集成mqtt-转载
     前言 随着物联网的火热,MQTT的应用逐渐增多  曾经也有幸使用过mqtt,今天正好总结下MQTT的使用;  一、MQTT是什么? 可以把他理解为,也是一种mq消息,设计简单且轻量级,通讯报文开销小,占用的网络带宽和资源较少,适用于低带宽、不稳定网络环境下的通讯。 MQTT采用发布/订阅模式,分为发布者和订阅者两个角色,需要一个中介来协调发布者和订阅者之间的消息传递,这个中介就是MQTT代理(Broker)。 MQTT协议在物联网领域应用广泛,包括智能家居、工业自动化、智能交通系统等。  个人简单总结:  每个客户端可以订阅一个或者多个主题(发消息,收消息) 每个客户端不订阅主题,也可以发送主题消息(只接受消息,不发送消息) 客户端A发送消息给客户端B流程为: 客户端A>>>Broker>>>客户端B ---  前置条件: a: 客户端A 发送主题消息,且与客户端B的订阅主题一致 b: 客户端B 订阅主题 二、继承步骤 1.安装MQTT 这里直接采用windows版本,解压版,比较快  下载地址 MQTT-windows版本 解压后,在bin文件下执行运行命令 .\emqx console 访问MQTT管理页面 http://localhost:18083/#/ 用户名密码admin/public  2.创建项目,引入依赖 大致分为如下步骤:  yml配置 主题 用户名 密码 根据配置创建客户端实例,实例订阅主题 实现 MqttCallback 接口 1. 重连处理 connectionLost 2. 消息接受处理 messageArrived 3. 消息发生成功处理 deliveryComplete 根据客户端信息发送某个主题的消息 3. 对应步骤2的代码 yml配置 server:   port: 8081 # 下面这里要看你自己的需求 customer:   mqtt:     broker: tcp://127.0.0.1:1883     clientList:       #发布客户端ID       - clientId: nxys_service         #监听主题 同时订阅多个主题使用 - 分割开         subscribeTopic: mqtt/publish         #用户名         userName: admin         #密码         password: public       #接受客户端ID       - clientId: receive_service         #监听主题 同时订阅多个主题使用 - 分割开         subscribeTopic: mqtt/receive         #用户名         userName: admin         #密码         password: public  实例信息获取 /**  * Mqtt配置类  */ @Data @Configuration @ConfigurationProperties(prefix = "customer.mqtt") public class MqttConfig {     /**      * mqtt broker地址      */     String broker;     /**      * 需要创建的MQTT客户端      */     List<MqttClient> clientList; } /**  * MQTT客户端  */ @Data public class MqttClient {     /**      * 客户端ID      */     private String clientId;     /**      * 监听主题      */     private String subscribeTopic;     /**      * 用户名      */     private String userName;     /**      * 密码      */     private String password; } 根据信息创建实例,订阅主题 /**  * MQTT客户端创建  */ @Component @Slf4j public class MqttClientCreate {     @Resource     private MqttClientManager mqttClientManager;     @Autowired     private MqttConfig mqttConfig;      /**      * 创建MQTT客户端      */     @PostConstruct     public void createMqttClient() {         List<MqttClient> mqttClientList = mqttConfig.getClientList();          for (MqttClient mqttClient : mqttClientList) {             log.info("{}", mqttClient);             //创建客户端,客户端ID:demo,回调类跟客户端ID一致             mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());         }     } } /**  * MQTT客户端管理类,如果客户端非常多后续可入redis缓存  */ @Slf4j @Component public class MqttClientManager {     @Value("${customer.mqtt.broker}")     private String mqttBroker;     @Resource     private MqttCallBackContext mqttCallBackContext;     /**      * 存储MQTT客户端      */     public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();      public static MqttClient getMqttClientById(String clientId) {         return MQTT_CLIENT_MAP.get(clientId);     }      /**      * 创建mqtt客户端      *      * @param clientId       客户端ID      * @param subscribeTopic 订阅主题,可为空      * @param userName       用户名,可为空      * @param password       密码,可为空      * @return mqtt客户端      */     public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {         MemoryPersistence persistence = new MemoryPersistence();          try {             MqttClient client = new MqttClient(mqttBroker, clientId, persistence);             MqttConnectOptions connOpts = new MqttConnectOptions();             if (null != userName && !"".equals(userName)) {                 connOpts.setUserName(userName);             }              if (null != password && !"".equals(password)) {                 connOpts.setPassword(password.toCharArray());             }              connOpts.setCleanSession(true);              if (null != subscribeTopic && !"".equals(subscribeTopic)) {                 AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);                  if (null == callBack) {                     callBack = mqttCallBackContext.getCallBack("default");                 }                  callBack.setClientId(clientId);                 callBack.setConnectOptions(connOpts);                 client.setCallback(callBack);             }              //连接mqtt服务端broker             client.connect(connOpts);             // 订阅主题             if (null != subscribeTopic && !"".equals(subscribeTopic)) {                 if (subscribeTopic.contains("-"))                     client.subscribe(subscribeTopic.split("-"));                 else //                    if (!subscribeTopic.equals("mqtt/receive"))                 {                     client.subscribe(subscribeTopic);                 }             }              MQTT_CLIENT_MAP.putIfAbsent(clientId, client);         } catch (MqttException e) {             log.error("Create mqttClient failed!", e);         }     } } 实现 MqttCallback 接口 /**  * MQTT回调抽象类  */ @Slf4j public abstract class AbsMqttCallBack implements MqttCallback {     private String clientId;      private MqttConnectOptions connectOptions;       public String getClientId() {         return clientId;     }       public void setClientId(String clientId) {         this.clientId = clientId;     }       public MqttConnectOptions getConnectOptions() {         return connectOptions;     }       public void setConnectOptions(MqttConnectOptions connectOptions) {         this.connectOptions = connectOptions;     }       /**      * 失去连接操作,进行重连      *      * @param throwable 异常      */     @Override     public void connectionLost(Throwable throwable) {         try {             if (null != clientId) {                 if (null != dconnectOptions) {                     MqttClientManager.getMqttClientById(clientId).connect(connectOptions);                 } else {                     MqttClientManager.getMqttClientById(clientId).connect();                 }             }           } catch (Exception e) {             log.error("{} reconnect failed!", e);         }     }       /**      * 接收订阅消息      * @param topic    主题      * @param mqttMessage 接收消息      * @throws Exception 异常      */     @Override     public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {         String content = new String(mqttMessage.getPayload());          handleReceiveMessage(topic, content);     }       /**      * 消息发送成功      *      * @param iMqttDeliveryToken toke      */     @Override     public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {         log.info("消息发送成功");     }         /**      * 处理接收的消息      * @param topic   主题      * @param message 消息内容      */     protected abstract void handleReceiveMessage(String topic, String message); }  /**  * 默认回调  */ @Slf4j @Component("default") public class DefaultMqttCallBack extends AbsMqttCallBack {      /**      * @param topic   主题      * @param message 消息内容      */     @Override     protected void handleReceiveMessage(String topic, String message) {         log.info("接收到主题---{}", topic);         log.info("接收到消息---{}", message);         // 你自己的消息处理业务      } } /**  * MQTT订阅回调环境类  */ @Component @Slf4j public class MqttCallBackContext {     private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();      /**      * 默认构造函数      *      * @param callBackMap 回调集合      */     public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {         this.callBackMap.clear();         this.callBackMap.putAll(callBackMap);     }      /**      * 获取MQTT回调类      *      * @param clientId 客户端ID      * @return MQTT回调类      */     public AbsMqttCallBack getCallBack(String clientId) {         return this.callBackMap.get(clientId);     } } 发送消息 @RestController public class SendController {     @Resource     MqttClientManager mqttClientManager;      @RequestMapping("/sendMessage")     public String sendMessage(String topic){         try {             MqttMessage mqttMessage = new MqttMessage("你好".getBytes());             mqttClientManager.getMqttClientById("nxys_service").publish(topic,mqttMessage);             return "发送成功";         } catch (Exception e) {             e.printStackTrace();             return "发送失败";         }     } } 3 测试 启动订阅,查看MQTT 管理页面  测试发送消息,查看发送情况,接受情况 http://localhost:8081/sendMessage?topic=mqtt/receive  总结 文中涉及的所有代码: MQTT-Demo  mqtt 启动后访问地址 http://localhost:18083/#/ 用户名/密码: admin/public 每个客户端可以订阅一个或者多个主题 每个客户端不订阅主题,也可以发送主题消息 客户端A发送消息给客户端B流程为: 客户端A>>>Broker>>>客户端B ---  前置条件: a: 客户端A 发送主题消息,且与客户端B的订阅主题一致 b: 客户端B 订阅主题 1 2 3 4 5 mqtt启动命令 在bin目录下,cmd 执行  .\emqx console ————————————————                              版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。                          原文链接:https://blog.csdn.net/qq_32419139/article/details/136176948 
  • [问题求助] 在MQTT协议设备OTA升级实践中设备侧如何通过Https请求下载升级包,我用PC MQTTfx可以下载,但是用MCU一直不行
    PC根据文档可以下载根据文档这里格式不知道有没有错,下面第一个红色框是请求红色是发送的,绿色是接收,不明白400错误那里怎么解决,我用的ESP8266透传,已经接上了https 443接口
  • [问题求助] 物联网应用中的通信网络构架
    物联网应用中的通信网络构架有哪些,各自的特点和适用场景是什么?
  • [问题求助] 如何使用云计算和大数据技术
    如何使用云计算和大数据技术,对物联网数据进行存储、分析和挖掘?
  • [问题求助] 物联网应用中的安全问题如何解决
    物联网应用中的安全问题如何解决?有哪些常见的安全威胁和防御措施?
  • [问题求助] 在物联网项目中,如何进行设备管理和监控?
    在物联网项目中,如何进行设备管理和监控?有哪些常见的设备管理工具和方法?
  • [问题求助] 物联网应用中的数据采集和处理如何实现
    物联网应用中的数据采集和处理如何实现?有哪些常见的数据处理方式和算法?
  • [问题求助] 如何合适的硬件平台和传感选择器设备
    如何合适的硬件平台和传感选择器设备,以便于快速构建物联网应用?有哪些评估指标和选择原则?
  • [问题求助] 在物联网项目中,常见的通信协议有哪些?
    它们各自的特点和适用场景是什么?
  • 物联网在哪些场景和行业领域得到广泛应用?
    请举例说明。
  • [问题求助] MQTT协议适用于哪些应用场景?
    在智能家居、工业自动化或农业等领域有哪些成功案例?
  • [问题求助] MQTT协议在大规模物联网部署中的可扩展性如何?
    有哪些策略可以提高系统的性能和吞吐量?
  • [问题求助] MQTT协议的安全性和数据的机密性
    如何确保MQTT协议的安全性和数据的机密性?
  • [问题求助] 在MQTT协议中,什么是主题(Topic)
    如何定义和使用主题?
  • [问题求助] MQTT协议处理断开连接和网络不稳定的情况
    MQTT协议如何处理断开连接和网络不稳定的情况?
总条数:174 到第
上滑加载中