【功能新增】IoT: 重构物联网模块,新增组件支持,更新依赖项,优化代码结构。具体更改包括:将插件模块替换为组件模块,添加 HTTP 和 EMQX 组件,更新相关依赖,优化心跳和下行处理逻辑,增加组件配置属性,完善文档说明。

This commit is contained in:
安浩浩 2025-04-02 21:28:56 +08:00
parent 516e3a2387
commit b5ce269fef
36 changed files with 2700 additions and 96 deletions

View File

@ -10,7 +10,8 @@
<modules>
<module>yudao-module-iot-api</module>
<module>yudao-module-iot-biz</module>
<module>yudao-module-iot-plugins</module>
<module>yudao-module-iot-components</module>
<!-- <module>yudao-module-iot-plugins</module>-->
</modules>
<modelVersion>4.0.0</modelVersion>

View File

@ -24,6 +24,16 @@
<artifactId>yudao-module-iot-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-component-http</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-component-emqx</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.iocoder.boot</groupId>
@ -70,11 +80,11 @@
</dependency>
<!-- 脚本插件相关 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-plugin-script</artifactId>
<version>${revision}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>cn.iocoder.boot</groupId>-->
<!-- <artifactId>yudao-module-iot-plugin-script</artifactId>-->
<!-- <version>${revision}</version>-->
<!-- </dependency>-->
<!-- 消息队列相关 -->
<dependency>

View File

@ -5,6 +5,7 @@ import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.*;
import cn.iocoder.yudao.module.iot.service.device.control.IotDeviceUpstreamService;
import cn.iocoder.yudao.module.iot.service.plugin.IotPluginInstanceService;
import jakarta.annotation.Resource;
import org.springframework.context.annotation.Primary;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
@ -15,6 +16,7 @@ import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
*/
@RestController
@Validated
@Primary
public class IoTDeviceUpstreamApiImpl implements IotDeviceUpstreamApi {
@Resource

View File

@ -9,18 +9,13 @@ import cn.iocoder.yudao.module.iot.controller.admin.product.vo.script.IotProduct
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductScriptDO;
import cn.iocoder.yudao.module.iot.dal.mysql.product.IotProductScriptMapper;
import cn.iocoder.yudao.module.iot.plugin.script.context.PluginScriptContext;
import cn.iocoder.yudao.module.iot.plugin.script.service.ScriptService;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.PRODUCT_NOT_EXISTS;
@ -43,8 +38,8 @@ public class IotProductScriptServiceImpl implements IotProductScriptService {
@Resource
private IotProductService productService;
@Resource
private ScriptService scriptService;
// @Resource
// private ScriptService scriptService;
@Override
public Long createProductScript(IotProductScriptSaveReqVO createReqVO) {
@ -121,89 +116,90 @@ public class IotProductScriptServiceImpl implements IotProductScriptService {
@Override
public IotProductScriptTestRespVO testProductScript(IotProductScriptTestReqVO testReqVO) {
long startTime = System.currentTimeMillis();
try {
// 验证产品是否存在
validateProductExists(testReqVO.getProductId());
// 根据ID获取已保存的脚本如果有
IotProductScriptDO existingScript = null;
if (testReqVO.getId() != null) {
existingScript = getProductScript(testReqVO.getId());
}
// 创建测试上下文
PluginScriptContext context = new PluginScriptContext();
IotProductDO product = productService.getProduct(testReqVO.getProductId());
// 设置设备上下文使用产品信息没有具体设备
context.withDeviceContext(product.getProductKey(), null);
// 设置输入参数
Map<String, Object> params = new HashMap<>();
params.put("input", testReqVO.getTestInput());
params.put("productKey", product.getProductKey());
params.put("scriptType", testReqVO.getScriptType());
// 根据脚本类型设置特定参数
switch (testReqVO.getScriptType()) {
case 1: // PROPERTY_PARSER
params.put("method", "property");
break;
case 2: // EVENT_PARSER
params.put("method", "event");
params.put("identifier", "default");
break;
case 3: // COMMAND_ENCODER
params.put("method", "command");
break;
default:
// 默认不添加额外参数
}
// 添加所有参数到上下文
for (Map.Entry<String, Object> entry : params.entrySet()) {
context.setParameter(entry.getKey(), entry.getValue());
}
// 执行脚本
Object result = scriptService.executeScript(
testReqVO.getScriptLanguage(),
testReqVO.getScriptContent(),
context);
// 更新测试结果如果是已保存的脚本
if (existingScript != null) {
IotProductScriptDO updateObj = new IotProductScriptDO();
updateObj.setId(existingScript.getId());
updateObj.setLastTestTime(LocalDateTime.now());
updateObj.setLastTestResult(1); // 1表示成功
productScriptMapper.updateById(updateObj);
}
long executionTime = System.currentTimeMillis() - startTime;
return IotProductScriptTestRespVO.success(result, executionTime);
} catch (Exception e) {
log.error("[testProductScript][测试脚本异常]", e);
// 如果是已保存的脚本更新测试失败状态
if (testReqVO.getId() != null) {
try {
IotProductScriptDO updateObj = new IotProductScriptDO();
updateObj.setId(testReqVO.getId());
updateObj.setLastTestTime(LocalDateTime.now());
updateObj.setLastTestResult(0); // 0表示失败
productScriptMapper.updateById(updateObj);
} catch (Exception ex) {
log.error("[testProductScript][更新脚本测试结果异常]", ex);
}
}
long executionTime = System.currentTimeMillis() - startTime;
return IotProductScriptTestRespVO.error(e.getMessage(), executionTime);
}
// long startTime = System.currentTimeMillis();
//
// try {
// // 验证产品是否存在
// validateProductExists(testReqVO.getProductId());
//
// // 根据ID获取已保存的脚本如果有
// IotProductScriptDO existingScript = null;
// if (testReqVO.getId() != null) {
// existingScript = getProductScript(testReqVO.getId());
// }
//
// // 创建测试上下文
// PluginScriptContext context = new PluginScriptContext();
// IotProductDO product = productService.getProduct(testReqVO.getProductId());
//
// // 设置设备上下文使用产品信息没有具体设备
// context.withDeviceContext(product.getProductKey(), null);
//
// // 设置输入参数
// Map<String, Object> params = new HashMap<>();
// params.put("input", testReqVO.getTestInput());
// params.put("productKey", product.getProductKey());
// params.put("scriptType", testReqVO.getScriptType());
//
// // 根据脚本类型设置特定参数
// switch (testReqVO.getScriptType()) {
// case 1: // PROPERTY_PARSER
// params.put("method", "property");
// break;
// case 2: // EVENT_PARSER
// params.put("method", "event");
// params.put("identifier", "default");
// break;
// case 3: // COMMAND_ENCODER
// params.put("method", "command");
// break;
// default:
// // 默认不添加额外参数
// }
//
// // 添加所有参数到上下文
// for (Map.Entry<String, Object> entry : params.entrySet()) {
// context.setParameter(entry.getKey(), entry.getValue());
// }
//
// // 执行脚本
// Object result = scriptService.executeScript(
// testReqVO.getScriptLanguage(),
// testReqVO.getScriptContent(),
// context);
//
// // 更新测试结果如果是已保存的脚本
// if (existingScript != null) {
// IotProductScriptDO updateObj = new IotProductScriptDO();
// updateObj.setId(existingScript.getId());
// updateObj.setLastTestTime(LocalDateTime.now());
// updateObj.setLastTestResult(1); // 1表示成功
// productScriptMapper.updateById(updateObj);
// }
//
// long executionTime = System.currentTimeMillis() - startTime;
// return IotProductScriptTestRespVO.success(result, executionTime);
//
// } catch (Exception e) {
// log.error("[testProductScript][测试脚本异常]", e);
//
// // 如果是已保存的脚本更新测试失败状态
// if (testReqVO.getId() != null) {
// try {
// IotProductScriptDO updateObj = new IotProductScriptDO();
// updateObj.setId(testReqVO.getId());
// updateObj.setLastTestTime(LocalDateTime.now());
// updateObj.setLastTestResult(0); // 0表示失败
// productScriptMapper.updateById(updateObj);
// } catch (Exception ex) {
// log.error("[testProductScript][更新脚本测试结果异常]", ex);
// }
// }
//
// long executionTime = System.currentTimeMillis() - startTime;
// return IotProductScriptTestRespVO.error(e.getMessage(), executionTime);
// }
return null;
}
@Override

View File

@ -0,0 +1,135 @@
# IOT 组件使用说明
## 组件介绍
该模块包含多个 IoT 设备连接组件,提供不同的通信协议支持:
- `yudao-module-iot-component-core`: 核心接口和通用类
- `yudao-module-iot-component-http`: 基于 HTTP 协议的设备通信组件
- `yudao-module-iot-component-emqx`: 基于 MQTT/EMQX 的设备通信组件
## 组件架构
### 架构设计
各组件采用统一的架构设计和命名规范:
- 配置类: `IotComponentXxxAutoConfiguration` - 提供Bean定义和组件初始化逻辑
- 属性类: `IotComponentXxxProperties` - 定义组件的配置属性
- 下行接口: `*DownstreamHandler` - 处理从平台到设备的下行通信
- 上行接口: `*UpstreamServer` - 处理从设备到平台的上行通信
### Bean 命名规范
为避免 Bean 冲突,各个组件中的 Bean 已添加特定前缀:
- HTTP 组件: `httpDeviceUpstreamServer`, `httpDeviceDownstreamHandler`
- EMQX 组件: `emqxDeviceUpstreamServer`, `emqxDeviceDownstreamHandler`
### 组件启用规则
现在系统支持同时使用多个组件,但有以下规则:
1. 当`yudao.iot.component.emqx.enabled=true`时核心模块将优先使用EMQX组件
2. 如果同时启用了多个组件,需要在业务代码中使用`@Qualifier`指定要使用的具体实现
> **重要提示:**
> 1. 组件库内部的默认配置文件**不会**被自动加载。必须将上述配置添加到主应用的配置文件中。
> 2. 所有配置项现在都已增加空值处理,配置缺失时将使用合理的默认值
> 3. `mqtt-host` 是唯一必须配置的参数,其他参数均有默认值
> 4. `mqtt-ssl``auth-port` 缺失时的默认值分别为 `false``8080`
> 5. `mqtt-topics` 缺失时将使用默认主题 `/device/#`
### 如何引用特定的 Bean
在其他组件中引用这些 Bean 时,需要使用 `@Qualifier` 注解指定 Bean 名称:
```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamHandler;
@Service
public class YourServiceClass {
// 注入 HTTP 组件的下行处理器
@Autowired
@Qualifier("httpDeviceDownstreamHandler")
private IotDeviceDownstreamHandler httpDeviceDownstreamHandler;
// 注入 EMQX 组件的下行处理器
@Autowired
@Qualifier("emqxDeviceDownstreamHandler")
private IotDeviceDownstreamHandler emqxDeviceDownstreamHandler;
// 使用示例
public void example() {
// 使用 HTTP 组件
httpDeviceDownstreamHandler.invokeDeviceService(...);
// 使用 EMQX 组件
emqxDeviceDownstreamHandler.invokeDeviceService(...);
}
}
```
### 组件选择指南
- **HTTP 组件**:适用于简单场景,设备通过 HTTP 接口与平台通信
- **EMQX 组件**:适用于实时性要求高的场景,基于 MQTT 协议,支持发布/订阅模式
## 常见问题
### 1. 配置未加载问题
如果遇到以下日志:
```
MQTT配置: host=null, port=null, username=null, ssl=null
[connectMqtt][MQTT Host为null无法连接]
```
这表明配置没有被正确加载。请确保:
1. 在主应用的配置文件中(如 `application.yml``application-dev.yml`)添加了必要的 EMQX 配置
2. 配置前缀正确:`yudao.iot.component.emqx`
3. 配置了必要的 `mqtt-host` 属性
### 2. mqttSsl 空指针异常
如果遇到以下错误:
```
Cannot invoke "java.lang.Boolean.booleanValue()" because the return value of "cn.iocoder.yudao.module.iot.component.emqx.config.IotEmqxComponentProperties.getMqttSsl()" is null
```
此问题已通过代码修复,现在会自动使用默认值 `false`。同样适用于其他配置项的空值问题。
### 3. authPort 空指针异常
如果遇到以下错误:
```
Cannot invoke "java.lang.Integer.intValue()" because the return value of "cn.iocoder.yudao.module.iot.component.emqx.config.IotEmqxComponentProperties.getAuthPort()" is null
```
此问题已通过代码修复,现在会自动使用默认值 `8080`
### 4. Bean注入问题
如果遇到以下错误:
```
Parameter 1 of method deviceDownstreamServer in IotPluginCommonAutoConfiguration required a single bean, but 2 were found
```
此问题已通过修改核心配置类来解决。现在系统会根据组件的启用状态自动选择合适的实现:
1. 优先使用EMQX组件当`yudao.iot.component.emqx.enabled=true`时)
2. 如果EMQX未启用则使用HTTP组件当`yudao.iot.component.http.enabled=true`时)
如果需要同时使用两个组件,业务代码中必须使用`@Qualifier`明确指定要使用的Bean。
### 5. 使用默认配置
组件现已加入完善的默认配置和空值处理机制,使配置更加灵活。但需要注意的是,这些默认配置值必须通过在主应用配置文件中设置相应的属性才能生效。

View File

@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yudao-module-iot</artifactId>
<groupId>cn.iocoder.boot</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>yudao-module-iot-components</artifactId>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>
物联网组件模块,提供与物联网设备通讯、管理的组件实现
</description>
<modules>
<module>yudao-module-iot-component-core</module>
<module>yudao-module-iot-component-http</module>
<module>yudao-module-iot-component-emqx</module>
</modules>
</project>

View File

@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yudao-module-iot-components</artifactId>
<groupId>cn.iocoder.boot</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>yudao-module-iot-component-core</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
物联网组件核心模块
</description>
<dependencies>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Web 相关 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</dependency>
<!-- 工具类相关 -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
<optional>true</optional>
</dependency>
<!-- 参数校验 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,50 @@
package cn.iocoder.yudao.module.iot.component.core.config;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamServer;
import cn.iocoder.yudao.module.iot.component.core.heartbeat.IotComponentInstanceHeartbeatJob;
import cn.iocoder.yudao.module.iot.component.core.heartbeat.IotComponentRegistry;
import cn.iocoder.yudao.module.iot.component.core.upstream.IotDeviceUpstreamClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* IoT 组件的通用自动配置类
*
* @author haohao
*/
@AutoConfiguration
@EnableConfigurationProperties(IotComponentCommonProperties.class)
@EnableScheduling // 开启定时任务因为 IotComponentInstanceHeartbeatJob 是一个定时任务
public class IotComponentCommonAutoConfiguration {
/**
* 创建EMQX设备下行服务器
* 当yudao.iot.component.emqx.enabled=true时使用emqxDeviceDownstreamHandler
*/
@Bean
@ConditionalOnProperty(prefix = "yudao.iot.component.emqx", name = "enabled", havingValue = "true")
public IotDeviceDownstreamServer emqxDeviceDownstreamServer(IotComponentCommonProperties properties,
@Qualifier("emqxDeviceDownstreamHandler") IotDeviceDownstreamHandler deviceDownstreamHandler) {
return new IotDeviceDownstreamServer(properties, deviceDownstreamHandler);
}
@Bean(initMethod = "init", destroyMethod = "stop")
public IotComponentInstanceHeartbeatJob pluginInstanceHeartbeatJob(IotDeviceUpstreamApi deviceUpstreamApi,
IotDeviceDownstreamServer deviceDownstreamServer,
IotComponentCommonProperties commonProperties,
IotComponentRegistry componentRegistry) {
return new IotComponentInstanceHeartbeatJob(deviceUpstreamApi, deviceDownstreamServer, commonProperties,
componentRegistry);
}
@Bean
public IotDeviceUpstreamClient deviceUpstreamClient() {
return new IotDeviceUpstreamClient();
}
}

View File

@ -0,0 +1,24 @@
package cn.iocoder.yudao.module.iot.component.core.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
/**
* IoT 组件通用配置属性
*
* @author haohao
*/
@ConfigurationProperties(prefix = "yudao.iot.component.core")
@Validated
@Data
public class IotComponentCommonProperties {
/**
* 组件的唯一标识
* <p>
* 注意该值将在运行时由各组件设置不再从配置读取
*/
private String pluginKey;
}

View File

@ -0,0 +1,55 @@
package cn.iocoder.yudao.module.iot.component.core.downstream;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
/**
* IoT 设备下行处理器
* <p>
* 目的每个 plugin 需要实现用于处理 server 下行的指令请求从而实现从 server => plugin => device 的下行流程
*
* @author 芋道源码
*/
public interface IotDeviceDownstreamHandler {
/**
* 调用设备服务
*
* @param invokeReqDTO 调用设备服务的请求
* @return 是否成功
*/
CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO);
/**
* 获取设备属性
*
* @param getReqDTO 获取设备属性的请求
* @return 是否成功
*/
CommonResult<Boolean> getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO);
/**
* 设置设备属性
*
* @param setReqDTO 设置设备属性的请求
* @return 是否成功
*/
CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO);
/**
* 设置设备配置
*
* @param setReqDTO 设置设备配置的请求
* @return 是否成功
*/
CommonResult<Boolean> setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO);
/**
* 升级设备 OTA
*
* @param upgradeReqDTO 升级设备 OTA 的请求
* @return 是否成功
*/
CommonResult<Boolean> upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO);
}

View File

@ -0,0 +1,80 @@
package cn.iocoder.yudao.module.iot.component.core.downstream;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
import cn.iocoder.yudao.module.iot.component.core.config.IotComponentCommonProperties;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* IoT 设备下行服务直接转发给 device 设备
*
* @author 芋道源码
*/
@Slf4j
@RequiredArgsConstructor
public class IotDeviceDownstreamServer {
private final IotComponentCommonProperties properties;
private final IotDeviceDownstreamHandler deviceDownstreamHandler;
/**
* 调用设备服务
*
* @param invokeReqDTO 调用设备服务的请求
* @return 是否成功
*/
public CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) {
return deviceDownstreamHandler.invokeDeviceService(invokeReqDTO);
}
/**
* 获取设备属性
*
* @param getReqDTO 获取设备属性的请求
* @return 是否成功
*/
public CommonResult<Boolean> getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) {
return deviceDownstreamHandler.getDeviceProperty(getReqDTO);
}
/**
* 设置设备属性
*
* @param setReqDTO 设置设备属性的请求
* @return 是否成功
*/
public CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) {
return deviceDownstreamHandler.setDeviceProperty(setReqDTO);
}
/**
* 设置设备配置
*
* @param setReqDTO 设置设备配置的请求
* @return 是否成功
*/
public CommonResult<Boolean> setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) {
return deviceDownstreamHandler.setDeviceConfig(setReqDTO);
}
/**
* 升级设备 OTA
*
* @param upgradeReqDTO 升级设备 OTA 的请求
* @return 是否成功
*/
public CommonResult<Boolean> upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) {
return deviceDownstreamHandler.upgradeDeviceOta(upgradeReqDTO);
}
/**
* 获得内部组件标识
*
* @return 组件标识
*/
public String getComponentId() {
return properties.getPluginKey();
}
}

View File

@ -0,0 +1,131 @@
package cn.iocoder.yudao.module.iot.component.core.heartbeat;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotPluginInstanceHeartbeatReqDTO;
import cn.iocoder.yudao.module.iot.component.core.config.IotComponentCommonProperties;
import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamServer;
import cn.iocoder.yudao.module.iot.component.core.heartbeat.IotComponentRegistry.IotComponentInfo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import java.lang.management.ManagementFactory;
import java.util.concurrent.TimeUnit;
/**
* IoT 组件实例心跳定时任务
* <p>
* 将组件的状态定时上报给 server 服务器
* <p>
* 用于定时发送心跳给服务端
*/
@RequiredArgsConstructor
@Slf4j
public class IotComponentInstanceHeartbeatJob {
/**
* 内嵌模式的端口值固定为0
*/
private static final Integer EMBEDDED_PORT = 0;
private final IotDeviceUpstreamApi deviceUpstreamApi;
private final IotDeviceDownstreamServer deviceDownstreamServer;
private final IotComponentCommonProperties commonProperties;
private final IotComponentRegistry componentRegistry;
/**
* 初始化方法由Spring调用
* 注册当前组件并发送上线心跳
*/
public void init() {
// 将当前组件注册到注册表
String processId = getProcessId();
String hostIp = SystemUtil.getHostInfo().getAddress();
// 注册当前组件
componentRegistry.registerComponent(
commonProperties.getPluginKey(),
hostIp,
EMBEDDED_PORT,
processId);
// 发送所有组件的上线心跳
for (IotComponentInfo component : componentRegistry.getAllComponents()) {
try {
CommonResult<Boolean> result = deviceUpstreamApi.heartbeatPluginInstance(
buildPluginInstanceHeartbeatReqDTO(component, true));
log.info("[init][组件({})上线结果:{})]", component.getPluginKey(), result);
} catch (Exception e) {
log.error("[init][组件({})上线发送异常]", component.getPluginKey(), e);
}
}
}
/**
* 停止方法由Spring调用
* 发送下线心跳并注销组件
*/
public void stop() {
// 发送所有组件的下线心跳
for (IotComponentInfo component : componentRegistry.getAllComponents()) {
try {
CommonResult<Boolean> result = deviceUpstreamApi.heartbeatPluginInstance(
buildPluginInstanceHeartbeatReqDTO(component, false));
log.info("[stop][组件({})下线结果:{})]", component.getPluginKey(), result);
} catch (Exception e) {
log.error("[stop][组件({})下线发送异常]", component.getPluginKey(), e);
}
}
// 注销当前组件
componentRegistry.unregisterComponent(commonProperties.getPluginKey());
}
/**
* 定时发送心跳
*/
@Scheduled(initialDelay = 1, fixedRate = 1, timeUnit = TimeUnit.MINUTES) // 1 分钟执行一次
public void execute() {
// 发送所有组件的心跳
for (IotComponentInfo component : componentRegistry.getAllComponents()) {
try {
CommonResult<Boolean> result = deviceUpstreamApi.heartbeatPluginInstance(
buildPluginInstanceHeartbeatReqDTO(component, true));
log.info("[execute][组件({})心跳结果:{})]", component.getPluginKey(), result);
} catch (Exception e) {
log.error("[execute][组件({})心跳发送异常]", component.getPluginKey(), e);
}
}
}
/**
* 构建心跳DTO
*
* @param component 组件信息
* @param online 是否在线
* @return 心跳DTO
*/
private IotPluginInstanceHeartbeatReqDTO buildPluginInstanceHeartbeatReqDTO(IotComponentInfo component,
Boolean online) {
return new IotPluginInstanceHeartbeatReqDTO()
.setPluginKey(component.getPluginKey())
.setProcessId(component.getProcessId())
.setHostIp(component.getHostIp())
.setDownstreamPort(component.getDownstreamPort())
.setOnline(online);
}
/**
* 获取当前进程ID
*
* @return 进程ID
*/
private String getProcessId() {
// 获取进程的 name
String name = ManagementFactory.getRuntimeMXBean().getName();
// 分割名称格式为 pid@hostname
return name.split("@")[0];
}
}

View File

@ -0,0 +1,92 @@
package cn.iocoder.yudao.module.iot.component.core.heartbeat;
import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* IoT 组件注册表
* <p>
* 用于管理多个组件的注册信息解决多组件心跳问题
*/
@Component
@Slf4j
public class IotComponentRegistry {
/**
* 组件信息
*/
@Data
@ToString
public static class IotComponentInfo {
/**
* 组件Key
*/
private final String pluginKey;
/**
* 主机IP
*/
private final String hostIp;
/**
* 下游端口
*/
private final Integer downstreamPort;
/**
* 进程ID
*/
private final String processId;
}
/**
* 组件映射表key为组件Key
*/
private final Map<String, IotComponentInfo> components = new ConcurrentHashMap<>();
/**
* 注册组件
*
* @param pluginKey 组件Key
* @param hostIp 主机IP
* @param downstreamPort 下游端口
* @param processId 进程ID
*/
public void registerComponent(String pluginKey, String hostIp, Integer downstreamPort, String processId) {
log.info("[registerComponent][注册组件, pluginKey={}, hostIp={}, downstreamPort={}, processId={}]",
pluginKey, hostIp, downstreamPort, processId);
components.put(pluginKey, new IotComponentInfo(pluginKey, hostIp, downstreamPort, processId));
}
/**
* 注销组件
*
* @param pluginKey 组件Key
*/
public void unregisterComponent(String pluginKey) {
log.info("[unregisterComponent][注销组件, pluginKey={}]", pluginKey);
components.remove(pluginKey);
}
/**
* 获取所有组件
*
* @return 所有组件集合
*/
public Collection<IotComponentInfo> getAllComponents() {
return components.values();
}
/**
* 获取指定组件
*
* @param pluginKey 组件Key
* @return 组件信息
*/
public IotComponentInfo getComponent(String pluginKey) {
return components.get(pluginKey);
}
}

View File

@ -0,0 +1,93 @@
package cn.iocoder.yudao.module.iot.component.core.pojo;
import lombok.Data;
/**
* IoT 标准协议响应实体类
* <p>
* 用于统一 MQTT HTTP 的响应格式
*
* @author haohao
*/
@Data
public class IotStandardResponse {
/**
* 消息ID
*/
private String id;
/**
* 状态码
*/
private Integer code;
/**
* 响应数据
*/
private Object data;
/**
* 响应消息
*/
private String message;
/**
* 方法名
*/
private String method;
/**
* 协议版本
*/
private String version;
/**
* 创建成功响应
*
* @param id 消息ID
* @param method 方法名
* @return 成功响应
*/
public static IotStandardResponse success(String id, String method) {
return success(id, method, null);
}
/**
* 创建成功响应
*
* @param id 消息ID
* @param method 方法名
* @param data 响应数据
* @return 成功响应
*/
public static IotStandardResponse success(String id, String method, Object data) {
return new IotStandardResponse()
.setId(id)
.setCode(200)
.setData(data)
.setMessage("success")
.setMethod(method)
.setVersion("1.0");
}
/**
* 创建错误响应
*
* @param id 消息ID
* @param method 方法名
* @param code 错误码
* @param message 错误消息
* @return 错误响应
*/
public static IotStandardResponse error(String id, String method, Integer code, String message) {
return new IotStandardResponse()
.setId(id)
.setCode(code)
.setData(null)
.setMessage(message)
.setMethod(method)
.setVersion("1.0");
}
}

View File

@ -0,0 +1,62 @@
package cn.iocoder.yudao.module.iot.component.core.upstream;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.*;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Resource;
/**
* 设备数据 Upstream 上行客户端
* <p>
* 直接调用 IotDeviceUpstreamApi 接口
*
* @author haohao
*/
@Slf4j
public class IotDeviceUpstreamClient implements IotDeviceUpstreamApi {
@Resource
private IotDeviceUpstreamApi deviceUpstreamApi;
@Override
public CommonResult<Boolean> updateDeviceState(IotDeviceStateUpdateReqDTO updateReqDTO) {
return deviceUpstreamApi.updateDeviceState(updateReqDTO);
}
@Override
public CommonResult<Boolean> reportDeviceEvent(IotDeviceEventReportReqDTO reportReqDTO) {
return deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
}
@Override
public CommonResult<Boolean> registerDevice(IotDeviceRegisterReqDTO registerReqDTO) {
return deviceUpstreamApi.registerDevice(registerReqDTO);
}
@Override
public CommonResult<Boolean> registerSubDevice(IotDeviceRegisterSubReqDTO registerReqDTO) {
return deviceUpstreamApi.registerSubDevice(registerReqDTO);
}
@Override
public CommonResult<Boolean> addDeviceTopology(IotDeviceTopologyAddReqDTO addReqDTO) {
return deviceUpstreamApi.addDeviceTopology(addReqDTO);
}
@Override
public CommonResult<Boolean> authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) {
return deviceUpstreamApi.authenticateEmqxConnection(authReqDTO);
}
@Override
public CommonResult<Boolean> reportDeviceProperty(IotDevicePropertyReportReqDTO reportReqDTO) {
return deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
}
@Override
public CommonResult<Boolean> heartbeatPluginInstance(IotPluginInstanceHeartbeatReqDTO heartbeatReqDTO) {
return deviceUpstreamApi.heartbeatPluginInstance(heartbeatReqDTO);
}
}

View File

@ -0,0 +1,76 @@
package cn.iocoder.yudao.module.iot.component.core.util;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.component.core.pojo.IotStandardResponse;
import io.vertx.core.http.HttpHeaders;
import io.vertx.ext.web.RoutingContext;
import org.springframework.http.MediaType;
/**
* IoT 插件的通用工具类
*
* @author 芋道源码
*/
public class IotPluginCommonUtils {
/**
* 流程实例的进程编号
*/
private static String processId;
public static String getProcessId() {
if (StrUtil.isEmpty(processId)) {
initProcessId();
}
return processId;
}
private synchronized static void initProcessId() {
processId = String.format("%s@%d@%s", // IP@PID@${uuid}
SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID(), IdUtil.fastSimpleUUID());
}
/**
* 将对象转换为JSON字符串后写入HTTP响应
*
* @param routingContext 路由上下文
* @param data 数据对象
*/
@SuppressWarnings("deprecation")
public static void writeJsonResponse(RoutingContext routingContext, Object data) {
routingContext.response()
.setStatusCode(200)
.putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE)
.end(JsonUtils.toJsonString(data));
}
/**
* 生成标准JSON格式的响应并写入HTTP响应基于IotStandardResponse
* <p>
* 推荐使用此方法统一MQTT和HTTP的响应格式使用方式
*
* <pre>
* // 成功响应
* IotStandardResponse response = IotStandardResponse.success(requestId, method, data);
* IotPluginCommonUtils.writeJsonResponse(routingContext, response);
*
* // 错误响应
* IotStandardResponse errorResponse = IotStandardResponse.error(requestId, method, code, message);
* IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
* </pre>
*
* @param routingContext 路由上下文
* @param response IotStandardResponse响应对象
*/
@SuppressWarnings("deprecation")
public static void writeJsonResponse(RoutingContext routingContext, IotStandardResponse response) {
routingContext.response()
.setStatusCode(200)
.putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE)
.end(JsonUtils.toJsonString(response));
}
}

View File

@ -0,0 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.iocoder.yudao.module.iot.component.core.config.IotPluginCommonAutoConfiguration

View File

@ -0,0 +1 @@
cn.iocoder.yudao.module.iot.component.core.config.IotComponentCommonAutoConfiguration

View File

@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yudao-module-iot-components</artifactId>
<groupId>cn.iocoder.boot</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>yudao-module-iot-component-emqx</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
物联网组件 EMQX 模块
</description>
<dependencies>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-component-core</artifactId>
<version>${revision}</version>
</dependency>
<!-- 工具类相关 -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mqtt</artifactId>
</dependency>
<!-- 参数校验 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,121 @@
package cn.iocoder.yudao.module.iot.component.emqx.config;
import cn.hutool.core.util.IdUtil;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.component.core.config.IotComponentCommonProperties;
import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.component.core.heartbeat.IotComponentRegistry;
import cn.iocoder.yudao.module.iot.component.emqx.downstream.IotDeviceDownstreamHandlerImpl;
import cn.iocoder.yudao.module.iot.component.emqx.upstream.IotDeviceUpstreamServer;
import io.vertx.core.Vertx;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.event.EventListener;
import java.lang.management.ManagementFactory;
/**
* IoT 组件 EMQX 的自动配置类
*
* @author haohao
*/
@Slf4j
@AutoConfiguration
@EnableConfigurationProperties(IotComponentEmqxProperties.class)
@ConditionalOnProperty(prefix = "yudao.iot.component.emqx", name = "enabled", havingValue = "true", matchIfMissing = false)
@ComponentScan(basePackages = {
"cn.iocoder.yudao.module.iot.component.core", // 核心包
"cn.iocoder.yudao.module.iot.component.emqx" // EMQX组件包
})
public class IotComponentEmqxAutoConfiguration {
/**
* 组件key
*/
private static final String PLUGIN_KEY = "emqx";
public IotComponentEmqxAutoConfiguration() {
log.info("[IotComponentEmqxAutoConfiguration][已启动]");
}
@EventListener(ApplicationStartedEvent.class)
public void initialize(ApplicationStartedEvent event) {
// 从应用上下文中获取需要的Bean
IotComponentRegistry componentRegistry = event.getApplicationContext().getBean(IotComponentRegistry.class);
IotComponentCommonProperties commonProperties = event.getApplicationContext().getBean(IotComponentCommonProperties.class);
// 设置当前组件的核心标识
commonProperties.setPluginKey(PLUGIN_KEY);
// 将EMQX组件注册到组件注册表
componentRegistry.registerComponent(
PLUGIN_KEY,
SystemUtil.getHostInfo().getAddress(),
0, // 内嵌模式固定为0
getProcessId()
);
log.info("[initialize][IoT EMQX 组件初始化完成]");
}
@Bean
public Vertx vertx() {
return Vertx.vertx();
}
@Bean
public MqttClient mqttClient(Vertx vertx, IotComponentEmqxProperties emqxProperties) {
log.info("MQTT配置: host={}, port={}, username={}, ssl={}",
emqxProperties.getMqttHost(), emqxProperties.getMqttPort(),
emqxProperties.getMqttUsername(), emqxProperties.getMqttSsl());
MqttClientOptions options = new MqttClientOptions()
.setClientId("yudao-iot-downstream-" + IdUtil.fastSimpleUUID())
.setUsername(emqxProperties.getMqttUsername())
.setPassword(emqxProperties.getMqttPassword());
if (emqxProperties.getMqttSsl() != null) {
options.setSsl(emqxProperties.getMqttSsl());
} else {
options.setSsl(false);
log.warn("MQTT SSL配置为null默认设置为false");
}
return MqttClient.create(vertx, options);
}
@Bean(name = "emqxDeviceUpstreamServer", initMethod = "start", destroyMethod = "stop")
public IotDeviceUpstreamServer deviceUpstreamServer(IotDeviceUpstreamApi deviceUpstreamApi,
IotComponentEmqxProperties emqxProperties,
Vertx vertx,
MqttClient mqttClient,
IotComponentRegistry componentRegistry) {
return new IotDeviceUpstreamServer(emqxProperties, deviceUpstreamApi, vertx, mqttClient, componentRegistry);
}
@Bean(name = "emqxDeviceDownstreamHandler")
public IotDeviceDownstreamHandler deviceDownstreamHandler(MqttClient mqttClient) {
return new IotDeviceDownstreamHandlerImpl(mqttClient);
}
/**
* 获取当前进程ID
*
* @return 进程ID
*/
private String getProcessId() {
// 获取进程的 name
String name = ManagementFactory.getRuntimeMXBean().getName();
// 分割名称格式为 pid@hostname
String pid = name.split("@")[0];
return pid;
}
}

View File

@ -0,0 +1,59 @@
package cn.iocoder.yudao.module.iot.component.emqx.config;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* IoT EMQX组件配置属性
*/
@ConfigurationProperties(prefix = "yudao.iot.component.emqx")
@Data
public class IotComponentEmqxProperties {
/**
* 是否启用EMQX组件
*/
private Boolean enabled;
/**
* 服务主机
*/
@NotBlank(message = "MQTT服务器主机不能为空")
private String mqttHost;
/**
* 服务端口
*/
@NotNull(message = "MQTT服务器端口不能为空")
private Integer mqttPort;
/**
* 服务用户名
*/
@NotBlank(message = "MQTT服务器用户名不能为空")
private String mqttUsername;
/**
* 服务密码
*/
@NotBlank(message = "MQTT服务器密码不能为空")
private String mqttPassword;
/**
* 是否启用 SSL
*/
@NotNull(message = "MQTT SSL配置不能为空")
private Boolean mqttSsl;
/**
* 订阅的主题列表
*/
@NotEmpty(message = "MQTT订阅主题不能为空")
private String[] mqttTopics;
/**
* 认证端口
*/
@NotNull(message = "认证端口不能为空")
private Integer authPort;
}

View File

@ -0,0 +1,177 @@
package cn.iocoder.yudao.module.iot.component.emqx.downstream;
import cn.hutool.core.util.IdUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamHandler;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttClient;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.MQTT_TOPIC_ILLEGAL;
/**
* EMQX 插件的 {@link IotDeviceDownstreamHandler} 实现类
*
* @author 芋道源码
*/
@Slf4j
public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandler {
private static final String SYS_TOPIC_PREFIX = "/sys/";
// TODO @haohao是不是可以类似 IotDeviceConfigSetVertxHandler 的建议抽到统一的枚举类
// TODO @haohao讨论感觉 mqtt http可以做个相对统一的格式哈回复 都使用 Alink 格式方便后续扩展
// 设备服务调用 标准 JSON
// 请求Topic/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}
// 响应Topic/sys/${productKey}/${deviceName}/thing/service/${tsl.service.identifier}_reply
private static final String SERVICE_TOPIC_PREFIX = "/thing/service/";
// 设置设备属性 标准 JSON
// 请求Topic/sys/${productKey}/${deviceName}/thing/service/property/set
// 响应Topic/sys/${productKey}/${deviceName}/thing/service/property/set_reply
private static final String PROPERTY_SET_TOPIC = "/thing/service/property/set";
private final MqttClient mqttClient;
/**
* 构造函数
*
* @param mqttClient MQTT客户端
*/
public IotDeviceDownstreamHandlerImpl(MqttClient mqttClient) {
this.mqttClient = mqttClient;
}
@Override
public CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO reqDTO) {
log.info("[invokeService][开始调用设备服务][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
// 验证参数
if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null || reqDTO.getIdentifier() == null) {
log.error("[invokeService][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
}
try {
// 构建请求主题
String topic = buildServiceTopic(reqDTO.getProductKey(), reqDTO.getDeviceName(), reqDTO.getIdentifier());
// 构建请求消息
String requestId = reqDTO.getRequestId() != null ? reqDTO.getRequestId() : generateRequestId();
JSONObject request = buildServiceRequest(requestId, reqDTO.getIdentifier(), reqDTO.getParams());
// 发送消息
publishMessage(topic, request);
log.info("[invokeService][调用设备服务成功][requestId: {}][topic: {}]", requestId, topic);
return CommonResult.success(true);
} catch (Exception e) {
log.error("[invokeService][调用设备服务异常][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO), e);
return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
}
}
@Override
public CommonResult<Boolean> getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) {
return CommonResult.success(true);
}
@Override
public CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO reqDTO) {
// 验证参数
log.info("[setProperty][开始设置设备属性][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
if (reqDTO.getProductKey() == null || reqDTO.getDeviceName() == null) {
log.error("[setProperty][参数不完整][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO));
return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
}
try {
// 构建请求主题
String topic = buildPropertySetTopic(reqDTO.getProductKey(), reqDTO.getDeviceName());
// 构建请求消息
String requestId = reqDTO.getRequestId() != null ? reqDTO.getRequestId() : generateRequestId();
JSONObject request = buildPropertySetRequest(requestId, reqDTO.getProperties());
// 发送消息
publishMessage(topic, request);
log.info("[setProperty][设置设备属性成功][requestId: {}][topic: {}]", requestId, topic);
return CommonResult.success(true);
} catch (Exception e) {
log.error("[setProperty][设置设备属性异常][reqDTO: {}]", JSONUtil.toJsonStr(reqDTO), e);
return CommonResult.error(MQTT_TOPIC_ILLEGAL.getCode(), MQTT_TOPIC_ILLEGAL.getMsg());
}
}
@Override
public CommonResult<Boolean> setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) {
return CommonResult.success(true);
}
@Override
public CommonResult<Boolean> upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) {
return CommonResult.success(true);
}
/**
* 构建服务调用主题
*/
private String buildServiceTopic(String productKey, String deviceName, String serviceIdentifier) {
return SYS_TOPIC_PREFIX + productKey + "/" + deviceName + SERVICE_TOPIC_PREFIX + serviceIdentifier;
}
/**
* 构建属性设置主题
*/
private String buildPropertySetTopic(String productKey, String deviceName) {
return SYS_TOPIC_PREFIX + productKey + "/" + deviceName + PROPERTY_SET_TOPIC;
}
// TODO @haohao这个后面搞个对象会不会好点哈
/**
* 构建服务调用请求
*/
private JSONObject buildServiceRequest(String requestId, String serviceIdentifier, Map<String, Object> params) {
return new JSONObject()
.set("id", requestId)
.set("version", "1.0")
.set("method", "thing.service." + serviceIdentifier)
.set("params", params != null ? params : new JSONObject());
}
/**
* 构建属性设置请求
*/
private JSONObject buildPropertySetRequest(String requestId, Map<String, Object> properties) {
return new JSONObject()
.set("id", requestId)
.set("version", "1.0")
.set("method", "thing.service.property.set")
.set("params", properties);
}
/**
* 发布 MQTT 消息
*/
private void publishMessage(String topic, JSONObject payload) {
mqttClient.publish(
topic,
Buffer.buffer(payload.toString()),
MqttQoS.AT_LEAST_ONCE,
false,
false);
log.info("[publishMessage][发送消息成功][topic: {}][payload: {}]", topic, payload);
}
/**
* 生成请求 ID
*/
private String generateRequestId() {
return IdUtil.fastSimpleUUID();
}
}

View File

@ -0,0 +1,263 @@
package cn.iocoder.yudao.module.iot.component.emqx.upstream;
import cn.hutool.core.util.ArrayUtil;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.component.core.heartbeat.IotComponentRegistry;
import cn.iocoder.yudao.module.iot.component.emqx.config.IotComponentEmqxProperties;
import cn.iocoder.yudao.module.iot.component.emqx.upstream.router.IotDeviceAuthVertxHandler;
import cn.iocoder.yudao.module.iot.component.emqx.upstream.router.IotDeviceMqttMessageHandler;
import cn.iocoder.yudao.module.iot.component.emqx.upstream.router.IotDeviceWebhookVertxHandler;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.mqtt.MqttClient;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* IoT 设备下行服务端接收来自 device 设备的请求转发给 server 服务器
* <p>
* 协议HTTPMQTT
*
* @author haohao
*/
@Slf4j
public class IotDeviceUpstreamServer {
/**
* 重连延迟时间(毫秒)
*/
private static final int RECONNECT_DELAY_MS = 5000;
/**
* 连接超时时间(毫秒)
*/
private static final int CONNECTION_TIMEOUT_MS = 10000;
/**
* 默认 QoS 级别
*/
private static final MqttQoS DEFAULT_QOS = MqttQoS.AT_LEAST_ONCE;
private final Vertx vertx;
private final HttpServer server;
private final MqttClient client;
private final IotComponentEmqxProperties emqxProperties;
private final IotDeviceMqttMessageHandler mqttMessageHandler;
private final IotComponentRegistry componentRegistry;
/**
* 服务运行状态标志
*/
private volatile boolean isRunning = false;
public IotDeviceUpstreamServer(IotComponentEmqxProperties emqxProperties,
IotDeviceUpstreamApi deviceUpstreamApi,
Vertx vertx,
MqttClient client,
IotComponentRegistry componentRegistry) {
this.vertx = vertx;
this.emqxProperties = emqxProperties;
this.client = client;
this.componentRegistry = componentRegistry;
// 创建 Router 实例
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create()); // 处理 Body
router.post(IotDeviceAuthVertxHandler.PATH)
// TODO @haohao疑问mqtt 的认证需要通过 http
// 回复MQTT 认证不必须通过 HTTP 进行 HTTP 认证是 EMQX MQTT 服务器支持的一种灵活的认证方式
.handler(new IotDeviceAuthVertxHandler(deviceUpstreamApi));
// 添加 Webhook 处理器用于处理设备连接和断开连接事件
router.post(IotDeviceWebhookVertxHandler.PATH)
.handler(new IotDeviceWebhookVertxHandler(deviceUpstreamApi));
// 创建 HttpServer 实例
this.server = vertx.createHttpServer().requestHandler(router);
this.mqttMessageHandler = new IotDeviceMqttMessageHandler(deviceUpstreamApi, client);
}
/**
* 启动 HTTP 服务器MQTT 客户端
*/
public void start() {
if (isRunning) {
log.warn("[start][服务已经在运行中,请勿重复启动]");
return;
}
log.info("[start][开始启动服务]");
// 检查authPort是否为null
Integer authPort = emqxProperties.getAuthPort();
if (authPort == null) {
log.warn("[start][authPort为null使用默认端口8080]");
authPort = 8080; // 默认端口
}
// 1. 启动 HTTP 服务器
final Integer finalAuthPort = authPort; // 为lambda表达式创建final变量
CompletableFuture<Void> httpFuture = server.listen(finalAuthPort)
.toCompletionStage()
.toCompletableFuture()
.thenAccept(v -> log.info("[start][HTTP服务器启动完成端口: {}]", server.actualPort()));
// 2. 连接 MQTT Broker
CompletableFuture<Void> mqttFuture = connectMqtt()
.toCompletionStage()
.toCompletableFuture()
.thenAccept(v -> {
// 2.1 添加 MQTT 断开重连监听器
client.closeHandler(closeEvent -> {
log.warn("[closeHandler][MQTT连接已断开准备重连]");
reconnectWithDelay();
});
// 2. 设置 MQTT 消息处理器
setupMessageHandler();
});
// 3. 等待所有服务启动完成
CompletableFuture.allOf(httpFuture, mqttFuture)
.orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.whenComplete((result, error) -> {
if (error != null) {
log.error("[start][服务启动失败]", error);
} else {
isRunning = true;
log.info("[start][所有服务启动完成]");
}
});
}
/**
* 设置 MQTT 消息处理器
*/
private void setupMessageHandler() {
client.publishHandler(mqttMessageHandler::handle);
log.debug("[setupMessageHandler][MQTT消息处理器设置完成]");
}
/**
* 重连 MQTT 客户端
*/
private void reconnectWithDelay() {
if (!isRunning) {
log.info("[reconnectWithDelay][服务已停止,不再尝试重连]");
return;
}
vertx.setTimer(RECONNECT_DELAY_MS, id -> {
log.info("[reconnectWithDelay][开始重新连接 MQTT]");
connectMqtt();
});
}
/**
* 连接 MQTT Broker 并订阅主题
*
* @return 连接结果的Future
*/
private Future<Void> connectMqtt() {
// 检查必要的MQTT配置
String host = emqxProperties.getMqttHost();
Integer port = emqxProperties.getMqttPort();
if (host == null) {
String msg = "[connectMqtt][MQTT Host为null无法连接]";
log.error(msg);
return Future.failedFuture(new IllegalStateException(msg));
}
if (port == null) {
log.warn("[connectMqtt][MQTT Port为null使用默认端口1883]");
port = 1883; // 默认MQTT端口
}
final Integer finalPort = port; // 为lambda表达式创建final变量
return client.connect(finalPort, host)
.compose(connAck -> {
log.info("[connectMqtt][MQTT客户端连接成功]");
return subscribeToTopics();
})
.recover(error -> {
log.error("[connectMqtt][连接MQTT Broker失败:]", error);
reconnectWithDelay();
return Future.failedFuture(error);
});
}
/**
* 订阅设备上行消息主题
*
* @return 订阅结果的 Future
*/
private Future<Void> subscribeToTopics() {
String[] topics = emqxProperties.getMqttTopics();
if (ArrayUtil.isEmpty(topics)) {
log.warn("[subscribeToTopics][未配置MQTT主题或为null使用默认主题]");
// 默认订阅所有设备上下行主题
topics = new String[]{"/device/#"};
}
log.info("[subscribeToTopics][开始订阅设备上行消息主题]");
Future<Void> compositeFuture = Future.succeededFuture();
for (String topic : topics) {
if (topic == null) {
continue; // 跳过null主题
}
String trimmedTopic = topic.trim();
if (trimmedTopic.isEmpty()) {
continue;
}
compositeFuture = compositeFuture.compose(v -> client.subscribe(trimmedTopic, DEFAULT_QOS.value())
.<Void>map(ack -> {
log.info("[subscribeToTopics][成功订阅主题: {}]", trimmedTopic);
return null;
})
.recover(error -> {
log.error("[subscribeToTopics][订阅主题失败: {}]", trimmedTopic, error);
return Future.<Void>succeededFuture(); // 继续订阅其他主题
}));
}
return compositeFuture;
}
/**
* 停止所有服务
*/
public void stop() {
if (!isRunning) {
log.warn("[stop][服务未运行,无需停止]");
return;
}
log.info("[stop][开始关闭服务]");
isRunning = false;
try {
CompletableFuture<Void> serverFuture = server != null
? server.close().toCompletionStage().toCompletableFuture()
: CompletableFuture.completedFuture(null);
CompletableFuture<Void> clientFuture = client != null
? client.disconnect().toCompletionStage().toCompletableFuture()
: CompletableFuture.completedFuture(null);
CompletableFuture<Void> vertxFuture = vertx != null
? vertx.close().toCompletionStage().toCompletableFuture()
: CompletableFuture.completedFuture(null);
// 等待所有资源关闭
CompletableFuture.allOf(serverFuture, clientFuture, vertxFuture)
.orTimeout(CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.whenComplete((result, error) -> {
if (error != null) {
log.error("[stop][服务关闭过程中发生异常]", error);
} else {
log.info("[stop][所有服务关闭完成]");
}
});
} catch (Exception e) {
log.error("[stop][关闭服务异常]", e);
throw new RuntimeException("关闭 IoT 设备上行服务失败", e);
}
}
}

View File

@ -0,0 +1,64 @@
package cn.iocoder.yudao.module.iot.component.emqx.upstream.router;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEmqxAuthReqDTO;
import cn.iocoder.yudao.module.iot.component.core.util.IotPluginCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
/**
* IoT EMQX 连接认证的 Vert.x Handler
* <p>
* 参考<a href="https://docs.emqx.com/zh/emqx/latest/access-control/authn/http.html">EMQX HTTP</a>
* <p>
* 注意该处理器需要返回特定格式{"result": "allow"} {"result": "deny"}
* 以符合 EMQX 认证插件的要求因此不使用 IotStandardResponse 实体类
*
* @author haohao
*/
@RequiredArgsConstructor
@Slf4j
public class IotDeviceAuthVertxHandler implements Handler<RoutingContext> {
public static final String PATH = "/mqtt/auth";
private final IotDeviceUpstreamApi deviceUpstreamApi;
@Override
public void handle(RoutingContext routingContext) {
try {
// 构建认证请求 DTO
JsonObject json = routingContext.body().asJsonObject();
String clientId = json.getString("clientid");
String username = json.getString("username");
String password = json.getString("password");
IotDeviceEmqxAuthReqDTO authReqDTO = new IotDeviceEmqxAuthReqDTO()
.setClientId(clientId)
.setUsername(username)
.setPassword(password);
// 调用认证 API
CommonResult<Boolean> authResult = deviceUpstreamApi.authenticateEmqxConnection(authReqDTO);
if (authResult.getCode() != 0 || !authResult.getData()) {
// 注意这里必须返回 {"result": "deny"} 格式以符合 EMQX 认证插件的要求
IotPluginCommonUtils.writeJsonResponse(routingContext, Collections.singletonMap("result", "deny"));
return;
}
// 响应结果
// 注意这里必须返回 {"result": "allow"} 格式以符合 EMQX 认证插件的要求
IotPluginCommonUtils.writeJsonResponse(routingContext, Collections.singletonMap("result", "allow"));
} catch (Exception e) {
log.error("[handle][EMQX 认证异常]", e);
// 注意这里必须返回 {"result": "deny"} 格式以符合 EMQX 认证插件的要求
IotPluginCommonUtils.writeJsonResponse(routingContext, Collections.singletonMap("result", "deny"));
}
}
}

View File

@ -0,0 +1,296 @@
package cn.iocoder.yudao.module.iot.component.emqx.upstream.router;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
import cn.iocoder.yudao.module.iot.component.core.pojo.IotStandardResponse;
import cn.iocoder.yudao.module.iot.component.core.util.IotPluginCommonUtils;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
/**
* IoT 设备 MQTT 消息处理器
* <p>
* 参考<a href="https://help.aliyun.com/zh/iot/user-guide/device-properties-events-and-services">设备属性事件服务</a>
*/
@Slf4j
public class IotDeviceMqttMessageHandler {
// TODO @haohao讨论感觉 mqtt http可以做个相对统一的格式哈回复 都使用 Alink 格式方便后续扩展
// 设备上报属性 标准 JSON
// 请求 Topic/sys/${productKey}/${deviceName}/thing/event/property/post
// 响应 Topic/sys/${productKey}/${deviceName}/thing/event/property/post_reply
// 设备上报事件 标准 JSON
// 请求 Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post
// 响应 Topic/sys/${productKey}/${deviceName}/thing/event/${tsl.event.identifier}/post_reply
private static final String SYS_TOPIC_PREFIX = "/sys/";
private static final String PROPERTY_POST_TOPIC = "/thing/event/property/post";
private static final String EVENT_POST_TOPIC_PREFIX = "/thing/event/";
private static final String EVENT_POST_TOPIC_SUFFIX = "/post";
private static final String REPLY_SUFFIX = "_reply";
private static final String PROPERTY_METHOD = "thing.event.property.post";
private static final String EVENT_METHOD_PREFIX = "thing.event.";
private static final String EVENT_METHOD_SUFFIX = ".post";
private final IotDeviceUpstreamApi deviceUpstreamApi;
private final MqttClient mqttClient;
public IotDeviceMqttMessageHandler(IotDeviceUpstreamApi deviceUpstreamApi, MqttClient mqttClient) {
this.deviceUpstreamApi = deviceUpstreamApi;
this.mqttClient = mqttClient;
}
/**
* 处理MQTT消息
*
* @param message MQTT发布消息
*/
public void handle(MqttPublishMessage message) {
String topic = message.topicName();
String payload = message.payload().toString();
log.info("[messageHandler][接收到消息][topic: {}][payload: {}]", topic, payload);
try {
if (StrUtil.isEmpty(payload)) {
log.warn("[messageHandler][消息内容为空][topic: {}]", topic);
return;
}
handleMessage(topic, payload);
} catch (Exception e) {
log.error("[messageHandler][处理消息失败][topic: {}][payload: {}]", topic, payload, e);
}
}
/**
* 根据主题类型处理消息
*
* @param topic 主题
* @param payload 消息内容
*/
private void handleMessage(String topic, String payload) {
// 校验前缀
if (!topic.startsWith(SYS_TOPIC_PREFIX)) {
log.warn("[handleMessage][未知的消息类型][topic: {}]", topic);
return;
}
// 处理设备属性上报消息
if (topic.endsWith(PROPERTY_POST_TOPIC)) {
log.info("[handleMessage][接收到设备属性上报][topic: {}]", topic);
handlePropertyPost(topic, payload);
return;
}
// 处理设备事件上报消息
if (topic.contains(EVENT_POST_TOPIC_PREFIX) && topic.endsWith(EVENT_POST_TOPIC_SUFFIX)) {
log.info("[handleMessage][接收到设备事件上报][topic: {}]", topic);
handleEventPost(topic, payload);
return;
}
// 未知消息类型
log.warn("[handleMessage][未知的消息类型][topic: {}]", topic);
}
/**
* 处理设备属性上报消息
*
* @param topic 主题
* @param payload 消息内容
*/
private void handlePropertyPost(String topic, String payload) {
try {
// 解析消息内容
JSONObject jsonObject = JSONUtil.parseObj(payload);
String[] topicParts = parseTopic(topic);
if (topicParts == null) {
return;
}
// 构建设备属性上报请求对象
IotDevicePropertyReportReqDTO reportReqDTO = buildPropertyReportDTO(jsonObject, topicParts);
// 调用上游 API 处理设备上报数据
deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
log.info("[handlePropertyPost][处理设备属性上报成功][topic: {}]", topic);
// 发送响应消息
sendResponse(topic, jsonObject, PROPERTY_METHOD, null);
} catch (Exception e) {
log.error("[handlePropertyPost][处理设备属性上报失败][topic: {}][payload: {}]", topic, payload, e);
}
}
/**
* 处理设备事件上报消息
*
* @param topic 主题
* @param payload 消息内容
*/
private void handleEventPost(String topic, String payload) {
try {
// 解析消息内容
JSONObject jsonObject = JSONUtil.parseObj(payload);
String[] topicParts = parseTopic(topic);
if (topicParts == null) {
return;
}
// 构建设备事件上报请求对象
IotDeviceEventReportReqDTO reportReqDTO = buildEventReportDTO(jsonObject, topicParts);
// 调用上游 API 处理设备上报数据
deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
log.info("[handleEventPost][处理设备事件上报成功][topic: {}]", topic);
// topic 中获取事件标识符
String eventIdentifier = getEventIdentifier(topicParts, topic);
if (eventIdentifier == null) {
return;
}
// 发送响应消息
String method = EVENT_METHOD_PREFIX + eventIdentifier + EVENT_METHOD_SUFFIX;
sendResponse(topic, jsonObject, method, null);
} catch (Exception e) {
log.error("[handleEventPost][处理设备事件上报失败][topic: {}][payload: {}]", topic, payload, e);
}
}
/**
* 解析主题获取主题各部分
*
* @param topic 主题
* @return 主题各部分数组如果解析失败返回null
*/
private String[] parseTopic(String topic) {
String[] topicParts = topic.split("/");
if (topicParts.length < 7) {
log.warn("[parseTopic][主题格式不正确][topic: {}]", topic);
return null;
}
return topicParts;
}
/**
* 从主题部分中获取事件标识符
*
* @param topicParts 主题各部分
* @param topic 原始主题用于日志
* @return 事件标识符如果获取失败返回null
*/
private String getEventIdentifier(String[] topicParts, String topic) {
try {
return topicParts[6];
} catch (ArrayIndexOutOfBoundsException e) {
log.warn("[getEventIdentifier][无法从主题中获取事件标识符][topic: {}][topicParts: {}]",
topic, Arrays.toString(topicParts));
return null;
}
}
/**
* 发送响应消息
*
* @param topic 原始主题
* @param jsonObject 原始消息JSON对象
* @param method 响应方法
* @param customData 自定义数据可为 null
*/
private void sendResponse(String topic, JSONObject jsonObject, String method, Object customData) {
String replyTopic = topic + REPLY_SUFFIX;
// 响应结果
IotStandardResponse response = IotStandardResponse.success(
jsonObject.getStr("id"), method, customData);
try {
mqttClient.publish(replyTopic, Buffer.buffer(JsonUtils.toJsonString(response)),
MqttQoS.AT_LEAST_ONCE, false, false);
log.info("[sendResponse][发送响应消息成功][topic: {}]", replyTopic);
} catch (Exception e) {
log.error("[sendResponse][发送响应消息失败][topic: {}][response: {}]", replyTopic, response, e);
}
}
/**
* 构建设备属性上报请求对象
*
* @param jsonObject 消息内容
* @param topicParts 主题部分
* @return 设备属性上报请求对象
*/
private IotDevicePropertyReportReqDTO buildPropertyReportDTO(JSONObject jsonObject, String[] topicParts) {
IotDevicePropertyReportReqDTO reportReqDTO = new IotDevicePropertyReportReqDTO();
reportReqDTO.setRequestId(jsonObject.getStr("id"));
reportReqDTO.setProcessId(IotPluginCommonUtils.getProcessId());
reportReqDTO.setReportTime(LocalDateTime.now());
reportReqDTO.setProductKey(topicParts[2]);
reportReqDTO.setDeviceName(topicParts[3]);
// 只使用标准JSON格式处理属性数据
JSONObject params = jsonObject.getJSONObject("params");
if (params == null) {
log.warn("[buildPropertyReportDTO][消息格式不正确缺少params字段][jsonObject: {}]", jsonObject);
params = new JSONObject();
}
// 将标准格式的params转换为平台需要的properties格式
Map<String, Object> properties = new HashMap<>();
for (Map.Entry<String, Object> entry : params.entrySet()) {
String key = entry.getKey();
Object valueObj = entry.getValue();
// 如果是复杂结构包含value和time
if (valueObj instanceof JSONObject valueJson) {
properties.put(key, valueJson.getOrDefault("value", valueObj));
} else {
properties.put(key, valueObj);
}
}
reportReqDTO.setProperties(properties);
return reportReqDTO;
}
/**
* 构建设备事件上报请求对象
*
* @param jsonObject 消息内容
* @param topicParts 主题部分
* @return 设备事件上报请求对象
*/
private IotDeviceEventReportReqDTO buildEventReportDTO(JSONObject jsonObject, String[] topicParts) {
IotDeviceEventReportReqDTO reportReqDTO = new IotDeviceEventReportReqDTO();
reportReqDTO.setRequestId(jsonObject.getStr("id"));
reportReqDTO.setProcessId(IotPluginCommonUtils.getProcessId());
reportReqDTO.setReportTime(LocalDateTime.now());
reportReqDTO.setProductKey(topicParts[2]);
reportReqDTO.setDeviceName(topicParts[3]);
reportReqDTO.setIdentifier(topicParts[6]);
// 只使用标准JSON格式处理事件参数
JSONObject params = jsonObject.getJSONObject("params");
if (params == null) {
log.warn("[buildEventReportDTO][消息格式不正确缺少params字段][jsonObject: {}]", jsonObject);
params = new JSONObject();
}
reportReqDTO.setParams(params);
return reportReqDTO;
}
}

View File

@ -0,0 +1,152 @@
package cn.iocoder.yudao.module.iot.component.emqx.upstream.router;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
import cn.iocoder.yudao.module.iot.component.core.util.IotPluginCommonUtils;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.Collections;
/**
* IoT EMQX Webhook 事件处理的 Vert.x Handler
* <p>
* 参考<a href="https://docs.emqx.com/zh/emqx/latest/data-integration/webhook.html">EMQX Webhook</a>
* <p>
* 注意该处理器需要返回特定格式{"result": "success"} {"result": "error"}
* 以符合 EMQX Webhook 插件的要求因此不使用 IotStandardResponse 实体类
*
* @author haohao
*/
@RequiredArgsConstructor
@Slf4j
public class IotDeviceWebhookVertxHandler implements Handler<RoutingContext> {
public static final String PATH = "/mqtt/webhook";
private final IotDeviceUpstreamApi deviceUpstreamApi;
@Override
public void handle(RoutingContext routingContext) {
try {
// 解析请求体
JsonObject json = routingContext.body().asJsonObject();
String event = json.getString("event");
String clientId = json.getString("clientid");
String username = json.getString("username");
// 处理不同的事件类型
switch (event) {
case "client.connected":
handleClientConnected(clientId, username);
break;
case "client.disconnected":
handleClientDisconnected(clientId, username);
break;
default:
log.info("[handle][未处理的 Webhook 事件] event={}, clientId={}, username={}", event, clientId, username);
break;
}
// 返回成功响应
// 注意这里必须返回 {"result": "success"} 格式以符合 EMQX Webhook 插件的要求
IotPluginCommonUtils.writeJsonResponse(routingContext, Collections.singletonMap("result", "success"));
} catch (Exception e) {
log.error("[handle][处理 Webhook 事件异常]", e);
// 注意这里必须返回 {"result": "error"} 格式以符合 EMQX Webhook 插件的要求
IotPluginCommonUtils.writeJsonResponse(routingContext, Collections.singletonMap("result", "error"));
}
}
/**
* 处理客户端连接事件
*
* @param clientId 客户端ID
* @param username 用户名
*/
private void handleClientConnected(String clientId, String username) {
// 解析产品标识和设备名称
if (StrUtil.isEmpty(username) || "undefined".equals(username)) {
log.warn("[handleClientConnected][客户端连接事件,但用户名为空] clientId={}", clientId);
return;
}
String[] parts = parseUsername(username);
if (parts == null) {
return;
}
// 更新设备状态为在线
IotDeviceStateUpdateReqDTO updateReqDTO = new IotDeviceStateUpdateReqDTO();
updateReqDTO.setProductKey(parts[1]);
updateReqDTO.setDeviceName(parts[0]);
updateReqDTO.setState(IotDeviceStateEnum.ONLINE.getState());
updateReqDTO.setProcessId(IotPluginCommonUtils.getProcessId());
updateReqDTO.setReportTime(LocalDateTime.now());
CommonResult<Boolean> result = deviceUpstreamApi.updateDeviceState(updateReqDTO);
if (result.getCode() != 0 || !result.getData()) {
log.error("[handleClientConnected][更新设备状态为在线失败] clientId={}, username={}, code={}, msg={}",
clientId, username, result.getCode(), result.getMsg());
} else {
log.info("[handleClientConnected][更新设备状态为在线成功] clientId={}, username={}", clientId, username);
}
}
/**
* 处理客户端断开连接事件
*
* @param clientId 客户端ID
* @param username 用户名
*/
private void handleClientDisconnected(String clientId, String username) {
// 解析产品标识和设备名称
if (StrUtil.isEmpty(username) || "undefined".equals(username)) {
log.warn("[handleClientDisconnected][客户端断开连接事件,但用户名为空] clientId={}", clientId);
return;
}
String[] parts = parseUsername(username);
if (parts == null) {
return;
}
// 更新设备状态为离线
IotDeviceStateUpdateReqDTO offlineReqDTO = new IotDeviceStateUpdateReqDTO();
offlineReqDTO.setProductKey(parts[1]);
offlineReqDTO.setDeviceName(parts[0]);
offlineReqDTO.setState(IotDeviceStateEnum.OFFLINE.getState());
offlineReqDTO.setProcessId(IotPluginCommonUtils.getProcessId());
offlineReqDTO.setReportTime(LocalDateTime.now());
CommonResult<Boolean> offlineResult = deviceUpstreamApi.updateDeviceState(offlineReqDTO);
if (offlineResult.getCode() != 0 || !offlineResult.getData()) {
log.error("[handleClientDisconnected][更新设备状态为离线失败] clientId={}, username={}, code={}, msg={}",
clientId, username, offlineResult.getCode(), offlineResult.getMsg());
} else {
log.info("[handleClientDisconnected][更新设备状态为离线成功] clientId={}, username={}", clientId, username);
}
}
/**
* 解析用户名格式为 deviceName&productKey
*
* @param username 用户名
* @return 解析结果[0] deviceName[1] productKey解析失败返回 null
*/
private String[] parseUsername(String username) {
if (StrUtil.isEmpty(username)) {
return null;
}
String[] parts = username.split("&");
if (parts.length != 2) {
log.warn("[parseUsername][用户名格式({})不正确,无法解析产品标识和设备名称]", username);
return null;
}
return parts;
}
}

View File

@ -0,0 +1 @@
cn.iocoder.yudao.module.iot.component.emqx.config.IotComponentEmqxAutoConfiguration

View File

@ -0,0 +1,18 @@
# EMQX组件默认配置
yudao:
iot:
component:
# 核心组件配置
core:
plugin-key: emqx # 插件的唯一标识
# EMQX组件配置
# emqx:
# enabled: true # 启用EMQX组件
# mqtt-host: 127.0.0.1 # MQTT服务器主机地址
# mqtt-port: 1883 # MQTT服务器端口
# mqtt-username: yudao # MQTT服务器用户名
# mqtt-password: 123456 # MQTT服务器密码
# mqtt-ssl: false # 是否启用SSL
# mqtt-topics: # 订阅的主题列表
# - "/sys/#"
# auth-port: 8101 # 认证端口

View File

@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>yudao-module-iot-components</artifactId>
<groupId>cn.iocoder.boot</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>yudao-module-iot-component-http</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
物联网组件 HTTP 模块
</description>
<dependencies>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-component-core</artifactId>
<version>${revision}</version>
</dependency>
<!-- 脚本解析相关 -->
<!-- <dependency>-->
<!-- <groupId>cn.iocoder.boot</groupId>-->
<!-- <artifactId>yudao-module-iot-plugin-script</artifactId>-->
<!-- <version>${revision}</version>-->
<!-- </dependency>-->
<!-- 工具类相关 -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<!-- 参数校验 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,91 @@
package cn.iocoder.yudao.module.iot.component.http.config;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.component.core.config.IotComponentCommonProperties;
import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.component.core.heartbeat.IotComponentRegistry;
import cn.iocoder.yudao.module.iot.component.http.downstream.IotDeviceDownstreamHandlerImpl;
import cn.iocoder.yudao.module.iot.component.http.upstream.IotDeviceUpstreamServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.event.EventListener;
import java.lang.management.ManagementFactory;
/**
* IoT 组件 HTTP 的自动配置类
*
* @author haohao
*/
@Slf4j
@AutoConfiguration
@EnableConfigurationProperties(IotComponentHttpProperties.class)
@ConditionalOnProperty(prefix = "yudao.iot.component.http", name = "enabled", havingValue = "true", matchIfMissing = false)
@ComponentScan(basePackages = {
"cn.iocoder.yudao.module.iot.component.core", // 核心包
"cn.iocoder.yudao.module.iot.component.http" // HTTP组件包
})
public class IotComponentHttpAutoConfiguration {
/**
* 组件key
*/
private static final String PLUGIN_KEY = "http";
public IotComponentHttpAutoConfiguration() {
log.info("[IotComponentHttpAutoConfiguration][已启动]");
}
@EventListener(ApplicationStartedEvent.class)
public void initialize(ApplicationStartedEvent event) {
// 从应用上下文中获取需要的Bean
IotComponentRegistry componentRegistry = event.getApplicationContext().getBean(IotComponentRegistry.class);
IotComponentCommonProperties commonProperties = event.getApplicationContext()
.getBean(IotComponentCommonProperties.class);
// 设置当前组件的核心标识
commonProperties.setPluginKey(PLUGIN_KEY);
// 将HTTP组件注册到组件注册表
componentRegistry.registerComponent(
PLUGIN_KEY,
SystemUtil.getHostInfo().getAddress(),
0, // 内嵌模式固定为0
getProcessId());
log.info("[initialize][IoT HTTP 组件初始化完成]");
}
@Bean(name = "httpDeviceUpstreamServer", initMethod = "start", destroyMethod = "stop")
public IotDeviceUpstreamServer deviceUpstreamServer(IotDeviceUpstreamApi deviceUpstreamApi,
IotComponentHttpProperties properties,
ApplicationContext applicationContext,
IotComponentRegistry componentRegistry) {
return new IotDeviceUpstreamServer(properties, deviceUpstreamApi, applicationContext, componentRegistry);
}
@Bean(name = "httpDeviceDownstreamHandler")
public IotDeviceDownstreamHandler deviceDownstreamHandler() {
return new IotDeviceDownstreamHandlerImpl();
}
/**
* 获取当前进程ID
*
* @return 进程ID
*/
private String getProcessId() {
// 获取进程的 name
String name = ManagementFactory.getRuntimeMXBean().getName();
// 分割名称格式为 pid@hostname
String pid = name.split("@")[0];
return pid;
}
}

View File

@ -0,0 +1,25 @@
package cn.iocoder.yudao.module.iot.component.http.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
/**
* IoT HTTP组件配置属性
*/
@ConfigurationProperties(prefix = "yudao.iot.component.http")
@Validated
@Data
public class IotComponentHttpProperties {
/**
* 是否启用
*/
private Boolean enabled;
/**
* HTTP 服务端口
*/
private Integer serverPort;
}

View File

@ -0,0 +1,44 @@
package cn.iocoder.yudao.module.iot.component.http.downstream;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.*;
import cn.iocoder.yudao.module.iot.component.core.downstream.IotDeviceDownstreamHandler;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.NOT_IMPLEMENTED;
/**
* HTTP 插件的 {@link IotDeviceDownstreamHandler} 实现类
* <p>
* 但是由于设备通过 HTTP 短链接接入导致其实无法下行指导给 device 设备所以基本都是直接返回失败
* 类似 MQTTWebSocketTCP 插件是可以实现下行指令的
*
* @author 芋道源码
*/
public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandler {
@Override
public CommonResult<Boolean> invokeDeviceService(IotDeviceServiceInvokeReqDTO invokeReqDTO) {
return CommonResult.error(NOT_IMPLEMENTED.getCode(), "HTTP 不支持调用设备服务");
}
@Override
public CommonResult<Boolean> getDeviceProperty(IotDevicePropertyGetReqDTO getReqDTO) {
return CommonResult.error(NOT_IMPLEMENTED.getCode(), "HTTP 不支持获取设备属性");
}
@Override
public CommonResult<Boolean> setDeviceProperty(IotDevicePropertySetReqDTO setReqDTO) {
return CommonResult.error(NOT_IMPLEMENTED.getCode(), "HTTP 不支持设置设备属性");
}
@Override
public CommonResult<Boolean> setDeviceConfig(IotDeviceConfigSetReqDTO setReqDTO) {
return CommonResult.error(NOT_IMPLEMENTED.getCode(), "HTTP 不支持设置设备属性");
}
@Override
public CommonResult<Boolean> upgradeDeviceOta(IotDeviceOtaUpgradeReqDTO upgradeReqDTO) {
return CommonResult.error(NOT_IMPLEMENTED.getCode(), "HTTP 不支持设置设备属性");
}
}

View File

@ -0,0 +1,91 @@
package cn.iocoder.yudao.module.iot.component.http.upstream;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.component.core.heartbeat.IotComponentRegistry;
import cn.iocoder.yudao.module.iot.component.http.config.IotComponentHttpProperties;
import cn.iocoder.yudao.module.iot.component.http.upstream.router.IotDeviceUpstreamVertxHandler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServer;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
/**
* IoT 设备下行服务端接收来自 device 设备的请求转发给 server 服务器
* <p>
* 协议HTTP
*
* @author haohao
*/
@Slf4j
public class IotDeviceUpstreamServer {
private final Vertx vertx;
private final HttpServer server;
private final IotComponentHttpProperties properties;
private final IotComponentRegistry componentRegistry;
public IotDeviceUpstreamServer(IotComponentHttpProperties properties,
IotDeviceUpstreamApi deviceUpstreamApi,
ApplicationContext applicationContext,
IotComponentRegistry componentRegistry) {
this.properties = properties;
this.componentRegistry = componentRegistry;
// 创建 Vertx 实例
this.vertx = Vertx.vertx();
// 创建 Router 实例
Router router = Router.router(vertx);
router.route().handler(BodyHandler.create()); // 处理 Body
// 使用统一的 Handler 处理所有上行请求
IotDeviceUpstreamVertxHandler upstreamHandler = new IotDeviceUpstreamVertxHandler(deviceUpstreamApi,
applicationContext);
router.post(IotDeviceUpstreamVertxHandler.PROPERTY_PATH).handler(upstreamHandler);
router.post(IotDeviceUpstreamVertxHandler.EVENT_PATH).handler(upstreamHandler);
// 创建 HttpServer 实例
this.server = vertx.createHttpServer().requestHandler(router);
}
/**
* 启动 HTTP 服务器
*/
public void start() {
log.info("[start][开始启动]");
server.listen(properties.getServerPort())
.toCompletionStage()
.toCompletableFuture()
.join();
log.info("[start][启动完成,端口({})]", this.server.actualPort());
}
/**
* 停止所有
*/
public void stop() {
log.info("[stop][开始关闭]");
try {
// 关闭 HTTP 服务器
if (server != null) {
server.close()
.toCompletionStage()
.toCompletableFuture()
.join();
}
// 关闭 Vertx 实例
if (vertx != null) {
vertx.close()
.toCompletionStage()
.toCompletableFuture()
.join();
}
log.info("[stop][关闭完成]");
} catch (Exception e) {
log.error("[stop][关闭异常]", e);
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,212 @@
package cn.iocoder.yudao.module.iot.component.http.upstream.router;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
import cn.iocoder.yudao.module.iot.component.core.pojo.IotStandardResponse;
import cn.iocoder.yudao.module.iot.component.core.util.IotPluginCommonUtils;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
* IoT 设备上行统一处理的 Vert.x Handler
* <p>
* 统一处理设备属性上报和事件上报的请求
*
* @author haohao
*/
@Slf4j
public class IotDeviceUpstreamVertxHandler implements Handler<RoutingContext> {
/**
* 属性上报路径
*/
public static final String PROPERTY_PATH = "/sys/:productKey/:deviceName/thing/event/property/post";
/**
* 事件上报路径
*/
public static final String EVENT_PATH = "/sys/:productKey/:deviceName/thing/event/:identifier/post";
private static final String PROPERTY_METHOD = "thing.event.property.post";
private static final String EVENT_METHOD_PREFIX = "thing.event.";
private static final String EVENT_METHOD_SUFFIX = ".post";
private final IotDeviceUpstreamApi deviceUpstreamApi;
// private final HttpScriptService scriptService;
public IotDeviceUpstreamVertxHandler(IotDeviceUpstreamApi deviceUpstreamApi,
ApplicationContext applicationContext) {
this.deviceUpstreamApi = deviceUpstreamApi;
// this.scriptService = applicationContext.getBean(HttpScriptService.class);
}
@Override
public void handle(RoutingContext routingContext) {
String path = routingContext.request().path();
String requestId = IdUtil.fastSimpleUUID();
try {
// 1. 解析通用参数
String productKey = routingContext.pathParam("productKey");
String deviceName = routingContext.pathParam("deviceName");
JsonObject body = routingContext.body().asJsonObject();
requestId = ObjUtil.defaultIfBlank(body.getString("id"), requestId);
// 2. 根据路径模式处理不同类型的请求
CommonResult<Boolean> result;
String method;
if (path.matches(".*/thing/event/property/post")) {
// 处理属性上报
IotDevicePropertyReportReqDTO reportReqDTO = parsePropertyReportRequest(productKey, deviceName,
requestId, body);
// 设备上线
updateDeviceState(reportReqDTO.getProductKey(), reportReqDTO.getDeviceName());
// 属性上报
result = deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
method = PROPERTY_METHOD;
} else if (path.matches(".*/thing/event/.+/post")) {
// 处理事件上报
String identifier = routingContext.pathParam("identifier");
IotDeviceEventReportReqDTO reportReqDTO = parseEventReportRequest(productKey, deviceName, identifier,
requestId, body);
// 设备上线
updateDeviceState(reportReqDTO.getProductKey(), reportReqDTO.getDeviceName());
// 事件上报
result = deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
method = EVENT_METHOD_PREFIX + identifier + EVENT_METHOD_SUFFIX;
} else {
// 不支持的请求路径
IotStandardResponse errorResponse = IotStandardResponse.error(requestId, "unknown",
BAD_REQUEST.getCode(), "不支持的请求路径");
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
return;
}
// 3. 返回标准响应
IotStandardResponse response;
if (result.isSuccess()) {
response = IotStandardResponse.success(requestId, method, result.getData());
} else {
response = IotStandardResponse.error(requestId, method, result.getCode(), result.getMsg());
}
IotPluginCommonUtils.writeJsonResponse(routingContext, response);
} catch (Exception e) {
log.error("[handle][处理上行请求异常] path={}", path, e);
String method = path.contains("/property/") ? PROPERTY_METHOD
: EVENT_METHOD_PREFIX + (routingContext.pathParams().containsKey("identifier")
? routingContext.pathParam("identifier")
: "unknown") + EVENT_METHOD_SUFFIX;
IotStandardResponse errorResponse = IotStandardResponse.error(requestId, method,
INTERNAL_SERVER_ERROR.getCode(), INTERNAL_SERVER_ERROR.getMsg());
IotPluginCommonUtils.writeJsonResponse(routingContext, errorResponse);
}
}
/**
* 更新设备状态
*
* @param productKey 产品 Key
* @param deviceName 设备名称
*/
private void updateDeviceState(String productKey, String deviceName) {
deviceUpstreamApi.updateDeviceState(((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO()
.setRequestId(IdUtil.fastSimpleUUID()).setProcessId(IotPluginCommonUtils.getProcessId())
.setReportTime(LocalDateTime.now())
.setProductKey(productKey).setDeviceName(deviceName)).setState(IotDeviceStateEnum.ONLINE.getState()));
}
/**
* 解析属性上报请求
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param requestId 请求 ID
* @param body 请求体
* @return 属性上报请求 DTO
*/
private IotDevicePropertyReportReqDTO parsePropertyReportRequest(String productKey, String deviceName,
String requestId, JsonObject body) {
// 使用脚本解析数据
// Map<String, Object> properties = scriptService.parsePropertyData(productKey, deviceName, body);
// 如果脚本解析结果为空使用默认解析逻辑
// TODO @芋艿注释说明一下为什么要这么处理
// if (CollUtil.isNotEmpty(properties)) {
Map<String, Object> properties = new HashMap<>();
Map<String, Object> params = body.getJsonObject("params") != null ?
body.getJsonObject("params").getMap() : null;
if (params != null) {
// 将标准格式的 params 转换为平台需要的 properties 格式
for (Map.Entry<String, Object> entry : params.entrySet()) {
String key = entry.getKey();
Object valueObj = entry.getValue();
// 如果是复杂结构包含 value time
if (valueObj instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> valueMap = (Map<String, Object>) valueObj;
properties.put(key, valueMap.getOrDefault("value", valueObj));
} else {
properties.put(key, valueObj);
}
}
}
// }
// 构建属性上报请求 DTO
return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO().setRequestId(requestId)
.setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
.setProductKey(productKey).setDeviceName(deviceName)).setProperties(properties);
}
/**
* 解析事件上报请求
*
* @param productKey 产品K ey
* @param deviceName 设备名称
* @param identifier 事件标识符
* @param requestId 请求 ID
* @param body 请求体
* @return 事件上报请求 DTO
*/
private IotDeviceEventReportReqDTO parseEventReportRequest(String productKey, String deviceName, String identifier,
String requestId, JsonObject body) {
// 使用脚本解析事件数据
// Map<String, Object> params = scriptService.parseEventData(productKey, deviceName, identifier, body);
Map<String, Object> params = null;
// 如果脚本解析结果为空使用默认解析逻辑
// if (CollUtil.isNotEmpty(params)) {
if (body.containsKey("params")) {
params = body.getJsonObject("params").getMap();
} else {
// 兼容旧格式
params = new HashMap<>();
}
// }
// 构建事件上报请求 DTO
return ((IotDeviceEventReportReqDTO) new IotDeviceEventReportReqDTO().setRequestId(requestId)
.setProcessId(IotPluginCommonUtils.getProcessId()).setReportTime(LocalDateTime.now())
.setProductKey(productKey).setDeviceName(deviceName)).setIdentifier(identifier).setParams(params);
}
}

View File

@ -0,0 +1 @@
cn.iocoder.yudao.module.iot.component.http.config.IotComponentHttpAutoConfiguration

View File

@ -0,0 +1,10 @@
# HTTP组件默认配置
yudao:
iot:
component:
core:
plugin-key: http # 插件的唯一标识
# http:
# enabled: true # 是否启用HTTP组件默认启用
# server-port: 8092