-
前言 随着物联网的火热,MQTT的应用逐渐增多 曾经也有幸使用过mqtt,今天正好总结下MQTT的使用; 一、MQTT是什么? 可以把他理解为,也是一种mq消息,设计简单且轻量级,通讯报文开销小,占用的网络带宽和资源较少,适用于低带宽、不稳定网络环境下的通讯。 MQTT采用发布/订阅模式,分为发布者和订阅者两个角色,需要一个中介来协调发布者和订阅者之间的消息传递,这个中介就是MQTT代理(Broker)。 MQTT协议在物联网领域应用广泛,包括智能家居、工业自动化、智能交通系统等。 个人简单总结: 每个客户端可以订阅一个或者多个主题(发消息,收消息) 每个客户端不订阅主题,也可以发送主题消息(只接受消息,不发送消息) 客户端A发送消息给客户端B流程为: 客户端A>>>Broker>>>客户端B --- 前置条件: a: 客户端A 发送主题消息,且与客户端B的订阅主题一致 b: 客户端B 订阅主题 二、继承步骤 1.安装MQTT 这里直接采用windows版本,解压版,比较快 下载地址 MQTT-windows版本 解压后,在bin文件下执行运行命令 .\emqx console 访问MQTT管理页面 http://localhost:18083/#/ 用户名密码admin/public 2.创建项目,引入依赖 大致分为如下步骤: yml配置 主题 用户名 密码 根据配置创建客户端实例,实例订阅主题 实现 MqttCallback 接口 1. 重连处理 connectionLost 2. 消息接受处理 messageArrived 3. 消息发生成功处理 deliveryComplete 根据客户端信息发送某个主题的消息 3. 对应步骤2的代码 yml配置 server: port: 8081 # 下面这里要看你自己的需求 customer: mqtt: broker: tcp://127.0.0.1:1883 clientList: #发布客户端ID - clientId: nxys_service #监听主题 同时订阅多个主题使用 - 分割开 subscribeTopic: mqtt/publish #用户名 userName: admin #密码 password: public #接受客户端ID - clientId: receive_service #监听主题 同时订阅多个主题使用 - 分割开 subscribeTopic: mqtt/receive #用户名 userName: admin #密码 password: public 实例信息获取 /** * Mqtt配置类 */ @Data @Configuration @ConfigurationProperties(prefix = "customer.mqtt") public class MqttConfig { /** * mqtt broker地址 */ String broker; /** * 需要创建的MQTT客户端 */ List<MqttClient> clientList; } /** * MQTT客户端 */ @Data public class MqttClient { /** * 客户端ID */ private String clientId; /** * 监听主题 */ private String subscribeTopic; /** * 用户名 */ private String userName; /** * 密码 */ private String password; } 根据信息创建实例,订阅主题 /** * MQTT客户端创建 */ @Component @Slf4j public class MqttClientCreate { @Resource private MqttClientManager mqttClientManager; @Autowired private MqttConfig mqttConfig; /** * 创建MQTT客户端 */ @PostConstruct public void createMqttClient() { List<MqttClient> mqttClientList = mqttConfig.getClientList(); for (MqttClient mqttClient : mqttClientList) { log.info("{}", mqttClient); //创建客户端,客户端ID:demo,回调类跟客户端ID一致 mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword()); } } } /** * MQTT客户端管理类,如果客户端非常多后续可入redis缓存 */ @Slf4j @Component public class MqttClientManager { @Value("${customer.mqtt.broker}") private String mqttBroker; @Resource private MqttCallBackContext mqttCallBackContext; /** * 存储MQTT客户端 */ public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>(); public static MqttClient getMqttClientById(String clientId) { return MQTT_CLIENT_MAP.get(clientId); } /** * 创建mqtt客户端 * * @param clientId 客户端ID * @param subscribeTopic 订阅主题,可为空 * @param userName 用户名,可为空 * @param password 密码,可为空 * @return mqtt客户端 */ public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) { MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient client = new MqttClient(mqttBroker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); if (null != userName && !"".equals(userName)) { connOpts.setUserName(userName); } if (null != password && !"".equals(password)) { connOpts.setPassword(password.toCharArray()); } connOpts.setCleanSession(true); if (null != subscribeTopic && !"".equals(subscribeTopic)) { AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId); if (null == callBack) { callBack = mqttCallBackContext.getCallBack("default"); } callBack.setClientId(clientId); callBack.setConnectOptions(connOpts); client.setCallback(callBack); } //连接mqtt服务端broker client.connect(connOpts); // 订阅主题 if (null != subscribeTopic && !"".equals(subscribeTopic)) { if (subscribeTopic.contains("-")) client.subscribe(subscribeTopic.split("-")); else // if (!subscribeTopic.equals("mqtt/receive")) { client.subscribe(subscribeTopic); } } MQTT_CLIENT_MAP.putIfAbsent(clientId, client); } catch (MqttException e) { log.error("Create mqttClient failed!", e); } } } 实现 MqttCallback 接口 /** * MQTT回调抽象类 */ @Slf4j public abstract class AbsMqttCallBack implements MqttCallback { private String clientId; private MqttConnectOptions connectOptions; public String getClientId() { return clientId; } public void setClientId(String clientId) { this.clientId = clientId; } public MqttConnectOptions getConnectOptions() { return connectOptions; } public void setConnectOptions(MqttConnectOptions connectOptions) { this.connectOptions = connectOptions; } /** * 失去连接操作,进行重连 * * @param throwable 异常 */ @Override public void connectionLost(Throwable throwable) { try { if (null != clientId) { if (null != dconnectOptions) { MqttClientManager.getMqttClientById(clientId).connect(connectOptions); } else { MqttClientManager.getMqttClientById(clientId).connect(); } } } catch (Exception e) { log.error("{} reconnect failed!", e); } } /** * 接收订阅消息 * @param topic 主题 * @param mqttMessage 接收消息 * @throws Exception 异常 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { String content = new String(mqttMessage.getPayload()); handleReceiveMessage(topic, content); } /** * 消息发送成功 * * @param iMqttDeliveryToken toke */ @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { log.info("消息发送成功"); } /** * 处理接收的消息 * @param topic 主题 * @param message 消息内容 */ protected abstract void handleReceiveMessage(String topic, String message); } /** * 默认回调 */ @Slf4j @Component("default") public class DefaultMqttCallBack extends AbsMqttCallBack { /** * @param topic 主题 * @param message 消息内容 */ @Override protected void handleReceiveMessage(String topic, String message) { log.info("接收到主题---{}", topic); log.info("接收到消息---{}", message); // 你自己的消息处理业务 } } /** * MQTT订阅回调环境类 */ @Component @Slf4j public class MqttCallBackContext { private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>(); /** * 默认构造函数 * * @param callBackMap 回调集合 */ public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) { this.callBackMap.clear(); this.callBackMap.putAll(callBackMap); } /** * 获取MQTT回调类 * * @param clientId 客户端ID * @return MQTT回调类 */ public AbsMqttCallBack getCallBack(String clientId) { return this.callBackMap.get(clientId); } } 发送消息 @RestController public class SendController { @Resource MqttClientManager mqttClientManager; @RequestMapping("/sendMessage") public String sendMessage(String topic){ try { MqttMessage mqttMessage = new MqttMessage("你好".getBytes()); mqttClientManager.getMqttClientById("nxys_service").publish(topic,mqttMessage); return "发送成功"; } catch (Exception e) { e.printStackTrace(); return "发送失败"; } } } 3 测试 启动订阅,查看MQTT 管理页面 测试发送消息,查看发送情况,接受情况 http://localhost:8081/sendMessage?topic=mqtt/receive 总结 文中涉及的所有代码: MQTT-Demo mqtt 启动后访问地址 http://localhost:18083/#/ 用户名/密码: admin/public 每个客户端可以订阅一个或者多个主题 每个客户端不订阅主题,也可以发送主题消息 客户端A发送消息给客户端B流程为: 客户端A>>>Broker>>>客户端B --- 前置条件: a: 客户端A 发送主题消息,且与客户端B的订阅主题一致 b: 客户端B 订阅主题 1 2 3 4 5 mqtt启动命令 在bin目录下,cmd 执行 .\emqx console ———————————————— 版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 原文链接:https://blog.csdn.net/qq_32419139/article/details/136176948
-
PC根据文档可以下载根据文档这里格式不知道有没有错,下面第一个红色框是请求红色是发送的,绿色是接收,不明白400错误那里怎么解决,我用的ESP8266透传,已经接上了https 443接口
-
物联网应用中的通信网络构架有哪些,各自的特点和适用场景是什么?
-
如何使用云计算和大数据技术,对物联网数据进行存储、分析和挖掘?
-
物联网应用中的安全问题如何解决?有哪些常见的安全威胁和防御措施?
-
在物联网项目中,如何进行设备管理和监控?有哪些常见的设备管理工具和方法?
-
物联网应用中的数据采集和处理如何实现?有哪些常见的数据处理方式和算法?
-
如何合适的硬件平台和传感选择器设备,以便于快速构建物联网应用?有哪些评估指标和选择原则?
-
它们各自的特点和适用场景是什么?
-
请举例说明。
-
在智能家居、工业自动化或农业等领域有哪些成功案例?
-
有哪些策略可以提高系统的性能和吞吐量?
-
如何确保MQTT协议的安全性和数据的机密性?
-
如何定义和使用主题?
-
MQTT协议如何处理断开连接和网络不稳定的情况?
上滑加载中
推荐直播
-
华为云云原生FinOps解决方案,为您释放云原生最大价值
2024/04/24 周三 16:30-18:00
Roc 华为云云原生DTSE技术布道师
还在对CCE集群成本评估感到束手无策?还在担心不合理的K8s集群资源申请和过度浪费?华为云容器服务CCE全新上线云原生FinOps中心,为用户提供多维度集群成本可视化,结合智能规格推荐、混部、超卖等成本优化手段,助力客户降本增效,释放云原生最大价值。
去报名 -
鲲鹏开发者创享日·江苏站暨数字技术创新应用峰会
2024/04/25 周四 09:30-12:00
鲲鹏专家团
这是华为推出的旨在和众多技术大牛、行业大咖一同探讨最前沿的技术思考,分享最纯粹的技术经验,进行最真实的动手体验,为开发者提供一个深度探讨与交流的平台。
即将直播 -
产教融合专家大讲堂·第①期《高校人才培养创新模式经验分享》
2024/04/25 周四 16:00-18:00
于晓东 上海杉达学院信息科学与技术学院副院长;崔宝才 天津电子信息职业技术学院电子与通信技术系主任
本期直播将与您一起探讨高校人才培养创新模式经验。
去报名
热门标签