diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/IotDeviceUpstreamApi.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/IotDeviceUpstreamApi.java index 9bc20f7c70..0dde58a5be 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/IotDeviceUpstreamApi.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/IotDeviceUpstreamApi.java @@ -18,59 +18,6 @@ public interface IotDeviceUpstreamApi { String PREFIX = ApiConstants.PREFIX + "/device/upstream"; - // ========== 设备相关 ========== - - /** - * 更新设备状态 - * - * @param updateReqDTO 更新设备状态 DTO - */ - @PostMapping(PREFIX + "/update-state") - CommonResult updateDeviceState(@Valid @RequestBody IotDeviceStateUpdateReqDTO updateReqDTO); - - /** - * 上报设备属性数据 - * - * @param reportReqDTO 上报设备属性数据 DTO - */ - @PostMapping(PREFIX + "/report-property") - CommonResult reportDeviceProperty(@Valid @RequestBody IotDevicePropertyReportReqDTO reportReqDTO); - - /** - * 上报设备事件数据 - * - * @param reportReqDTO 设备事件 - */ - @PostMapping(PREFIX + "/report-event") - CommonResult reportDeviceEvent(@Valid @RequestBody IotDeviceEventReportReqDTO reportReqDTO); - - // TODO @芋艿:这个需要 plugins 接入下 - /** - * 注册设备 - * - * @param registerReqDTO 注册设备 DTO - */ - @PostMapping(PREFIX + "/register") - CommonResult registerDevice(@Valid @RequestBody IotDeviceRegisterReqDTO registerReqDTO); - - // TODO @芋艿:这个需要 plugins 接入下 - /** - * 注册子设备 - * - * @param registerReqDTO 注册子设备 DTO - */ - @PostMapping(PREFIX + "/register-sub") - CommonResult registerSubDevice(@Valid @RequestBody IotDeviceRegisterSubReqDTO registerReqDTO); - - // TODO @芋艿:这个需要 plugins 接入下 - /** - * 注册设备拓扑 - * - * @param addReqDTO 注册设备拓扑 DTO - */ - @PostMapping(PREFIX + "/add-topology") - CommonResult addDeviceTopology(@Valid @RequestBody IotDeviceTopologyAddReqDTO addReqDTO); - // TODO @芋艿:考虑 http 认证 /** * 认证 Emqx 连接 diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/dto/control/upstream/IotDeviceStateUpdateReqDTO.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/dto/control/upstream/IotDeviceStateUpdateReqDTO.java deleted file mode 100644 index 38c479a57b..0000000000 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/device/dto/control/upstream/IotDeviceStateUpdateReqDTO.java +++ /dev/null @@ -1,23 +0,0 @@ -package cn.iocoder.yudao.module.iot.api.device.dto.control.upstream; - -import cn.iocoder.yudao.framework.common.validation.InEnum; -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum; -import jakarta.validation.constraints.NotNull; -import lombok.Data; - -/** - * IoT 设备【状态】更新 Request DTO - * - * @author 芋道源码 - */ -@Data -public class IotDeviceStateUpdateReqDTO extends IotDeviceUpstreamAbstractReqDTO { - - /** - * 设备状态 - */ - @NotNull(message = "设备状态不能为空") - @InEnum(IotDeviceStateEnum.class) // 只使用:在线、离线 - private Integer state; - -} diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java index 7247b0cb3c..e51c24b6ff 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java @@ -30,7 +30,7 @@ public interface ErrorCodeConstants { ErrorCode DEVICE_GATEWAY_NOT_EXISTS = new ErrorCode(1_050_003_004, "网关设备不存在"); ErrorCode DEVICE_NOT_GATEWAY = new ErrorCode(1_050_003_005, "设备不是网关设备"); ErrorCode DEVICE_IMPORT_LIST_IS_EMPTY = new ErrorCode(1_050_003_006, "导入设备数据不能为空!"); - ErrorCode DEVICE_DOWNSTREAM_FAILED = new ErrorCode(1_050_003_007, "执行失败,原因:{}"); + ErrorCode DEVICE_DOWNSTREAM_FAILED_SERVER_ID_NULL = new ErrorCode(1_050_003_007, "下行设备消息失败,原因:设备未连接网关"); // ========== 产品分类 1-050-004-000 ========== ErrorCode PRODUCT_CATEGORY_NOT_EXISTS = new ErrorCode(1_050_004_000, "产品分类不存在"); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceUpstreamApiImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceUpstreamApiImpl.java index ca9641563e..31c4b69ae1 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceUpstreamApiImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/api/device/IoTDeviceUpstreamApiImpl.java @@ -21,44 +21,6 @@ public class IoTDeviceUpstreamApiImpl implements IotDeviceUpstreamApi { @Resource private IotDeviceUpstreamService deviceUpstreamService; - // ========== 设备相关 ========== - - @Override - public CommonResult updateDeviceState(IotDeviceStateUpdateReqDTO updateReqDTO) { - deviceUpstreamService.updateDeviceState(updateReqDTO); - return success(true); - } - - @Override - public CommonResult reportDeviceProperty(IotDevicePropertyReportReqDTO reportReqDTO) { - deviceUpstreamService.reportDeviceProperty(reportReqDTO); - return success(true); - } - - @Override - public CommonResult reportDeviceEvent(IotDeviceEventReportReqDTO reportReqDTO) { - deviceUpstreamService.reportDeviceEvent(reportReqDTO); - return success(true); - } - - @Override - public CommonResult registerDevice(IotDeviceRegisterReqDTO registerReqDTO) { - deviceUpstreamService.registerDevice(registerReqDTO); - return success(true); - } - - @Override - public CommonResult registerSubDevice(IotDeviceRegisterSubReqDTO registerReqDTO) { - deviceUpstreamService.registerSubDevice(registerReqDTO); - return success(true); - } - - @Override - public CommonResult addDeviceTopology(IotDeviceTopologyAddReqDTO addReqDTO) { - deviceUpstreamService.addDeviceTopology(addReqDTO); - return success(true); - } - @Override public CommonResult authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) { boolean result = deviceUpstreamService.authenticateEmqxConnection(authReqDTO); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/device/IotDevicePageReqVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/device/IotDevicePageReqVO.java index 6862677328..f7d515df96 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/device/IotDevicePageReqVO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/device/vo/device/IotDevicePageReqVO.java @@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.controller.admin.device.vo.device; import cn.iocoder.yudao.framework.common.pojo.PageParam; import cn.iocoder.yudao.framework.common.validation.InEnum; -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum; import cn.iocoder.yudao.module.iot.enums.product.IotProductDeviceTypeEnum; import io.swagger.v3.oas.annotations.media.Schema; import lombok.Data; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/statistics/IotStatisticsController.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/statistics/IotStatisticsController.java index a9c195656c..0086c22943 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/statistics/IotStatisticsController.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/statistics/IotStatisticsController.java @@ -4,7 +4,7 @@ import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageSummaryRespVO; import cn.iocoder.yudao.module.iot.controller.admin.statistics.vo.IotStatisticsReqVO; import cn.iocoder.yudao.module.iot.controller.admin.statistics.vo.IotStatisticsSummaryRespVO; -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum; import cn.iocoder.yudao.module.iot.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.service.device.data.IotDeviceLogService; import cn.iocoder.yudao.module.iot.service.product.IotProductCategoryService; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/device/IotDeviceDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/device/IotDeviceDO.java index 714775ce22..3dd2dd8eb6 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/device/IotDeviceDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/device/IotDeviceDO.java @@ -4,7 +4,7 @@ import cn.iocoder.yudao.framework.mybatis.core.type.LongSetTypeHandler; import cn.iocoder.yudao.framework.tenant.core.db.TenantBaseDO; import cn.iocoder.yudao.module.iot.dal.dataobject.ota.IotOtaFirmwareDO; import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO; -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum; import com.baomidou.mybatisplus.annotation.KeySequence; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotAlertRecordDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotAlertRecordDO.java index 840111078c..d6e002e6a7 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotAlertRecordDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotAlertRecordDO.java @@ -1,14 +1,17 @@ package cn.iocoder.yudao.module.iot.dal.dataobject.rule; import cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import com.baomidou.mybatisplus.annotation.KeySequence; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler; -import lombok.*; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; /** * IoT 告警记录 DO diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotRuleSceneDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotRuleSceneDO.java index b51ea844d9..f8f3382930 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotRuleSceneDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotRuleSceneDO.java @@ -145,7 +145,7 @@ public class IotRuleSceneDO extends TenantBaseDO { public static class TriggerConditionParameter { // TODO @芋艿: identifier0 存事件和服务的 identifier 属性的情况 identifier0 就为 null 解决前端回显问题 - // TODO @haohao:可以根据 TriggerCondition.type 判断,是服务、还是事件、还是属性么? + // TODO @puhui999:可以根据 TriggerCondition.type 判断,是服务、还是事件、还是属性么? /** * 标识符(事件、服务) * diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java index d31096b118..52c68c1ec0 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/RedisKeyConstants.java @@ -19,11 +19,10 @@ public interface RedisKeyConstants { */ String DEVICE_PROPERTY = "iot:device_property:%s"; - // TODO @芋艿:弱化 deviceKey;使用 product_key + device_name 替代 /** * 设备的最后上报时间,采用 ZSET 结构 * - * KEY 格式:{deviceKey} + * KEY 格式:{productKey},${deviceName} * SCORE:上报时间 */ String DEVICE_REPORT_TIMES = "iot:device_report_times"; @@ -44,4 +43,13 @@ public interface RedisKeyConstants { */ String THING_MODEL_LIST = "iot:thing_model_list"; + /** + * 设备关联的网关 serverId 缓存,采用 HASH 结构 + * + * KEY 格式:device_server_id + * HASH KEY:{productKey},{deviceName} + * VALUE 数据类型:String serverId + */ + String DEVICE_SERVER_ID = "iot:device_server_id"; + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/device/DeviceReportTimeRedisDAO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/device/DeviceReportTimeRedisDAO.java index d84af7543e..27089283f2 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/device/DeviceReportTimeRedisDAO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/device/DeviceReportTimeRedisDAO.java @@ -1,6 +1,8 @@ package cn.iocoder.yudao.module.iot.dal.redis.device; import cn.hutool.core.date.LocalDateTimeUtil; +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.framework.common.util.collection.CollectionUtils; import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants; import jakarta.annotation.Resource; import org.springframework.data.redis.core.StringRedisTemplate; @@ -20,14 +22,17 @@ public class DeviceReportTimeRedisDAO { @Resource private StringRedisTemplate stringRedisTemplate; - public void update(String deviceKey, LocalDateTime reportTime) { - stringRedisTemplate.opsForZSet().add(RedisKeyConstants.DEVICE_REPORT_TIMES, deviceKey, + public void update(String productKey, String deviceName, LocalDateTime reportTime) { + String value = productKey + StrUtil.COMMA + deviceName; // 使用 , 分隔 + stringRedisTemplate.opsForZSet().add(RedisKeyConstants.DEVICE_REPORT_TIMES, value, LocalDateTimeUtil.toEpochMilli(reportTime)); } - public Set range(LocalDateTime maxReportTime) { - return stringRedisTemplate.opsForZSet().rangeByScore(RedisKeyConstants.DEVICE_REPORT_TIMES, 0, + public Set range(LocalDateTime maxReportTime) { + Set values = stringRedisTemplate.opsForZSet().rangeByScore(RedisKeyConstants.DEVICE_REPORT_TIMES, 0, LocalDateTimeUtil.toEpochMilli(maxReportTime)); + return CollectionUtils.convertSet(values, + value -> value.split(StrUtil.COMMA)); // 使用, 分隔 } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/device/DeviceServerIdRedisDAO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/device/DeviceServerIdRedisDAO.java new file mode 100644 index 0000000000..7bd8d03bb0 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/redis/device/DeviceServerIdRedisDAO.java @@ -0,0 +1,67 @@ +package cn.iocoder.yudao.module.iot.dal.redis.device; + +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants; +import jakarta.annotation.Resource; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Repository; + +/** + * 设备关联的网关 serverId 的 Redis DAO + * + * @author 芋道源码 + */ +@Repository +public class DeviceServerIdRedisDAO { + + @Resource + private StringRedisTemplate stringRedisTemplate; + + /** + * 更新设备关联的网关 serverId + * + * @param productKey 产品标识 + * @param deviceName 设备名称 + * @param serverId 网关 serverId + */ + public void update(String productKey, String deviceName, String serverId) { + String hashKey = buildHashKey(productKey, deviceName); + stringRedisTemplate.opsForHash().put(RedisKeyConstants.DEVICE_SERVER_ID, hashKey, serverId); + } + + /** + * 获得设备关联的网关 serverId + * + * @param productKey 产品标识 + * @param deviceName 设备名称 + * @return 网关 serverId + */ + public String get(String productKey, String deviceName) { + String hashKey = buildHashKey(productKey, deviceName); + Object value = stringRedisTemplate.opsForHash().get(RedisKeyConstants.DEVICE_SERVER_ID, hashKey); + return value != null ? (String) value : null; + } + + /** + * 删除设备关联的网关 serverId + * + * @param productKey 产品标识 + * @param deviceName 设备名称 + */ + public void delete(String productKey, String deviceName) { + String hashKey = buildHashKey(productKey, deviceName); + stringRedisTemplate.opsForHash().delete(RedisKeyConstants.DEVICE_SERVER_ID, hashKey); + } + + /** + * 构建 HASH KEY + * + * @param productKey 产品标识 + * @param deviceName 设备名称 + * @return HASH KEY + */ + private String buildHashKey(String productKey, String deviceName) { + return productKey + StrUtil.COMMA + deviceName; + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/device/IotDeviceOfflineCheckJob.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/device/IotDeviceOfflineCheckJob.java index 4e9e9ecff5..8451b6670e 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/device/IotDeviceOfflineCheckJob.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/device/IotDeviceOfflineCheckJob.java @@ -1,16 +1,16 @@ package cn.iocoder.yudao.module.iot.job.device; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.framework.quartz.core.handler.JobHandler; import cn.iocoder.yudao.framework.tenant.core.job.TenantJob; -import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum; import cn.iocoder.yudao.module.iot.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.service.device.data.IotDevicePropertyService; -import cn.iocoder.yudao.module.iot.service.device.control.IotDeviceUpstreamService; import jakarta.annotation.Resource; import org.springframework.stereotype.Component; @@ -20,10 +20,12 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet; + /** * IoT 设备离线检查 Job * - * 检测逻辑:设备最后一条 {@link cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage} 消息超过一定时间,则认为设备离线 + * 检测逻辑:设备最后一条 {@link IotDeviceMessage} 消息超过一定时间,则认为设备离线 * * @author 芋道源码 */ @@ -41,8 +43,9 @@ public class IotDeviceOfflineCheckJob implements JobHandler { private IotDeviceService deviceService; @Resource private IotDevicePropertyService devicePropertyService; + @Resource - private IotDeviceUpstreamService deviceUpstreamService; + private IotDeviceMessageProducer deviceMessageProducer; @Override @TenantJob @@ -52,22 +55,22 @@ public class IotDeviceOfflineCheckJob implements JobHandler { if (CollUtil.isEmpty(devices)) { return JsonUtils.toJsonString(Collections.emptyList()); } - // 1.2 获取超时的 deviceKey 集合 - Set timeoutDeviceKeys = devicePropertyService.getDeviceKeysByReportTime( + // 1.2 获取超时的设备集合 + Set timeoutDevices = devicePropertyService.getProductKeyDeviceNameListByReportTime( LocalDateTime.now().minus(OFFLINE_TIMEOUT)); + Set timeoutDevices2 = convertSet(timeoutDevices, item -> item[0] + StrUtil.COMMA + item[1]); // 2. 下线设备 - List offlineDeviceKeys = CollUtil.newArrayList(); + List offlineDeviceKeys = CollUtil.newArrayList(); for (IotDeviceDO device : devices) { - if (!timeoutDeviceKeys.contains(device.getDeviceKey())) { + String timeoutDeviceKey = device.getProductKey() + StrUtil.COMMA + device.getDeviceName(); + if (!timeoutDevices2.contains(timeoutDeviceKey)) { continue; } - offlineDeviceKeys.add(device.getDeviceKey()); + offlineDeviceKeys.add(new String[]{device.getProductKey(), device.getDeviceName()}); // 为什么不直接更新状态呢?因为通过 IotDeviceMessage 可以经过一系列的处理,例如说记录日志等等 - deviceUpstreamService.updateDeviceState(((IotDeviceStateUpdateReqDTO) - new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID()).setReportTime(LocalDateTime.now()) - .setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName())) - .setState((IotDeviceStateEnum.OFFLINE.getState()))); + deviceMessageProducer.sendDeviceMessage(IotDeviceMessage.of(device.getProductKey(), device.getDeviceName()) + .ofStateOffline()); } return JsonUtils.toJsonString(offlineDeviceKeys); } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceLogMessageSubscriber.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceLogMessageSubscriber.java index ff8c220c23..37e7e698a2 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceLogMessageSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceLogMessageSubscriber.java @@ -10,7 +10,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** - * 针对 {@link IotDeviceMessage} 的消费者,记录设备日志 + * 针对 {@link IotDeviceMessage} 的消费者:记录设备日志 * * @author 芋道源码 */ diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceOnlineMessageConsumer.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceOnlineMessageConsumer.java deleted file mode 100644 index f0e49bd475..0000000000 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceOnlineMessageConsumer.java +++ /dev/null @@ -1,85 +0,0 @@ -package cn.iocoder.yudao.module.iot.mq.consumer.device; - -import cn.hutool.core.util.IdUtil; -import cn.hutool.core.util.StrUtil; -import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO; -import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum; -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageTypeEnum; -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.service.device.IotDeviceService; -import cn.iocoder.yudao.module.iot.service.device.control.IotDeviceUpstreamService; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.context.event.EventListener; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Component; - -import java.time.LocalDateTime; -import java.util.Objects; - -/** - * 针对 {@link IotDeviceMessage} 的消费者,将离线的设备,自动标记为上线 - * - * 注意:只有设备上行消息,才会触发该逻辑 - * - * @author 芋道源码 - */ -@Component -@Slf4j -public class IotDeviceOnlineMessageConsumer { - - @Resource - private IotDeviceService deviceService; - - @Resource - private IotDeviceUpstreamService deviceUpstreamService; - - @EventListener - @Async - public void onMessage(IotDeviceMessage message) { - // 1.1 只处理上行消息。因为,只有设备上行的消息,才会触发设备上线的逻辑 - if (!isUpstreamMessage(message)) { - return; - } - // 1.2 如果设备已在线,则不做处理 - log.info("[onMessage][消息内容({})]", message); - IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceNameFromCache( - message.getProductKey(), message.getDeviceName()); - if (device == null) { - log.error("[onMessage][消息({}) 对应的设备部存在]", message); - return; - } - if (IotDeviceStateEnum.isOnline(device.getState())) { - return; - } - - // 2. 标记设备为在线 - // 为什么不直接更新状态呢?因为通过 IotDeviceMessage 可以经过一系列的处理,例如说记录日志等等 - deviceUpstreamService.updateDeviceState(((IotDeviceStateUpdateReqDTO) - new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID()).setReportTime(LocalDateTime.now()) - .setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName())) - .setState((IotDeviceStateEnum.ONLINE.getState()))); - } - - private boolean isUpstreamMessage(IotDeviceMessage message) { - // 设备属性 - if (Objects.equals(message.getType(), IotDeviceMessageTypeEnum.PROPERTY.getType()) - && Objects.equals(message.getIdentifier(), IotDeviceMessageIdentifierEnum.PROPERTY_REPORT.getIdentifier())) { - return true; - } - // 设备事件 - if (Objects.equals(message.getType(), IotDeviceMessageTypeEnum.EVENT.getType())) { - return true; - } - // 设备服务 - // noinspection RedundantIfStatement - if (Objects.equals(message.getType(), IotDeviceMessageTypeEnum.SERVICE.getType()) - && !StrUtil.endWith(message.getIdentifier(), IotDeviceMessageIdentifierEnum.SERVICE_REPLY_SUFFIX.getIdentifier())) { - return true; - } - return false; - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDevicePropertyMessageConsumer.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDevicePropertyMessageConsumer.java index bf9cc5332c..1a05233ee7 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDevicePropertyMessageConsumer.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDevicePropertyMessageConsumer.java @@ -1,9 +1,9 @@ package cn.iocoder.yudao.module.iot.mq.consumer.device; import cn.hutool.core.util.ObjectUtil; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum; import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageTypeEnum; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.service.device.data.IotDevicePropertyService; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceStateMessageSubscriber.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceStateMessageSubscriber.java new file mode 100644 index 0000000000..b355b985d6 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceStateMessageSubscriber.java @@ -0,0 +1,106 @@ +package cn.iocoder.yudao.module.iot.mq.consumer.device; + +import cn.hutool.core.util.ObjectUtil; +import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageIdentifierEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageTypeEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; +import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils; +import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; +import cn.iocoder.yudao.module.iot.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.service.device.data.IotDevicePropertyService; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.Objects; + +/** + * 针对 {@link IotDeviceMessage} 的消费者:记录设备状态 + * + * 特殊:如果是离线的设备,将自动上线 + * + * @author 芋道源码 + */ +@Component +@Slf4j +public class IotDeviceStateMessageSubscriber implements IotMessageSubscriber { + + @Resource + private IotDeviceService deviceService; + @Resource + private IotDevicePropertyService devicePropertyService; + + @Resource + private IotMessageBus messageBus; + @Resource + private IotDeviceMessageProducer deviceMessageProducer; + + @PostConstruct + public void init() { + messageBus.register(this); + } + + @Override + public String getTopic() { + return IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC; + } + + @Override + public String getGroup() { + return "iot_device_state_consumer"; + } + + @Override + public void onMessage(IotDeviceMessage message) { + // 1.1 只处理上行消息,或者是 STATE 相关的消息 + if (!IotDeviceMessageUtils.isUpstreamMessage(message) + && ObjectUtil.notEqual(message.getType(), IotDeviceMessageTypeEnum.STATE.getType())) { + return; + } + // 1.2 校验设备是否存在 + IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceNameFromCache( + message.getProductKey(), message.getDeviceName()); + if (device == null) { + log.error("[onMessage][消息({}) 对应的设备部存在]", message); + return; + } + + // 2. 处理消息 + TenantUtils.execute(device.getTenantId(), () -> onMessage(message, device)); + } + + private void onMessage(IotDeviceMessage message, IotDeviceDO device) { + // 更新设备的最后时间 + devicePropertyService.updateDeviceReportTime(device.getProductKey(), device.getDeviceName(), LocalDateTime.now()); + + // 情况一:STATE 相关的消息 + if (Objects.equals(message.getType(), IotDeviceMessageTypeEnum.STATE.getType())) { + if (Objects.equals(message.getIdentifier(), IotDeviceMessageIdentifierEnum.STATE_ONLINE.getIdentifier())) { + deviceService.updateDeviceState(device.getId(), IotDeviceStateEnum.ONLINE.getState()); + devicePropertyService.updateDeviceServerId(device.getProductKey(), device.getDeviceName(), message.getServerId()); + } else { + deviceService.updateDeviceState(device.getId(), IotDeviceStateEnum.OFFLINE.getState()); + devicePropertyService.deleteDeviceServerId(device.getProductKey(), device.getDeviceName()); + } + // TODO 芋艿:子设备的关联 + return; + } + + // 情况二:非 STATE 相关的消息 + devicePropertyService.updateDeviceServerId(device.getProductKey(), device.getDeviceName(), message.getServerId()); + // 特殊:设备非在线时,主动标记设备为在线 + // 为什么不直接更新状态呢?因为通过 IotDeviceMessage 可以经过一系列的处理,例如说记录日志等等 + if (ObjectUtil.notEqual(device.getState(), IotDeviceStateEnum.ONLINE.getState())) { + deviceMessageProducer.sendDeviceMessage(IotDeviceMessage.of(message.getProductKey(), message.getDeviceName()) + .ofStateOnline()); + } + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/rule/IotRuleSceneMessageHandler.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/rule/IotRuleSceneMessageHandler.java index e6ea3e22d0..e29730bcf0 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/rule/IotRuleSceneMessageHandler.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/rule/IotRuleSceneMessageHandler.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.module.iot.mq.consumer.rule; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.service.rule.IotRuleSceneService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/message/IotDeviceMessage.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/message/IotDeviceMessage.java deleted file mode 100644 index 84ae5ead80..0000000000 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/message/IotDeviceMessage.java +++ /dev/null @@ -1,77 +0,0 @@ -package cn.iocoder.yudao.module.iot.mq.message; - -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageTypeEnum; -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum; -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.time.LocalDateTime; - -// TODO @芋艿:参考阿里云的物模型,优化 IoT 上下行消息的设计,尽量保持一致(渐进式,不要一口气)! -/** - * IoT 设备消息 - */ -@Data -@NoArgsConstructor -@AllArgsConstructor -@Builder -@Deprecated -public class IotDeviceMessage { - - /** - * 请求编号 - */ - private String requestId; - - /** - * 设备信息 - */ - private String productKey; - /** - * 设备名称 - */ - private String deviceName; - /** - * 设备标识 - */ - private String deviceKey; - - /** - * 消息类型 - * - * 枚举 {@link IotDeviceMessageTypeEnum} - */ - private String type; - /** - * 标识符 - * - * 枚举 {@link IotDeviceMessageIdentifierEnum} - */ - private String identifier; - - /** - * 请求参数 - * - * 例如说:属性上报的 properties、事件上报的 params - */ - private Object data; - /** - * 响应码 - * - * 目前只有 server 下行消息给 device 设备时,才会有响应码 - */ - private Integer code; - - /** - * 上报时间 - */ - private LocalDateTime reportTime; - - /** - * 租户编号 - */ - private Long tenantId; - -} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/producer/device/IotDeviceProducer.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/producer/device/IotDeviceProducer.java deleted file mode 100644 index 11d5d96beb..0000000000 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/producer/device/IotDeviceProducer.java +++ /dev/null @@ -1,31 +0,0 @@ -package cn.iocoder.yudao.module.iot.mq.producer.device; - -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.context.ApplicationContext; -import org.springframework.stereotype.Component; - -/** - * IoT 设备相关消息的 Producer - * - * @author alwayssuper - * @since 2024/12/17 16:35 - */ -@Slf4j -@Component -public class IotDeviceProducer { - - @Resource - private ApplicationContext applicationContext; - - /** - * 发送 {@link IotDeviceMessage} 消息 - * - * @param thingModelMessage 物模型消息 - */ - public void sendDeviceMessage(IotDeviceMessage thingModelMessage) { - applicationContext.publishEvent(thingModelMessage); - } - -} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/producer/package-info.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/producer/package-info.java index 37d0ba016d..a52025050d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/producer/package-info.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/producer/package-info.java @@ -1,4 +1,4 @@ /** - * TODO 芋艿:临时占位 + * 消息队列的生产者 */ -package cn.iocoder.yudao.module.iot.mq.producer; \ No newline at end of file +package cn.iocoder.yudao.module.iot.mq.producer; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceService.java index 561fa1fd28..f722e8e033 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceService.java @@ -3,7 +3,7 @@ package cn.iocoder.yudao.module.iot.service.device; import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.device.*; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum; import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceServiceImpl.java index 03073ad496..e29ec59355 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceServiceImpl.java @@ -17,7 +17,7 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceGroupDO; import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO; import cn.iocoder.yudao.module.iot.dal.mysql.device.IotDeviceMapper; import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants; -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum; import cn.iocoder.yudao.module.iot.enums.product.IotProductDeviceTypeEnum; import cn.iocoder.yudao.module.iot.service.product.IotProductService; import cn.iocoder.yudao.module.iot.util.MqttSignUtils; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceDownstreamService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceDownstreamService.java index f09604dea2..b4d49587e6 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceDownstreamService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceDownstreamService.java @@ -1,13 +1,13 @@ package cn.iocoder.yudao.module.iot.service.device.control; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceDownstreamReqVO; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import jakarta.validation.Valid; /** * IoT 设备下行 Service 接口 * - * 目的:服务端 -> 插件 -> 设备 + * 目的:服务端 -> 网关 -> 设备 * * @author 芋道源码 */ @@ -17,7 +17,7 @@ public interface IotDeviceDownstreamService { * 设备下行,可用于设备模拟 * * @param downstreamReqVO 设备下行请求 VO - * @return 下发消息 + * @return 下行消息 */ IotDeviceMessage downstreamDevice(@Valid IotDeviceDownstreamReqVO downstreamReqVO); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceDownstreamServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceDownstreamServiceImpl.java index ba1112fb86..fd59f4b5c3 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceDownstreamServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceDownstreamServiceImpl.java @@ -1,34 +1,26 @@ package cn.iocoder.yudao.module.iot.service.device.control; -import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.framework.common.exception.ServiceException; -import cn.iocoder.yudao.framework.common.pojo.CommonResult; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDeviceConfigSetReqDTO; -import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDeviceOtaUpgradeReqDTO; -import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDevicePropertyGetReqDTO; -import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDeviceServiceInvokeReqDTO; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceDownstreamReqVO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum; import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageTypeEnum; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.mq.producer.device.IotDeviceProducer; import cn.iocoder.yudao.module.iot.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.service.device.data.IotDevicePropertyService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; -import java.time.LocalDateTime; -import java.util.List; import java.util.Map; import java.util.Objects; import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST; import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; -import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DEVICE_DOWNSTREAM_FAILED; +import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DEVICE_DOWNSTREAM_FAILED_SERVER_ID_NULL; /** * IoT 设备下行 Service 实现类 @@ -42,255 +34,234 @@ public class IotDeviceDownstreamServiceImpl implements IotDeviceDownstreamServic @Resource private IotDeviceService deviceService; - @Resource - private IotDeviceProducer deviceProducer; + private IotDevicePropertyService devicePropertyService; + @Resource private IotDeviceMessageProducer deviceMessageProducer; @Override public IotDeviceMessage downstreamDevice(IotDeviceDownstreamReqVO downstreamReqVO) { - // 校验设备是否存在 + // 1. 校验设备是否存在 IotDeviceDO device = deviceService.validateDeviceExists(downstreamReqVO.getId()); - // TODO @芋艿:离线设备,不允许推送 // TODO 芋艿:父设备的处理 IotDeviceDO parentDevice = null; + // 2. 构建消息 + IotDeviceMessage message = buildDownstreamDeviceMessage(downstreamReqVO, device, parentDevice); + + // 3.1 发送给网关 + String serverId = devicePropertyService.getDeviceServerId(message.getProductKey(), message.getDeviceName()); + if (StrUtil.isEmpty(serverId)) { + throw exception(DEVICE_DOWNSTREAM_FAILED_SERVER_ID_NULL); + } + deviceMessageProducer.sendGatewayDeviceMessage(serverId, message); + + // 3.2 发送给服务器(用于设备日志等的记录) + deviceMessageProducer.sendDeviceMessage(message); + return message; + } + + @SuppressWarnings("unchecked") + private IotDeviceMessage buildDownstreamDeviceMessage(IotDeviceDownstreamReqVO downstreamReqVO, + IotDeviceDO device, IotDeviceDO parentDevice) { + IotDeviceMessage message = IotDeviceMessage.of(getProductKey(device, parentDevice), + getDeviceName(device, parentDevice)); // 服务调用 if (Objects.equals(downstreamReqVO.getType(), IotDeviceMessageTypeEnum.SERVICE.getType())) { - return invokeDeviceService(downstreamReqVO, device, parentDevice); + // TODO @芋艿:待实现 +// return invokeDeviceService(downstreamReqVO, device, parentDevice); } // 属性相关 if (Objects.equals(downstreamReqVO.getType(), IotDeviceMessageTypeEnum.PROPERTY.getType())) { // 属性设置 if (Objects.equals(downstreamReqVO.getIdentifier(), IotDeviceMessageIdentifierEnum.PROPERTY_SET.getIdentifier())) { - return setDeviceProperty(downstreamReqVO, device, parentDevice); + if (!(downstreamReqVO.getData() instanceof Map)) { + throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 Map 类型"); + } + return message.ofPropertySet((Map) downstreamReqVO.getData()); } - // 属性设置 + // 属性获取 if (Objects.equals(downstreamReqVO.getIdentifier(), IotDeviceMessageIdentifierEnum.PROPERTY_GET.getIdentifier())) { - return getDeviceProperty(downstreamReqVO, device, parentDevice); + // TODO @芋艿:待实现 +// return getDeviceProperty(downstreamReqVO, device, parentDevice); } } // 配置下发 if (Objects.equals(downstreamReqVO.getType(), IotDeviceMessageTypeEnum.CONFIG.getType()) && Objects.equals(downstreamReqVO.getIdentifier(), - IotDeviceMessageIdentifierEnum.CONFIG_SET.getIdentifier())) { - return setDeviceConfig(downstreamReqVO, device, parentDevice); + IotDeviceMessageIdentifierEnum.CONFIG_SET.getIdentifier())) { + // TODO @芋艿:待实现 +// return setDeviceConfig(downstreamReqVO, device, parentDevice); } // OTA 升级 if (Objects.equals(downstreamReqVO.getType(), IotDeviceMessageTypeEnum.OTA.getType())) { - return otaUpgrade(downstreamReqVO, device, parentDevice); + // TODO @芋艿:待实现 +// return otaUpgrade(downstreamReqVO, device, parentDevice); } // TODO @芋艿:取消设备的网关的时,要不要下发 REGISTER_UNREGISTER_SUB ? throw new IllegalArgumentException("不支持的下行消息类型:" + downstreamReqVO); } - /** - * 调用设备服务 - * - * @param downstreamReqVO 下行请求 - * @param device 设备 - * @param parentDevice 父设备 - * @return 下发消息 - */ - @SuppressWarnings("unchecked") - private IotDeviceMessage invokeDeviceService(IotDeviceDownstreamReqVO downstreamReqVO, - IotDeviceDO device, IotDeviceDO parentDevice) { - // 1. 参数校验 - if (!(downstreamReqVO.getData() instanceof Map)) { - throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 Map 类型"); - } - // TODO @super:【可优化】过滤掉不合法的服务 +// /** +// * 调用设备服务 +// * +// * @param downstreamReqVO 下行请求 +// * @param device 设备 +// * @param parentDevice 父设备 +// * @return 下发消息 +// */ +// @SuppressWarnings("unchecked") +// private IotDeviceMessage invokeDeviceService(IotDeviceDownstreamReqVO downstreamReqVO, +// IotDeviceDO device, IotDeviceDO parentDevice) { +// // 1. 参数校验 +// if (!(downstreamReqVO.getData() instanceof Map)) { +// throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 Map 类型"); +// } +// // TODO @super:【可优化】过滤掉不合法的服务 +// +// // 2. 发送请求 +// String url = String.format("sys/%s/%s/thing/service/%s", +// getProductKey(device, parentDevice), getDeviceName(device, parentDevice), +// downstreamReqVO.getIdentifier()); +// IotDeviceServiceInvokeReqDTO reqDTO = new IotDeviceServiceInvokeReqDTO() +// .setParams((Map) downstreamReqVO.getData()); +//// CommonResult result = requestPlugin(url, reqDTO, device); +// CommonResult result = null; +// +// // 3. 发送设备消息 +// IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId()) +// .setType(IotDeviceMessageTypeEnum.SERVICE.getType()).setIdentifier(reqDTO.getIdentifier()) +// .setData(reqDTO.getParams()); +// sendDeviceMessage(message, device, result.getCode()); +// +// // 4. 如果不成功,抛出异常,提示用户 +// if (result.isError()) { +// log.error("[invokeDeviceService][设备({})服务调用失败,请求参数:({}),响应结果:({})]", +// device.getDeviceKey(), reqDTO, result); +// throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg()); +// } +// return message; +// } - // 2. 发送请求 - String url = String.format("sys/%s/%s/thing/service/%s", - getProductKey(device, parentDevice), getDeviceName(device, parentDevice), - downstreamReqVO.getIdentifier()); - IotDeviceServiceInvokeReqDTO reqDTO = new IotDeviceServiceInvokeReqDTO() - .setParams((Map) downstreamReqVO.getData()); -// CommonResult result = requestPlugin(url, reqDTO, device); - CommonResult result = null; +// /** +// * 获取设备属性 +// * +// * @param downstreamReqVO 下行请求 +// * @param device 设备 +// * @param parentDevice 父设备 +// * @return 下发消息 +// */ +// @SuppressWarnings("unchecked") +// private IotDeviceMessage getDeviceProperty(IotDeviceDownstreamReqVO downstreamReqVO, +// IotDeviceDO device, IotDeviceDO parentDevice) { +// // 1. 参数校验 +// if (!(downstreamReqVO.getData() instanceof List)) { +// throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 List 类型"); +// } +// // TODO @super:【可优化】过滤掉不合法的属性 +// +// // 2. 发送请求 +// String url = String.format("sys/%s/%s/thing/service/property/get", +// getProductKey(device, parentDevice), getDeviceName(device, parentDevice)); +// IotDevicePropertyGetReqDTO reqDTO = new IotDevicePropertyGetReqDTO() +// .setIdentifiers((List) downstreamReqVO.getData()); +//// CommonResult result = requestPlugin(url, reqDTO, device); +// CommonResult result = null; +// +// // 3. 发送设备消息 +// IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId()) +// .setType(IotDeviceMessageTypeEnum.PROPERTY.getType()) +// .setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_SET.getIdentifier()) +// .setData(reqDTO.getIdentifiers()); +// sendDeviceMessage(message, device, result.getCode()); +// +// // 4. 如果不成功,抛出异常,提示用户 +// if (result.isError()) { +// log.error("[getDeviceProperty][设备({})属性获取失败,请求参数:({}),响应结果:({})]", +// device.getDeviceKey(), reqDTO, result); +// throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg()); +// } +// return message; +// } - // 3. 发送设备消息 - IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId()) - .setType(IotDeviceMessageTypeEnum.SERVICE.getType()).setIdentifier(reqDTO.getIdentifier()) - .setData(reqDTO.getParams()); - sendDeviceMessage(message, device, result.getCode()); +// /** +// * 设置设备配置 +// * +// * @param downstreamReqVO 下行请求 +// * @param device 设备 +// * @param parentDevice 父设备 +// * @return 下发消息 +// */ +// @SuppressWarnings({ "unchecked", "unused" }) +// private IotDeviceMessage setDeviceConfig(IotDeviceDownstreamReqVO downstreamReqVO, +// IotDeviceDO device, IotDeviceDO parentDevice) { +// // 1. 参数转换,无需校验 +// Map config = JsonUtils.parseObject(device.getConfig(), Map.class); +// +// // 2. 发送请求 +// String url = String.format("sys/%s/%s/thing/service/config/set", +// getProductKey(device, parentDevice), getDeviceName(device, parentDevice)); +// IotDeviceConfigSetReqDTO reqDTO = new IotDeviceConfigSetReqDTO() +// .setConfig(config); +//// CommonResult result = requestPlugin(url, reqDTO, device); +// CommonResult result = null; +// +// // 3. 发送设备消息 +// IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId()) +// .setType(IotDeviceMessageTypeEnum.CONFIG.getType()) +// .setIdentifier(IotDeviceMessageIdentifierEnum.CONFIG_SET.getIdentifier()) +// .setData(reqDTO.getConfig()); +// sendDeviceMessage(message, device, result.getCode()); +// +// // 4. 如果不成功,抛出异常,提示用户 +// if (result.isError()) { +// log.error("[setDeviceConfig][设备({})配置下发失败,请求参数:({}),响应结果:({})]", +// device.getDeviceKey(), reqDTO, result); +// throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg()); +// } +// return message; +// } - // 4. 如果不成功,抛出异常,提示用户 - if (result.isError()) { - log.error("[invokeDeviceService][设备({})服务调用失败,请求参数:({}),响应结果:({})]", - device.getDeviceKey(), reqDTO, result); - throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg()); - } - return message; - } - - /** - * 设置设备属性 - * - * @param downstreamReqVO 下行请求 - * @param device 设备 - * @param parentDevice 父设备 - * @return 下发消息 - */ - @SuppressWarnings("unchecked") - private IotDeviceMessage setDeviceProperty(IotDeviceDownstreamReqVO downstreamReqVO, - IotDeviceDO device, IotDeviceDO parentDevice) { - // 1. 参数校验 - if (!(downstreamReqVO.getData() instanceof Map)) { - throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 Map 类型"); - } - // TODO @super:【可优化】过滤掉不合法的属性 - - // 2. 发送请求 - cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage message = cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage - .of(getProductKey(device, parentDevice), getDeviceName(device, parentDevice)); - String serverId = "192_168_64_1_8092"; - deviceMessageProducer.sendGatewayDeviceMessage(serverId, message); - deviceMessageProducer.sendDeviceMessage(message); - // TODO @芋艿:后续可以清理掉 - return null; - } - - /** - * 获取设备属性 - * - * @param downstreamReqVO 下行请求 - * @param device 设备 - * @param parentDevice 父设备 - * @return 下发消息 - */ - @SuppressWarnings("unchecked") - private IotDeviceMessage getDeviceProperty(IotDeviceDownstreamReqVO downstreamReqVO, - IotDeviceDO device, IotDeviceDO parentDevice) { - // 1. 参数校验 - if (!(downstreamReqVO.getData() instanceof List)) { - throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 List 类型"); - } - // TODO @super:【可优化】过滤掉不合法的属性 - - // 2. 发送请求 - String url = String.format("sys/%s/%s/thing/service/property/get", - getProductKey(device, parentDevice), getDeviceName(device, parentDevice)); - IotDevicePropertyGetReqDTO reqDTO = new IotDevicePropertyGetReqDTO() - .setIdentifiers((List) downstreamReqVO.getData()); -// CommonResult result = requestPlugin(url, reqDTO, device); - CommonResult result = null; - - // 3. 发送设备消息 - IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId()) - .setType(IotDeviceMessageTypeEnum.PROPERTY.getType()) - .setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_SET.getIdentifier()) - .setData(reqDTO.getIdentifiers()); - sendDeviceMessage(message, device, result.getCode()); - - // 4. 如果不成功,抛出异常,提示用户 - if (result.isError()) { - log.error("[getDeviceProperty][设备({})属性获取失败,请求参数:({}),响应结果:({})]", - device.getDeviceKey(), reqDTO, result); - throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg()); - } - return message; - } - - /** - * 设置设备配置 - * - * @param downstreamReqVO 下行请求 - * @param device 设备 - * @param parentDevice 父设备 - * @return 下发消息 - */ - @SuppressWarnings({ "unchecked", "unused" }) - private IotDeviceMessage setDeviceConfig(IotDeviceDownstreamReqVO downstreamReqVO, - IotDeviceDO device, IotDeviceDO parentDevice) { - // 1. 参数转换,无需校验 - Map config = JsonUtils.parseObject(device.getConfig(), Map.class); - - // 2. 发送请求 - String url = String.format("sys/%s/%s/thing/service/config/set", - getProductKey(device, parentDevice), getDeviceName(device, parentDevice)); - IotDeviceConfigSetReqDTO reqDTO = new IotDeviceConfigSetReqDTO() - .setConfig(config); -// CommonResult result = requestPlugin(url, reqDTO, device); - CommonResult result = null; - - // 3. 发送设备消息 - IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId()) - .setType(IotDeviceMessageTypeEnum.CONFIG.getType()) - .setIdentifier(IotDeviceMessageIdentifierEnum.CONFIG_SET.getIdentifier()) - .setData(reqDTO.getConfig()); - sendDeviceMessage(message, device, result.getCode()); - - // 4. 如果不成功,抛出异常,提示用户 - if (result.isError()) { - log.error("[setDeviceConfig][设备({})配置下发失败,请求参数:({}),响应结果:({})]", - device.getDeviceKey(), reqDTO, result); - throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg()); - } - return message; - } - - /** - * 设备 OTA 升级 - * - * @param downstreamReqVO 下行请求 - * @param device 设备 - * @param parentDevice 父设备 - * @return 下发消息 - */ - private IotDeviceMessage otaUpgrade(IotDeviceDownstreamReqVO downstreamReqVO, - IotDeviceDO device, IotDeviceDO parentDevice) { - // 1. 参数校验 - if (!(downstreamReqVO.getData() instanceof Map data)) { - throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 Map 类型"); - } - - // 2. 发送请求 - String url = String.format("ota/%s/%s/upgrade", - getProductKey(device, parentDevice), getDeviceName(device, parentDevice)); - IotDeviceOtaUpgradeReqDTO reqDTO = IotDeviceOtaUpgradeReqDTO.build(data); -// CommonResult result = requestPlugin(url, reqDTO, device); - CommonResult result = null; - - // 3. 发送设备消息 - IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId()) - .setType(IotDeviceMessageTypeEnum.OTA.getType()) - .setIdentifier(IotDeviceMessageIdentifierEnum.OTA_UPGRADE.getIdentifier()) - .setData(downstreamReqVO.getData()); - sendDeviceMessage(message, device, result.getCode()); - - // 4. 如果不成功,抛出异常,提示用户 - if (result.isError()) { - log.error("[otaUpgrade][设备({}) OTA 升级失败,请求参数:({}),响应结果:({})]", - device.getDeviceKey(), reqDTO, result); - throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg()); - } - return message; - } - - private void sendDeviceMessage(IotDeviceMessage message, IotDeviceDO device, Integer code) { - // 1. 完善消息 - message.setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName()) - .setDeviceKey(device.getDeviceKey()) - .setTenantId(device.getTenantId()); - Assert.notNull(message.getRequestId(), "requestId 不能为空"); - if (message.getReportTime() == null) { - message.setReportTime(LocalDateTime.now()); - } - message.setCode(code); - - // 2. 发送消息 - try { - deviceProducer.sendDeviceMessage(message); - log.info("[sendDeviceMessage][message({}) 发送消息成功]", message); - } catch (Exception e) { - log.error("[sendDeviceMessage][message({}) 发送消息失败]", message, e); - } - } +// /** +// * 设备 OTA 升级 +// * +// * @param downstreamReqVO 下行请求 +// * @param device 设备 +// * @param parentDevice 父设备 +// * @return 下发消息 +// */ +// private IotDeviceMessage otaUpgrade(IotDeviceDownstreamReqVO downstreamReqVO, +// IotDeviceDO device, IotDeviceDO parentDevice) { +// // 1. 参数校验 +// if (!(downstreamReqVO.getData() instanceof Map data)) { +// throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 Map 类型"); +// } +// +// // 2. 发送请求 +// String url = String.format("ota/%s/%s/upgrade", +// getProductKey(device, parentDevice), getDeviceName(device, parentDevice)); +// IotDeviceOtaUpgradeReqDTO reqDTO = IotDeviceOtaUpgradeReqDTO.build(data); +//// CommonResult result = requestPlugin(url, reqDTO, device); +// CommonResult result = null; +// +// // 3. 发送设备消息 +// IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId()) +// .setType(IotDeviceMessageTypeEnum.OTA.getType()) +// .setIdentifier(IotDeviceMessageIdentifierEnum.OTA_UPGRADE.getIdentifier()) +// .setData(downstreamReqVO.getData()); +// sendDeviceMessage(message, device, result.getCode()); +// +// // 4. 如果不成功,抛出异常,提示用户 +// if (result.isError()) { +// log.error("[otaUpgrade][设备({}) OTA 升级失败,请求参数:({}),响应结果:({})]", +// device.getDeviceKey(), reqDTO, result); +// throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg()); +// } +// return message; +// } private String getDeviceName(IotDeviceDO device, IotDeviceDO parentDevice) { return parentDevice != null ? parentDevice.getDeviceName() : device.getDeviceName(); diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamService.java index dba529df2c..727a0f92ed 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamService.java @@ -7,7 +7,7 @@ import jakarta.validation.Valid; /** * IoT 设备上行 Service 接口 * - * 目的:设备 -> 插件 -> 服务端 + * 目的:设备 -> 网关 -> 服务端 * * @author 芋道源码 */ @@ -20,47 +20,33 @@ public interface IotDeviceUpstreamService { */ void upstreamDevice(@Valid IotDeviceUpstreamReqVO simulatorReqVO); - /** - * 更新设备状态 - * - * @param updateReqDTO 更新设备状态 DTO - */ - void updateDeviceState(IotDeviceStateUpdateReqDTO updateReqDTO); - - /** - * 上报设备属性数据 - * - * @param reportReqDTO 上报设备属性数据 DTO - */ - void reportDeviceProperty(IotDevicePropertyReportReqDTO reportReqDTO); - - /** - * 上报设备事件数据 - * - * @param reportReqDTO 设备事件 - */ - void reportDeviceEvent(IotDeviceEventReportReqDTO reportReqDTO); - - /** - * 注册设备 - * - * @param registerReqDTO 注册设备 DTO - */ - void registerDevice(IotDeviceRegisterReqDTO registerReqDTO); - - /** - * 注册子设备 - * - * @param registerReqDTO 注册子设备 DTO - */ - void registerSubDevice(IotDeviceRegisterSubReqDTO registerReqDTO); - - /** - * 添加设备拓扑 - * - * @param addReqDTO 添加设备拓扑 DTO - */ - void addDeviceTopology(IotDeviceTopologyAddReqDTO addReqDTO); +// /** +// * 上报设备事件数据 +// * +// * @param reportReqDTO 设备事件 +// */ +// void reportDeviceEvent(IotDeviceEventReportReqDTO reportReqDTO); +// +// /** +// * 注册设备 +// * +// * @param registerReqDTO 注册设备 DTO +// */ +// void registerDevice(IotDeviceRegisterReqDTO registerReqDTO); +// +// /** +// * 注册子设备 +// * +// * @param registerReqDTO 注册子设备 DTO +// */ +// void registerSubDevice(IotDeviceRegisterSubReqDTO registerReqDTO); +// +// /** +// * 添加设备拓扑 +// * +// * @param addReqDTO 添加设备拓扑 DTO +// */ +// void addDeviceTopology(IotDeviceTopologyAddReqDTO addReqDTO); /** * Emqx 连接认证 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamServiceImpl.java index 329c98b89f..f36887905c 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/control/IotDeviceUpstreamServiceImpl.java @@ -1,22 +1,15 @@ package cn.iocoder.yudao.module.iot.service.device.control; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.lang.Assert; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.ObjUtil; import cn.hutool.core.util.StrUtil; -import cn.iocoder.yudao.framework.common.util.object.BeanUtils; -import cn.iocoder.yudao.framework.common.util.object.ObjectUtils; import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.*; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceUpstreamReqVO; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum; import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageTypeEnum; -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum; import cn.iocoder.yudao.module.iot.enums.product.IotProductDeviceTypeEnum; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; -import cn.iocoder.yudao.module.iot.mq.producer.device.IotDeviceProducer; import cn.iocoder.yudao.module.iot.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.service.device.data.IotDevicePropertyService; import cn.iocoder.yudao.module.iot.util.MqttSignUtils; @@ -45,9 +38,7 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService { @Resource private IotDevicePropertyService devicePropertyService; - @Resource - private IotDeviceProducer deviceProducer; - + // TODO @芋艿:需要重新实现下; @Override @SuppressWarnings("unchecked") public void upstreamDevice(IotDeviceUpstreamReqVO simulatorReqVO) { @@ -74,53 +65,13 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService { } // 2.3 情况三:状态变更 if (Objects.equals(simulatorReqVO.getType(), IotDeviceMessageTypeEnum.STATE.getType())) { - updateDeviceState(((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO() - .setRequestId(IdUtil.fastSimpleUUID()).setReportTime(LocalDateTime.now()) - .setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName())) - .setState((Integer) simulatorReqVO.getData())); + // TODO @芋艿:这里未搞完 return; } throw new IllegalArgumentException("未知的类型:" + simulatorReqVO.getType()); } - @Override - public void updateDeviceState(IotDeviceStateUpdateReqDTO updateReqDTO) { - Assert.isTrue(ObjectUtils.equalsAny(updateReqDTO.getState(), - IotDeviceStateEnum.ONLINE.getState(), IotDeviceStateEnum.OFFLINE.getState()), - "状态不合法"); - // 1.1 获得设备 - log.info("[updateDeviceState][更新设备状态: {}]", updateReqDTO); - IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceNameFromCache( - updateReqDTO.getProductKey(), updateReqDTO.getDeviceName()); - if (device == null) { - log.error("[updateDeviceState][设备({}/{}) 不存在]", - updateReqDTO.getProductKey(), updateReqDTO.getDeviceName()); - return; - } - TenantUtils.execute(device.getTenantId(), () -> { - // 1.2 记录设备的最后时间 - updateDeviceLastTime(device, updateReqDTO); - // 1.3 当前状态一致,不处理 - if (Objects.equals(device.getState(), updateReqDTO.getState())) { - return; - } - - // 2. 更新设备状态 - deviceService.updateDeviceState(device.getId(), updateReqDTO.getState()); - - // 3. TODO 芋艿:子设备的关联 - - // 4. 发送设备消息 - IotDeviceMessage message = BeanUtils.toBean(updateReqDTO, IotDeviceMessage.class) - .setType(IotDeviceMessageTypeEnum.STATE.getType()) - .setIdentifier(ObjUtil.equals(updateReqDTO.getState(), IotDeviceStateEnum.ONLINE.getState()) - ? IotDeviceMessageIdentifierEnum.STATE_ONLINE.getIdentifier() - : IotDeviceMessageIdentifierEnum.STATE_OFFLINE.getIdentifier()); - sendDeviceMessage(message, device); - }); - } - - @Override +// @Override TODO 芋艿:待重新实现 public void reportDeviceProperty(IotDevicePropertyReportReqDTO reportReqDTO) { // 1.1 获得设备 log.info("[reportDeviceProperty][上报设备属性: {}]", reportReqDTO); @@ -131,18 +82,16 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService { reportReqDTO.getProductKey(), reportReqDTO.getDeviceName()); return; } - // 1.2 记录设备的最后时间 - updateDeviceLastTime(device, reportReqDTO); // 2. 发送设备消息 - IotDeviceMessage message = BeanUtils.toBean(reportReqDTO, IotDeviceMessage.class) - .setType(IotDeviceMessageTypeEnum.PROPERTY.getType()) - .setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_REPORT.getIdentifier()) - .setData(reportReqDTO.getProperties()); - sendDeviceMessage(message, device); +// IotDeviceMessage message = BeanUtils.toBean(reportReqDTO, IotDeviceMessage.class) +// .setType(IotDeviceMessageTypeEnum.PROPERTY.getType()) +// .setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_REPORT.getIdentifier()) +// .setData(reportReqDTO.getProperties()); +// sendDeviceMessage(message, device); } - @Override + // @Override TODO 芋艿:待重新实现 public void reportDeviceEvent(IotDeviceEventReportReqDTO reportReqDTO) { // 1.1 获得设备 log.info("[reportDeviceEvent][上报设备事件: {}]", reportReqDTO); @@ -153,18 +102,16 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService { reportReqDTO.getProductKey(), reportReqDTO.getDeviceName()); return; } - // 1.2 记录设备的最后时间 - updateDeviceLastTime(device, reportReqDTO); // 2. 发送设备消息 - IotDeviceMessage message = BeanUtils.toBean(reportReqDTO, IotDeviceMessage.class) - .setType(IotDeviceMessageTypeEnum.EVENT.getType()) - .setIdentifier(reportReqDTO.getIdentifier()) - .setData(reportReqDTO.getParams()); - sendDeviceMessage(message, device); +// IotDeviceMessage message = BeanUtils.toBean(reportReqDTO, IotDeviceMessage.class) +// .setType(IotDeviceMessageTypeEnum.EVENT.getType()) +// .setIdentifier(reportReqDTO.getIdentifier()) +// .setData(reportReqDTO.getParams()); +// sendDeviceMessage(message, device); } - @Override + // @Override TODO 芋艿:待重新实现 public void registerDevice(IotDeviceRegisterReqDTO registerReqDTO) { log.info("[registerDevice][注册设备: {}]", registerReqDTO); registerDevice0(registerReqDTO.getProductKey(), registerReqDTO.getDeviceName(), null, registerReqDTO); @@ -186,18 +133,18 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService { registerReqDTO, productKey, device, gatewayId); } // 1.2 记录设备的最后时间 - updateDeviceLastTime(device, registerReqDTO); +// updateDeviceLastTime(device, registerReqDTO); // 2. 发送设备消息 if (registerNew) { - IotDeviceMessage message = BeanUtils.toBean(registerReqDTO, IotDeviceMessage.class) - .setType(IotDeviceMessageTypeEnum.REGISTER.getType()) - .setIdentifier(IotDeviceMessageIdentifierEnum.REGISTER_REGISTER.getIdentifier()); - sendDeviceMessage(message, device); +// IotDeviceMessage message = BeanUtils.toBean(registerReqDTO, IotDeviceMessage.class) +// .setType(IotDeviceMessageTypeEnum.REGISTER.getType()) +// .setIdentifier(IotDeviceMessageIdentifierEnum.REGISTER_REGISTER.getIdentifier()); +// sendDeviceMessage(message, device); } } - @Override + // @Override TODO 芋艿:待重新实现 public void registerSubDevice(IotDeviceRegisterSubReqDTO registerReqDTO) { // 1.1 注册子设备 log.info("[registerSubDevice][注册子设备: {}]", registerReqDTO); @@ -214,7 +161,7 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService { return; } // 1.2 记录设备的最后时间 - updateDeviceLastTime(device, registerReqDTO); +// updateDeviceLastTime(device, registerReqDTO); // 2. 处理子设备 if (CollUtil.isNotEmpty(registerReqDTO.getParams())) { @@ -224,14 +171,14 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService { } // 3. 发送设备消息 - IotDeviceMessage message = BeanUtils.toBean(registerReqDTO, IotDeviceMessage.class) - .setType(IotDeviceMessageTypeEnum.REGISTER.getType()) - .setIdentifier(IotDeviceMessageIdentifierEnum.REGISTER_REGISTER_SUB.getIdentifier()) - .setData(registerReqDTO.getParams()); - sendDeviceMessage(message, device); +// IotDeviceMessage message = BeanUtils.toBean(registerReqDTO, IotDeviceMessage.class) +// .setType(IotDeviceMessageTypeEnum.REGISTER.getType()) +// .setIdentifier(IotDeviceMessageIdentifierEnum.REGISTER_REGISTER_SUB.getIdentifier()) +// .setData(registerReqDTO.getParams()); +// sendDeviceMessage(message, device); } - @Override + // @Override TODO 芋艿:待重新实现 public void addDeviceTopology(IotDeviceTopologyAddReqDTO addReqDTO) { // 1.1 获得设备 log.info("[addDeviceTopology][添加设备拓扑: {}]", addReqDTO); @@ -247,8 +194,6 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService { addReqDTO.getProductKey(), addReqDTO.getDeviceName(), device); return; } - // 1.2 记录设备的最后时间 - updateDeviceLastTime(device, addReqDTO); // 2. 处理拓扑 if (CollUtil.isNotEmpty(addReqDTO.getParams())) { @@ -270,11 +215,11 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService { } // 3. 发送设备消息 - IotDeviceMessage message = BeanUtils.toBean(addReqDTO, IotDeviceMessage.class) - .setType(IotDeviceMessageTypeEnum.TOPOLOGY.getType()) - .setIdentifier(IotDeviceMessageIdentifierEnum.TOPOLOGY_ADD.getIdentifier()) - .setData(addReqDTO.getParams()); - sendDeviceMessage(message, device); +// IotDeviceMessage message = BeanUtils.toBean(addReqDTO, IotDeviceMessage.class) +// .setType(IotDeviceMessageTypeEnum.TOPOLOGY.getType()) +// .setIdentifier(IotDeviceMessageIdentifierEnum.TOPOLOGY_ADD.getIdentifier()) +// .setData(addReqDTO.getParams()); +// sendDeviceMessage(message, device); } // TODO @芋艿:后续需要考虑,http 的认证 @@ -310,33 +255,4 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService { return false; } - private void updateDeviceLastTime(IotDeviceDO device, IotDeviceUpstreamAbstractReqDTO reqDTO) { - // 1. 【异步】记录设备与插件实例的映射 -// pluginInstanceService.updateDevicePluginInstanceProcessIdAsync(device.getDeviceKey(), reqDTO.getProcessId()); - // TODO @芋艿:需要单独补充下; - - // 2. 【异步】更新设备的最后时间 - devicePropertyService.updateDeviceReportTimeAsync(device.getDeviceKey(), LocalDateTime.now()); - } - - private void sendDeviceMessage(IotDeviceMessage message, IotDeviceDO device) { - // 1. 完善消息 - message.setDeviceKey(device.getDeviceKey()) - .setTenantId(device.getTenantId()); - if (StrUtil.isEmpty(message.getRequestId())) { - message.setRequestId(IdUtil.fastSimpleUUID()); - } - if (message.getReportTime() == null) { - message.setReportTime(LocalDateTime.now()); - } - - // 2. 发送消息 - try { - deviceProducer.sendDeviceMessage(message); - log.info("[sendDeviceMessage][message({}) 发送消息成功]", message); - } catch (Exception e) { - log.error("[sendDeviceMessage][message({}) 发送消息失败]", message, e); - } - } - } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/data/IotDevicePropertyService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/data/IotDevicePropertyService.java index 2f06268656..799523b670 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/data/IotDevicePropertyService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/data/IotDevicePropertyService.java @@ -3,8 +3,8 @@ package cn.iocoder.yudao.module.iot.service.device.data; import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.data.IotDevicePropertyHistoryPageReqVO; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.data.IotDevicePropertyRespVO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDevicePropertyDO; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import jakarta.validation.Valid; import java.time.LocalDateTime; @@ -56,16 +56,43 @@ public interface IotDevicePropertyService { * 获得最后上报时间小于指定时间的设备标识 * * @param maxReportTime 最大上报时间 - * @return 设备标识列表 + * @return [productKey, deviceName] 列表 */ - Set getDeviceKeysByReportTime(LocalDateTime maxReportTime); + Set getProductKeyDeviceNameListByReportTime(LocalDateTime maxReportTime); /** - * 异步更新设备上报时间 + * 更新设备上报时间 * - * @param deviceKey 设备标识 + * @param productKey 产品标识 + * @param deviceName 设备名称 * @param reportTime 上报时间 */ - void updateDeviceReportTimeAsync(String deviceKey, LocalDateTime reportTime); + void updateDeviceReportTime(String productKey, String deviceName, LocalDateTime reportTime); + + /** + * 更新设备关联的网关 serverId + * + * @param productKey 产品标识 + * @param deviceName 设备名称 + * @param serverId 网关 serverId + */ + void updateDeviceServerId(String productKey, String deviceName, String serverId); + + /** + * 删除设备关联的网关 serverId + * + * @param productKey 产品标识 + * @param deviceName 设备名称 + */ + void deleteDeviceServerId(String productKey, String deviceName); + + /** + * 获得设备关联的网关 serverId + * + * @param productKey 产品标识 + * @param deviceName 设备名称 + * @return 网关 serverId + */ + String getDeviceServerId(String productKey, String deviceName); } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/data/IotDevicePropertyServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/data/IotDevicePropertyServiceImpl.java index 77dde64a66..e0d610f4d7 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/data/IotDevicePropertyServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/data/IotDevicePropertyServiceImpl.java @@ -9,17 +9,18 @@ import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.data.IotDevicePropertyHistoryPageReqVO; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.data.IotDevicePropertyRespVO; import cn.iocoder.yudao.module.iot.controller.admin.thingmodel.model.dataType.ThingModelDateOrTextDataSpecs; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDevicePropertyDO; import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO; import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotThingModelDO; import cn.iocoder.yudao.module.iot.dal.redis.device.DevicePropertyRedisDAO; import cn.iocoder.yudao.module.iot.dal.redis.device.DeviceReportTimeRedisDAO; +import cn.iocoder.yudao.module.iot.dal.redis.device.DeviceServerIdRedisDAO; import cn.iocoder.yudao.module.iot.dal.tdengine.IotDevicePropertyMapper; import cn.iocoder.yudao.module.iot.enums.thingmodel.IotDataSpecsDataTypeEnum; import cn.iocoder.yudao.module.iot.enums.thingmodel.IotThingModelTypeEnum; import cn.iocoder.yudao.module.iot.framework.tdengine.core.TDengineTableField; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.service.product.IotProductService; import cn.iocoder.yudao.module.iot.service.thingmodel.IotThingModelService; @@ -27,7 +28,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.time.LocalDateTime; @@ -70,6 +70,8 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { private DevicePropertyRedisDAO deviceDataRedisDAO; @Resource private DeviceReportTimeRedisDAO deviceReportTimeRedisDAO; + @Resource + private DeviceServerIdRedisDAO deviceServerIdRedisDAO; @Resource private IotDevicePropertyMapper devicePropertyMapper; @@ -153,7 +155,8 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { LocalDateTimeUtil.toEpochMilli(message.getReportTime())); // 3.2 保存设备属性【日志】 - deviceDataRedisDAO.putAll(message.getDeviceKey(), convertMap(properties.entrySet(), Map.Entry::getKey, + // TODO @芋艿:这里要调整下; + deviceDataRedisDAO.putAll(device.getDeviceKey(), convertMap(properties.entrySet(), Map.Entry::getKey, entry -> IotDevicePropertyDO.builder().value(entry.getValue()).updateTime(message.getReportTime()).build())); } @@ -187,14 +190,31 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService { // ========== 设备时间相关操作 ========== @Override - public Set getDeviceKeysByReportTime(LocalDateTime maxReportTime) { + public Set getProductKeyDeviceNameListByReportTime(LocalDateTime maxReportTime) { return deviceReportTimeRedisDAO.range(maxReportTime); } @Override - @Async - public void updateDeviceReportTimeAsync(String deviceKey, LocalDateTime reportTime) { - deviceReportTimeRedisDAO.update(deviceKey, reportTime); + public void updateDeviceReportTime(String productKey, String deviceName, LocalDateTime reportTime) { + deviceReportTimeRedisDAO.update(productKey, deviceName, reportTime); + } + + @Override + public void updateDeviceServerId(String productKey, String deviceName, String serverId) { + if (StrUtil.isEmpty(serverId)) { + return; + } + deviceServerIdRedisDAO.update(productKey, deviceName, serverId); + } + + @Override + public void deleteDeviceServerId(String productKey, String deviceName) { + deviceServerIdRedisDAO.delete(productKey, deviceName); + } + + @Override + public String getDeviceServerId(String productKey, String deviceName) { + return deviceServerIdRedisDAO.get(productKey, deviceName); } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotRuleSceneService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotRuleSceneService.java index fcd3b4cda9..ba137bc04c 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotRuleSceneService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotRuleSceneService.java @@ -3,9 +3,9 @@ package cn.iocoder.yudao.module.iot.service.rule; import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.scene.IotRuleScenePageReqVO; import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.scene.IotRuleSceneSaveReqVO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO; import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneTriggerTypeEnum; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import jakarta.validation.Valid; import java.util.List; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotRuleSceneServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotRuleSceneServiceImpl.java index cefc68dd01..89396ebb3f 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotRuleSceneServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotRuleSceneServiceImpl.java @@ -17,6 +17,7 @@ import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore; import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.scene.IotRuleScenePageReqVO; import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.scene.IotRuleSceneSaveReqVO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO; import cn.iocoder.yudao.module.iot.dal.mysql.rule.IotRuleSceneMapper; import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum; @@ -26,7 +27,6 @@ import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneTriggerConditionParame import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneTriggerTypeEnum; import cn.iocoder.yudao.module.iot.framework.job.core.IotSchedulerManager; import cn.iocoder.yudao.module.iot.job.rule.IotRuleSceneJob; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.service.rule.action.IotRuleSceneAction; import jakarta.annotation.Resource; import lombok.SneakyThrows; @@ -240,16 +240,17 @@ public class IotRuleSceneServiceImpl implements IotRuleSceneService { @Override public void executeRuleSceneByDevice(IotDeviceMessage message) { - TenantUtils.execute(message.getTenantId(), () -> { - // 1. 获得设备匹配的规则场景 - List ruleScenes = getMatchedRuleSceneListByMessage(message); - if (CollUtil.isEmpty(ruleScenes)) { - return; - } - - // 2. 执行规则场景 - executeRuleSceneAction(message, ruleScenes); - }); + // TODO @芋艿:这里的 tenantId,通过设备获取; +// TenantUtils.execute(message.getTenantId(), () -> { +// // 1. 获得设备匹配的规则场景 +// List ruleScenes = getMatchedRuleSceneListByMessage(message); +// if (CollUtil.isEmpty(ruleScenes)) { +// return; +// } +// +// // 2. 执行规则场景 +// executeRuleSceneAction(message, ruleScenes); +// }); } @Override diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java index 202c3fb67e..d441e5b13a 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java @@ -1,8 +1,8 @@ package cn.iocoder.yudao.module.iot.service.rule.action; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO; import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import javax.annotation.Nullable; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAlertAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAlertAction.java index eadc173787..57530f90e2 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAlertAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAlertAction.java @@ -1,8 +1,8 @@ package cn.iocoder.yudao.module.iot.service.rule.action; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO; import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import org.springframework.stereotype.Component; import javax.annotation.Nullable; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java index b38e181f93..cd1e3600c9 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java @@ -2,10 +2,10 @@ package cn.iocoder.yudao.module.iot.service.rule.action; import cn.hutool.core.lang.Assert; import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO; import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.service.rule.IotDataBridgeService; import cn.iocoder.yudao.module.iot.service.rule.action.databridge.IotDataBridgeExecute; import jakarta.annotation.Resource; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDeviceControlAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDeviceControlAction.java index d8fd76b5e7..3a7faee102 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDeviceControlAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDeviceControlAction.java @@ -2,10 +2,10 @@ package cn.iocoder.yudao.module.iot.service.rule.action; import cn.hutool.core.lang.Assert; import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceDownstreamReqVO; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO; import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.service.device.IotDeviceService; import cn.iocoder.yudao.module.iot.service.device.control.IotDeviceDownstreamService; import jakarta.annotation.Resource; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java index f557e7b467..a83912dda0 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java @@ -1,8 +1,8 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; import cn.hutool.core.util.ObjUtil; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java index ce3d0f1938..1251f3089d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java @@ -1,8 +1,7 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; - /** * IoT 数据桥梁的执行器 execute 接口 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java index 22b72e055e..16af0c109e 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java @@ -4,8 +4,8 @@ import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.framework.common.util.http.HttpUtils; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeHttpConfig; +import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum; -import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.http.*; @@ -16,8 +16,6 @@ import org.springframework.web.util.UriComponentsBuilder; import java.util.HashMap; import java.util.Map; -import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; - /** * Http 的 {@link IotDataBridgeExecute} 实现类 * @@ -48,7 +46,8 @@ public class IotHttpDataBridgeExecute implements IotDataBridgeExecute updateDeviceState(IotDeviceStateUpdateReqDTO updateReqDTO) { - return deviceUpstreamApi.updateDeviceState(updateReqDTO); - } - - @Override - public CommonResult reportDeviceEvent(IotDeviceEventReportReqDTO reportReqDTO) { - return deviceUpstreamApi.reportDeviceEvent(reportReqDTO); - } - - @Override - public CommonResult registerDevice(IotDeviceRegisterReqDTO registerReqDTO) { - return deviceUpstreamApi.registerDevice(registerReqDTO); - } - - @Override - public CommonResult registerSubDevice(IotDeviceRegisterSubReqDTO registerReqDTO) { - return deviceUpstreamApi.registerSubDevice(registerReqDTO); - } - - @Override - public CommonResult addDeviceTopology(IotDeviceTopologyAddReqDTO addReqDTO) { - return deviceUpstreamApi.addDeviceTopology(addReqDTO); - } - @Override public CommonResult authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) { return deviceUpstreamApi.authenticateEmqxConnection(authReqDTO); } - @Override - public CommonResult reportDeviceProperty(IotDevicePropertyReportReqDTO reportReqDTO) { - return deviceUpstreamApi.reportDeviceProperty(reportReqDTO); - } - } diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/net/component/emqx/upstream/router/IotDeviceWebhookVertxHandler.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/net/component/emqx/upstream/router/IotDeviceWebhookVertxHandler.java index 7c5ebefe4e..4c3550dbe8 100644 --- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/net/component/emqx/upstream/router/IotDeviceWebhookVertxHandler.java +++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-emqx/src/main/java/cn/iocoder/yudao/module/iot/net/component/emqx/upstream/router/IotDeviceWebhookVertxHandler.java @@ -4,7 +4,7 @@ import cn.hutool.core.util.StrUtil; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi; import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO; -import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum; +import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum; import cn.iocoder.yudao.module.iot.net.component.core.util.IotNetComponentCommonUtils; import io.vertx.core.Handler; import io.vertx.core.json.JsonObject;