-
上三张为对应的属性上报,物理模型定义,下为用arduino的代码,#include <ESP8266WiFi.h>//添加ESP8266用于WiFi的头文件 #include <PubSubClient.h>//添加用于MQTT客户端的头文件 #include <DHTesp.h>//添加dht11头文件char TempHum[]={"{"hum":00}"};//声明1个数组,json格式,用于存放温湿度char num[]={"0123456789"};//声明1个字符数组const char *ssid = "";//wifi的名字const char *password = "";//wifi的密码#define BrokerAddress "" //定义阿里云MQTT服务地址 #define BrokerPort //定义MQTT服务端口 #define ClientID "" //定义MQTT Client ID #define UserName "" //定义MQTT User Name #define Password "" //定义MQTT Password#define SubscribeTopic "$oc/devices//sys/messages/up" //定义订阅的Topic $oc/devices/{device_id}/sys/properties/report #define PublishTopic "$oc/devices//sys/properties/report" //定义发布的Topic $oc/devices//sys/messages/upDHTesp dht; /******************************************************/ void MqttCallback(char* topic, byte* payload, unsigned int length); WiFiClient EspClient; PubSubClient client(BrokerAddress, BrokerPort, &MqttCallback, EspClient);void SetupWifi() { delay(10); Serial.print("Connecting to "); Serial.println(ssid);WiFi.mode(WIFI_STA); WiFi.begin(ssid, password);while(WiFi.status()!=WL_CONNECTED){ delay(500); Serial.print("."); }Serial.println("WiFi connected"); Serial.println("IP address: "); Serial.println(WiFi.localIP()); }void MqttCallback(char* topic, byte* payload, unsigned int length) { Serial.print("Message arrived ["); Serial.print(topic); Serial.print("] "); for (int i = 0; i < length; i++) { Serial.print((char)payload[i]); } Serial.println(); }void setup() { Serial.begin(115200);//设置串口 SetupWifi();//设置wifi client.setServer(BrokerAddress,1883);//设置MQTT服务地址和端口 client.setCallback(MqttCallback);//回调函数 client.setClient(EspClient); client.connect(ClientID,UserName,Password);//设备接入物联网云平台 if(!client.connected())//判断是否接入成功 Serial.print("Connected Failed!\n"); else{ Serial.print("Connected Ok!\n"); client.subscribe(SubscribeTopic);//订阅Topic } dht.setup(2, DHTesp::DHT11); }void loop() { client.loop();//关键,可以侦测回调函数中数据是否到来 int h = dht.getHumidity();//读取湿度 int t = dht.getTemperature();//读取温度TempHum[7]=num[h/10];//存放湿度 TempHum[8]=num[h%10];TempHum[17]=num[t/10];//存放温度 TempHum[18]=num[t%10]; client.publish(PublishTopic,TempHum);//每隔一定时间向平台发送1次温湿度数据 delay(10000); }该如何实现属性显示
-
现在硬件能上报属性,转发规则如下,然后我在android studio上写了一个用iam账户获取token 的代码,然后怎么使用这个token请求调用api查询设备影子数据的代码一直不对,请问各路大神这要怎么操作
-
应用使用AMQPS接收消息,为什么好多信息都接收不全,还有AMQP消息队列详情中订阅列表什么意思,我只用一个应用侧,为啥这么多客户端IDimport threadingimport timeimport signalimport sysimport socketimport jsonfrom proton import SSLDomainfrom proton.handlers import MessagingHandlerfrom proton.reactor import Container# 重连次数reconnectTimes = 0# 用于控制主线程的事件循环running = True# TCP/IP 服务器配置tcp_host = '10.16.47.108'tcp_port = 8080def current_time_millis():return str(int(round(time.time() * 1000)))class AmqpClient(MessagingHandler):def __init__(self, host, port, accessKey, accessCode, queueName, instanceId):super(AmqpClient, self).__init__()self.host = hostself.port = portself.accessKey = accessKeyself.accessCode = accessCodeself.queueName = queueNameself.instanceId = instanceIddef on_start(self, event):# 接入域名,请参见AMQP客户端接入说明文档。url = "amqps://%s:%s" % (self.host, self.port)timestamp = current_time_millis()userName = "accessKey=" + self.accessKey + "|timestamp=" + timestamp + "|instanceId=" + self.instanceIdpassWord = self.accessCode# 默认不校验服务端证书sslDomain = SSLDomain(SSLDomain.MODE_CLIENT)sslDomain.set_peer_authentication(SSLDomain.ANONYMOUS_PEER)self.conn = event.container.connect(url, user=userName, password=passWord, heartbeat=60, ssl_domain=sslDomain,reconnect=False)event.container.create_receiver(self.conn, source=self.queueName)def on_connection_opened(self, event):global reconnectTimesreconnectTimes = 0print("Connection established, remoteUrl: %s" % event.connection.hostname)def on_connection_closed(self, event):print("Connection closed: %s" % self)ReconnectThread("reconnectThread").start()def on_connection_error(self, event):print("Connection error: %s" % self)ReconnectThread("reconnectThread").start()def on_transport_error(self, event):if event.transport.condition:if event.transport.condition.info:print("%s: %s: %s" % (event.transport.condition.name, event.transport.condition.description,event.transport.condition.info))else:print("%s: %s" % (event.transport.condition.name, event.transport.condition.description))else:print("Unspecified transport error")ReconnectThread("reconnectThread").start()def on_message(self, event):message = event.messagecontent = message.bodyprint("Received message: content=%s" % content)# 解析消息内容try:data = json.loads(content)services = data.get('notify_data', {}).get('body', {}).get('services', [])for service in services:if service['service_id'] == 'GoodType':good_name = service['properties'].get('GoodName')good_id = service['properties'].get('GoodId')if good_name and good_id:send_tcp_message(good_name, good_id)except json.JSONDecodeError:print("Failed to decode JSON from message content")def send_tcp_message(good_name, good_id):message = {"ReportType": "Send","ReportCategory": "Good","GoodName": good_name,"GoodId": good_id}message_json = json.dumps(message)print(f"Prepared JSON message to send: {message_json}")try:with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:s.connect((tcp_host, tcp_port))s.sendall(message_json.encode('utf-8'))print(f"Sent TCP JSON message: {message_json}")except Exception as e:print(f"Failed to send TCP JSON message: {e}")class ReconnectThread(threading.Thread):def __init__(self, name):threading.Thread.__init__(self)self.name = namedef run(self):global reconnectTimesreconnectTimes += 1time.sleep(15 if reconnectTimes > 15 else reconnectTimes)Container(AmqpClient(amqpHost, amqpPort, amqpAccessKey, amqpAccessCode, amqpQueueName, instanceId)).run()def signal_handler(sig, frame):global runningprint('Exiting...')running = Falsesys.exit(0)# 以下参数配置请参考连接配置说明# AMQP接入域名amqpHost = "95e74068ba.st1.iotda-app.cn-north-4.myhuaweicloud.com"# AMQP接入端口amqpPort = 5671# 接入凭证键值amqpAccessKey = 'ZyjyH9Ax'# 接入凭证密钥amqpAccessCode = 'ghc4ScCaA7vqzgIZqErEezDsvPO5FCaC'# 订阅队列名称amqpQueueName = '12345678'# 实例Id,同一Region购买多个标准版实例时需要填设置该参数。instanceId = '5b6a80f0-75d5-4091-b28a-7809838c524c'# 捕捉SIGINT信号(通常是CTRL+C)signal.signal(signal.SIGINT, signal_handler)# 创建并运行容器container = Container(AmqpClient(amqpHost, amqpPort, amqpAccessKey, amqpAccessCode, amqpQueueName, instanceId))# 启动一个线程来运行AMQP客户端thread = threading.Thread(target=container.run)thread.start()# 保持主线程存活,等待接收消息while running:time.sleep(1)
-
有没有WiFi模组AT命令用MQTT上华为云的AT指令文档,小熊派带的WIFI模块能使用MQTT上华为云吗,一般是使用什么方式上云的
-
新手学习,根据huaweicloud-sdk-nodejs-v3/README_CN.md at master · huaweicloud/huaweicloud-sdk-nodejs-v3 · GitHub 里提供的文档,出现如上问题,求解,
-
之前创建好设备后模拟MQTT客户端登陆能够正确登陆上,显示在线状态,现在不行了,查了异常运行日志显示这个问题已经尝试过换成域名解析IP还是失败
-
应用侧接收MQTT数据流传的信息,有没有例程,我根据教程,时好时坏
-
数据转发,应用侧接收老失败,有时候可以ClientConf.py:from typing import Optionalclass ClientConf:def __init__(self):# mqtt订阅地址self.__host: Optional[str] = "95e74068ba.st1.iotda-app.cn-north-4.myhuaweicloud.com"# mqtt订阅端口号self.__port: Optional[int] = 8883# mqtt接入凭据access_keyself.__access_key: Optional[str] = "glWckaFu"# mqtt接入凭据access_codeself.__access_code: Optional[str] = "x83VLjH7MAPikrIR36916sAStLoPXEK7"# mqtt订阅topicself.__topic: Optional[str] = "topic456"# 实例Id,同一Region购买多个标准版实例时需要填写该参数self.__instance_id: Optional[str] = "5b6a80f0-75d5-4091-b28a-7809838c524c"# mqtt qosself.__qos = 1# tcp 服务器 IPself.__tcp_ip: Optional[str] = "10.16.64.116"# tcp 服务器端口self.__tcp_port: Optional[int] = 11111@propertydef host(self):return self.__host@host.setterdef host(self, host):self.__host = host@propertydef port(self):return self.__port@port.setterdef port(self, port):self.__port = port@propertydef access_key(self):return self.__access_key@access_key.setterdef access_key(self, access_key):self.__access_key = access_key@propertydef access_code(self):return self.__access_code@access_code.setterdef access_code(self, access_code):self.__access_code = access_code@propertydef topic(self):return self.__topic@topic.setterdef topic(self, topic):self.__topic = topic@propertydef instance_id(self):return self.__instance_id@instance_id.setterdef instance_id(self, instance_id):self.__instance_id = instance_id@propertydef qos(self):return self.__qos@qos.setterdef qos(self, qos):self.__qos = qos@propertydef tcp_ip(self):return self.__tcp_ip@tcp_ip.setterdef tcp_ip(self, tcp_ip):self.__tcp_ip = tcp_ip@propertydef tcp_port(self):return self.__tcp_port@tcp_port.setterdef tcp_port(self, tcp_port):self.__tcp_port = tcp_portMqttClient.py:import osimport sslimport threadingimport timeimport tracebackimport secretsimport socketfrom typing import Optionalfrom ClientConf import ClientConfimport paho.mqtt.client as mqttimport jsonclass MqttClient:def __init__(self, client_conf: ClientConf):print("__init__ 66666")self.__host = client_conf.hostself.__port = client_conf.portself.__access_key = client_conf.access_keyself.__access_code = client_conf.access_codeself.__topic = client_conf.topicself.__instance_id = client_conf.instance_idself.__qos = client_conf.qosself.__tcp_ip = client_conf.tcp_ipself.__tcp_port = client_conf.tcp_portself.__paho_client: Optional[mqtt.Client] = Noneself.__connect_result_code = -1self.__default_backoff = 1000self.__retry_times = 0self.__min_backoff = 1 * 1000 # 1sself.__max_backoff = 30 * 1000 # 30sdef connect(self):print("connect 66666")self.__valid_params()rc = self.__connect()while rc != 0:# 退避重连low_bound = int(self.__default_backoff * 0.8)high_bound = int(self.__default_backoff * 1.0)random_backoff = secrets.randbelow(high_bound - low_bound)backoff_with_jitter = int(pow(2, self.__retry_times)) * (random_backoff + low_bound)wait_time_ms = self.__max_backoff if (self.__min_backoff + backoff_with_jitter) > self.__max_backoff else (self.__min_backoff + backoff_with_jitter)wait_time_s = round(wait_time_ms / 1000, 2)print("client will try to reconnect after " + str(wait_time_s) + " s")time.sleep(wait_time_s)self.__retry_times += 1self.close() # 释放之前的connectionrc = self.__connect()# rc为0表示建链成功,其它表示连接不成功if rc != 0:print("connect with result code: " + str(rc))if rc == 134:print("connect failed with bad username or password, ""reconnection will not be performed")passreturn rcdef __connect(self):try:print("__connect 66666")timestamp = self.current_time_millis()user_name = "accessKey=" + self.__access_key + "|timestamp=" + timestampif self.__instance_id:user_name = user_name + "|instanceId=" + self.__instance_idpass_word = self.__access_codeself.__paho_client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, "mqttClient")# 关闭自动重试, 采用手动重试的方式刷新时间戳self.__paho_client._reconnect_on_failure = False# 设置回调函数self._set_callback()# topic放在userdata中,回调函数直接拿topic订阅self.__paho_client.user_data_set(self.__topic)self.__paho_client.username_pw_set(user_name, pass_word)# 当前mqtt broker仅支持TLS1.2context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)# 不校验服务端证书context.verify_mode = ssl.CERT_NONEcontext.check_hostname = Falseself.__paho_client.tls_set_context(context)rc = self.__paho_client.connect(self.__host, self.__port)self.__connect_result_code = rcif rc == 0:threading.Thread(target=self.__paho_client.loop_forever, args=(1, False), name="MqttThread").start()# 等待建链time.sleep(1)except Exception as e:self.__connect_result_code = -1print("Mqtt connection error. traceback: " + traceback.format_exc())if self.__paho_client.is_connected():return 0else:return self.__connect_result_codedef __valid_params(self):print("__valid_params 66666")assert self.__access_key is not Noneassert self.__access_code is not Noneassert self.__topic is not None@staticmethoddef current_time_millis():return str(int(round(time.time() * 1000)))def _set_callback(self):print("_set_callback 6666 ")# 当平台响应连接请求时,执行self._on_connect()self.__paho_client.on_connect = self._on_connectprint("_on_disconnect ")# 当与平台断开连接时,执行self._on_disconnect()self.__paho_client.on_disconnect = self._on_disconnectprint("_on_subscribe ")# 当订阅topic时,执行self._on_subscribeself.__paho_client.on_subscribe = self._on_subscribe# 当接收到一个原始消息时,执行self._on_message()print("_on_message ")self.__paho_client.on_message = self._on_messagedef _on_connect(self, client, userdata, flags, rc: mqtt.ReasonCode, properties):print("_on_connect 66666 ")if rc == 0:print("Connected to Mqtt Broker! topic " + self.__topic)client.subscribe(userdata, 1)else:# 只有当用户名或密码错误,才不进行自动重连。# 如果这里不使用disconnect()方法,那么loop_forever会一直进行重连。if rc == 134:self.__paho_client.disconnect()print("Failed to connect. return code :" + str(rc.value) + ", reason" + rc.getName())def _on_subscribe(self, client, userdata, mid, granted_qos, properties):print("_on_subscribe 66666 ")print("Subscribed: " + str(mid) + " " + str(granted_qos) + " topic: " + self.__topic)def _on_message(self, client, userdata, message: mqtt.MQTTMessage):print("_on_message 66666666666")print("topic " + self.__topic + " Received message: " + message.payload.decode())# 通过TCP发送消息try:with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:s.connect((self.__tcp_ip, self.__tcp_port))s.sendall(message.payload)print("Message sent over TCP")except Exception as e:print("TCP send error. traceback: " + traceback.format_exc())# def _on_message(self, client, userdata, message: mqtt.MQTTMessage):# print("_on_message 66666666666")# print("topic " + self.__topic + " Received message: " + message.payload.decode())## try:# # 解析收到的 JSON 消息# received_msg = json.loads(message.payload.decode())## # 提取需要的部分# extracted_data = received_msg["notify_data"]["body"]## # 添加上报类型和类型# extracted_data["report_type"] = "Send"# extracted_data["type"] = "货物"## # 将提取和修改后的数据转换回 JSON# extracted_data_str = json.dumps(extracted_data)## # 打印发送的信息# print("Message to be sent over TCP: " + extracted_data_str)## # 通过 TCP 发送消息# with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:# s.connect((self.__tcp_ip, self.__tcp_port))# s.sendall(extracted_data_str.encode())# print("Message sent over TCP")# except Exception as e:# print("TCP send error. traceback: " + traceback.format_exc())def _on_disconnect(self, client, userdata, flags, rc, properties):print("Disconnect to Mqtt Broker. topic: " + self.__topic)# 断链后将客户端主动关闭,手动重连刷新时间戳try:self.__paho_client.disconnect()except Exception as e:print("Mqtt connection error. traceback: " + traceback.format_exc())self.connect()def close(self):print("close 66666 ")if self.__paho_client is not None and self.__paho_client.is_connected():try:self.__paho_client.disconnect()print("Mqtt connection close")except Exception as e:print("paho client disconnect failed. exception: " + str(e))else:passMqttDemo.py:import osfrom ClientConf import ClientConffrom MqttClient import MqttClientimport osfrom typing import Optionalfrom huaweicloudsdkcore.auth.credentials import BasicCredentials, DerivedCredentialsfrom huaweicloudsdkcore.region.region import Region as coreRegionfrom huaweicloudsdkcore.exceptions import exceptionsfrom huaweicloudsdkiotda.v5 import *def main():client_conf = ClientConf()client_conf.host = "95e74068ba.st1.iotda-app.cn-north-4.myhuaweicloud.com"client_conf.port = 8883client_conf.topic = "topic456"# mqtt接入凭据access_key可使用环境变量的方式注入client_conf.access_key = "glWckaFu"# mqtt接入凭据a ccess_code可使用环境变量的方式注入client_conf.access_code = "x83VLjH7MAPikrIR36916sAStLoPXEK7"client_conf.instance_id = "5b6a80f0-75d5-4091-b28a-7809838c524c"mqtt_client = MqttClient(client_conf)if mqtt_client.connect() != 0:print("init failed")return# 调用华为IoTDA服务接口if __name__ == "__main__":main()
-
使用pythonSDK示例失败# coding: utf-8import osfrom huaweicloudsdkcore.auth.credentials import BasicCredentialsfrom huaweicloudsdkcore.auth.credentials import DerivedCredentialsfrom huaweicloudsdkcore.region.region import Region as coreRegionfrom huaweicloudsdkcore.exceptions import exceptionsfrom huaweicloudsdkiotda.v5 import *if __name__ == "__main__":# The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.# In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environmentak = "jfdjkjCjk8C84"sk = "ljkjQjkfdjhvgdfffjWVzJDUB"endpoint = "85e44068ba.st1.iotda-app.cn-north-4.myhuaweicloud.com"credentials = BasicCredentials(ak, sk).with_derived_predicate(DerivedCredentials.get_default_derived_predicate())client = IoTDAClient.new_builder() \.with_credentials(credentials) \.with_region(coreRegion(id="cn-north-4", endpoint=endpoint)) \.build()try:request = UpdatePropertiesRequest()request.body = DevicePropertiesRequest(services="[{\"service_id\":\"Temperature\",\"properties\":{\"value\":57}}, {\"service_id\":\"Battery\",\"properties\":{\"level\":80}}]")response = client.update_properties(request)print(response)except exceptions.ClientRequestException as e:print(e.status_code)print(e.request_id)print(e.error_code)print(e.error_msg)
-
想问一下,这个调用修改设备属性dAPI超时的问题该怎么解决
-
在访问过程中,已经能够返回201值,说明连接完成,后使用python继续访问,获得成功,想问下各位是如何解决该问题的。
-
使用微信小程序开发工具,调用设备影子接口查看信息,报错如下,使用的是iam账号访问申请得到的token,同时资源空间内也能访问到主账号的资源
-
添加转发目标转发目标第三方应用服务(HTTP推送)平台支持根据规则配置将指定的设备数据推送到第三方应用服务器。您可以设置将不同类型的设备数据推送到对应地址进行数据处理。推送URL注:推送请求方法为POST鉴权这个url怎么写
-
怎么用nodered httprequest请求华为云上设备属性
-
为什么一次上报两个属性会失败,请问是数据格式问题,还是超出上报限制?
上滑加载中
推荐直播
-
全面解析华为云EI-API服务:理论基础与实践应用指南
2024/11/29 周五 18:20-20:20
Alex 华为云学堂技术讲师
本期直播给大家带来的是理论与实践结合的华为云EI-API的服务介绍。从“主要功能,应用场景,实践案例,调用流程”四个维度来深入解析“语音交互API,文字识别API,自然语言处理API,图像识别API及图像搜索API”五大场景下API服务,同时结合实验,来加深开发者对API服务理解。
正在直播 -
企业员工、应届毕业生、在读研究生共探项目实践
2024/12/02 周一 19:00-21:00
姚圣伟 在职软件工程师 昇腾社区优秀开发者 华为云云享专家 HCDG天津地区发起人
大神带你一键了解和掌握LeakyReLU自定义算子在ONNX网络中应用和优化技巧,在线分享如何入门,以及在工作中如何结合实际项目进行学习
即将直播 -
昇腾云服务ModelArts深度解析:理论基础与实践应用指南
2024/12/03 周二 14:30-16:30
Alex 华为云学堂技术讲师
如何快速创建和部署模型,管理全周期AI工作流呢?本期直播聚焦华为昇腾云服务ModelArts一站式AI开发平台功能介绍,同时结合基于ModelArts 的实践性实验,帮助开发者从理论到实验更好地理解和使用ModelArts。
去报名
热门标签