From 4015e7905f3fcd0590f91ada1b3920a06649b7d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E6=B5=A9=E6=B5=A9?= <1036606149@qq.com> Date: Tue, 25 Feb 2025 08:14:19 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E5=8A=9F=E8=83=BD=E5=AE=8C=E5=96=84?= =?UTF-8?q?=E3=80=91IoT:=20=E6=9B=B4=E6=96=B0=20MQTT=20=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF=E9=80=BB=E8=BE=91=EF=BC=8C=E9=87=8D=E6=9E=84=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=A4=84=E7=90=86=E5=92=8C=E9=87=8D=E8=BF=9E=E6=9C=BA?= =?UTF-8?q?=E5=88=B6=EF=BC=8C=E4=BC=98=E5=8C=96=E9=85=8D=E7=BD=AE=E6=96=87?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../upstream/IotDeviceUpstreamServer.java | 135 +++++++++++------- .../src/main/resources/application.yml | 2 +- .../src/main/resources/application-local.yaml | 17 --- 3 files changed, 88 insertions(+), 66 deletions(-) diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java index bd0751511c..195e5b0009 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.module.iot.plugin.emqx.upstream; -import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.IdUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; @@ -19,7 +19,6 @@ import io.vertx.mqtt.MqttClientOptions; import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; -import java.util.UUID; /** * IoT 设备下行服务端,接收来自 device 设备的请求,转发给 server 服务器 @@ -31,6 +30,10 @@ import java.util.UUID; @Slf4j public class IotDeviceUpstreamServer { + private static final String PROPERTY_POST_TOPIC = "/event/property/post"; + private static final int RECONNECT_DELAY = 5000; // 重连延迟时间(毫秒) + private static final int QOS_LEVEL = 1; + private final Vertx vertx; private final HttpServer server; private final MqttClient client; @@ -55,7 +58,7 @@ public class IotDeviceUpstreamServer { // 创建 MQTT 客户端 MqttClientOptions options = new MqttClientOptions() - .setClientId("yudao-iot-server-" + UUID.randomUUID()) + .setClientId("yudao-iot-server-" + IdUtil.fastSimpleUUID()) .setUsername(emqxProperties.getMqttUsername()) .setPassword(emqxProperties.getMqttPassword()) .setSsl(emqxProperties.isMqttSsl()); @@ -80,78 +83,114 @@ public class IotDeviceUpstreamServer { // 3. 添加 MQTT 断开重连监听器 client.closeHandler(v -> { log.warn("[closeHandler][MQTT 连接已断开,准备重连]"); - // 等待 5 秒后重连,避免频繁重连 - vertx.setTimer(5000, id -> { - log.info("[closeHandler][开始重新连接 MQTT]"); - connectMqtt(); - }); + reconnectWithDelay(); }); // 4. 设置 MQTT 消息处理器 + setupMessageHandler(); + } + + /** + * 设置 MQTT 消息处理器 + */ + private void setupMessageHandler() { client.publishHandler(message -> { String topic = message.topicName(); String payload = message.payload().toString(); log.info("[messageHandler][接收到消息][topic: {}][payload: {}]", topic, payload); try { - // 4.1 处理设备属性上报消息: /{productKey}/{deviceName}/event/property/post - if (topic.contains("/event/property/post")) { - // 4.2 解析消息内容 - JSONObject jsonObject = JSONUtil.parseObj(payload); - String requestId = jsonObject.getStr("id"); - Long timestamp = jsonObject.getLong("timestamp"); - - // 4.3 从 topic 中解析设备标识 - String[] topicParts = topic.split("/"); - String productKey = topicParts[1]; - String deviceName = topicParts[2]; - - // 4.4 构建设备属性上报请求对象 - IotDevicePropertyReportReqDTO devicePropertyReportReqDTO = ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO() - .setRequestId(requestId) - .setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now()) - .setProductKey(productKey).setDeviceName(deviceName)) - .setProperties(jsonObject.getJSONObject("params")); - - // 4.5 调用上游 API 处理设备上报数据 - deviceUpstreamApi.reportDeviceProperty(devicePropertyReportReqDTO); - log.info("[messageHandler][处理设备上行消息成功][topic: {}][devicePropertyReportReqDTO: {}]", - topic, JSONUtil.toJsonStr(devicePropertyReportReqDTO)); - } + handleMessage(topic, payload); } catch (Exception e) { log.error("[messageHandler][处理消息失败][topic: {}][payload: {}]", topic, payload, e); } }); } + /** + * 处理 MQTT 消息 + */ + private void handleMessage(String topic, String payload) { + // 处理设备属性上报消息 + if (topic.contains(PROPERTY_POST_TOPIC)) { + handlePropertyPost(topic, payload); + } + } + + /** + * 处理设备属性上报 + */ + private void handlePropertyPost(String topic, String payload) { + // 解析消息内容 + JSONObject jsonObject = JSONUtil.parseObj(payload); + String[] topicParts = topic.split("/"); + + // 构建设备属性上报请求对象 + IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO( + jsonObject, + topicParts[1], // productKey + topicParts[2] // deviceName + ); + + // 调用上游 API 处理设备上报数据 + deviceUpstreamApi.reportDeviceProperty(reportReqDTO); + log.info("[handlePropertyPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]", + topic, JSONUtil.toJsonStr(reportReqDTO)); + } + + /** + * 构建设备属性上报请求对象 + */ + private IotDevicePropertyReportReqDTO buildPropertyReportDTO(JSONObject jsonObject, + String productKey, + String deviceName) { + return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO() + .setRequestId(jsonObject.getStr("id")) + .setProcessId(IotPluginCommonUtils.getProcessId()) + .setReportTime(LocalDateTime.now()) + .setProductKey(productKey) + .setDeviceName(deviceName)) + .setProperties(jsonObject.getJSONObject("params")); + } + + /** + * 重连 MQTT 客户端 + */ + private void reconnectWithDelay() { + vertx.setTimer(RECONNECT_DELAY, id -> { + log.info("[reconnectWithDelay][开始重新连接 MQTT]"); + connectMqtt(); + }); + } + /** * 连接 MQTT Broker 并订阅主题 */ private void connectMqtt() { - // 连接 MQTT Broker client.connect(emqxProperties.getMqttPort(), emqxProperties.getMqttHost()) .onSuccess(connAck -> { log.info("[connectMqtt][MQTT客户端连接成功]"); - // 连接成功后订阅主题 - String mqttTopics = emqxProperties.getMqttTopics(); - String[] topics = mqttTopics.split(","); - for (String topic : topics) { - client.subscribe(topic, 1) - .onSuccess(v -> log.info("[connectMqtt][成功订阅主题: {}]", topic)) - .onFailure(err -> log.error("[connectMqtt][订阅主题失败: {}]", topic, err)); - } - log.info("[connectMqtt][开始订阅设备上行消息主题]"); + subscribeToTopics(); }) .onFailure(err -> { log.error("[connectMqtt][连接 MQTT Broker 失败]", err); - // 连接失败后,等待 5 秒重试 - vertx.setTimer(5000, id -> { - log.info("[connectMqtt][准备重新连接 MQTT]"); - connectMqtt(); - }); + reconnectWithDelay(); }); } + /** + * 订阅设备上行消息主题 + */ + private void subscribeToTopics() { + String[] topics = emqxProperties.getMqttTopics().split(","); + for (String topic : topics) { + client.subscribe(topic, QOS_LEVEL) + .onSuccess(v -> log.info("[subscribeToTopics][成功订阅主题: {}]", topic)) + .onFailure(err -> log.error("[subscribeToTopics][订阅主题失败: {}]", topic, err)); + } + log.info("[subscribeToTopics][开始订阅设备上行消息主题]"); + } + /** * 停止所有 */ @@ -187,4 +226,4 @@ public class IotDeviceUpstreamServer { throw new RuntimeException(e); } } -} +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml index 7927313fd3..eb9f9f6a96 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/resources/application.yml @@ -14,6 +14,6 @@ yudao: mqtt-port: 1883 mqtt-ssl: false mqtt-username: yudao - mqtt-password: yudao + mqtt-password: 123456 mqtt-topics: "/+/#" auth-port: 8101 diff --git a/yudao-server/src/main/resources/application-local.yaml b/yudao-server/src/main/resources/application-local.yaml index bbb94772c7..0641547fc9 100644 --- a/yudao-server/src/main/resources/application-local.yaml +++ b/yudao-server/src/main/resources/application-local.yaml @@ -267,23 +267,6 @@ justauth: timeout: 24h # 超时时长,目前只对 Redis 缓存生效,默认 3 分钟 --- #################### iot相关配置 TODO 芋艿:再瞅瞅 #################### -iot: - emq: - # 账号 - username: haohao - # 密码 - password: ahh@123456 - # 主机地址 - hostUrl: tcp://chaojiniu.top:1883 - # 客户端Id,不能相同,采用随机数 ${random.value} - client-id: ${random.int} - # 默认主题 - default-topic: test - # 保持连接 - keepalive: 60 - # 清除会话(设置为false,断开连接,重连后使用原来的会话 保留订阅的主题,能接收离线期间的消息) - clearSession: true - pf4j: # pluginsDir: /tmp/ pluginsDir: ../plugins \ No newline at end of file