diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java index 74fbb8b2e5..3d12c4b594 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java @@ -67,4 +67,7 @@ public interface ErrorCodeConstants { ErrorCode OTA_UPGRADE_RECORD_DUPLICATE = new ErrorCode(1_050_008_007, "升级记录重复"); ErrorCode OTA_UPGRADE_RECORD_CANNOT_RETRY = new ErrorCode(1_050_008_008, "升级记录不能重试"); + // ========== MQTT 通信相关 1-050-009-000 ========== + ErrorCode MQTT_TOPIC_ILLEGAL = new ErrorCode(1_050_009_000, "topic illegal"); + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxAutoConfiguration.java index d504e5704f..382bb9ecf2 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxAutoConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/config/IotPluginEmqxAutoConfiguration.java @@ -1,9 +1,14 @@ package cn.iocoder.yudao.module.iot.plugin.emqx.config; +import cn.hutool.core.util.IdUtil; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler; import cn.iocoder.yudao.module.iot.plugin.emqx.downstream.IotDeviceDownstreamHandlerImpl; import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.IotDeviceUpstreamServer; +import io.vertx.core.Vertx; +import io.vertx.mqtt.MqttClient; +import io.vertx.mqtt.MqttClientOptions; +import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -13,19 +18,37 @@ import org.springframework.context.annotation.Configuration; * * @author haohao */ +@Slf4j @Configuration @EnableConfigurationProperties(IotPluginEmqxProperties.class) public class IotPluginEmqxAutoConfiguration { - @Bean(initMethod = "start", destroyMethod = "stop") - public IotDeviceUpstreamServer deviceUpstreamServer(IotDeviceUpstreamApi deviceUpstreamApi, - IotPluginEmqxProperties emqxProperties) { - return new IotDeviceUpstreamServer(emqxProperties, deviceUpstreamApi); + @Bean + public Vertx vertx() { + return Vertx.vertx(); } @Bean - public IotDeviceDownstreamHandler deviceDownstreamHandler() { - return new IotDeviceDownstreamHandlerImpl(); + public MqttClient mqttClient(Vertx vertx, IotPluginEmqxProperties emqxProperties) { + MqttClientOptions options = new MqttClientOptions() + .setClientId("yudao-iot-downstream-" + IdUtil.fastSimpleUUID()) + .setUsername(emqxProperties.getMqttUsername()) + .setPassword(emqxProperties.getMqttPassword()) + .setSsl(emqxProperties.isMqttSsl()); + return MqttClient.create(vertx, options); } -} + @Bean(initMethod = "start", destroyMethod = "stop") + public IotDeviceUpstreamServer deviceUpstreamServer(IotDeviceUpstreamApi deviceUpstreamApi, + IotPluginEmqxProperties emqxProperties, + Vertx vertx, + MqttClient mqttClient) { + return new IotDeviceUpstreamServer(emqxProperties, deviceUpstreamApi, vertx, mqttClient); + } + + @Bean + public IotDeviceDownstreamHandler deviceDownstreamHandler(MqttClient mqttClient) { + return new IotDeviceDownstreamHandlerImpl(mqttClient); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java index b1a8eebbf4..57445e5adb 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/downstream/IotDeviceDownstreamHandlerImpl.java @@ -1,8 +1,19 @@ package cn.iocoder.yudao.module.iot.plugin.emqx.downstream; +import cn.hutool.core.util.IdUtil; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*; import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamHandler; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.buffer.Buffer; +import io.vertx.mqtt.MqttClient; +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; + +import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.MQTT_TOPIC_ILLEGAL; /** * EMQX 插件的 {@link IotDeviceDownstreamHandler} 实现类 @@ -10,14 +21,61 @@ import cn.iocoder.yudao.module.iot.plugin.common.downstream.IotDeviceDownstreamH * * @author 芋道源码 */ +@Slf4j public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandler { + private static final String SYS_TOPIC_PREFIX = "/sys/"; + + // 设备服务调用 标准 JSON + // 请求Topic:/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier} + // 响应Topic:/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}_reply + private static final String SERVICE_TOPIC_PREFIX = "/thing/service/"; + + // 设置设备属性 标准 JSON + // 请求Topic:/sys/${productKey}/${deviceName}/thing/service/property/set + // 响应Topic:/sys/${productKey}/${deviceName}/thing/service/property/set_reply + private static final String PROPERTY_SET_TOPIC = "/thing/service/property/set"; + + private final MqttClient mqttClient; + + /** + * 构造函数 + * + * @param mqttClient MQTT客户端 + */ + public IotDeviceDownstreamHandlerImpl(MqttClient mqttClient) { + this.mqttClient = mqttClient; + } + @Override - public CommonResult invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) { - // 设备服务调用 - // 请求Topic:/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier} - // 响应Topic:/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}_reply - return CommonResult.success(true); + public CommonResult invokeDeviceService(IotDeviceServiceInvokeReqDTO reqDTO) { + log.info("[invokeService][开始调用设备服务][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO)); + + // 验证参数 + if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null || reqDTO.getIdentifier() == null) { + log.error("[invokeService][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO)); + return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg()); + } + + try { + // 构建请求主题 + String topic = buildServiceTopic(reqDTO.getProductKey(), reqDTO.getDeviceName(), reqDTO.getIdentifier()); + + // 生成请求ID(如果没有提供) + String requestId = reqDTO.getRequestId() != null ? reqDTO.getRequestId() : generateRequestId(); + + // 构建请求消息 + JSONObject request = buildServiceRequest(requestId, reqDTO.getIdentifier(), reqDTO.getParams()); + + // 发送消息 + publishMessage(topic, request); + + log.info("[invokeService][调用设备服务成功][requestId: {}][topic: {}]", requestId, topic); + return CommonResult.success(true); + } catch (Exception e) { + log.error("[invokeService][调用设备服务异常][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO), e); + return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg()); + } } @Override @@ -26,11 +84,34 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle } @Override - public CommonResult setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) { - // 设置设备属性 标准 JSON - // 请求Topic:/sys/${productKey}/${deviceName}/thing/service/property/set - // 响应Topic:/sys/${productKey}/${deviceName}/thing/service/property/set_reply - return CommonResult.success(true); + public CommonResult setDeviceProperty(IotDevicePropertySetReqDTO reqDTO) { + log.info("[setProperty][开始设置设备属性][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO)); + + // 验证参数 + if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null) { + log.error("[setProperty][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO)); + return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg()); + } + + try { + // 构建请求主题 + String topic = buildPropertySetTopic(reqDTO.getProductKey(), reqDTO.getDeviceName()); + + // 生成请求ID(如果没有提供) + String requestId = reqDTO.getRequestId() != null ? reqDTO.getRequestId() : generateRequestId(); + + // 构建请求消息 + JSONObject request = buildPropertySetRequest(requestId, reqDTO.getProperties()); + + // 发送消息 + publishMessage(topic, request); + + log.info("[setProperty][设置设备属性成功][requestId: {}][topic: {}]", requestId, topic); + return CommonResult.success(true); + } catch (Exception e) { + log.error("[setProperty][设置设备属性异常][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO), e); + return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg()); + } } @Override @@ -43,4 +124,60 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle return CommonResult.success(true); } + /** + * 构建服务调用主题 + */ + private String buildServiceTopic(String productKey, String deviceName, String serviceIdentifier) { + return SYS_TOPIC_PREFIX + productKey + "/" + deviceName + SERVICE_TOPIC_PREFIX + serviceIdentifier; + } + + /** + * 构建属性设置主题 + */ + private String buildPropertySetTopic(String productKey, String deviceName) { + return SYS_TOPIC_PREFIX + productKey + "/" + deviceName + PROPERTY_SET_TOPIC; + } + + /** + * 构建服务调用请求 + */ + private JSONObject buildServiceRequest(String requestId, String serviceIdentifier, Map params) { + return new JSONObject() + .set("id", requestId) + .set("version", "1.0") + .set("method", "thing.service." + serviceIdentifier) + .set("params", params != null ? params : new JSONObject()); + } + + /** + * 构建属性设置请求 + */ + private JSONObject buildPropertySetRequest(String requestId, Map properties) { + return new JSONObject() + .set("id", requestId) + .set("version", "1.0") + .set("method", "thing.service.property.set") + .set("params", properties); + } + + /** + * 发布MQTT消息 + */ + private void publishMessage(String topic, JSONObject payload) { + mqttClient.publish( + topic, + Buffer.buffer(payload.toString()), + MqttQoS.AT_LEAST_ONCE, + false, + false); + log.info("[publishMessage][发送消息成功][topic: {}][payload: {}]", topic, payload); + } + + /** + * 生成请求ID + */ + private String generateRequestId() { + return IdUtil.fastSimpleUUID(); + } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java index baeddcb152..738860361e 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/IotDeviceUpstreamServer.java @@ -1,19 +1,21 @@ package cn.iocoder.yudao.module.iot.plugin.emqx.upstream; -import cn.hutool.core.util.IdUtil; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; import cn.iocoder.yudao.module.iot.plugin.emqx.config.IotPluginEmqxProperties; import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceAuthVertxHandler; import cn.iocoder.yudao.module.iot.plugin.emqx.upstream.router.IotDeviceMqttMessageHandler; import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.http.HttpServer; import io.vertx.ext.web.Router; import io.vertx.ext.web.handler.BodyHandler; import io.vertx.mqtt.MqttClient; -import io.vertx.mqtt.MqttClientOptions; import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + /** * IoT 设备下行服务端,接收来自 device 设备的请求,转发给 server 服务器 *

@@ -24,20 +26,26 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class IotDeviceUpstreamServer { - private static final int RECONNECT_DELAY = 5000; // 重连延迟时间(毫秒) + private static final int RECONNECT_DELAY_MS = 5000; // 重连延迟时间(毫秒) + private static final int CONNECTION_TIMEOUT_MS = 10000; // 连接超时时间(毫秒) + private static final String TOPIC_SEPARATOR = ","; // 主题分隔符 + private static final MqttQoS DEFAULT_QOS = MqttQoS.AT_LEAST_ONCE; // 默认QoS级别 private final Vertx vertx; private final HttpServer server; private final MqttClient client; private final IotPluginEmqxProperties emqxProperties; private final IotDeviceMqttMessageHandler mqttMessageHandler; + private volatile boolean isRunning = false; // 服务运行状态标志 public IotDeviceUpstreamServer(IotPluginEmqxProperties emqxProperties, - IotDeviceUpstreamApi deviceUpstreamApi) { + IotDeviceUpstreamApi deviceUpstreamApi, + Vertx vertx, + MqttClient client) { + this.vertx = vertx; this.emqxProperties = emqxProperties; + this.client = client; - // 创建 Vertx 实例 - this.vertx = Vertx.vertx(); // 创建 Router 实例 Router router = Router.router(vertx); router.route().handler(BodyHandler.create()); // 处理 Body @@ -45,14 +53,6 @@ public class IotDeviceUpstreamServer { .handler(new IotDeviceAuthVertxHandler(deviceUpstreamApi)); // 创建 HttpServer 实例 this.server = vertx.createHttpServer().requestHandler(router); - - // 创建 MQTT 客户端 - MqttClientOptions options = new MqttClientOptions() - .setClientId("yudao-iot-server-" + IdUtil.fastSimpleUUID()) - .setUsername(emqxProperties.getMqttUsername()) - .setPassword(emqxProperties.getMqttPassword()) - .setSsl(emqxProperties.isMqttSsl()); - client = MqttClient.create(vertx, options); this.mqttMessageHandler = new IotDeviceMqttMessageHandler(deviceUpstreamApi, client); } @@ -60,25 +60,45 @@ public class IotDeviceUpstreamServer { * 启动 HTTP 服务器、MQTT 客户端 */ public void start() { + if (isRunning) { + log.warn("服务已经在运行中,请勿重复启动"); + return; + } + + log.info("[start] 开始启动服务"); + // 1. 启动 HTTP 服务器 - log.info("[start][开始启动]"); - server.listen(emqxProperties.getAuthPort()) + CompletableFuture httpFuture = server.listen(emqxProperties.getAuthPort()) .toCompletionStage() .toCompletableFuture() - .join(); - log.info("[start][HTTP服务器启动完成,端口({})]", this.server.actualPort()); + .thenAccept(v -> log.info("[start] HTTP服务器启动完成,端口: {}", server.actualPort())); // 2. 连接 MQTT Broker - connectMqtt(); + CompletableFuture mqttFuture = connectMqtt() + .toCompletionStage() + .toCompletableFuture() + .thenAccept(v -> { + // 3. 添加 MQTT 断开重连监听器 + client.closeHandler(closeEvent -> { + log.warn("[closeHandler] MQTT连接已断开,准备重连"); + reconnectWithDelay(); + }); - // 3. 添加 MQTT 断开重连监听器 - client.closeHandler(v -> { - log.warn("[closeHandler][MQTT 连接已断开,准备重连]"); - reconnectWithDelay(); - }); + // 4. 设置 MQTT 消息处理器 + setupMessageHandler(); + }); - // 4. 设置 MQTT 消息处理器 - setupMessageHandler(); + // 等待所有服务启动完成 + CompletableFuture.allOf(httpFuture, mqttFuture) + .orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .whenComplete((result, error) -> { + if (error != null) { + log.error("[start] 服务启动失败", error); + } else { + isRunning = true; + log.info("[start] 所有服务启动完成"); + } + }); } /** @@ -86,79 +106,118 @@ public class IotDeviceUpstreamServer { */ private void setupMessageHandler() { client.publishHandler(mqttMessageHandler::handle); + log.debug("[setupMessageHandler] MQTT消息处理器设置完成"); } /** * 重连 MQTT 客户端 */ private void reconnectWithDelay() { - vertx.setTimer(RECONNECT_DELAY, id -> { - log.info("[reconnectWithDelay][开始重新连接 MQTT]"); + if (!isRunning) { + log.info("[reconnectWithDelay] 服务已停止,不再尝试重连"); + return; + } + + vertx.setTimer(RECONNECT_DELAY_MS, id -> { + log.info("[reconnectWithDelay] 开始重新连接MQTT"); connectMqtt(); }); } /** * 连接 MQTT Broker 并订阅主题 + * + * @return 连接结果的Future */ - private void connectMqtt() { - client.connect(emqxProperties.getMqttPort(), emqxProperties.getMqttHost()) - .onSuccess(connAck -> { - log.info("[connectMqtt][MQTT客户端连接成功]"); - subscribeToTopics(); + private Future connectMqtt() { + return client.connect(emqxProperties.getMqttPort(), emqxProperties.getMqttHost()) + .compose(connAck -> { + log.info("[connectMqtt] MQTT客户端连接成功"); + return subscribeToTopics(); }) - .onFailure(err -> { - log.error("[connectMqtt][连接 MQTT Broker 失败]", err); + .recover(err -> { + log.error("[connectMqtt] 连接MQTT Broker失败: {}", err.getMessage()); reconnectWithDelay(); + return Future.failedFuture(err); }); } /** * 订阅设备上行消息主题 + * + * @return 订阅结果的Future */ - private void subscribeToTopics() { - String[] topics = emqxProperties.getMqttTopics().split(","); - for (String topic : topics) { - client.subscribe(topic, MqttQoS.AT_LEAST_ONCE.value()) - .onSuccess(v -> log.info("[subscribeToTopics][成功订阅主题: {}]", topic)) - .onFailure(err -> log.error("[subscribeToTopics][订阅主题失败: {}]", topic, err)); + private Future subscribeToTopics() { + String topicsStr = emqxProperties.getMqttTopics(); + if (topicsStr == null || topicsStr.trim().isEmpty()) { + log.warn("[subscribeToTopics] 未配置MQTT主题,跳过订阅"); + return Future.succeededFuture(); } - log.info("[subscribeToTopics][开始订阅设备上行消息主题]"); + + log.info("[subscribeToTopics] 开始订阅设备上行消息主题"); + + String[] topics = topicsStr.split(TOPIC_SEPARATOR); + Future compositeFuture = Future.succeededFuture(); + + for (String topic : topics) { + String trimmedTopic = topic.trim(); + if (trimmedTopic.isEmpty()) { + continue; + } + + compositeFuture = compositeFuture.compose(v -> client.subscribe(trimmedTopic, DEFAULT_QOS.value()) + .map(ack -> { + log.info("[subscribeToTopics] 成功订阅主题: {}", trimmedTopic); + return null; + }) + .recover(err -> { + log.error("[subscribeToTopics] 订阅主题失败: {}, 原因: {}", + trimmedTopic, err.getMessage()); + return Future.succeededFuture(); // 继续订阅其他主题 + })); + } + + return compositeFuture; } /** - * 停止所有 + * 停止所有服务 */ public void stop() { - log.info("[stop][开始关闭]"); + if (!isRunning) { + log.warn("[stop] 服务未运行,无需停止"); + return; + } + + log.info("[stop] 开始关闭服务"); + isRunning = false; + try { - // 关闭 HTTP 服务器 - if (server != null) { - server.close() - .toCompletionStage() - .toCompletableFuture() - .join(); - } + CompletableFuture serverFuture = server != null + ? server.close().toCompletionStage().toCompletableFuture() + : CompletableFuture.completedFuture(null); - // 关闭 MQTT 客户端 - if (client != null) { - client.disconnect() - .toCompletionStage() - .toCompletableFuture() - .join(); - } + CompletableFuture clientFuture = client != null + ? client.disconnect().toCompletionStage().toCompletableFuture() + : CompletableFuture.completedFuture(null); - // 关闭 Vertx 实例 - if (vertx != null) { - vertx.close() - .toCompletionStage() - .toCompletableFuture() - .join(); - } - log.info("[stop][关闭完成]"); + CompletableFuture vertxFuture = vertx != null + ? vertx.close().toCompletionStage().toCompletableFuture() + : CompletableFuture.completedFuture(null); + + // 等待所有资源关闭 + CompletableFuture.allOf(serverFuture, clientFuture, vertxFuture) + .orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .whenComplete((result, error) -> { + if (error != null) { + log.error("[stop] 服务关闭过程中发生异常", error); + } else { + log.info("[stop] 所有服务关闭完成"); + } + }); } catch (Exception e) { - log.error("[stop][关闭异常]", e); - throw new RuntimeException(e); + log.error("[stop] 关闭服务异常", e); + throw new RuntimeException("关闭IoT设备上行服务失败", e); } } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java index 6b99d781a4..83e9d1bfd3 100644 --- a/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java +++ b/yudao-module-iot/yudao-module-iot-plugins/yudao-module-iot-plugin-emqx/src/main/java/cn/iocoder/yudao/module/iot/plugin/emqx/upstream/router/IotDeviceMqttMessageHandler.java @@ -13,13 +13,15 @@ import io.vertx.mqtt.messages.MqttPublishMessage; import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; +import java.util.Arrays; /** * IoT 设备 MQTT 消息处理器 *

* 参考: *

- * "..."> + * "..."> */ @Slf4j public class IotDeviceMqttMessageHandler { @@ -35,6 +37,12 @@ public class IotDeviceMqttMessageHandler { private static final String PROPERTY_POST_TOPIC = "/thing/event/property/post"; private static final String EVENT_POST_TOPIC_PREFIX = "/thing/event/"; private static final String EVENT_POST_TOPIC_SUFFIX = "/post"; + private static final String REPLY_SUFFIX = "_reply"; + private static final int SUCCESS_CODE = 200; + private static final String SUCCESS_MESSAGE = "success"; + private static final String PROPERTY_METHOD = "thing.event.property.post"; + private static final String EVENT_METHOD_PREFIX = "thing.event."; + private static final String EVENT_METHOD_SUFFIX = ".post"; private final IotDeviceUpstreamApi deviceUpstreamApi; private final MqttClient mqttClient; @@ -44,18 +52,33 @@ public class IotDeviceMqttMessageHandler { this.mqttClient = mqttClient; } + /** + * 处理MQTT消息 + * + * @param message MQTT发布消息 + */ public void handle(MqttPublishMessage message) { String topic = message.topicName(); String payload = message.payload().toString(); log.info("[messageHandler][接收到消息][topic: {}][payload: {}]", topic, payload); try { + if (payload == null || payload.isEmpty()) { + log.warn("[messageHandler][消息内容为空][topic: {}]", topic); + return; + } handleMessage(topic, payload); } catch (Exception e) { log.error("[messageHandler][处理消息失败][topic: {}][payload: {}]", topic, payload, e); } } + /** + * 根据主题类型处理消息 + * + * @param topic 主题 + * @param payload 消息内容 + */ private void handleMessage(String topic, String payload) { // 校验前缀 if (!topic.startsWith(SYS_TOPIC_PREFIX)) { @@ -88,34 +111,26 @@ public class IotDeviceMqttMessageHandler { * @param payload 消息内容 */ private void handlePropertyPost(String topic, String payload) { - // 解析消息内容 - JSONObject jsonObject = JSONUtil.parseObj(payload); - String[] topicParts = topic.split("/"); + try { + // 解析消息内容 + JSONObject jsonObject = JSONUtil.parseObj(payload); + String[] topicParts = parseTopic(topic); + if (topicParts == null) { + return; + } - // 构建设备属性上报请求对象 - IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO(jsonObject, topicParts); + // 构建设备属性上报请求对象 + IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO(jsonObject, topicParts); - // 调用上游 API 处理设备上报数据 - deviceUpstreamApi.reportDeviceProperty(reportReqDTO); - log.info("[handlePropertyPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]", - topic, JSONUtil.toJsonStr(reportReqDTO)); + // 调用上游 API 处理设备上报数据 + deviceUpstreamApi.reportDeviceProperty(reportReqDTO); + log.info("[handlePropertyPost][处理设备属性上报成功][topic: {}]", topic); - // 发送响应消息 - String replyTopic = topic + "_reply"; - JSONObject response = new JSONObject() - .set("id", jsonObject.getStr("id")) - .set("code", 200) - .set("data", new JSONObject()) - .set("message", "success") - .set("method", "thing.event.property.post"); - - mqttClient.publish(replyTopic, - Buffer.buffer(response.toString()), - MqttQoS.AT_LEAST_ONCE, - false, - false); - log.info("[handlePropertyPost][发送响应消息成功][topic: {}][response: {}]", - replyTopic, response.toString()); + // 发送响应消息 + sendResponse(topic, jsonObject, PROPERTY_METHOD, null); + } catch (Exception e) { + log.error("[handlePropertyPost][处理设备属性上报失败][topic: {}][payload: {}]", topic, payload, e); + } } /** @@ -125,35 +140,97 @@ public class IotDeviceMqttMessageHandler { * @param payload 消息内容 */ private void handleEventPost(String topic, String payload) { - // 解析消息内容 - JSONObject jsonObject = JSONUtil.parseObj(payload); + try { + // 解析消息内容 + JSONObject jsonObject = JSONUtil.parseObj(payload); + String[] topicParts = parseTopic(topic); + if (topicParts == null) { + return; + } + + // 构建设备事件上报请求对象 + IotDeviceEventReportReqDTO reportReqDTO = buildEventReportDTO(jsonObject, topicParts); + + // 调用上游 API 处理设备上报数据 + deviceUpstreamApi.reportDeviceEvent(reportReqDTO); + log.info("[handleEventPost][处理设备事件上报成功][topic: {}]", topic); + + // 从 topic 中获取事件标识符 + String eventIdentifier = getEventIdentifier(topicParts, topic); + if (eventIdentifier == null) { + return; + } + + // 发送响应消息 + String method = EVENT_METHOD_PREFIX + eventIdentifier + EVENT_METHOD_SUFFIX; + sendResponse(topic, jsonObject, method, null); + } catch (Exception e) { + log.error("[handleEventPost][处理设备事件上报失败][topic: {}][payload: {}]", topic, payload, e); + } + } + + /** + * 解析主题,获取主题各部分 + * + * @param topic 主题 + * @return 主题各部分数组,如果解析失败返回null + */ + private String[] parseTopic(String topic) { String[] topicParts = topic.split("/"); + if (topicParts.length < 7) { + log.warn("[parseTopic][主题格式不正确][topic: {}]", topic); + return null; + } + return topicParts; + } - // 构建设备事件上报请求对象 - IotDeviceEventReportReqDTO reportReqDTO = buildEventReportDTO(jsonObject, topicParts); + /** + * 从主题部分中获取事件标识符 + * + * @param topicParts 主题各部分 + * @param topic 原始主题,用于日志 + * @return 事件标识符,如果获取失败返回null + */ + private String getEventIdentifier(String[] topicParts, String topic) { + try { + return topicParts[6]; + } catch (ArrayIndexOutOfBoundsException e) { + log.warn("[getEventIdentifier][无法从主题中获取事件标识符][topic: {}][topicParts: {}]", + topic, Arrays.toString(topicParts)); + return null; + } + } - // 调用上游 API 处理设备上报数据 - deviceUpstreamApi.reportDeviceEvent(reportReqDTO); - log.info("[handleEventPost][处理设备上行消息成功][topic: {}][reportReqDTO: {}]", - topic, JSONUtil.toJsonStr(reportReqDTO)); + /** + * 发送响应消息 + * + * @param topic 原始主题 + * @param jsonObject 原始消息JSON对象 + * @param method 响应方法 + * @param customData 自定义数据,可为null + */ + private void sendResponse(String topic, JSONObject jsonObject, String method, JSONObject customData) { + String replyTopic = topic + REPLY_SUFFIX; + JSONObject data = customData != null ? customData : new JSONObject(); - // 发送响应消息 - String replyTopic = topic + "_reply"; - String eventIdentifier = topicParts[6]; // 从 topic 中获取事件标识符 JSONObject response = new JSONObject() .set("id", jsonObject.getStr("id")) - .set("code", 200) - .set("data", new JSONObject()) - .set("message", "success") - .set("method", "thing.event." + eventIdentifier + ".post"); + .set("code", SUCCESS_CODE) + .set("data", data) + .set("message", SUCCESS_MESSAGE) + .set("method", method); - mqttClient.publish(replyTopic, - Buffer.buffer(response.toString()), - MqttQoS.AT_LEAST_ONCE, - false, - false); - log.info("[handleEventPost][发送响应消息成功][topic: {}][response: {}]", - replyTopic, response.toString()); + try { + mqttClient.publish(replyTopic, + Buffer.buffer(response.toString()), + MqttQoS.AT_LEAST_ONCE, + false, + false); + log.info("[sendResponse][发送响应消息成功][topic: {}]", replyTopic); + } catch (Exception e) { + log.error("[sendResponse][发送响应消息失败][topic: {}][response: {}]", + replyTopic, response.toString(), e); + } } /** @@ -163,15 +240,15 @@ public class IotDeviceMqttMessageHandler { * @param topicParts 主题部分 * @return 设备属性上报请求对象 */ - private IotDevicePropertyReportReqDTO buildPropertyReportDTO(JSONObject jsonObject, - String[] topicParts) { - return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO() - .setRequestId(jsonObject.getStr("id")) - .setProcessId(IotPluginCommonUtils.getProcessId()) - .setReportTime(LocalDateTime.now()) - .setProductKey(topicParts[2]) - .setDeviceName(topicParts[3])) - .setProperties(jsonObject.getJSONObject("params")); + private IotDevicePropertyReportReqDTO buildPropertyReportDTO(JSONObject jsonObject, String[] topicParts) { + IotDevicePropertyReportReqDTO reportReqDTO = new IotDevicePropertyReportReqDTO(); + reportReqDTO.setRequestId(jsonObject.getStr("id")); + reportReqDTO.setProcessId(IotPluginCommonUtils.getProcessId()); + reportReqDTO.setReportTime(LocalDateTime.now()); + reportReqDTO.setProductKey(topicParts[2]); + reportReqDTO.setDeviceName(topicParts[3]); + reportReqDTO.setProperties(jsonObject.getJSONObject("params")); + return reportReqDTO; } /** @@ -182,13 +259,14 @@ public class IotDeviceMqttMessageHandler { * @return 设备事件上报请求对象 */ private IotDeviceEventReportReqDTO buildEventReportDTO(JSONObject jsonObject, String[] topicParts) { - return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO() - .setRequestId(jsonObject.getStr("id")) - .setProcessId(IotPluginCommonUtils.getProcessId()) - .setReportTime(LocalDateTime.now()) - .setProductKey(topicParts[2]) - .setDeviceName(topicParts[3])) - .setIdentifier(topicParts[4]) - .setParams(jsonObject.getJSONObject("params")); + IotDeviceEventReportReqDTO reportReqDTO = new IotDeviceEventReportReqDTO(); + reportReqDTO.setRequestId(jsonObject.getStr("id")); + reportReqDTO.setProcessId(IotPluginCommonUtils.getProcessId()); + reportReqDTO.setReportTime(LocalDateTime.now()); + reportReqDTO.setProductKey(topicParts[2]); + reportReqDTO.setDeviceName(topicParts[3]); + reportReqDTO.setIdentifier(topicParts[6]); + reportReqDTO.setParams(jsonObject.getJSONObject("params")); + return reportReqDTO; } } \ No newline at end of file