entry : params.entrySet()) {
+// context.setParameter(entry.getKey(), entry.getValue());
+// }
+//
+// // 执行脚本
+// Object result = scriptService.executeScript(
+// testReqVO.getScriptLanguage(),
+// testReqVO.getScriptContent(),
+// context);
+//
+// // 更新测试结果(如果是已保存的脚本)
+// if (existingScript != null) {
+// IotProductScriptDO updateObj = new IotProductScriptDO();
+// updateObj.setId(existingScript.getId());
+// updateObj.setLastTestTime(LocalDateTime.now());
+// updateObj.setLastTestResult(1); // 1表示成功
+// productScriptMapper.updateById(updateObj);
+// }
+//
+// long executionTime = System.currentTimeMillis() - startTime;
+// return IotProductScriptTestRespVO.success(result, executionTime);
+//
+// } catch (Exception e) {
+// log.error("[testProductScript][测试脚本异常]", e);
+//
+// // 如果是已保存的脚本,更新测试失败状态
+// if (testReqVO.getId() != null) {
+// try {
+// IotProductScriptDO updateObj = new IotProductScriptDO();
+// updateObj.setId(testReqVO.getId());
+// updateObj.setLastTestTime(LocalDateTime.now());
+// updateObj.setLastTestResult(0); // 0表示失败
+// productScriptMapper.updateById(updateObj);
+// } catch (Exception ex) {
+// log.error("[testProductScript][更新脚本测试结果异常]", ex);
+// }
+// }
+//
+// long executionTime = System.currentTimeMillis() - startTime;
+// return IotProductScriptTestRespVO.error(e.getMessage(), executionTime);
+// }
+ return null;
}
@Override
diff --git a/yudao-module-iot/yudao-module-iot-components/README.md b/yudao-module-iot/yudao-module-iot-components/README.md
new file mode 100644
index 0000000000..88b368854d
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/README.md
@@ -0,0 +1,135 @@
+# IOT 组件使用说明
+
+## 组件介绍
+
+该模块包含多个 IoT 设备连接组件,提供不同的通信协议支持:
+
+- `yudao-module-iot-component-core`: 核心接口和通用类
+- `yudao-module-iot-component-http`: 基于 HTTP 协议的设备通信组件
+- `yudao-module-iot-component-emqx`: 基于 MQTT/EMQX 的设备通信组件
+
+## 组件架构
+
+### 架构设计
+
+各组件采用统一的架构设计和命名规范:
+
+- 配置类: `IotComponentXxxAutoConfiguration` - 提供Bean定义和组件初始化逻辑
+- 属性类: `IotComponentXxxProperties` - 定义组件的配置属性
+- 下行接口: `*DownstreamHandler` - 处理从平台到设备的下行通信
+- 上行接口: `*UpstreamServer` - 处理从设备到平台的上行通信
+
+### Bean 命名规范
+
+为避免 Bean 冲突,各个组件中的 Bean 已添加特定前缀:
+
+- HTTP 组件: `httpDeviceUpstreamServer`, `httpDeviceDownstreamHandler`
+- EMQX 组件: `emqxDeviceUpstreamServer`, `emqxDeviceDownstreamHandler`
+
+### 组件启用规则
+
+现在系统支持同时使用多个组件,但有以下规则:
+
+1. 当`yudao.iot.component.emqx.enabled=true`时,核心模块将优先使用EMQX组件
+2. 如果同时启用了多个组件,需要在业务代码中使用`@Qualifier`指定要使用的具体实现
+
+> **重要提示:**
+> 1. 组件库内部的默认配置文件**不会**被自动加载。必须将上述配置添加到主应用的配置文件中。
+> 2. 所有配置项现在都已增加空值处理,配置缺失时将使用合理的默认值
+> 3. `mqtt-host` 是唯一必须配置的参数,其他参数均有默认值
+> 4. `mqtt-ssl` 和 `auth-port` 缺失时的默认值分别为 `false` 和 `8080`
+> 5. `mqtt-topics` 缺失时将使用默认主题 `/device/#`
+
+### 如何引用特定的 Bean
+
+在其他组件中引用这些 Bean 时,需要使用 `@Qualifier` 注解指定 Bean 名称:
+
+```java
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamHandler;
+
+@Service
+public class YourServiceClass {
+
+ // 注入 HTTP 组件的下行处理器
+ @Autowired
+ @Qualifier("httpDeviceDownstreamHandler")
+ private IotDeviceDownstreamHandler httpDeviceDownstreamHandler;
+
+ // 注入 EMQX 组件的下行处理器
+ @Autowired
+ @Qualifier("emqxDeviceDownstreamHandler")
+ private IotDeviceDownstreamHandler emqxDeviceDownstreamHandler;
+
+ // 使用示例
+ public void example() {
+ // 使用 HTTP 组件
+ httpDeviceDownstreamHandler.invokeDeviceService(...);
+
+ // 使用 EMQX 组件
+ emqxDeviceDownstreamHandler.invokeDeviceService(...);
+ }
+}
+```
+
+### 组件选择指南
+
+- **HTTP 组件**:适用于简单场景,设备通过 HTTP 接口与平台通信
+- **EMQX 组件**:适用于实时性要求高的场景,基于 MQTT 协议,支持发布/订阅模式
+
+## 常见问题
+
+### 1. 配置未加载问题
+
+如果遇到以下日志:
+
+```
+MQTT配置: host=null, port=null, username=null, ssl=null
+[connectMqtt][MQTT Host为null,无法连接]
+```
+
+这表明配置没有被正确加载。请确保:
+
+1. 在主应用的配置文件中(如 `application.yml` 或 `application-dev.yml`)添加了必要的 EMQX 配置
+2. 配置前缀正确:`yudao.iot.component.emqx`
+3. 配置了必要的 `mqtt-host` 属性
+
+### 2. mqttSsl 空指针异常
+
+如果遇到以下错误:
+
+```
+Cannot invoke "java.lang.Boolean.booleanValue()" because the return value of "cn.iocoder.yudao.module.iot.component.emqx.config.IotEmqxComponentProperties.getMqttSsl()" is null
+```
+
+此问题已通过代码修复,现在会自动使用默认值 `false`。同样适用于其他配置项的空值问题。
+
+### 3. authPort 空指针异常
+
+如果遇到以下错误:
+
+```
+Cannot invoke "java.lang.Integer.intValue()" because the return value of "cn.iocoder.yudao.module.iot.component.emqx.config.IotEmqxComponentProperties.getAuthPort()" is null
+```
+
+此问题已通过代码修复,现在会自动使用默认值 `8080`。
+
+### 4. Bean注入问题
+
+如果遇到以下错误:
+
+```
+Parameter 1 of method deviceDownstreamServer in IotPluginCommonAutoConfiguration required a single bean, but 2 were found
+```
+
+此问题已通过修改核心配置类来解决。现在系统会根据组件的启用状态自动选择合适的实现:
+
+1. 优先使用EMQX组件(当`yudao.iot.component.emqx.enabled=true`时)
+2. 如果EMQX未启用,则使用HTTP组件(当`yudao.iot.component.http.enabled=true`时)
+
+如果需要同时使用两个组件,业务代码中必须使用`@Qualifier`明确指定要使用的Bean。
+
+### 5. 使用默认配置
+
+组件现已加入完善的默认配置和空值处理机制,使配置更加灵活。但需要注意的是,这些默认配置值必须通过在主应用配置文件中设置相应的属性才能生效。
diff --git a/yudao-module-iot/yudao-module-iot-components/pom.xml b/yudao-module-iot/yudao-module-iot-components/pom.xml
new file mode 100644
index 0000000000..297761f9c3
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/pom.xml
@@ -0,0 +1,26 @@
+
+
+
+ yudao-module-iot
+ cn.iocoder.boot
+ ${revision}
+
+ 4.0.0
+
+ yudao-module-iot-components
+ pom
+
+ ${project.artifactId}
+
+ 物联网组件模块,提供与物联网设备通讯、管理的组件实现
+
+
+
+ yudao-module-iot-component-core
+ yudao-module-iot-component-http
+ yudao-module-iot-component-emqx
+
+
+
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/pom.xml b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/pom.xml
new file mode 100644
index 0000000000..adb6255278
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/pom.xml
@@ -0,0 +1,52 @@
+
+
+
+ yudao-module-iot-components
+ cn.iocoder.boot
+ ${revision}
+
+ 4.0.0
+
+ yudao-module-iot-component-core
+ jar
+
+ ${project.artifactId}
+
+ 物联网组件核心模块
+
+
+
+
+ cn.iocoder.boot
+ yudao-module-iot-api
+ ${revision}
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+
+
+ org.springframework
+ spring-web
+
+
+
+
+ io.vertx
+ vertx-web
+ true
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-validation
+ true
+
+
+
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/config/IotComponentCommonAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/config/IotComponentCommonAutoConfiguration.java
new file mode 100644
index 0000000000..f80c969bb0
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/config/IotComponentCommonAutoConfiguration.java
@@ -0,0 +1,50 @@
+package cn.iocoder.yudao.module.iot.component.core.config;
+
+import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamHandler;
+import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamServer;
+import cn.iocoder.yudao.module.iot.component.core.heartbeat.IotComponentInstanceHeartbeatJob;
+import cn.iocoder.yudao.module.iot.component.core.heartbeat.IotComponentRegistry;
+import cn.iocoder.yudao.module.iot.component.core.upstream.IotDeviceUpstreamClient;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+/**
+ * IoT 组件的通用自动配置类
+ *
+ * @author haohao
+ */
+@AutoConfiguration
+@EnableConfigurationProperties(IotComponentCommonProperties.class)
+@EnableScheduling // 开启定时任务,因为 IotComponentInstanceHeartbeatJob 是一个定时任务
+public class IotComponentCommonAutoConfiguration {
+
+ /**
+ * 创建EMQX设备下行服务器
+ * 当yudao.iot.component.emqx.enabled=true时,使用emqxDeviceDownstreamHandler
+ */
+ @Bean
+ @ConditionalOnProperty(prefix = "yudao.iot.component.emqx", name = "enabled", havingValue = "true")
+ public IotDeviceDownstreamServer emqxDeviceDownstreamServer(IotComponentCommonProperties properties,
+ @Qualifier("emqxDeviceDownstreamHandler") IotDeviceDownstreamHandler deviceDownstreamHandler) {
+ return new IotDeviceDownstreamServer(properties, deviceDownstreamHandler);
+ }
+
+ @Bean(initMethod = "init", destroyMethod = "stop")
+ public IotComponentInstanceHeartbeatJob pluginInstanceHeartbeatJob(IotDeviceUpstreamApi deviceUpstreamApi,
+ IotDeviceDownstreamServer deviceDownstreamServer,
+ IotComponentCommonProperties commonProperties,
+ IotComponentRegistry componentRegistry) {
+ return new IotComponentInstanceHeartbeatJob(deviceUpstreamApi, deviceDownstreamServer, commonProperties,
+ componentRegistry);
+ }
+
+ @Bean
+ public IotDeviceUpstreamClient deviceUpstreamClient() {
+ return new IotDeviceUpstreamClient();
+ }
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/config/IotComponentCommonProperties.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/config/IotComponentCommonProperties.java
new file mode 100644
index 0000000000..43eec749e4
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/config/IotComponentCommonProperties.java
@@ -0,0 +1,24 @@
+package cn.iocoder.yudao.module.iot.component.core.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.validation.annotation.Validated;
+
+/**
+ * IoT 组件通用配置属性
+ *
+ * @author haohao
+ */
+@ConfigurationProperties(prefix = "yudao.iot.component.core")
+@Validated
+@Data
+public class IotComponentCommonProperties {
+
+ /**
+ * 组件的唯一标识
+ *
+ * 注意:该值将在运行时由各组件设置,不再从配置读取
+ */
+ private String pluginKey;
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/downstream/IotDeviceDownstreamHandler.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/downstream/IotDeviceDownstreamHandler.java
new file mode 100644
index 0000000000..d3fefde970
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/downstream/IotDeviceDownstreamHandler.java
@@ -0,0 +1,55 @@
+package cn.iocoder.yudao.module.iot.component.core.downstream;
+
+import cn.iocoder.yudao.framework.common.pojo.CommonResult;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
+
+/**
+ * IoT 设备下行处理器
+ *
+ * 目的:每个 plugin 需要实现,用于处理 server 下行的指令(请求),从而实现从 server => plugin => device 的下行流程
+ *
+ * @author 芋道源码
+ */
+public interface IotDeviceDownstreamHandler {
+
+ /**
+ * 调用设备服务
+ *
+ * @param invokeReqDTO 调用设备服务的请求
+ * @return 是否成功
+ */
+ CommonResult invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO);
+
+ /**
+ * 获取设备属性
+ *
+ * @param getReqDTO 获取设备属性的请求
+ * @return 是否成功
+ */
+ CommonResult getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO);
+
+ /**
+ * 设置设备属性
+ *
+ * @param setReqDTO 设置设备属性的请求
+ * @return 是否成功
+ */
+ CommonResult setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO);
+
+ /**
+ * 设置设备配置
+ *
+ * @param setReqDTO 设置设备配置的请求
+ * @return 是否成功
+ */
+ CommonResult setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO);
+
+ /**
+ * 升级设备 OTA
+ *
+ * @param upgradeReqDTO 升级设备 OTA 的请求
+ * @return 是否成功
+ */
+ CommonResult upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO);
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/downstream/IotDeviceDownstreamServer.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/downstream/IotDeviceDownstreamServer.java
new file mode 100644
index 0000000000..dfff2b1b3e
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/downstream/IotDeviceDownstreamServer.java
@@ -0,0 +1,80 @@
+package cn.iocoder.yudao.module.iot.component.core.downstream;
+
+import cn.iocoder.yudao.framework.common.pojo.CommonResult;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
+import cn.iocoder.yudao.module.iot.component.core.config.IotComponentCommonProperties;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * IoT 设备下行服务,直接转发给 device 设备
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+@RequiredArgsConstructor
+public class IotDeviceDownstreamServer {
+
+ private final IotComponentCommonProperties properties;
+ private final IotDeviceDownstreamHandler deviceDownstreamHandler;
+
+ /**
+ * 调用设备服务
+ *
+ * @param invokeReqDTO 调用设备服务的请求
+ * @return 是否成功
+ */
+ public CommonResult invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) {
+ return deviceDownstreamHandler.invokeDeviceService(invokeReqDTO);
+ }
+
+ /**
+ * 获取设备属性
+ *
+ * @param getReqDTO 获取设备属性的请求
+ * @return 是否成功
+ */
+ public CommonResult getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) {
+ return deviceDownstreamHandler.getDeviceProperty(getReqDTO);
+ }
+
+ /**
+ * 设置设备属性
+ *
+ * @param setReqDTO 设置设备属性的请求
+ * @return 是否成功
+ */
+ public CommonResult setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) {
+ return deviceDownstreamHandler.setDeviceProperty(setReqDTO);
+ }
+
+ /**
+ * 设置设备配置
+ *
+ * @param setReqDTO 设置设备配置的请求
+ * @return 是否成功
+ */
+ public CommonResult setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) {
+ return deviceDownstreamHandler.setDeviceConfig(setReqDTO);
+ }
+
+ /**
+ * 升级设备 OTA
+ *
+ * @param upgradeReqDTO 升级设备 OTA 的请求
+ * @return 是否成功
+ */
+ public CommonResult upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) {
+ return deviceDownstreamHandler.upgradeDeviceOta(upgradeReqDTO);
+ }
+
+ /**
+ * 获得内部组件标识
+ *
+ * @return 组件标识
+ */
+ public String getComponentId() {
+ return properties.getPluginKey();
+ }
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/heartbeat/IotComponentInstanceHeartbeatJob.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/heartbeat/IotComponentInstanceHeartbeatJob.java
new file mode 100644
index 0000000000..acec908495
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/heartbeat/IotComponentInstanceHeartbeatJob.java
@@ -0,0 +1,131 @@
+package cn.iocoder.yudao.module.iot.component.core.heartbeat;
+
+import cn.hutool.system.SystemUtil;
+import cn.iocoder.yudao.framework.common.pojo.CommonResult;
+import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO;
+import cn.iocoder.yudao.module.iot.component.core.config.IotComponentCommonProperties;
+import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamServer;
+import cn.iocoder.yudao.module.iot.component.core.heartbeat.IotComponentRegistry.IotComponentInfo;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Scheduled;
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * IoT 组件实例心跳定时任务
+ *
+ * 将组件的状态定时上报给 server 服务器
+ *
+ * 用于定时发送心跳给服务端
+ */
+@RequiredArgsConstructor
+@Slf4j
+public class IotComponentInstanceHeartbeatJob {
+
+ /**
+ * 内嵌模式的端口值(固定为0)
+ */
+ private static final Integer EMBEDDED_PORT = 0;
+
+ private final IotDeviceUpstreamApi deviceUpstreamApi;
+ private final IotDeviceDownstreamServer deviceDownstreamServer;
+ private final IotComponentCommonProperties commonProperties;
+ private final IotComponentRegistry componentRegistry;
+
+ /**
+ * 初始化方法由Spring调用
+ * 注册当前组件并发送上线心跳
+ */
+ public void init() {
+ // 将当前组件注册到注册表
+ String processId = getProcessId();
+ String hostIp = SystemUtil.getHostInfo().getAddress();
+
+ // 注册当前组件
+ componentRegistry.registerComponent(
+ commonProperties.getPluginKey(),
+ hostIp,
+ EMBEDDED_PORT,
+ processId);
+
+ // 发送所有组件的上线心跳
+ for (IotComponentInfo component : componentRegistry.getAllComponents()) {
+ try {
+ CommonResult result = deviceUpstreamApi.heartbeatPluginInstance(
+ buildPluginInstanceHeartbeatReqDTO(component, true));
+ log.info("[init][组件({})上线结果:{})]", component.getPluginKey(), result);
+ } catch (Exception e) {
+ log.error("[init][组件({})上线发送异常]", component.getPluginKey(), e);
+ }
+ }
+ }
+
+ /**
+ * 停止方法由Spring调用
+ * 发送下线心跳并注销组件
+ */
+ public void stop() {
+ // 发送所有组件的下线心跳
+ for (IotComponentInfo component : componentRegistry.getAllComponents()) {
+ try {
+ CommonResult result = deviceUpstreamApi.heartbeatPluginInstance(
+ buildPluginInstanceHeartbeatReqDTO(component, false));
+ log.info("[stop][组件({})下线结果:{})]", component.getPluginKey(), result);
+ } catch (Exception e) {
+ log.error("[stop][组件({})下线发送异常]", component.getPluginKey(), e);
+ }
+ }
+
+ // 注销当前组件
+ componentRegistry.unregisterComponent(commonProperties.getPluginKey());
+ }
+
+ /**
+ * 定时发送心跳
+ */
+ @Scheduled(initialDelay = 1, fixedRate = 1, timeUnit = TimeUnit.MINUTES) // 1 分钟执行一次
+ public void execute() {
+ // 发送所有组件的心跳
+ for (IotComponentInfo component : componentRegistry.getAllComponents()) {
+ try {
+ CommonResult result = deviceUpstreamApi.heartbeatPluginInstance(
+ buildPluginInstanceHeartbeatReqDTO(component, true));
+ log.info("[execute][组件({})心跳结果:{})]", component.getPluginKey(), result);
+ } catch (Exception e) {
+ log.error("[execute][组件({})心跳发送异常]", component.getPluginKey(), e);
+ }
+ }
+ }
+
+ /**
+ * 构建心跳DTO
+ *
+ * @param component 组件信息
+ * @param online 是否在线
+ * @return 心跳DTO
+ */
+ private IotPluginInstanceHeartbeatReqDTO buildPluginInstanceHeartbeatReqDTO(IotComponentInfo component,
+ Boolean online) {
+ return new IotPluginInstanceHeartbeatReqDTO()
+ .setPluginKey(component.getPluginKey())
+ .setProcessId(component.getProcessId())
+ .setHostIp(component.getHostIp())
+ .setDownstreamPort(component.getDownstreamPort())
+ .setOnline(online);
+ }
+
+ /**
+ * 获取当前进程ID
+ *
+ * @return 进程ID
+ */
+ private String getProcessId() {
+ // 获取进程的 name
+ String name = ManagementFactory.getRuntimeMXBean().getName();
+ // 分割名称,格式为 pid@hostname
+ return name.split("@")[0];
+ }
+}
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/heartbeat/IotComponentRegistry.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/heartbeat/IotComponentRegistry.java
new file mode 100644
index 0000000000..9913f02825
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/heartbeat/IotComponentRegistry.java
@@ -0,0 +1,92 @@
+package cn.iocoder.yudao.module.iot.component.core.heartbeat;
+
+import lombok.Data;
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * IoT 组件注册表
+ *
+ * 用于管理多个组件的注册信息,解决多组件心跳问题
+ */
+@Component
+@Slf4j
+public class IotComponentRegistry {
+
+ /**
+ * 组件信息
+ */
+ @Data
+ @ToString
+ public static class IotComponentInfo {
+ /**
+ * 组件Key
+ */
+ private final String pluginKey;
+ /**
+ * 主机IP
+ */
+ private final String hostIp;
+ /**
+ * 下游端口
+ */
+ private final Integer downstreamPort;
+ /**
+ * 进程ID
+ */
+ private final String processId;
+ }
+
+ /**
+ * 组件映射表,key为组件Key
+ */
+ private final Map components = new ConcurrentHashMap<>();
+
+ /**
+ * 注册组件
+ *
+ * @param pluginKey 组件Key
+ * @param hostIp 主机IP
+ * @param downstreamPort 下游端口
+ * @param processId 进程ID
+ */
+ public void registerComponent(String pluginKey, String hostIp, Integer downstreamPort, String processId) {
+ log.info("[registerComponent][注册组件, pluginKey={}, hostIp={}, downstreamPort={}, processId={}]",
+ pluginKey, hostIp, downstreamPort, processId);
+ components.put(pluginKey, new IotComponentInfo(pluginKey, hostIp, downstreamPort, processId));
+ }
+
+ /**
+ * 注销组件
+ *
+ * @param pluginKey 组件Key
+ */
+ public void unregisterComponent(String pluginKey) {
+ log.info("[unregisterComponent][注销组件, pluginKey={}]", pluginKey);
+ components.remove(pluginKey);
+ }
+
+ /**
+ * 获取所有组件
+ *
+ * @return 所有组件集合
+ */
+ public Collection getAllComponents() {
+ return components.values();
+ }
+
+ /**
+ * 获取指定组件
+ *
+ * @param pluginKey 组件Key
+ * @return 组件信息
+ */
+ public IotComponentInfo getComponent(String pluginKey) {
+ return components.get(pluginKey);
+ }
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/pojo/IotStandardResponse.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/pojo/IotStandardResponse.java
new file mode 100644
index 0000000000..dbbbe73abf
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/pojo/IotStandardResponse.java
@@ -0,0 +1,93 @@
+package cn.iocoder.yudao.module.iot.component.core.pojo;
+
+import lombok.Data;
+
+/**
+ * IoT 标准协议响应实体类
+ *
+ * 用于统一 MQTT 和 HTTP 的响应格式
+ *
+ * @author haohao
+ */
+@Data
+public class IotStandardResponse {
+
+ /**
+ * 消息ID
+ */
+ private String id;
+
+ /**
+ * 状态码
+ */
+ private Integer code;
+
+ /**
+ * 响应数据
+ */
+ private Object data;
+
+ /**
+ * 响应消息
+ */
+ private String message;
+
+ /**
+ * 方法名
+ */
+ private String method;
+
+ /**
+ * 协议版本
+ */
+ private String version;
+
+ /**
+ * 创建成功响应
+ *
+ * @param id 消息ID
+ * @param method 方法名
+ * @return 成功响应
+ */
+ public static IotStandardResponse success(String id, String method) {
+ return success(id, method, null);
+ }
+
+ /**
+ * 创建成功响应
+ *
+ * @param id 消息ID
+ * @param method 方法名
+ * @param data 响应数据
+ * @return 成功响应
+ */
+ public static IotStandardResponse success(String id, String method, Object data) {
+ return new IotStandardResponse()
+ .setId(id)
+ .setCode(200)
+ .setData(data)
+ .setMessage("success")
+ .setMethod(method)
+ .setVersion("1.0");
+ }
+
+ /**
+ * 创建错误响应
+ *
+ * @param id 消息ID
+ * @param method 方法名
+ * @param code 错误码
+ * @param message 错误消息
+ * @return 错误响应
+ */
+ public static IotStandardResponse error(String id, String method, Integer code, String message) {
+ return new IotStandardResponse()
+ .setId(id)
+ .setCode(code)
+ .setData(null)
+ .setMessage(message)
+ .setMethod(method)
+ .setVersion("1.0");
+ }
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/upstream/IotDeviceUpstreamClient.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/upstream/IotDeviceUpstreamClient.java
new file mode 100644
index 0000000000..1cec3ee0f1
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/upstream/IotDeviceUpstreamClient.java
@@ -0,0 +1,62 @@
+package cn.iocoder.yudao.module.iot.component.core.upstream;
+
+import cn.iocoder.yudao.framework.common.pojo.CommonResult;
+import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.*;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.annotation.Resource;
+
+/**
+ * 设备数据 Upstream 上行客户端
+ *
+ * 直接调用 IotDeviceUpstreamApi 接口
+ *
+ * @author haohao
+ */
+@Slf4j
+public class IotDeviceUpstreamClient implements IotDeviceUpstreamApi {
+
+ @Resource
+ private IotDeviceUpstreamApi deviceUpstreamApi;
+
+ @Override
+ public CommonResult updateDeviceState(IotDeviceStateUpdateReqDTO updateReqDTO) {
+ return deviceUpstreamApi.updateDeviceState(updateReqDTO);
+ }
+
+ @Override
+ public CommonResult reportDeviceEvent(IotDeviceEventReportReqDTO reportReqDTO) {
+ return deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
+ }
+
+ @Override
+ public CommonResult registerDevice(IotDeviceRegisterReqDTO registerReqDTO) {
+ return deviceUpstreamApi.registerDevice(registerReqDTO);
+ }
+
+ @Override
+ public CommonResult registerSubDevice(IotDeviceRegisterSubReqDTO registerReqDTO) {
+ return deviceUpstreamApi.registerSubDevice(registerReqDTO);
+ }
+
+ @Override
+ public CommonResult addDeviceTopology(IotDeviceTopologyAddReqDTO addReqDTO) {
+ return deviceUpstreamApi.addDeviceTopology(addReqDTO);
+ }
+
+ @Override
+ public CommonResult authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) {
+ return deviceUpstreamApi.authenticateEmqxConnection(authReqDTO);
+ }
+
+ @Override
+ public CommonResult reportDeviceProperty(IotDevicePropertyReportReqDTO reportReqDTO) {
+ return deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
+ }
+
+ @Override
+ public CommonResult heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) {
+ return deviceUpstreamApi.heartbeatPluginInstance(heartbeatReqDTO);
+ }
+}
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/util/IotPluginCommonUtils.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/util/IotPluginCommonUtils.java
new file mode 100644
index 0000000000..5fc1df120d
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/java/cn/iocoder/yudao/module/iot/component/core/util/IotPluginCommonUtils.java
@@ -0,0 +1,76 @@
+package cn.iocoder.yudao.module.iot.component.core.util;
+
+import cn.hutool.core.util.IdUtil;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.system.SystemUtil;
+import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+import cn.iocoder.yudao.module.iot.component.core.pojo.IotStandardResponse;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.ext.web.RoutingContext;
+import org.springframework.http.MediaType;
+
+/**
+ * IoT 插件的通用工具类
+ *
+ * @author 芋道源码
+ */
+public class IotPluginCommonUtils {
+
+ /**
+ * 流程实例的进程编号
+ */
+ private static String processId;
+
+ public static String getProcessId() {
+ if (StrUtil.isEmpty(processId)) {
+ initProcessId();
+ }
+ return processId;
+ }
+
+ private synchronized static void initProcessId() {
+ processId = String.format("%s@%d@%s", // IP@PID@${uuid}
+ SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID(), IdUtil.fastSimpleUUID());
+ }
+
+ /**
+ * 将对象转换为JSON字符串后写入HTTP响应
+ *
+ * @param routingContext 路由上下文
+ * @param data 数据对象
+ */
+ @SuppressWarnings("deprecation")
+ public static void writeJsonResponse(RoutingContext routingContext, Object data) {
+ routingContext.response()
+ .setStatusCode(200)
+ .putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE)
+ .end(JsonUtils.toJsonString(data));
+ }
+
+ /**
+ * 生成标准JSON格式的响应并写入HTTP响应(基于IotStandardResponse)
+ *
+ * 推荐使用此方法,统一MQTT和HTTP的响应格式。使用方式:
+ *
+ *
+ * // 成功响应
+ * IotStandardResponse response = IotStandardResponse.success(requestId, method, data);
+ * IotPluginCommonUtils.writeJsonResponse(routingContext, response);
+ *
+ * // 错误响应
+ * IotStandardResponse errorResponse = IotStandardResponse.error(requestId, method, code, message);
+ * IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
+ *
+ *
+ * @param routingContext 路由上下文
+ * @param response IotStandardResponse响应对象
+ */
+ @SuppressWarnings("deprecation")
+ public static void writeJsonResponse(RoutingContext routingContext, IotStandardResponse response) {
+ routingContext.response()
+ .setStatusCode(200)
+ .putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE)
+ .end(JsonUtils.toJsonString(response));
+ }
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/resources/META-INF/spring.factories b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000000..7f075529e5
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,2 @@
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+ cn.iocoder.yudao.module.iot.component.core.config.IotPluginCommonAutoConfiguration
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 0000000000..e7b9b8ba6e
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+cn.iocoder.yudao.module.iot.component.core.config.IotComponentCommonAutoConfiguration
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/pom.xml b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/pom.xml
new file mode 100644
index 0000000000..977fcc5014
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/pom.xml
@@ -0,0 +1,44 @@
+
+
+
+ yudao-module-iot-components
+ cn.iocoder.boot
+ ${revision}
+
+ 4.0.0
+ yudao-module-iot-component-emqx
+ jar
+
+ ${project.artifactId}
+
+ 物联网组件 EMQX 模块
+
+
+
+
+ cn.iocoder.boot
+ yudao-module-iot-component-core
+ ${revision}
+
+
+
+
+ io.vertx
+ vertx-web
+
+
+ io.vertx
+ vertx-mqtt
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-validation
+ true
+
+
+
+
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/config/IotComponentEmqxAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/config/IotComponentEmqxAutoConfiguration.java
new file mode 100644
index 0000000000..69bce3f88e
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/config/IotComponentEmqxAutoConfiguration.java
@@ -0,0 +1,121 @@
+package cn.iocoder.yudao.module.iot.component.emqx.config;
+
+import cn.hutool.core.util.IdUtil;
+import cn.hutool.system.SystemUtil;
+import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.component.core.config.IotComponentCommonProperties;
+import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamHandler;
+import cn.iocoder.yudao.module.iot.component.core.heartbeat.IotComponentRegistry;
+import cn.iocoder.yudao.module.iot.component.emqx.downstream.IotDeviceDownstreamHandlerImpl;
+import cn.iocoder.yudao.module.iot.component.emqx.upstream.IotDeviceUpstreamServer;
+import io.vertx.core.Vertx;
+import io.vertx.mqtt.MqttClient;
+import io.vertx.mqtt.MqttClientOptions;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.event.ApplicationStartedEvent;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.event.EventListener;
+
+import java.lang.management.ManagementFactory;
+
+/**
+ * IoT 组件 EMQX 的自动配置类
+ *
+ * @author haohao
+ */
+@Slf4j
+@AutoConfiguration
+@EnableConfigurationProperties(IotComponentEmqxProperties.class)
+@ConditionalOnProperty(prefix = "yudao.iot.component.emqx", name = "enabled", havingValue = "true", matchIfMissing = false)
+@ComponentScan(basePackages = {
+ "cn.iocoder.yudao.module.iot.component.core", // 核心包
+ "cn.iocoder.yudao.module.iot.component.emqx" // EMQX组件包
+})
+public class IotComponentEmqxAutoConfiguration {
+
+ /**
+ * 组件key
+ */
+ private static final String PLUGIN_KEY = "emqx";
+
+ public IotComponentEmqxAutoConfiguration() {
+ log.info("[IotComponentEmqxAutoConfiguration][已启动]");
+ }
+
+ @EventListener(ApplicationStartedEvent.class)
+ public void initialize(ApplicationStartedEvent event) {
+ // 从应用上下文中获取需要的Bean
+ IotComponentRegistry componentRegistry = event.getApplicationContext().getBean(IotComponentRegistry.class);
+ IotComponentCommonProperties commonProperties = event.getApplicationContext().getBean(IotComponentCommonProperties.class);
+
+ // 设置当前组件的核心标识
+ commonProperties.setPluginKey(PLUGIN_KEY);
+
+ // 将EMQX组件注册到组件注册表
+ componentRegistry.registerComponent(
+ PLUGIN_KEY,
+ SystemUtil.getHostInfo().getAddress(),
+ 0, // 内嵌模式固定为0
+ getProcessId()
+ );
+
+ log.info("[initialize][IoT EMQX 组件初始化完成]");
+ }
+
+ @Bean
+ public Vertx vertx() {
+ return Vertx.vertx();
+ }
+
+ @Bean
+ public MqttClient mqttClient(Vertx vertx, IotComponentEmqxProperties emqxProperties) {
+ log.info("MQTT配置: host={}, port={}, username={}, ssl={}",
+ emqxProperties.getMqttHost(), emqxProperties.getMqttPort(),
+ emqxProperties.getMqttUsername(), emqxProperties.getMqttSsl());
+
+ MqttClientOptions options = new MqttClientOptions()
+ .setClientId("yudao-iot-downstream-" + IdUtil.fastSimpleUUID())
+ .setUsername(emqxProperties.getMqttUsername())
+ .setPassword(emqxProperties.getMqttPassword());
+
+ if (emqxProperties.getMqttSsl() != null) {
+ options.setSsl(emqxProperties.getMqttSsl());
+ } else {
+ options.setSsl(false);
+ log.warn("MQTT SSL配置为null,默认设置为false");
+ }
+
+ return MqttClient.create(vertx, options);
+ }
+
+ @Bean(name = "emqxDeviceUpstreamServer", initMethod = "start", destroyMethod = "stop")
+ public IotDeviceUpstreamServer deviceUpstreamServer(IotDeviceUpstreamApi deviceUpstreamApi,
+ IotComponentEmqxProperties emqxProperties,
+ Vertx vertx,
+ MqttClient mqttClient,
+ IotComponentRegistry componentRegistry) {
+ return new IotDeviceUpstreamServer(emqxProperties, deviceUpstreamApi, vertx, mqttClient, componentRegistry);
+ }
+
+ @Bean(name = "emqxDeviceDownstreamHandler")
+ public IotDeviceDownstreamHandler deviceDownstreamHandler(MqttClient mqttClient) {
+ return new IotDeviceDownstreamHandlerImpl(mqttClient);
+ }
+
+ /**
+ * 获取当前进程ID
+ *
+ * @return 进程ID
+ */
+ private String getProcessId() {
+ // 获取进程的 name
+ String name = ManagementFactory.getRuntimeMXBean().getName();
+ // 分割名称,格式为 pid@hostname
+ String pid = name.split("@")[0];
+ return pid;
+ }
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/config/IotComponentEmqxProperties.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/config/IotComponentEmqxProperties.java
new file mode 100644
index 0000000000..ff8dc48323
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/config/IotComponentEmqxProperties.java
@@ -0,0 +1,59 @@
+package cn.iocoder.yudao.module.iot.component.emqx.config;
+
+import jakarta.validation.constraints.NotBlank;
+import jakarta.validation.constraints.NotEmpty;
+import jakarta.validation.constraints.NotNull;
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * IoT EMQX组件配置属性
+ */
+@ConfigurationProperties(prefix = "yudao.iot.component.emqx")
+@Data
+public class IotComponentEmqxProperties {
+
+ /**
+ * 是否启用EMQX组件
+ */
+ private Boolean enabled;
+
+ /**
+ * 服务主机
+ */
+ @NotBlank(message = "MQTT服务器主机不能为空")
+ private String mqttHost;
+ /**
+ * 服务端口
+ */
+ @NotNull(message = "MQTT服务器端口不能为空")
+ private Integer mqttPort;
+ /**
+ * 服务用户名
+ */
+ @NotBlank(message = "MQTT服务器用户名不能为空")
+ private String mqttUsername;
+ /**
+ * 服务密码
+ */
+ @NotBlank(message = "MQTT服务器密码不能为空")
+ private String mqttPassword;
+ /**
+ * 是否启用 SSL
+ */
+ @NotNull(message = "MQTT SSL配置不能为空")
+ private Boolean mqttSsl;
+
+ /**
+ * 订阅的主题列表
+ */
+ @NotEmpty(message = "MQTT订阅主题不能为空")
+ private String[] mqttTopics;
+
+ /**
+ * 认证端口
+ */
+ @NotNull(message = "认证端口不能为空")
+ private Integer authPort;
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/downstream/IotDeviceDownstreamHandlerImpl.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/downstream/IotDeviceDownstreamHandlerImpl.java
new file mode 100644
index 0000000000..c05ef0d2f8
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/downstream/IotDeviceDownstreamHandlerImpl.java
@@ -0,0 +1,177 @@
+package cn.iocoder.yudao.module.iot.component.emqx.downstream;
+
+import cn.hutool.core.util.IdUtil;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import cn.iocoder.yudao.framework.common.pojo.CommonResult;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
+import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamHandler;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.mqtt.MqttClient;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Map;
+
+import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.MQTT_TOPIC_ILLEGAL;
+
+/**
+ * EMQX 插件的 {@link IotDeviceDownstreamHandler} 实现类
+ *
+ * @author 芋道源码
+ */
+@Slf4j
+public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandler {
+
+ private static final String SYS_TOPIC_PREFIX = "/sys/";
+
+ // TODO @haohao:是不是可以类似 IotDeviceConfigSetVertxHandler 的建议,抽到统一的枚举类
+ // TODO @haohao:讨论,感觉 mqtt 和 http,可以做个相对统一的格式哈。;回复 都使用 Alink 格式,方便后续扩展。
+ // 设备服务调用 标准 JSON
+ // 请求Topic:/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}
+ // 响应Topic:/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}_reply
+ private static final String SERVICE_TOPIC_PREFIX = "/thing/service/";
+
+ // 设置设备属性 标准 JSON
+ // 请求Topic:/sys/${productKey}/${deviceName}/thing/service/property/set
+ // 响应Topic:/sys/${productKey}/${deviceName}/thing/service/property/set_reply
+ private static final String PROPERTY_SET_TOPIC = "/thing/service/property/set";
+
+ private final MqttClient mqttClient;
+
+ /**
+ * 构造函数
+ *
+ * @param mqttClient MQTT客户端
+ */
+ public IotDeviceDownstreamHandlerImpl(MqttClient mqttClient) {
+ this.mqttClient = mqttClient;
+ }
+
+ @Override
+ public CommonResult invokeDeviceService(IotDeviceServiceInvokeReqDTO reqDTO) {
+ log.info("[invokeService][开始调用设备服务][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
+
+ // 验证参数
+ if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null || reqDTO.getIdentifier() == null) {
+ log.error("[invokeService][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
+ return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
+ }
+
+ try {
+ // 构建请求主题
+ String topic = buildServiceTopic(reqDTO.getProductKey(), reqDTO.getDeviceName(), reqDTO.getIdentifier());
+ // 构建请求消息
+ String requestId = reqDTO.getRequestId() != null ? reqDTO.getRequestId() : generateRequestId();
+ JSONObject request = buildServiceRequest(requestId, reqDTO.getIdentifier(), reqDTO.getParams());
+ // 发送消息
+ publishMessage(topic, request);
+
+ log.info("[invokeService][调用设备服务成功][requestId: {}][topic: {}]", requestId, topic);
+ return CommonResult.success(true);
+ } catch (Exception e) {
+ log.error("[invokeService][调用设备服务异常][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO), e);
+ return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
+ }
+ }
+
+ @Override
+ public CommonResult getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) {
+ return CommonResult.success(true);
+ }
+
+ @Override
+ public CommonResult setDeviceProperty(IotDevicePropertySetReqDTO reqDTO) {
+ // 验证参数
+ log.info("[setProperty][开始设置设备属性][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
+ if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null) {
+ log.error("[setProperty][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
+ return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
+ }
+
+ try {
+ // 构建请求主题
+ String topic = buildPropertySetTopic(reqDTO.getProductKey(), reqDTO.getDeviceName());
+ // 构建请求消息
+ String requestId = reqDTO.getRequestId() != null ? reqDTO.getRequestId() : generateRequestId();
+ JSONObject request = buildPropertySetRequest(requestId, reqDTO.getProperties());
+ // 发送消息
+ publishMessage(topic, request);
+
+ log.info("[setProperty][设置设备属性成功][requestId: {}][topic: {}]", requestId, topic);
+ return CommonResult.success(true);
+ } catch (Exception e) {
+ log.error("[setProperty][设置设备属性异常][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO), e);
+ return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
+ }
+ }
+
+ @Override
+ public CommonResult setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) {
+ return CommonResult.success(true);
+ }
+
+ @Override
+ public CommonResult upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) {
+ return CommonResult.success(true);
+ }
+
+ /**
+ * 构建服务调用主题
+ */
+ private String buildServiceTopic(String productKey, String deviceName, String serviceIdentifier) {
+ return SYS_TOPIC_PREFIX + productKey + "/" + deviceName + SERVICE_TOPIC_PREFIX + serviceIdentifier;
+ }
+
+ /**
+ * 构建属性设置主题
+ */
+ private String buildPropertySetTopic(String productKey, String deviceName) {
+ return SYS_TOPIC_PREFIX + productKey + "/" + deviceName + PROPERTY_SET_TOPIC;
+ }
+
+ // TODO @haohao:这个,后面搞个对象,会不会好点哈?
+
+ /**
+ * 构建服务调用请求
+ */
+ private JSONObject buildServiceRequest(String requestId, String serviceIdentifier, Map params) {
+ return new JSONObject()
+ .set("id", requestId)
+ .set("version", "1.0")
+ .set("method", "thing.service." + serviceIdentifier)
+ .set("params", params != null ? params : new JSONObject());
+ }
+
+ /**
+ * 构建属性设置请求
+ */
+ private JSONObject buildPropertySetRequest(String requestId, Map properties) {
+ return new JSONObject()
+ .set("id", requestId)
+ .set("version", "1.0")
+ .set("method", "thing.service.property.set")
+ .set("params", properties);
+ }
+
+ /**
+ * 发布 MQTT 消息
+ */
+ private void publishMessage(String topic, JSONObject payload) {
+ mqttClient.publish(
+ topic,
+ Buffer.buffer(payload.toString()),
+ MqttQoS.AT_LEAST_ONCE,
+ false,
+ false);
+ log.info("[publishMessage][发送消息成功][topic: {}][payload: {}]", topic, payload);
+ }
+
+ /**
+ * 生成请求 ID
+ */
+ private String generateRequestId() {
+ return IdUtil.fastSimpleUUID();
+ }
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/upstream/IotDeviceUpstreamServer.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/upstream/IotDeviceUpstreamServer.java
new file mode 100644
index 0000000000..2e17ae1266
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/upstream/IotDeviceUpstreamServer.java
@@ -0,0 +1,263 @@
+package cn.iocoder.yudao.module.iot.component.emqx.upstream;
+
+import cn.hutool.core.util.ArrayUtil;
+import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.component.core.heartbeat.IotComponentRegistry;
+import cn.iocoder.yudao.module.iot.component.emqx.config.IotComponentEmqxProperties;
+import cn.iocoder.yudao.module.iot.component.emqx.upstream.router.IotDeviceAuthVertxHandler;
+import cn.iocoder.yudao.module.iot.component.emqx.upstream.router.IotDeviceMqttMessageHandler;
+import cn.iocoder.yudao.module.iot.component.emqx.upstream.router.IotDeviceWebhookVertxHandler;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServer;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.handler.BodyHandler;
+import io.vertx.mqtt.MqttClient;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * IoT 设备下行服务端,接收来自 device 设备的请求,转发给 server 服务器
+ *
+ * 协议:HTTP、MQTT
+ *
+ * @author haohao
+ */
+@Slf4j
+public class IotDeviceUpstreamServer {
+
+ /**
+ * 重连延迟时间(毫秒)
+ */
+ private static final int RECONNECT_DELAY_MS = 5000;
+ /**
+ * 连接超时时间(毫秒)
+ */
+ private static final int CONNECTION_TIMEOUT_MS = 10000;
+ /**
+ * 默认 QoS 级别
+ */
+ private static final MqttQoS DEFAULT_QOS = MqttQoS.AT_LEAST_ONCE;
+
+ private final Vertx vertx;
+ private final HttpServer server;
+ private final MqttClient client;
+ private final IotComponentEmqxProperties emqxProperties;
+ private final IotDeviceMqttMessageHandler mqttMessageHandler;
+ private final IotComponentRegistry componentRegistry;
+
+ /**
+ * 服务运行状态标志
+ */
+ private volatile boolean isRunning = false;
+
+ public IotDeviceUpstreamServer(IotComponentEmqxProperties emqxProperties,
+ IotDeviceUpstreamApi deviceUpstreamApi,
+ Vertx vertx,
+ MqttClient client,
+ IotComponentRegistry componentRegistry) {
+ this.vertx = vertx;
+ this.emqxProperties = emqxProperties;
+ this.client = client;
+ this.componentRegistry = componentRegistry;
+
+ // 创建 Router 实例
+ Router router = Router.router(vertx);
+ router.route().handler(BodyHandler.create()); // 处理 Body
+ router.post(IotDeviceAuthVertxHandler.PATH)
+ // TODO @haohao:疑问,mqtt 的认证,需要通过 http 呀?
+ // 回复:MQTT 认证不必须通过 HTTP 进行,但 HTTP 认证是 EMQX 等 MQTT 服务器支持的一种灵活的认证方式
+ .handler(new IotDeviceAuthVertxHandler(deviceUpstreamApi));
+ // 添加 Webhook 处理器,用于处理设备连接和断开连接事件
+ router.post(IotDeviceWebhookVertxHandler.PATH)
+ .handler(new IotDeviceWebhookVertxHandler(deviceUpstreamApi));
+ // 创建 HttpServer 实例
+ this.server = vertx.createHttpServer().requestHandler(router);
+ this.mqttMessageHandler = new IotDeviceMqttMessageHandler(deviceUpstreamApi, client);
+ }
+
+ /**
+ * 启动 HTTP 服务器、MQTT 客户端
+ */
+ public void start() {
+ if (isRunning) {
+ log.warn("[start][服务已经在运行中,请勿重复启动]");
+ return;
+ }
+ log.info("[start][开始启动服务]");
+
+ // 检查authPort是否为null
+ Integer authPort = emqxProperties.getAuthPort();
+ if (authPort == null) {
+ log.warn("[start][authPort为null,使用默认端口8080]");
+ authPort = 8080; // 默认端口
+ }
+
+ // 1. 启动 HTTP 服务器
+ final Integer finalAuthPort = authPort; // 为lambda表达式创建final变量
+ CompletableFuture httpFuture = server.listen(finalAuthPort)
+ .toCompletionStage()
+ .toCompletableFuture()
+ .thenAccept(v -> log.info("[start][HTTP服务器启动完成,端口: {}]", server.actualPort()));
+
+ // 2. 连接 MQTT Broker
+ CompletableFuture mqttFuture = connectMqtt()
+ .toCompletionStage()
+ .toCompletableFuture()
+ .thenAccept(v -> {
+ // 2.1 添加 MQTT 断开重连监听器
+ client.closeHandler(closeEvent -> {
+ log.warn("[closeHandler][MQTT连接已断开,准备重连]");
+ reconnectWithDelay();
+ });
+ // 2. 设置 MQTT 消息处理器
+ setupMessageHandler();
+ });
+
+ // 3. 等待所有服务启动完成
+ CompletableFuture.allOf(httpFuture, mqttFuture)
+ .orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.error("[start][服务启动失败]", error);
+ } else {
+ isRunning = true;
+ log.info("[start][所有服务启动完成]");
+ }
+ });
+ }
+
+ /**
+ * 设置 MQTT 消息处理器
+ */
+ private void setupMessageHandler() {
+ client.publishHandler(mqttMessageHandler::handle);
+ log.debug("[setupMessageHandler][MQTT消息处理器设置完成]");
+ }
+
+ /**
+ * 重连 MQTT 客户端
+ */
+ private void reconnectWithDelay() {
+ if (!isRunning) {
+ log.info("[reconnectWithDelay][服务已停止,不再尝试重连]");
+ return;
+ }
+
+ vertx.setTimer(RECONNECT_DELAY_MS, id -> {
+ log.info("[reconnectWithDelay][开始重新连接 MQTT]");
+ connectMqtt();
+ });
+ }
+
+ /**
+ * 连接 MQTT Broker 并订阅主题
+ *
+ * @return 连接结果的Future
+ */
+ private Future connectMqtt() {
+ // 检查必要的MQTT配置
+ String host = emqxProperties.getMqttHost();
+ Integer port = emqxProperties.getMqttPort();
+
+ if (host == null) {
+ String msg = "[connectMqtt][MQTT Host为null,无法连接]";
+ log.error(msg);
+ return Future.failedFuture(new IllegalStateException(msg));
+ }
+
+ if (port == null) {
+ log.warn("[connectMqtt][MQTT Port为null,使用默认端口1883]");
+ port = 1883; // 默认MQTT端口
+ }
+
+ final Integer finalPort = port; // 为lambda表达式创建final变量
+ return client.connect(finalPort, host)
+ .compose(connAck -> {
+ log.info("[connectMqtt][MQTT客户端连接成功]");
+ return subscribeToTopics();
+ })
+ .recover(error -> {
+ log.error("[connectMqtt][连接MQTT Broker失败:]", error);
+ reconnectWithDelay();
+ return Future.failedFuture(error);
+ });
+ }
+
+ /**
+ * 订阅设备上行消息主题
+ *
+ * @return 订阅结果的 Future
+ */
+ private Future subscribeToTopics() {
+ String[] topics = emqxProperties.getMqttTopics();
+ if (ArrayUtil.isEmpty(topics)) {
+ log.warn("[subscribeToTopics][未配置MQTT主题或为null,使用默认主题]");
+ // 默认订阅所有设备上下行主题
+ topics = new String[]{"/device/#"};
+ }
+ log.info("[subscribeToTopics][开始订阅设备上行消息主题]");
+
+ Future compositeFuture = Future.succeededFuture();
+ for (String topic : topics) {
+ if (topic == null) {
+ continue; // 跳过null主题
+ }
+ String trimmedTopic = topic.trim();
+ if (trimmedTopic.isEmpty()) {
+ continue;
+ }
+ compositeFuture = compositeFuture.compose(v -> client.subscribe(trimmedTopic, DEFAULT_QOS.value())
+ .map(ack -> {
+ log.info("[subscribeToTopics][成功订阅主题: {}]", trimmedTopic);
+ return null;
+ })
+ .recover(error -> {
+ log.error("[subscribeToTopics][订阅主题失败: {}]", trimmedTopic, error);
+ return Future.succeededFuture(); // 继续订阅其他主题
+ }));
+ }
+ return compositeFuture;
+ }
+
+ /**
+ * 停止所有服务
+ */
+ public void stop() {
+ if (!isRunning) {
+ log.warn("[stop][服务未运行,无需停止]");
+ return;
+ }
+ log.info("[stop][开始关闭服务]");
+ isRunning = false;
+
+ try {
+ CompletableFuture serverFuture = server != null
+ ? server.close().toCompletionStage().toCompletableFuture()
+ : CompletableFuture.completedFuture(null);
+ CompletableFuture clientFuture = client != null
+ ? client.disconnect().toCompletionStage().toCompletableFuture()
+ : CompletableFuture.completedFuture(null);
+ CompletableFuture vertxFuture = vertx != null
+ ? vertx.close().toCompletionStage().toCompletableFuture()
+ : CompletableFuture.completedFuture(null);
+
+ // 等待所有资源关闭
+ CompletableFuture.allOf(serverFuture, clientFuture, vertxFuture)
+ .orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.error("[stop][服务关闭过程中发生异常]", error);
+ } else {
+ log.info("[stop][所有服务关闭完成]");
+ }
+ });
+ } catch (Exception e) {
+ log.error("[stop][关闭服务异常]", e);
+ throw new RuntimeException("关闭 IoT 设备上行服务失败", e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/upstream/router/IotDeviceAuthVertxHandler.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/upstream/router/IotDeviceAuthVertxHandler.java
new file mode 100644
index 0000000000..5b7f92845d
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/upstream/router/IotDeviceAuthVertxHandler.java
@@ -0,0 +1,64 @@
+package cn.iocoder.yudao.module.iot.component.emqx.upstream.router;
+
+import cn.iocoder.yudao.framework.common.pojo.CommonResult;
+import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEmqxAuthReqDTO;
+import cn.iocoder.yudao.module.iot.component.core.util.IotPluginCommonUtils;
+import io.vertx.core.Handler;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.RoutingContext;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Collections;
+
+/**
+ * IoT EMQX 连接认证的 Vert.x Handler
+ *
+ * 参考:EMQX HTTP
+ *
+ * 注意:该处理器需要返回特定格式:{"result": "allow"} 或 {"result": "deny"},
+ * 以符合 EMQX 认证插件的要求,因此不使用 IotStandardResponse 实体类
+ *
+ * @author haohao
+ */
+@RequiredArgsConstructor
+@Slf4j
+public class IotDeviceAuthVertxHandler implements Handler {
+
+ public static final String PATH = "/mqtt/auth";
+
+ private final IotDeviceUpstreamApi deviceUpstreamApi;
+
+ @Override
+ public void handle(RoutingContext routingContext) {
+ try {
+ // 构建认证请求 DTO
+ JsonObject json = routingContext.body().asJsonObject();
+ String clientId = json.getString("clientid");
+ String username = json.getString("username");
+ String password = json.getString("password");
+ IotDeviceEmqxAuthReqDTO authReqDTO = new IotDeviceEmqxAuthReqDTO()
+ .setClientId(clientId)
+ .setUsername(username)
+ .setPassword(password);
+
+ // 调用认证 API
+ CommonResult authResult = deviceUpstreamApi.authenticateEmqxConnection(authReqDTO);
+ if (authResult.getCode() != 0 || !authResult.getData()) {
+ // 注意:这里必须返回 {"result": "deny"} 格式,以符合 EMQX 认证插件的要求
+ IotPluginCommonUtils.writeJsonResponse(routingContext, Collections.singletonMap("result", "deny"));
+ return;
+ }
+
+ // 响应结果
+ // 注意:这里必须返回 {"result": "allow"} 格式,以符合 EMQX 认证插件的要求
+ IotPluginCommonUtils.writeJsonResponse(routingContext, Collections.singletonMap("result", "allow"));
+ } catch (Exception e) {
+ log.error("[handle][EMQX 认证异常]", e);
+ // 注意:这里必须返回 {"result": "deny"} 格式,以符合 EMQX 认证插件的要求
+ IotPluginCommonUtils.writeJsonResponse(routingContext, Collections.singletonMap("result", "deny"));
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/upstream/router/IotDeviceMqttMessageHandler.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/upstream/router/IotDeviceMqttMessageHandler.java
new file mode 100644
index 0000000000..19463d6a13
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/upstream/router/IotDeviceMqttMessageHandler.java
@@ -0,0 +1,296 @@
+package cn.iocoder.yudao.module.iot.component.emqx.upstream.router;
+
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
+import cn.iocoder.yudao.module.iot.component.core.pojo.IotStandardResponse;
+import cn.iocoder.yudao.module.iot.component.core.util.IotPluginCommonUtils;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.mqtt.MqttClient;
+import io.vertx.mqtt.messages.MqttPublishMessage;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * IoT 设备 MQTT 消息处理器
+ *
+ * 参考:设备属性、事件、服务
+ */
+@Slf4j
+public class IotDeviceMqttMessageHandler {
+
+ // TODO @haohao:讨论,感觉 mqtt 和 http,可以做个相对统一的格式哈;回复 都使用 Alink 格式,方便后续扩展。
+ // 设备上报属性 标准 JSON
+ // 请求 Topic:/sys/${productKey}/${deviceName}/thing/event/property/post
+ // 响应 Topic:/sys/${productKey}/${deviceName}/thing/event/property/post_reply
+
+ // 设备上报事件 标准 JSON
+ // 请求 Topic:/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post
+ // 响应 Topic:/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post_reply
+
+ private static final String SYS_TOPIC_PREFIX = "/sys/";
+ private static final String PROPERTY_POST_TOPIC = "/thing/event/property/post";
+ private static final String EVENT_POST_TOPIC_PREFIX = "/thing/event/";
+ private static final String EVENT_POST_TOPIC_SUFFIX = "/post";
+ private static final String REPLY_SUFFIX = "_reply";
+ private static final String PROPERTY_METHOD = "thing.event.property.post";
+ private static final String EVENT_METHOD_PREFIX = "thing.event.";
+ private static final String EVENT_METHOD_SUFFIX = ".post";
+
+ private final IotDeviceUpstreamApi deviceUpstreamApi;
+ private final MqttClient mqttClient;
+
+ public IotDeviceMqttMessageHandler(IotDeviceUpstreamApi deviceUpstreamApi, MqttClient mqttClient) {
+ this.deviceUpstreamApi = deviceUpstreamApi;
+ this.mqttClient = mqttClient;
+ }
+
+ /**
+ * 处理MQTT消息
+ *
+ * @param message MQTT发布消息
+ */
+ public void handle(MqttPublishMessage message) {
+ String topic = message.topicName();
+ String payload = message.payload().toString();
+ log.info("[messageHandler][接收到消息][topic: {}][payload: {}]", topic, payload);
+
+ try {
+ if (StrUtil.isEmpty(payload)) {
+ log.warn("[messageHandler][消息内容为空][topic: {}]", topic);
+ return;
+ }
+ handleMessage(topic, payload);
+ } catch (Exception e) {
+ log.error("[messageHandler][处理消息失败][topic: {}][payload: {}]", topic, payload, e);
+ }
+ }
+
+ /**
+ * 根据主题类型处理消息
+ *
+ * @param topic 主题
+ * @param payload 消息内容
+ */
+ private void handleMessage(String topic, String payload) {
+ // 校验前缀
+ if (!topic.startsWith(SYS_TOPIC_PREFIX)) {
+ log.warn("[handleMessage][未知的消息类型][topic: {}]", topic);
+ return;
+ }
+
+ // 处理设备属性上报消息
+ if (topic.endsWith(PROPERTY_POST_TOPIC)) {
+ log.info("[handleMessage][接收到设备属性上报][topic: {}]", topic);
+ handlePropertyPost(topic, payload);
+ return;
+ }
+
+ // 处理设备事件上报消息
+ if (topic.contains(EVENT_POST_TOPIC_PREFIX) && topic.endsWith(EVENT_POST_TOPIC_SUFFIX)) {
+ log.info("[handleMessage][接收到设备事件上报][topic: {}]", topic);
+ handleEventPost(topic, payload);
+ return;
+ }
+
+ // 未知消息类型
+ log.warn("[handleMessage][未知的消息类型][topic: {}]", topic);
+ }
+
+ /**
+ * 处理设备属性上报消息
+ *
+ * @param topic 主题
+ * @param payload 消息内容
+ */
+ private void handlePropertyPost(String topic, String payload) {
+ try {
+ // 解析消息内容
+ JSONObject jsonObject = JSONUtil.parseObj(payload);
+ String[] topicParts = parseTopic(topic);
+ if (topicParts == null) {
+ return;
+ }
+
+ // 构建设备属性上报请求对象
+ IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO(jsonObject, topicParts);
+
+ // 调用上游 API 处理设备上报数据
+ deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
+ log.info("[handlePropertyPost][处理设备属性上报成功][topic: {}]", topic);
+
+ // 发送响应消息
+ sendResponse(topic, jsonObject, PROPERTY_METHOD, null);
+ } catch (Exception e) {
+ log.error("[handlePropertyPost][处理设备属性上报失败][topic: {}][payload: {}]", topic, payload, e);
+ }
+ }
+
+ /**
+ * 处理设备事件上报消息
+ *
+ * @param topic 主题
+ * @param payload 消息内容
+ */
+ private void handleEventPost(String topic, String payload) {
+ try {
+ // 解析消息内容
+ JSONObject jsonObject = JSONUtil.parseObj(payload);
+ String[] topicParts = parseTopic(topic);
+ if (topicParts == null) {
+ return;
+ }
+
+ // 构建设备事件上报请求对象
+ IotDeviceEventReportReqDTO reportReqDTO = buildEventReportDTO(jsonObject, topicParts);
+
+ // 调用上游 API 处理设备上报数据
+ deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
+ log.info("[handleEventPost][处理设备事件上报成功][topic: {}]", topic);
+
+ // 从 topic 中获取事件标识符
+ String eventIdentifier = getEventIdentifier(topicParts, topic);
+ if (eventIdentifier == null) {
+ return;
+ }
+
+ // 发送响应消息
+ String method = EVENT_METHOD_PREFIX + eventIdentifier + EVENT_METHOD_SUFFIX;
+ sendResponse(topic, jsonObject, method, null);
+ } catch (Exception e) {
+ log.error("[handleEventPost][处理设备事件上报失败][topic: {}][payload: {}]", topic, payload, e);
+ }
+ }
+
+ /**
+ * 解析主题,获取主题各部分
+ *
+ * @param topic 主题
+ * @return 主题各部分数组,如果解析失败返回null
+ */
+ private String[] parseTopic(String topic) {
+ String[] topicParts = topic.split("/");
+ if (topicParts.length < 7) {
+ log.warn("[parseTopic][主题格式不正确][topic: {}]", topic);
+ return null;
+ }
+ return topicParts;
+ }
+
+ /**
+ * 从主题部分中获取事件标识符
+ *
+ * @param topicParts 主题各部分
+ * @param topic 原始主题,用于日志
+ * @return 事件标识符,如果获取失败返回null
+ */
+ private String getEventIdentifier(String[] topicParts, String topic) {
+ try {
+ return topicParts[6];
+ } catch (ArrayIndexOutOfBoundsException e) {
+ log.warn("[getEventIdentifier][无法从主题中获取事件标识符][topic: {}][topicParts: {}]",
+ topic, Arrays.toString(topicParts));
+ return null;
+ }
+ }
+
+ /**
+ * 发送响应消息
+ *
+ * @param topic 原始主题
+ * @param jsonObject 原始消息JSON对象
+ * @param method 响应方法
+ * @param customData 自定义数据,可为 null
+ */
+ private void sendResponse(String topic, JSONObject jsonObject, String method, Object customData) {
+ String replyTopic = topic + REPLY_SUFFIX;
+
+ // 响应结果
+ IotStandardResponse response = IotStandardResponse.success(
+ jsonObject.getStr("id"), method, customData);
+ try {
+ mqttClient.publish(replyTopic, Buffer.buffer(JsonUtils.toJsonString(response)),
+ MqttQoS.AT_LEAST_ONCE, false, false);
+ log.info("[sendResponse][发送响应消息成功][topic: {}]", replyTopic);
+ } catch (Exception e) {
+ log.error("[sendResponse][发送响应消息失败][topic: {}][response: {}]", replyTopic, response, e);
+ }
+ }
+
+ /**
+ * 构建设备属性上报请求对象
+ *
+ * @param jsonObject 消息内容
+ * @param topicParts 主题部分
+ * @return 设备属性上报请求对象
+ */
+ private IotDevicePropertyReportReqDTO buildPropertyReportDTO(JSONObject jsonObject, String[] topicParts) {
+ IotDevicePropertyReportReqDTO reportReqDTO = new IotDevicePropertyReportReqDTO();
+ reportReqDTO.setRequestId(jsonObject.getStr("id"));
+ reportReqDTO.setProcessId(IotPluginCommonUtils.getProcessId());
+ reportReqDTO.setReportTime(LocalDateTime.now());
+ reportReqDTO.setProductKey(topicParts[2]);
+ reportReqDTO.setDeviceName(topicParts[3]);
+
+ // 只使用标准JSON格式处理属性数据
+ JSONObject params = jsonObject.getJSONObject("params");
+ if (params == null) {
+ log.warn("[buildPropertyReportDTO][消息格式不正确,缺少params字段][jsonObject: {}]", jsonObject);
+ params = new JSONObject();
+ }
+
+ // 将标准格式的params转换为平台需要的properties格式
+ Map properties = new HashMap<>();
+ for (Map.Entry entry : params.entrySet()) {
+ String key = entry.getKey();
+ Object valueObj = entry.getValue();
+
+ // 如果是复杂结构(包含value和time)
+ if (valueObj instanceof JSONObject valueJson) {
+ properties.put(key, valueJson.getOrDefault("value", valueObj));
+ } else {
+ properties.put(key, valueObj);
+ }
+ }
+ reportReqDTO.setProperties(properties);
+
+ return reportReqDTO;
+ }
+
+ /**
+ * 构建设备事件上报请求对象
+ *
+ * @param jsonObject 消息内容
+ * @param topicParts 主题部分
+ * @return 设备事件上报请求对象
+ */
+ private IotDeviceEventReportReqDTO buildEventReportDTO(JSONObject jsonObject, String[] topicParts) {
+ IotDeviceEventReportReqDTO reportReqDTO = new IotDeviceEventReportReqDTO();
+ reportReqDTO.setRequestId(jsonObject.getStr("id"));
+ reportReqDTO.setProcessId(IotPluginCommonUtils.getProcessId());
+ reportReqDTO.setReportTime(LocalDateTime.now());
+ reportReqDTO.setProductKey(topicParts[2]);
+ reportReqDTO.setDeviceName(topicParts[3]);
+ reportReqDTO.setIdentifier(topicParts[6]);
+
+ // 只使用标准JSON格式处理事件参数
+ JSONObject params = jsonObject.getJSONObject("params");
+ if (params == null) {
+ log.warn("[buildEventReportDTO][消息格式不正确,缺少params字段][jsonObject: {}]", jsonObject);
+ params = new JSONObject();
+ }
+ reportReqDTO.setParams(params);
+
+ return reportReqDTO;
+ }
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/upstream/router/IotDeviceWebhookVertxHandler.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/upstream/router/IotDeviceWebhookVertxHandler.java
new file mode 100644
index 0000000000..7efd0b9343
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/component/emqx/upstream/router/IotDeviceWebhookVertxHandler.java
@@ -0,0 +1,152 @@
+package cn.iocoder.yudao.module.iot.component.emqx.upstream.router;
+
+import cn.hutool.core.util.StrUtil;
+import cn.iocoder.yudao.framework.common.pojo.CommonResult;
+import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
+import cn.iocoder.yudao.module.iot.component.core.util.IotPluginCommonUtils;
+import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
+import io.vertx.core.Handler;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.RoutingContext;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.LocalDateTime;
+import java.util.Collections;
+
+/**
+ * IoT EMQX Webhook 事件处理的 Vert.x Handler
+ *
+ * 参考:EMQX Webhook
+ *
+ * 注意:该处理器需要返回特定格式:{"result": "success"} 或 {"result": "error"},
+ * 以符合 EMQX Webhook 插件的要求,因此不使用 IotStandardResponse 实体类。
+ *
+ * @author haohao
+ */
+@RequiredArgsConstructor
+@Slf4j
+public class IotDeviceWebhookVertxHandler implements Handler {
+
+ public static final String PATH = "/mqtt/webhook";
+
+ private final IotDeviceUpstreamApi deviceUpstreamApi;
+
+ @Override
+ public void handle(RoutingContext routingContext) {
+ try {
+ // 解析请求体
+ JsonObject json = routingContext.body().asJsonObject();
+ String event = json.getString("event");
+ String clientId = json.getString("clientid");
+ String username = json.getString("username");
+
+ // 处理不同的事件类型
+ switch (event) {
+ case "client.connected":
+ handleClientConnected(clientId, username);
+ break;
+ case "client.disconnected":
+ handleClientDisconnected(clientId, username);
+ break;
+ default:
+ log.info("[handle][未处理的 Webhook 事件] event={}, clientId={}, username={}", event, clientId, username);
+ break;
+ }
+
+ // 返回成功响应
+ // 注意:这里必须返回 {"result": "success"} 格式,以符合 EMQX Webhook 插件的要求
+ IotPluginCommonUtils.writeJsonResponse(routingContext, Collections.singletonMap("result", "success"));
+ } catch (Exception e) {
+ log.error("[handle][处理 Webhook 事件异常]", e);
+ // 注意:这里必须返回 {"result": "error"} 格式,以符合 EMQX Webhook 插件的要求
+ IotPluginCommonUtils.writeJsonResponse(routingContext, Collections.singletonMap("result", "error"));
+ }
+ }
+
+ /**
+ * 处理客户端连接事件
+ *
+ * @param clientId 客户端ID
+ * @param username 用户名
+ */
+ private void handleClientConnected(String clientId, String username) {
+ // 解析产品标识和设备名称
+ if (StrUtil.isEmpty(username) || "undefined".equals(username)) {
+ log.warn("[handleClientConnected][客户端连接事件,但用户名为空] clientId={}", clientId);
+ return;
+ }
+ String[] parts = parseUsername(username);
+ if (parts == null) {
+ return;
+ }
+
+ // 更新设备状态为在线
+ IotDeviceStateUpdateReqDTO updateReqDTO = new IotDeviceStateUpdateReqDTO();
+ updateReqDTO.setProductKey(parts[1]);
+ updateReqDTO.setDeviceName(parts[0]);
+ updateReqDTO.setState(IotDeviceStateEnum.ONLINE.getState());
+ updateReqDTO.setProcessId(IotPluginCommonUtils.getProcessId());
+ updateReqDTO.setReportTime(LocalDateTime.now());
+ CommonResult result = deviceUpstreamApi.updateDeviceState(updateReqDTO);
+ if (result.getCode() != 0 || !result.getData()) {
+ log.error("[handleClientConnected][更新设备状态为在线失败] clientId={}, username={}, code={}, msg={}",
+ clientId, username, result.getCode(), result.getMsg());
+ } else {
+ log.info("[handleClientConnected][更新设备状态为在线成功] clientId={}, username={}", clientId, username);
+ }
+ }
+
+ /**
+ * 处理客户端断开连接事件
+ *
+ * @param clientId 客户端ID
+ * @param username 用户名
+ */
+ private void handleClientDisconnected(String clientId, String username) {
+ // 解析产品标识和设备名称
+ if (StrUtil.isEmpty(username) || "undefined".equals(username)) {
+ log.warn("[handleClientDisconnected][客户端断开连接事件,但用户名为空] clientId={}", clientId);
+ return;
+ }
+ String[] parts = parseUsername(username);
+ if (parts == null) {
+ return;
+ }
+
+ // 更新设备状态为离线
+ IotDeviceStateUpdateReqDTO offlineReqDTO = new IotDeviceStateUpdateReqDTO();
+ offlineReqDTO.setProductKey(parts[1]);
+ offlineReqDTO.setDeviceName(parts[0]);
+ offlineReqDTO.setState(IotDeviceStateEnum.OFFLINE.getState());
+ offlineReqDTO.setProcessId(IotPluginCommonUtils.getProcessId());
+ offlineReqDTO.setReportTime(LocalDateTime.now());
+ CommonResult offlineResult = deviceUpstreamApi.updateDeviceState(offlineReqDTO);
+ if (offlineResult.getCode() != 0 || !offlineResult.getData()) {
+ log.error("[handleClientDisconnected][更新设备状态为离线失败] clientId={}, username={}, code={}, msg={}",
+ clientId, username, offlineResult.getCode(), offlineResult.getMsg());
+ } else {
+ log.info("[handleClientDisconnected][更新设备状态为离线成功] clientId={}, username={}", clientId, username);
+ }
+ }
+
+ /**
+ * 解析用户名,格式为 deviceName&productKey
+ *
+ * @param username 用户名
+ * @return 解析结果,[0] 为 deviceName,[1] 为 productKey,解析失败返回 null
+ */
+ private String[] parseUsername(String username) {
+ if (StrUtil.isEmpty(username)) {
+ return null;
+ }
+ String[] parts = username.split("&");
+ if (parts.length != 2) {
+ log.warn("[parseUsername][用户名格式({})不正确,无法解析产品标识和设备名称]", username);
+ return null;
+ }
+ return parts;
+ }
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 0000000000..bf8624f153
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+cn.iocoder.yudao.module.iot.component.emqx.config.IotComponentEmqxAutoConfiguration
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/resources/application.yml b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/resources/application.yml
new file mode 100644
index 0000000000..01002c653a
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-emqx/src/main/resources/application.yml
@@ -0,0 +1,18 @@
+# EMQX组件默认配置
+yudao:
+ iot:
+ component:
+ # 核心组件配置
+ core:
+ plugin-key: emqx # 插件的唯一标识
+ # EMQX组件配置
+# emqx:
+# enabled: true # 启用EMQX组件
+# mqtt-host: 127.0.0.1 # MQTT服务器主机地址
+# mqtt-port: 1883 # MQTT服务器端口
+# mqtt-username: yudao # MQTT服务器用户名
+# mqtt-password: 123456 # MQTT服务器密码
+# mqtt-ssl: false # 是否启用SSL
+# mqtt-topics: # 订阅的主题列表
+# - "/sys/#"
+# auth-port: 8101 # 认证端口
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/pom.xml b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/pom.xml
new file mode 100644
index 0000000000..cd40c99bcc
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/pom.xml
@@ -0,0 +1,47 @@
+
+
+
+ yudao-module-iot-components
+ cn.iocoder.boot
+ ${revision}
+
+ 4.0.0
+ yudao-module-iot-component-http
+ jar
+
+ ${project.artifactId}
+
+ 物联网组件 HTTP 模块
+
+
+
+
+ cn.iocoder.boot
+ yudao-module-iot-component-core
+ ${revision}
+
+
+
+
+
+
+
+
+
+
+
+ io.vertx
+ vertx-web
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-validation
+ true
+
+
+
+
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/java/cn/iocoder/yudao/module/iot/component/http/config/IotComponentHttpAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/java/cn/iocoder/yudao/module/iot/component/http/config/IotComponentHttpAutoConfiguration.java
new file mode 100644
index 0000000000..ec5f70dbe2
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/java/cn/iocoder/yudao/module/iot/component/http/config/IotComponentHttpAutoConfiguration.java
@@ -0,0 +1,91 @@
+package cn.iocoder.yudao.module.iot.component.http.config;
+
+import cn.hutool.system.SystemUtil;
+import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.component.core.config.IotComponentCommonProperties;
+import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamHandler;
+import cn.iocoder.yudao.module.iot.component.core.heartbeat.IotComponentRegistry;
+import cn.iocoder.yudao.module.iot.component.http.downstream.IotDeviceDownstreamHandlerImpl;
+import cn.iocoder.yudao.module.iot.component.http.upstream.IotDeviceUpstreamServer;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.event.ApplicationStartedEvent;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.event.EventListener;
+
+import java.lang.management.ManagementFactory;
+
+/**
+ * IoT 组件 HTTP 的自动配置类
+ *
+ * @author haohao
+ */
+@Slf4j
+@AutoConfiguration
+@EnableConfigurationProperties(IotComponentHttpProperties.class)
+@ConditionalOnProperty(prefix = "yudao.iot.component.http", name = "enabled", havingValue = "true", matchIfMissing = false)
+@ComponentScan(basePackages = {
+ "cn.iocoder.yudao.module.iot.component.core", // 核心包
+ "cn.iocoder.yudao.module.iot.component.http" // HTTP组件包
+})
+public class IotComponentHttpAutoConfiguration {
+
+ /**
+ * 组件key
+ */
+ private static final String PLUGIN_KEY = "http";
+
+ public IotComponentHttpAutoConfiguration() {
+ log.info("[IotComponentHttpAutoConfiguration][已启动]");
+ }
+
+ @EventListener(ApplicationStartedEvent.class)
+ public void initialize(ApplicationStartedEvent event) {
+ // 从应用上下文中获取需要的Bean
+ IotComponentRegistry componentRegistry = event.getApplicationContext().getBean(IotComponentRegistry.class);
+ IotComponentCommonProperties commonProperties = event.getApplicationContext()
+ .getBean(IotComponentCommonProperties.class);
+
+ // 设置当前组件的核心标识
+ commonProperties.setPluginKey(PLUGIN_KEY);
+
+ // 将HTTP组件注册到组件注册表
+ componentRegistry.registerComponent(
+ PLUGIN_KEY,
+ SystemUtil.getHostInfo().getAddress(),
+ 0, // 内嵌模式固定为0
+ getProcessId());
+
+ log.info("[initialize][IoT HTTP 组件初始化完成]");
+ }
+
+ @Bean(name = "httpDeviceUpstreamServer", initMethod = "start", destroyMethod = "stop")
+ public IotDeviceUpstreamServer deviceUpstreamServer(IotDeviceUpstreamApi deviceUpstreamApi,
+ IotComponentHttpProperties properties,
+ ApplicationContext applicationContext,
+ IotComponentRegistry componentRegistry) {
+ return new IotDeviceUpstreamServer(properties, deviceUpstreamApi, applicationContext, componentRegistry);
+ }
+
+ @Bean(name = "httpDeviceDownstreamHandler")
+ public IotDeviceDownstreamHandler deviceDownstreamHandler() {
+ return new IotDeviceDownstreamHandlerImpl();
+ }
+
+ /**
+ * 获取当前进程ID
+ *
+ * @return 进程ID
+ */
+ private String getProcessId() {
+ // 获取进程的 name
+ String name = ManagementFactory.getRuntimeMXBean().getName();
+ // 分割名称,格式为 pid@hostname
+ String pid = name.split("@")[0];
+ return pid;
+ }
+}
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/java/cn/iocoder/yudao/module/iot/component/http/config/IotComponentHttpProperties.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/java/cn/iocoder/yudao/module/iot/component/http/config/IotComponentHttpProperties.java
new file mode 100644
index 0000000000..dd3fecf759
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/java/cn/iocoder/yudao/module/iot/component/http/config/IotComponentHttpProperties.java
@@ -0,0 +1,25 @@
+package cn.iocoder.yudao.module.iot.component.http.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.validation.annotation.Validated;
+
+/**
+ * IoT HTTP组件配置属性
+ */
+@ConfigurationProperties(prefix = "yudao.iot.component.http")
+@Validated
+@Data
+public class IotComponentHttpProperties {
+
+ /**
+ * 是否启用
+ */
+ private Boolean enabled;
+
+ /**
+ * HTTP 服务端口
+ */
+ private Integer serverPort;
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/java/cn/iocoder/yudao/module/iot/component/http/downstream/IotDeviceDownstreamHandlerImpl.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/java/cn/iocoder/yudao/module/iot/component/http/downstream/IotDeviceDownstreamHandlerImpl.java
new file mode 100644
index 0000000000..4519bda1bf
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/java/cn/iocoder/yudao/module/iot/component/http/downstream/IotDeviceDownstreamHandlerImpl.java
@@ -0,0 +1,44 @@
+package cn.iocoder.yudao.module.iot.component.http.downstream;
+
+import cn.iocoder.yudao.framework.common.pojo.CommonResult;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
+import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamHandler;
+
+import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.NOT_IMPLEMENTED;
+
+/**
+ * HTTP 插件的 {@link IotDeviceDownstreamHandler} 实现类
+ *
+ * 但是:由于设备通过 HTTP 短链接接入,导致其实无法下行指导给 device 设备,所以基本都是直接返回失败!!!
+ * 类似 MQTT、WebSocket、TCP 插件,是可以实现下行指令的。
+ *
+ * @author 芋道源码
+ */
+public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandler {
+
+ @Override
+ public CommonResult invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) {
+ return CommonResult.error(NOT_IMPLEMENTED.getCode(), "HTTP 不支持调用设备服务");
+ }
+
+ @Override
+ public CommonResult getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) {
+ return CommonResult.error(NOT_IMPLEMENTED.getCode(), "HTTP 不支持获取设备属性");
+ }
+
+ @Override
+ public CommonResult setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) {
+ return CommonResult.error(NOT_IMPLEMENTED.getCode(), "HTTP 不支持设置设备属性");
+ }
+
+ @Override
+ public CommonResult setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) {
+ return CommonResult.error(NOT_IMPLEMENTED.getCode(), "HTTP 不支持设置设备属性");
+ }
+
+ @Override
+ public CommonResult upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) {
+ return CommonResult.error(NOT_IMPLEMENTED.getCode(), "HTTP 不支持设置设备属性");
+ }
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/java/cn/iocoder/yudao/module/iot/component/http/upstream/IotDeviceUpstreamServer.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/java/cn/iocoder/yudao/module/iot/component/http/upstream/IotDeviceUpstreamServer.java
new file mode 100644
index 0000000000..ff570f1867
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/java/cn/iocoder/yudao/module/iot/component/http/upstream/IotDeviceUpstreamServer.java
@@ -0,0 +1,91 @@
+package cn.iocoder.yudao.module.iot.component.http.upstream;
+
+import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.component.core.heartbeat.IotComponentRegistry;
+import cn.iocoder.yudao.module.iot.component.http.config.IotComponentHttpProperties;
+import cn.iocoder.yudao.module.iot.component.http.upstream.router.IotDeviceUpstreamVertxHandler;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpServer;
+import io.vertx.ext.web.Router;
+import io.vertx.ext.web.handler.BodyHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.ApplicationContext;
+
+/**
+ * IoT 设备下行服务端,接收来自 device 设备的请求,转发给 server 服务器
+ *
+ * 协议:HTTP
+ *
+ * @author haohao
+ */
+@Slf4j
+public class IotDeviceUpstreamServer {
+
+ private final Vertx vertx;
+ private final HttpServer server;
+ private final IotComponentHttpProperties properties;
+ private final IotComponentRegistry componentRegistry;
+
+ public IotDeviceUpstreamServer(IotComponentHttpProperties properties,
+ IotDeviceUpstreamApi deviceUpstreamApi,
+ ApplicationContext applicationContext,
+ IotComponentRegistry componentRegistry) {
+ this.properties = properties;
+ this.componentRegistry = componentRegistry;
+
+ // 创建 Vertx 实例
+ this.vertx = Vertx.vertx();
+ // 创建 Router 实例
+ Router router = Router.router(vertx);
+ router.route().handler(BodyHandler.create()); // 处理 Body
+
+ // 使用统一的 Handler 处理所有上行请求
+ IotDeviceUpstreamVertxHandler upstreamHandler = new IotDeviceUpstreamVertxHandler(deviceUpstreamApi,
+ applicationContext);
+ router.post(IotDeviceUpstreamVertxHandler.PROPERTY_PATH).handler(upstreamHandler);
+ router.post(IotDeviceUpstreamVertxHandler.EVENT_PATH).handler(upstreamHandler);
+
+ // 创建 HttpServer 实例
+ this.server = vertx.createHttpServer().requestHandler(router);
+ }
+
+ /**
+ * 启动 HTTP 服务器
+ */
+ public void start() {
+ log.info("[start][开始启动]");
+ server.listen(properties.getServerPort())
+ .toCompletionStage()
+ .toCompletableFuture()
+ .join();
+ log.info("[start][启动完成,端口({})]", this.server.actualPort());
+ }
+
+ /**
+ * 停止所有
+ */
+ public void stop() {
+ log.info("[stop][开始关闭]");
+ try {
+ // 关闭 HTTP 服务器
+ if (server != null) {
+ server.close()
+ .toCompletionStage()
+ .toCompletableFuture()
+ .join();
+ }
+
+ // 关闭 Vertx 实例
+ if (vertx != null) {
+ vertx.close()
+ .toCompletionStage()
+ .toCompletableFuture()
+ .join();
+ }
+ log.info("[stop][关闭完成]");
+ } catch (Exception e) {
+ log.error("[stop][关闭异常]", e);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/java/cn/iocoder/yudao/module/iot/component/http/upstream/router/IotDeviceUpstreamVertxHandler.java b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/java/cn/iocoder/yudao/module/iot/component/http/upstream/router/IotDeviceUpstreamVertxHandler.java
new file mode 100644
index 0000000000..d1d30575a7
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/java/cn/iocoder/yudao/module/iot/component/http/upstream/router/IotDeviceUpstreamVertxHandler.java
@@ -0,0 +1,212 @@
+package cn.iocoder.yudao.module.iot.component.http.upstream.router;
+
+import cn.hutool.core.util.IdUtil;
+import cn.hutool.core.util.ObjUtil;
+import cn.iocoder.yudao.framework.common.pojo.CommonResult;
+import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
+import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
+import cn.iocoder.yudao.module.iot.component.core.pojo.IotStandardResponse;
+import cn.iocoder.yudao.module.iot.component.core.util.IotPluginCommonUtils;
+import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
+import io.vertx.core.Handler;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.RoutingContext;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.ApplicationContext;
+
+import java.time.LocalDateTime;
+import java.util.HashMap;
+import java.util.Map;
+
+import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
+import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
+
+/**
+ * IoT 设备上行统一处理的 Vert.x Handler
+ *
+ * 统一处理设备属性上报和事件上报的请求
+ *
+ * @author haohao
+ */
+@Slf4j
+public class IotDeviceUpstreamVertxHandler implements Handler {
+
+ /**
+ * 属性上报路径
+ */
+ public static final String PROPERTY_PATH = "/sys/:productKey/:deviceName/thing/event/property/post";
+ /**
+ * 事件上报路径
+ */
+ public static final String EVENT_PATH = "/sys/:productKey/:deviceName/thing/event/:identifier/post";
+
+ private static final String PROPERTY_METHOD = "thing.event.property.post";
+ private static final String EVENT_METHOD_PREFIX = "thing.event.";
+ private static final String EVENT_METHOD_SUFFIX = ".post";
+
+ private final IotDeviceUpstreamApi deviceUpstreamApi;
+// private final HttpScriptService scriptService;
+
+ public IotDeviceUpstreamVertxHandler(IotDeviceUpstreamApi deviceUpstreamApi,
+ ApplicationContext applicationContext) {
+ this.deviceUpstreamApi = deviceUpstreamApi;
+// this.scriptService = applicationContext.getBean(HttpScriptService.class);
+ }
+
+ @Override
+ public void handle(RoutingContext routingContext) {
+ String path = routingContext.request().path();
+ String requestId = IdUtil.fastSimpleUUID();
+
+ try {
+ // 1. 解析通用参数
+ String productKey = routingContext.pathParam("productKey");
+ String deviceName = routingContext.pathParam("deviceName");
+ JsonObject body = routingContext.body().asJsonObject();
+ requestId = ObjUtil.defaultIfBlank(body.getString("id"), requestId);
+
+ // 2. 根据路径模式处理不同类型的请求
+ CommonResult result;
+ String method;
+ if (path.matches(".*/thing/event/property/post")) {
+ // 处理属性上报
+ IotDevicePropertyReportReqDTO reportReqDTO = parsePropertyReportRequest(productKey, deviceName,
+ requestId, body);
+
+ // 设备上线
+ updateDeviceState(reportReqDTO.getProductKey(), reportReqDTO.getDeviceName());
+
+ // 属性上报
+ result = deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
+ method = PROPERTY_METHOD;
+ } else if (path.matches(".*/thing/event/.+/post")) {
+ // 处理事件上报
+ String identifier = routingContext.pathParam("identifier");
+ IotDeviceEventReportReqDTO reportReqDTO = parseEventReportRequest(productKey, deviceName, identifier,
+ requestId, body);
+
+ // 设备上线
+ updateDeviceState(reportReqDTO.getProductKey(), reportReqDTO.getDeviceName());
+
+ // 事件上报
+ result = deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
+ method = EVENT_METHOD_PREFIX + identifier + EVENT_METHOD_SUFFIX;
+ } else {
+ // 不支持的请求路径
+ IotStandardResponse errorResponse = IotStandardResponse.error(requestId, "unknown",
+ BAD_REQUEST.getCode(), "不支持的请求路径");
+ IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
+ return;
+ }
+
+ // 3. 返回标准响应
+ IotStandardResponse response;
+ if (result.isSuccess()) {
+ response = IotStandardResponse.success(requestId, method, result.getData());
+ } else {
+ response = IotStandardResponse.error(requestId, method, result.getCode(), result.getMsg());
+ }
+ IotPluginCommonUtils.writeJsonResponse(routingContext, response);
+ } catch (Exception e) {
+ log.error("[handle][处理上行请求异常] path={}", path, e);
+ String method = path.contains("/property/") ? PROPERTY_METHOD
+ : EVENT_METHOD_PREFIX + (routingContext.pathParams().containsKey("identifier")
+ ? routingContext.pathParam("identifier")
+ : "unknown") + EVENT_METHOD_SUFFIX;
+ IotStandardResponse errorResponse = IotStandardResponse.error(requestId, method,
+ INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
+ IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
+ }
+ }
+
+ /**
+ * 更新设备状态
+ *
+ * @param productKey 产品 Key
+ * @param deviceName 设备名称
+ */
+ private void updateDeviceState(String productKey, String deviceName) {
+ deviceUpstreamApi.updateDeviceState(((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO()
+ .setRequestId(IdUtil.fastSimpleUUID()).setProcessId(IotPluginCommonUtils.getProcessId())
+ .setReportTime(LocalDateTime.now())
+ .setProductKey(productKey).setDeviceName(deviceName)).setState(IotDeviceStateEnum.ONLINE.getState()));
+ }
+
+ /**
+ * 解析属性上报请求
+ *
+ * @param productKey 产品 Key
+ * @param deviceName 设备名称
+ * @param requestId 请求 ID
+ * @param body 请求体
+ * @return 属性上报请求 DTO
+ */
+ private IotDevicePropertyReportReqDTO parsePropertyReportRequest(String productKey, String deviceName,
+ String requestId, JsonObject body) {
+ // 使用脚本解析数据
+// Map properties = scriptService.parsePropertyData(productKey, deviceName, body);
+
+
+ // 如果脚本解析结果为空,使用默认解析逻辑
+ // TODO @芋艿:注释说明一下,为什么要这么处理?
+// if (CollUtil.isNotEmpty(properties)) {
+ Map properties = new HashMap<>();
+ Map params = body.getJsonObject("params") != null ?
+ body.getJsonObject("params").getMap() : null;
+ if (params != null) {
+ // 将标准格式的 params 转换为平台需要的 properties 格式
+ for (Map.Entry entry : params.entrySet()) {
+ String key = entry.getKey();
+ Object valueObj = entry.getValue();
+ // 如果是复杂结构(包含 value 和 time)
+ if (valueObj instanceof Map) {
+ @SuppressWarnings("unchecked")
+ Map valueMap = (Map) valueObj;
+ properties.put(key, valueMap.getOrDefault("value", valueObj));
+ } else {
+ properties.put(key, valueObj);
+ }
+ }
+ }
+// }
+
+ // 构建属性上报请求 DTO
+ return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO().setRequestId(requestId)
+ .setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
+ .setProductKey(productKey).setDeviceName(deviceName)).setProperties(properties);
+ }
+
+ /**
+ * 解析事件上报请求
+ *
+ * @param productKey 产品K ey
+ * @param deviceName 设备名称
+ * @param identifier 事件标识符
+ * @param requestId 请求 ID
+ * @param body 请求体
+ * @return 事件上报请求 DTO
+ */
+ private IotDeviceEventReportReqDTO parseEventReportRequest(String productKey, String deviceName, String identifier,
+ String requestId, JsonObject body) {
+ // 使用脚本解析事件数据
+// Map params = scriptService.parseEventData(productKey, deviceName, identifier, body);
+ Map params = null;
+
+ // 如果脚本解析结果为空,使用默认解析逻辑
+// if (CollUtil.isNotEmpty(params)) {
+ if (body.containsKey("params")) {
+ params = body.getJsonObject("params").getMap();
+ } else {
+ // 兼容旧格式
+ params = new HashMap<>();
+ }
+// }
+
+ // 构建事件上报请求 DTO
+ return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO().setRequestId(requestId)
+ .setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
+ .setProductKey(productKey).setDeviceName(deviceName)).setIdentifier(identifier).setParams(params);
+ }
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 0000000000..f735566c97
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+cn.iocoder.yudao.module.iot.component.http.config.IotComponentHttpAutoConfiguration
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/resources/application.yml b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/resources/application.yml
new file mode 100644
index 0000000000..bdb6b74970
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-components/yudao-module-iot-component-http/src/main/resources/application.yml
@@ -0,0 +1,10 @@
+# HTTP组件默认配置
+yudao:
+ iot:
+ component:
+ core:
+ plugin-key: http # 插件的唯一标识
+# http:
+# enabled: true # 是否启用HTTP组件,默认启用
+# server-port: 8092
+