feat:【IoT 物联网】消息下行时,增加 serverId 的接入

This commit is contained in:
YunaiV 2025-06-02 11:13:44 +08:00
parent ac624b7495
commit 0bb01eaeeb
52 changed files with 647 additions and 845 deletions

View File

@ -18,59 +18,6 @@ public interface IotDeviceUpstreamApi {
String PREFIX = ApiConstants.PREFIX + "/device/upstream";
// ========== 设备相关 ==========
/**
* 更新设备状态
*
* @param updateReqDTO 更新设备状态 DTO
*/
@PostMapping(PREFIX + "/update-state")
CommonResult<Boolean> updateDeviceState(@Valid @RequestBody IotDeviceStateUpdateReqDTO updateReqDTO);
/**
* 上报设备属性数据
*
* @param reportReqDTO 上报设备属性数据 DTO
*/
@PostMapping(PREFIX + "/report-property")
CommonResult<Boolean> reportDeviceProperty(@Valid @RequestBody IotDevicePropertyReportReqDTO reportReqDTO);
/**
* 上报设备事件数据
*
* @param reportReqDTO 设备事件
*/
@PostMapping(PREFIX + "/report-event")
CommonResult<Boolean> reportDeviceEvent(@Valid @RequestBody IotDeviceEventReportReqDTO reportReqDTO);
// TODO @芋艿这个需要 plugins 接入下
/**
* 注册设备
*
* @param registerReqDTO 注册设备 DTO
*/
@PostMapping(PREFIX + "/register")
CommonResult<Boolean> registerDevice(@Valid @RequestBody IotDeviceRegisterReqDTO registerReqDTO);
// TODO @芋艿这个需要 plugins 接入下
/**
* 注册子设备
*
* @param registerReqDTO 注册子设备 DTO
*/
@PostMapping(PREFIX + "/register-sub")
CommonResult<Boolean> registerSubDevice(@Valid @RequestBody IotDeviceRegisterSubReqDTO registerReqDTO);
// TODO @芋艿这个需要 plugins 接入下
/**
* 注册设备拓扑
*
* @param addReqDTO 注册设备拓扑 DTO
*/
@PostMapping(PREFIX + "/add-topology")
CommonResult<Boolean> addDeviceTopology(@Valid @RequestBody IotDeviceTopologyAddReqDTO addReqDTO);
// TODO @芋艿考虑 http 认证
/**
* 认证 Emqx 连接

View File

@ -1,23 +0,0 @@
package cn.iocoder.yudao.module.iot.api.device.dto.control.upstream;
import cn.iocoder.yudao.framework.common.validation.InEnum;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
/**
* IoT 设备状态更新 Request DTO
*
* @author 芋道源码
*/
@Data
public class IotDeviceStateUpdateReqDTO extends IotDeviceUpstreamAbstractReqDTO {
/**
* 设备状态
*/
@NotNull(message = "设备状态不能为空")
@InEnum(IotDeviceStateEnum.class) // 只使用在线离线
private Integer state;
}

View File

@ -30,7 +30,7 @@ public interface ErrorCodeConstants {
ErrorCode DEVICE_GATEWAY_NOT_EXISTS = new ErrorCode(1_050_003_004, "网关设备不存在");
ErrorCode DEVICE_NOT_GATEWAY = new ErrorCode(1_050_003_005, "设备不是网关设备");
ErrorCode DEVICE_IMPORT_LIST_IS_EMPTY = new ErrorCode(1_050_003_006, "导入设备数据不能为空!");
ErrorCode DEVICE_DOWNSTREAM_FAILED = new ErrorCode(1_050_003_007, "执行失败,原因:{}");
ErrorCode DEVICE_DOWNSTREAM_FAILED_SERVER_ID_NULL = new ErrorCode(1_050_003_007, "下行设备消息失败,原因:设备未连接网关");
// ========== 产品分类 1-050-004-000 ==========
ErrorCode PRODUCT_CATEGORY_NOT_EXISTS = new ErrorCode(1_050_004_000, "产品分类不存在");

View File

@ -21,44 +21,6 @@ public class IoTDeviceUpstreamApiImpl implements IotDeviceUpstreamApi {
@Resource
private IotDeviceUpstreamService deviceUpstreamService;
// ========== 设备相关 ==========
@Override
public CommonResult<Boolean> updateDeviceState(IotDeviceStateUpdateReqDTO updateReqDTO) {
deviceUpstreamService.updateDeviceState(updateReqDTO);
return success(true);
}
@Override
public CommonResult<Boolean> reportDeviceProperty(IotDevicePropertyReportReqDTO reportReqDTO) {
deviceUpstreamService.reportDeviceProperty(reportReqDTO);
return success(true);
}
@Override
public CommonResult<Boolean> reportDeviceEvent(IotDeviceEventReportReqDTO reportReqDTO) {
deviceUpstreamService.reportDeviceEvent(reportReqDTO);
return success(true);
}
@Override
public CommonResult<Boolean> registerDevice(IotDeviceRegisterReqDTO registerReqDTO) {
deviceUpstreamService.registerDevice(registerReqDTO);
return success(true);
}
@Override
public CommonResult<Boolean> registerSubDevice(IotDeviceRegisterSubReqDTO registerReqDTO) {
deviceUpstreamService.registerSubDevice(registerReqDTO);
return success(true);
}
@Override
public CommonResult<Boolean> addDeviceTopology(IotDeviceTopologyAddReqDTO addReqDTO) {
deviceUpstreamService.addDeviceTopology(addReqDTO);
return success(true);
}
@Override
public CommonResult<Boolean> authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) {
boolean result = deviceUpstreamService.authenticateEmqxConnection(authReqDTO);

View File

@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.controller.admin.device.vo.device;
import cn.iocoder.yudao.framework.common.pojo.PageParam;
import cn.iocoder.yudao.framework.common.validation.InEnum;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.enums.product.IotProductDeviceTypeEnum;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

View File

@ -4,7 +4,7 @@ import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.controller.admin.statistics.vo.IotStatisticsDeviceMessageSummaryRespVO;
import cn.iocoder.yudao.module.iot.controller.admin.statistics.vo.IotStatisticsReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.statistics.vo.IotStatisticsSummaryRespVO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.device.data.IotDeviceLogService;
import cn.iocoder.yudao.module.iot.service.product.IotProductCategoryService;

View File

@ -4,7 +4,7 @@ import cn.iocoder.yudao.framework.mybatis.core.type.LongSetTypeHandler;
import cn.iocoder.yudao.framework.tenant.core.db.TenantBaseDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.ota.IotOtaFirmwareDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum;
import com.baomidou.mybatisplus.annotation.KeySequence;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;

View File

@ -1,14 +1,17 @@
package cn.iocoder.yudao.module.iot.dal.dataobject.rule;
import cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import com.baomidou.mybatisplus.annotation.KeySequence;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler;
import lombok.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* IoT 告警记录 DO

View File

@ -145,7 +145,7 @@ public class IotRuleSceneDO extends TenantBaseDO {
public static class TriggerConditionParameter {
// TODO @芋艿: identifier0 存事件和服务的 identifier 属性的情况 identifier0 就为 null 解决前端回显问题
// TODO @haohao可以根据 TriggerCondition.type 判断是服务还是事件还是属性么
// TODO @puhui999可以根据 TriggerCondition.type 判断是服务还是事件还是属性么
/**
* 标识符事件服务
*

View File

@ -19,11 +19,10 @@ public interface RedisKeyConstants {
*/
String DEVICE_PROPERTY = "iot:device_property:%s";
// TODO @芋艿弱化 deviceKey使用 product_key + device_name 替代
/**
* 设备的最后上报时间采用 ZSET 结构
*
* KEY 格式{deviceKey}
* KEY 格式{productKey},${deviceName}
* SCORE上报时间
*/
String DEVICE_REPORT_TIMES = "iot:device_report_times";
@ -44,4 +43,13 @@ public interface RedisKeyConstants {
*/
String THING_MODEL_LIST = "iot:thing_model_list";
/**
* 设备关联的网关 serverId 缓存采用 HASH 结构
*
* KEY 格式device_server_id
* HASH KEY{productKey},{deviceName}
* VALUE 数据类型String serverId
*/
String DEVICE_SERVER_ID = "iot:device_server_id";
}

View File

@ -1,6 +1,8 @@
package cn.iocoder.yudao.module.iot.dal.redis.device;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.collection.CollectionUtils;
import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
@ -20,14 +22,17 @@ public class DeviceReportTimeRedisDAO {
@Resource
private StringRedisTemplate stringRedisTemplate;
public void update(String deviceKey, LocalDateTime reportTime) {
stringRedisTemplate.opsForZSet().add(RedisKeyConstants.DEVICE_REPORT_TIMES, deviceKey,
public void update(String productKey, String deviceName, LocalDateTime reportTime) {
String value = productKey + StrUtil.COMMA + deviceName; // 使用 , 分隔
stringRedisTemplate.opsForZSet().add(RedisKeyConstants.DEVICE_REPORT_TIMES, value,
LocalDateTimeUtil.toEpochMilli(reportTime));
}
public Set<String> range(LocalDateTime maxReportTime) {
return stringRedisTemplate.opsForZSet().rangeByScore(RedisKeyConstants.DEVICE_REPORT_TIMES, 0,
public Set<String[]> range(LocalDateTime maxReportTime) {
Set<String> values = stringRedisTemplate.opsForZSet().rangeByScore(RedisKeyConstants.DEVICE_REPORT_TIMES, 0,
LocalDateTimeUtil.toEpochMilli(maxReportTime));
return CollectionUtils.convertSet(values,
value -> value.split(StrUtil.COMMA)); // 使用, 分隔
}
}

View File

@ -0,0 +1,67 @@
package cn.iocoder.yudao.module.iot.dal.redis.device;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants;
import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Repository;
/**
* 设备关联的网关 serverId Redis DAO
*
* @author 芋道源码
*/
@Repository
public class DeviceServerIdRedisDAO {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 更新设备关联的网关 serverId
*
* @param productKey 产品标识
* @param deviceName 设备名称
* @param serverId 网关 serverId
*/
public void update(String productKey, String deviceName, String serverId) {
String hashKey = buildHashKey(productKey, deviceName);
stringRedisTemplate.opsForHash().put(RedisKeyConstants.DEVICE_SERVER_ID, hashKey, serverId);
}
/**
* 获得设备关联的网关 serverId
*
* @param productKey 产品标识
* @param deviceName 设备名称
* @return 网关 serverId
*/
public String get(String productKey, String deviceName) {
String hashKey = buildHashKey(productKey, deviceName);
Object value = stringRedisTemplate.opsForHash().get(RedisKeyConstants.DEVICE_SERVER_ID, hashKey);
return value != null ? (String) value : null;
}
/**
* 删除设备关联的网关 serverId
*
* @param productKey 产品标识
* @param deviceName 设备名称
*/
public void delete(String productKey, String deviceName) {
String hashKey = buildHashKey(productKey, deviceName);
stringRedisTemplate.opsForHash().delete(RedisKeyConstants.DEVICE_SERVER_ID, hashKey);
}
/**
* 构建 HASH KEY
*
* @param productKey 产品标识
* @param deviceName 设备名称
* @return HASH KEY
*/
private String buildHashKey(String productKey, String deviceName) {
return productKey + StrUtil.COMMA + deviceName;
}
}

View File

@ -1,16 +1,16 @@
package cn.iocoder.yudao.module.iot.job.device;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.quartz.core.handler.JobHandler;
import cn.iocoder.yudao.framework.tenant.core.job.TenantJob;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.device.data.IotDevicePropertyService;
import cn.iocoder.yudao.module.iot.service.device.control.IotDeviceUpstreamService;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Component;
@ -20,10 +20,12 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
/**
* IoT 设备离线检查 Job
*
* 检测逻辑设备最后一条 {@link cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage} 消息超过一定时间则认为设备离线
* 检测逻辑设备最后一条 {@link IotDeviceMessage} 消息超过一定时间则认为设备离线
*
* @author 芋道源码
*/
@ -41,8 +43,9 @@ public class IotDeviceOfflineCheckJob implements JobHandler {
private IotDeviceService deviceService;
@Resource
private IotDevicePropertyService devicePropertyService;
@Resource
private IotDeviceUpstreamService deviceUpstreamService;
private IotDeviceMessageProducer deviceMessageProducer;
@Override
@TenantJob
@ -52,22 +55,22 @@ public class IotDeviceOfflineCheckJob implements JobHandler {
if (CollUtil.isEmpty(devices)) {
return JsonUtils.toJsonString(Collections.emptyList());
}
// 1.2 获取超时的 deviceKey 集合
Set<String> timeoutDeviceKeys = devicePropertyService.getDeviceKeysByReportTime(
// 1.2 获取超时的设备集合
Set<String[]> timeoutDevices = devicePropertyService.getProductKeyDeviceNameListByReportTime(
LocalDateTime.now().minus(OFFLINE_TIMEOUT));
Set<String> timeoutDevices2 = convertSet(timeoutDevices, item -> item[0] + StrUtil.COMMA + item[1]);
// 2. 下线设备
List<String> offlineDeviceKeys = CollUtil.newArrayList();
List<String[]> offlineDeviceKeys = CollUtil.newArrayList();
for (IotDeviceDO device : devices) {
if (!timeoutDeviceKeys.contains(device.getDeviceKey())) {
String timeoutDeviceKey = device.getProductKey() + StrUtil.COMMA + device.getDeviceName();
if (!timeoutDevices2.contains(timeoutDeviceKey)) {
continue;
}
offlineDeviceKeys.add(device.getDeviceKey());
offlineDeviceKeys.add(new String[]{device.getProductKey(), device.getDeviceName()});
// 为什么不直接更新状态呢因为通过 IotDeviceMessage 可以经过一系列的处理例如说记录日志等等
deviceUpstreamService.updateDeviceState(((IotDeviceStateUpdateReqDTO)
new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID()).setReportTime(LocalDateTime.now())
.setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName()))
.setState((IotDeviceStateEnum.OFFLINE.getState())));
deviceMessageProducer.sendDeviceMessage(IotDeviceMessage.of(device.getProductKey(), device.getDeviceName())
.ofStateOffline());
}
return JsonUtils.toJsonString(offlineDeviceKeys);
}

View File

@ -10,7 +10,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 针对 {@link IotDeviceMessage} 的消费者记录设备日志
* 针对 {@link IotDeviceMessage} 的消费者记录设备日志
*
* @author 芋道源码
*/

View File

@ -1,85 +0,0 @@
package cn.iocoder.yudao.module.iot.mq.consumer.device;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageTypeEnum;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.device.control.IotDeviceUpstreamService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Objects;
/**
* 针对 {@link IotDeviceMessage} 的消费者将离线的设备自动标记为上线
*
* 注意只有设备上行消息才会触发该逻辑
*
* @author 芋道源码
*/
@Component
@Slf4j
public class IotDeviceOnlineMessageConsumer {
@Resource
private IotDeviceService deviceService;
@Resource
private IotDeviceUpstreamService deviceUpstreamService;
@EventListener
@Async
public void onMessage(IotDeviceMessage message) {
// 1.1 只处理上行消息因为只有设备上行的消息才会触发设备上线的逻辑
if (!isUpstreamMessage(message)) {
return;
}
// 1.2 如果设备已在线则不做处理
log.info("[onMessage][消息内容({})]", message);
IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceNameFromCache(
message.getProductKey(), message.getDeviceName());
if (device == null) {
log.error("[onMessage][消息({}) 对应的设备部存在]", message);
return;
}
if (IotDeviceStateEnum.isOnline(device.getState())) {
return;
}
// 2. 标记设备为在线
// 为什么不直接更新状态呢因为通过 IotDeviceMessage 可以经过一系列的处理例如说记录日志等等
deviceUpstreamService.updateDeviceState(((IotDeviceStateUpdateReqDTO)
new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID()).setReportTime(LocalDateTime.now())
.setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName()))
.setState((IotDeviceStateEnum.ONLINE.getState())));
}
private boolean isUpstreamMessage(IotDeviceMessage message) {
// 设备属性
if (Objects.equals(message.getType(), IotDeviceMessageTypeEnum.PROPERTY.getType())
&& Objects.equals(message.getIdentifier(), IotDeviceMessageIdentifierEnum.PROPERTY_REPORT.getIdentifier())) {
return true;
}
// 设备事件
if (Objects.equals(message.getType(), IotDeviceMessageTypeEnum.EVENT.getType())) {
return true;
}
// 设备服务
// noinspection RedundantIfStatement
if (Objects.equals(message.getType(), IotDeviceMessageTypeEnum.SERVICE.getType())
&& !StrUtil.endWith(message.getIdentifier(), IotDeviceMessageIdentifierEnum.SERVICE_REPLY_SUFFIX.getIdentifier())) {
return true;
}
return false;
}
}

View File

@ -1,9 +1,9 @@
package cn.iocoder.yudao.module.iot.mq.consumer.device;
import cn.hutool.core.util.ObjectUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.service.device.data.IotDevicePropertyService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;

View File

@ -0,0 +1,106 @@
package cn.iocoder.yudao.module.iot.mq.consumer.device;
import cn.hutool.core.util.ObjectUtil;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageIdentifierEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageTypeEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.device.data.IotDevicePropertyService;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.Objects;
/**
* 针对 {@link IotDeviceMessage} 的消费者记录设备状态
*
* 特殊如果是离线的设备将自动上线
*
* @author 芋道源码
*/
@Component
@Slf4j
public class IotDeviceStateMessageSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
@Resource
private IotDeviceService deviceService;
@Resource
private IotDevicePropertyService devicePropertyService;
@Resource
private IotMessageBus messageBus;
@Resource
private IotDeviceMessageProducer deviceMessageProducer;
@PostConstruct
public void init() {
messageBus.register(this);
}
@Override
public String getTopic() {
return IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC;
}
@Override
public String getGroup() {
return "iot_device_state_consumer";
}
@Override
public void onMessage(IotDeviceMessage message) {
// 1.1 只处理上行消息或者是 STATE 相关的消息
if (!IotDeviceMessageUtils.isUpstreamMessage(message)
&& ObjectUtil.notEqual(message.getType(), IotDeviceMessageTypeEnum.STATE.getType())) {
return;
}
// 1.2 校验设备是否存在
IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceNameFromCache(
message.getProductKey(), message.getDeviceName());
if (device == null) {
log.error("[onMessage][消息({}) 对应的设备部存在]", message);
return;
}
// 2. 处理消息
TenantUtils.execute(device.getTenantId(), () -> onMessage(message, device));
}
private void onMessage(IotDeviceMessage message, IotDeviceDO device) {
// 更新设备的最后时间
devicePropertyService.updateDeviceReportTime(device.getProductKey(), device.getDeviceName(), LocalDateTime.now());
// 情况一STATE 相关的消息
if (Objects.equals(message.getType(), IotDeviceMessageTypeEnum.STATE.getType())) {
if (Objects.equals(message.getIdentifier(), IotDeviceMessageIdentifierEnum.STATE_ONLINE.getIdentifier())) {
deviceService.updateDeviceState(device.getId(), IotDeviceStateEnum.ONLINE.getState());
devicePropertyService.updateDeviceServerId(device.getProductKey(), device.getDeviceName(), message.getServerId());
} else {
deviceService.updateDeviceState(device.getId(), IotDeviceStateEnum.OFFLINE.getState());
devicePropertyService.deleteDeviceServerId(device.getProductKey(), device.getDeviceName());
}
// TODO 芋艿子设备的关联
return;
}
// 情况二 STATE 相关的消息
devicePropertyService.updateDeviceServerId(device.getProductKey(), device.getDeviceName(), message.getServerId());
// 特殊设备非在线时主动标记设备为在线
// 为什么不直接更新状态呢因为通过 IotDeviceMessage 可以经过一系列的处理例如说记录日志等等
if (ObjectUtil.notEqual(device.getState(), IotDeviceStateEnum.ONLINE.getState())) {
deviceMessageProducer.sendDeviceMessage(IotDeviceMessage.of(message.getProductKey(), message.getDeviceName())
.ofStateOnline());
}
}
}

View File

@ -1,6 +1,6 @@
package cn.iocoder.yudao.module.iot.mq.consumer.rule;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.service.rule.IotRuleSceneService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;

View File

@ -1,77 +0,0 @@
package cn.iocoder.yudao.module.iot.mq.message;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageTypeEnum;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
// TODO @芋艿参考阿里云的物模型优化 IoT 上下行消息的设计尽量保持一致渐进式不要一口气
/**
* IoT 设备消息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Deprecated
public class IotDeviceMessage {
/**
* 请求编号
*/
private String requestId;
/**
* 设备信息
*/
private String productKey;
/**
* 设备名称
*/
private String deviceName;
/**
* 设备标识
*/
private String deviceKey;
/**
* 消息类型
*
* 枚举 {@link IotDeviceMessageTypeEnum}
*/
private String type;
/**
* 标识符
*
* 枚举 {@link IotDeviceMessageIdentifierEnum}
*/
private String identifier;
/**
* 请求参数
*
* 例如说属性上报的 properties事件上报的 params
*/
private Object data;
/**
* 响应码
*
* 目前只有 server 下行消息给 device 设备时才会有响应码
*/
private Integer code;
/**
* 上报时间
*/
private LocalDateTime reportTime;
/**
* 租户编号
*/
private Long tenantId;
}

View File

@ -1,31 +0,0 @@
package cn.iocoder.yudao.module.iot.mq.producer.device;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
/**
* IoT 设备相关消息的 Producer
*
* @author alwayssuper
* @since 2024/12/17 16:35
*/
@Slf4j
@Component
public class IotDeviceProducer {
@Resource
private ApplicationContext applicationContext;
/**
* 发送 {@link IotDeviceMessage} 消息
*
* @param thingModelMessage 物模型消息
*/
public void sendDeviceMessage(IotDeviceMessage thingModelMessage) {
applicationContext.publishEvent(thingModelMessage);
}
}

View File

@ -1,4 +1,4 @@
/**
* TODO 芋艿临时占位
* 消息队列的生产者
*/
package cn.iocoder.yudao.module.iot.mq.producer;
package cn.iocoder.yudao.module.iot.mq.producer;

View File

@ -3,7 +3,7 @@ package cn.iocoder.yudao.module.iot.service.device;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.device.*;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;

View File

@ -17,7 +17,7 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceGroupDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
import cn.iocoder.yudao.module.iot.dal.mysql.device.IotDeviceMapper;
import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.enums.product.IotProductDeviceTypeEnum;
import cn.iocoder.yudao.module.iot.service.product.IotProductService;
import cn.iocoder.yudao.module.iot.util.MqttSignUtils;

View File

@ -1,13 +1,13 @@
package cn.iocoder.yudao.module.iot.service.device.control;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceDownstreamReqVO;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import jakarta.validation.Valid;
/**
* IoT 设备下行 Service 接口
*
* 目的服务端 -> 插件 -> 设备
* 目的服务端 -> 网关 -> 设备
*
* @author 芋道源码
*/
@ -17,7 +17,7 @@ public interface IotDeviceDownstreamService {
* 设备下行可用于设备模拟
*
* @param downstreamReqVO 设备下行请求 VO
* @return 消息
* @return 消息
*/
IotDeviceMessage downstreamDevice(@Valid IotDeviceDownstreamReqVO downstreamReqVO);

View File

@ -1,34 +1,26 @@
package cn.iocoder.yudao.module.iot.service.device.control;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.exception.ServiceException;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDeviceConfigSetReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDeviceOtaUpgradeReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDevicePropertyGetReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.downstream.IotDeviceServiceInvokeReqDTO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceDownstreamReqVO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.mq.producer.device.IotDeviceProducer;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.device.data.IotDevicePropertyService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DEVICE_DOWNSTREAM_FAILED;
import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DEVICE_DOWNSTREAM_FAILED_SERVER_ID_NULL;
/**
* IoT 设备下行 Service 实现类
@ -42,255 +34,234 @@ public class IotDeviceDownstreamServiceImpl implements IotDeviceDownstreamServic
@Resource
private IotDeviceService deviceService;
@Resource
private IotDeviceProducer deviceProducer;
private IotDevicePropertyService devicePropertyService;
@Resource
private IotDeviceMessageProducer deviceMessageProducer;
@Override
public IotDeviceMessage downstreamDevice(IotDeviceDownstreamReqVO downstreamReqVO) {
// 校验设备是否存在
// 1. 校验设备是否存在
IotDeviceDO device = deviceService.validateDeviceExists(downstreamReqVO.getId());
// TODO @芋艿离线设备不允许推送
// TODO 芋艿父设备的处理
IotDeviceDO parentDevice = null;
// 2. 构建消息
IotDeviceMessage message = buildDownstreamDeviceMessage(downstreamReqVO, device, parentDevice);
// 3.1 发送给网关
String serverId = devicePropertyService.getDeviceServerId(message.getProductKey(), message.getDeviceName());
if (StrUtil.isEmpty(serverId)) {
throw exception(DEVICE_DOWNSTREAM_FAILED_SERVER_ID_NULL);
}
deviceMessageProducer.sendGatewayDeviceMessage(serverId, message);
// 3.2 发送给服务器用于设备日志等的记录
deviceMessageProducer.sendDeviceMessage(message);
return message;
}
@SuppressWarnings("unchecked")
private IotDeviceMessage buildDownstreamDeviceMessage(IotDeviceDownstreamReqVO downstreamReqVO,
IotDeviceDO device, IotDeviceDO parentDevice) {
IotDeviceMessage message = IotDeviceMessage.of(getProductKey(device, parentDevice),
getDeviceName(device, parentDevice));
// 服务调用
if (Objects.equals(downstreamReqVO.getType(), IotDeviceMessageTypeEnum.SERVICE.getType())) {
return invokeDeviceService(downstreamReqVO, device, parentDevice);
// TODO @芋艿待实现
// return invokeDeviceService(downstreamReqVO, device, parentDevice);
}
// 属性相关
if (Objects.equals(downstreamReqVO.getType(), IotDeviceMessageTypeEnum.PROPERTY.getType())) {
// 属性设置
if (Objects.equals(downstreamReqVO.getIdentifier(),
IotDeviceMessageIdentifierEnum.PROPERTY_SET.getIdentifier())) {
return setDeviceProperty(downstreamReqVO, device, parentDevice);
if (!(downstreamReqVO.getData() instanceof Map<?, ?>)) {
throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 Map 类型");
}
return message.ofPropertySet((Map<String, Object>) downstreamReqVO.getData());
}
// 属性设置
// 属性获取
if (Objects.equals(downstreamReqVO.getIdentifier(),
IotDeviceMessageIdentifierEnum.PROPERTY_GET.getIdentifier())) {
return getDeviceProperty(downstreamReqVO, device, parentDevice);
// TODO @芋艿待实现
// return getDeviceProperty(downstreamReqVO, device, parentDevice);
}
}
// 配置下发
if (Objects.equals(downstreamReqVO.getType(), IotDeviceMessageTypeEnum.CONFIG.getType())
&& Objects.equals(downstreamReqVO.getIdentifier(),
IotDeviceMessageIdentifierEnum.CONFIG_SET.getIdentifier())) {
return setDeviceConfig(downstreamReqVO, device, parentDevice);
IotDeviceMessageIdentifierEnum.CONFIG_SET.getIdentifier())) {
// TODO @芋艿待实现
// return setDeviceConfig(downstreamReqVO, device, parentDevice);
}
// OTA 升级
if (Objects.equals(downstreamReqVO.getType(), IotDeviceMessageTypeEnum.OTA.getType())) {
return otaUpgrade(downstreamReqVO, device, parentDevice);
// TODO @芋艿待实现
// return otaUpgrade(downstreamReqVO, device, parentDevice);
}
// TODO @芋艿取消设备的网关的时要不要下发 REGISTER_UNREGISTER_SUB
throw new IllegalArgumentException("不支持的下行消息类型:" + downstreamReqVO);
}
/**
* 调用设备服务
*
* @param downstreamReqVO 下行请求
* @param device 设备
* @param parentDevice 父设备
* @return 下发消息
*/
@SuppressWarnings("unchecked")
private IotDeviceMessage invokeDeviceService(IotDeviceDownstreamReqVO downstreamReqVO,
IotDeviceDO device, IotDeviceDO parentDevice) {
// 1. 参数校验
if (!(downstreamReqVO.getData() instanceof Map<?, ?>)) {
throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 Map 类型");
}
// TODO @super可优化过滤掉不合法的服务
// /**
// * 调用设备服务
// *
// * @param downstreamReqVO 下行请求
// * @param device 设备
// * @param parentDevice 父设备
// * @return 下发消息
// */
// @SuppressWarnings("unchecked")
// private IotDeviceMessage invokeDeviceService(IotDeviceDownstreamReqVO downstreamReqVO,
// IotDeviceDO device, IotDeviceDO parentDevice) {
// // 1. 参数校验
// if (!(downstreamReqVO.getData() instanceof Map<?, ?>)) {
// throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 Map 类型");
// }
// // TODO @super可优化过滤掉不合法的服务
//
// // 2. 发送请求
// String url = String.format("sys/%s/%s/thing/service/%s",
// getProductKey(device, parentDevice), getDeviceName(device, parentDevice),
// downstreamReqVO.getIdentifier());
// IotDeviceServiceInvokeReqDTO reqDTO = new IotDeviceServiceInvokeReqDTO()
// .setParams((Map<String, Object>) downstreamReqVO.getData());
//// CommonResult<Boolean> result = requestPlugin(url, reqDTO, device);
// CommonResult<Boolean> result = null;
//
// // 3. 发送设备消息
// IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId())
// .setType(IotDeviceMessageTypeEnum.SERVICE.getType()).setIdentifier(reqDTO.getIdentifier())
// .setData(reqDTO.getParams());
// sendDeviceMessage(message, device, result.getCode());
//
// // 4. 如果不成功抛出异常提示用户
// if (result.isError()) {
// log.error("[invokeDeviceService][设备({})服务调用失败,请求参数:({}),响应结果:({})]",
// device.getDeviceKey(), reqDTO, result);
// throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg());
// }
// return message;
// }
// 2. 发送请求
String url = String.format("sys/%s/%s/thing/service/%s",
getProductKey(device, parentDevice), getDeviceName(device, parentDevice),
downstreamReqVO.getIdentifier());
IotDeviceServiceInvokeReqDTO reqDTO = new IotDeviceServiceInvokeReqDTO()
.setParams((Map<String, Object>) downstreamReqVO.getData());
// CommonResult<Boolean> result = requestPlugin(url, reqDTO, device);
CommonResult<Boolean> result = null;
// /**
// * 获取设备属性
// *
// * @param downstreamReqVO 下行请求
// * @param device 设备
// * @param parentDevice 父设备
// * @return 下发消息
// */
// @SuppressWarnings("unchecked")
// private IotDeviceMessage getDeviceProperty(IotDeviceDownstreamReqVO downstreamReqVO,
// IotDeviceDO device, IotDeviceDO parentDevice) {
// // 1. 参数校验
// if (!(downstreamReqVO.getData() instanceof List<?>)) {
// throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 List 类型");
// }
// // TODO @super可优化过滤掉不合法的属性
//
// // 2. 发送请求
// String url = String.format("sys/%s/%s/thing/service/property/get",
// getProductKey(device, parentDevice), getDeviceName(device, parentDevice));
// IotDevicePropertyGetReqDTO reqDTO = new IotDevicePropertyGetReqDTO()
// .setIdentifiers((List<String>) downstreamReqVO.getData());
//// CommonResult<Boolean> result = requestPlugin(url, reqDTO, device);
// CommonResult<Boolean> result = null;
//
// // 3. 发送设备消息
// IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId())
// .setType(IotDeviceMessageTypeEnum.PROPERTY.getType())
// .setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_SET.getIdentifier())
// .setData(reqDTO.getIdentifiers());
// sendDeviceMessage(message, device, result.getCode());
//
// // 4. 如果不成功抛出异常提示用户
// if (result.isError()) {
// log.error("[getDeviceProperty][设备({})属性获取失败,请求参数:({}),响应结果:({})]",
// device.getDeviceKey(), reqDTO, result);
// throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg());
// }
// return message;
// }
// 3. 发送设备消息
IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId())
.setType(IotDeviceMessageTypeEnum.SERVICE.getType()).setIdentifier(reqDTO.getIdentifier())
.setData(reqDTO.getParams());
sendDeviceMessage(message, device, result.getCode());
// /**
// * 设置设备配置
// *
// * @param downstreamReqVO 下行请求
// * @param device 设备
// * @param parentDevice 父设备
// * @return 下发消息
// */
// @SuppressWarnings({ "unchecked", "unused" })
// private IotDeviceMessage setDeviceConfig(IotDeviceDownstreamReqVO downstreamReqVO,
// IotDeviceDO device, IotDeviceDO parentDevice) {
// // 1. 参数转换无需校验
// Map<String, Object> config = JsonUtils.parseObject(device.getConfig(), Map.class);
//
// // 2. 发送请求
// String url = String.format("sys/%s/%s/thing/service/config/set",
// getProductKey(device, parentDevice), getDeviceName(device, parentDevice));
// IotDeviceConfigSetReqDTO reqDTO = new IotDeviceConfigSetReqDTO()
// .setConfig(config);
//// CommonResult<Boolean> result = requestPlugin(url, reqDTO, device);
// CommonResult<Boolean> result = null;
//
// // 3. 发送设备消息
// IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId())
// .setType(IotDeviceMessageTypeEnum.CONFIG.getType())
// .setIdentifier(IotDeviceMessageIdentifierEnum.CONFIG_SET.getIdentifier())
// .setData(reqDTO.getConfig());
// sendDeviceMessage(message, device, result.getCode());
//
// // 4. 如果不成功抛出异常提示用户
// if (result.isError()) {
// log.error("[setDeviceConfig][设备({})配置下发失败,请求参数:({}),响应结果:({})]",
// device.getDeviceKey(), reqDTO, result);
// throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg());
// }
// return message;
// }
// 4. 如果不成功抛出异常提示用户
if (result.isError()) {
log.error("[invokeDeviceService][设备({})服务调用失败,请求参数:({}),响应结果:({})]",
device.getDeviceKey(), reqDTO, result);
throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg());
}
return message;
}
/**
* 设置设备属性
*
* @param downstreamReqVO 下行请求
* @param device 设备
* @param parentDevice 父设备
* @return 下发消息
*/
@SuppressWarnings("unchecked")
private IotDeviceMessage setDeviceProperty(IotDeviceDownstreamReqVO downstreamReqVO,
IotDeviceDO device, IotDeviceDO parentDevice) {
// 1. 参数校验
if (!(downstreamReqVO.getData() instanceof Map<?, ?>)) {
throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 Map 类型");
}
// TODO @super可优化过滤掉不合法的属性
// 2. 发送请求
cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage message = cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage
.of(getProductKey(device, parentDevice), getDeviceName(device, parentDevice));
String serverId = "192_168_64_1_8092";
deviceMessageProducer.sendGatewayDeviceMessage(serverId, message);
deviceMessageProducer.sendDeviceMessage(message);
// TODO @芋艿后续可以清理掉
return null;
}
/**
* 获取设备属性
*
* @param downstreamReqVO 下行请求
* @param device 设备
* @param parentDevice 父设备
* @return 下发消息
*/
@SuppressWarnings("unchecked")
private IotDeviceMessage getDeviceProperty(IotDeviceDownstreamReqVO downstreamReqVO,
IotDeviceDO device, IotDeviceDO parentDevice) {
// 1. 参数校验
if (!(downstreamReqVO.getData() instanceof List<?>)) {
throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 List 类型");
}
// TODO @super可优化过滤掉不合法的属性
// 2. 发送请求
String url = String.format("sys/%s/%s/thing/service/property/get",
getProductKey(device, parentDevice), getDeviceName(device, parentDevice));
IotDevicePropertyGetReqDTO reqDTO = new IotDevicePropertyGetReqDTO()
.setIdentifiers((List<String>) downstreamReqVO.getData());
// CommonResult<Boolean> result = requestPlugin(url, reqDTO, device);
CommonResult<Boolean> result = null;
// 3. 发送设备消息
IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId())
.setType(IotDeviceMessageTypeEnum.PROPERTY.getType())
.setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_SET.getIdentifier())
.setData(reqDTO.getIdentifiers());
sendDeviceMessage(message, device, result.getCode());
// 4. 如果不成功抛出异常提示用户
if (result.isError()) {
log.error("[getDeviceProperty][设备({})属性获取失败,请求参数:({}),响应结果:({})]",
device.getDeviceKey(), reqDTO, result);
throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg());
}
return message;
}
/**
* 设置设备配置
*
* @param downstreamReqVO 下行请求
* @param device 设备
* @param parentDevice 父设备
* @return 下发消息
*/
@SuppressWarnings({ "unchecked", "unused" })
private IotDeviceMessage setDeviceConfig(IotDeviceDownstreamReqVO downstreamReqVO,
IotDeviceDO device, IotDeviceDO parentDevice) {
// 1. 参数转换无需校验
Map<String, Object> config = JsonUtils.parseObject(device.getConfig(), Map.class);
// 2. 发送请求
String url = String.format("sys/%s/%s/thing/service/config/set",
getProductKey(device, parentDevice), getDeviceName(device, parentDevice));
IotDeviceConfigSetReqDTO reqDTO = new IotDeviceConfigSetReqDTO()
.setConfig(config);
// CommonResult<Boolean> result = requestPlugin(url, reqDTO, device);
CommonResult<Boolean> result = null;
// 3. 发送设备消息
IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId())
.setType(IotDeviceMessageTypeEnum.CONFIG.getType())
.setIdentifier(IotDeviceMessageIdentifierEnum.CONFIG_SET.getIdentifier())
.setData(reqDTO.getConfig());
sendDeviceMessage(message, device, result.getCode());
// 4. 如果不成功抛出异常提示用户
if (result.isError()) {
log.error("[setDeviceConfig][设备({})配置下发失败,请求参数:({}),响应结果:({})]",
device.getDeviceKey(), reqDTO, result);
throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg());
}
return message;
}
/**
* 设备 OTA 升级
*
* @param downstreamReqVO 下行请求
* @param device 设备
* @param parentDevice 父设备
* @return 下发消息
*/
private IotDeviceMessage otaUpgrade(IotDeviceDownstreamReqVO downstreamReqVO,
IotDeviceDO device, IotDeviceDO parentDevice) {
// 1. 参数校验
if (!(downstreamReqVO.getData() instanceof Map<?, ?> data)) {
throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 Map 类型");
}
// 2. 发送请求
String url = String.format("ota/%s/%s/upgrade",
getProductKey(device, parentDevice), getDeviceName(device, parentDevice));
IotDeviceOtaUpgradeReqDTO reqDTO = IotDeviceOtaUpgradeReqDTO.build(data);
// CommonResult<Boolean> result = requestPlugin(url, reqDTO, device);
CommonResult<Boolean> result = null;
// 3. 发送设备消息
IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId())
.setType(IotDeviceMessageTypeEnum.OTA.getType())
.setIdentifier(IotDeviceMessageIdentifierEnum.OTA_UPGRADE.getIdentifier())
.setData(downstreamReqVO.getData());
sendDeviceMessage(message, device, result.getCode());
// 4. 如果不成功抛出异常提示用户
if (result.isError()) {
log.error("[otaUpgrade][设备({}) OTA 升级失败,请求参数:({}),响应结果:({})]",
device.getDeviceKey(), reqDTO, result);
throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg());
}
return message;
}
private void sendDeviceMessage(IotDeviceMessage message, IotDeviceDO device, Integer code) {
// 1. 完善消息
message.setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName())
.setDeviceKey(device.getDeviceKey())
.setTenantId(device.getTenantId());
Assert.notNull(message.getRequestId(), "requestId 不能为空");
if (message.getReportTime() == null) {
message.setReportTime(LocalDateTime.now());
}
message.setCode(code);
// 2. 发送消息
try {
deviceProducer.sendDeviceMessage(message);
log.info("[sendDeviceMessage][message({}) 发送消息成功]", message);
} catch (Exception e) {
log.error("[sendDeviceMessage][message({}) 发送消息失败]", message, e);
}
}
// /**
// * 设备 OTA 升级
// *
// * @param downstreamReqVO 下行请求
// * @param device 设备
// * @param parentDevice 父设备
// * @return 下发消息
// */
// private IotDeviceMessage otaUpgrade(IotDeviceDownstreamReqVO downstreamReqVO,
// IotDeviceDO device, IotDeviceDO parentDevice) {
// // 1. 参数校验
// if (!(downstreamReqVO.getData() instanceof Map<?, ?> data)) {
// throw new ServiceException(BAD_REQUEST.getCode(), "data 不是 Map 类型");
// }
//
// // 2. 发送请求
// String url = String.format("ota/%s/%s/upgrade",
// getProductKey(device, parentDevice), getDeviceName(device, parentDevice));
// IotDeviceOtaUpgradeReqDTO reqDTO = IotDeviceOtaUpgradeReqDTO.build(data);
//// CommonResult<Boolean> result = requestPlugin(url, reqDTO, device);
// CommonResult<Boolean> result = null;
//
// // 3. 发送设备消息
// IotDeviceMessage message = new IotDeviceMessage().setRequestId(reqDTO.getRequestId())
// .setType(IotDeviceMessageTypeEnum.OTA.getType())
// .setIdentifier(IotDeviceMessageIdentifierEnum.OTA_UPGRADE.getIdentifier())
// .setData(downstreamReqVO.getData());
// sendDeviceMessage(message, device, result.getCode());
//
// // 4. 如果不成功抛出异常提示用户
// if (result.isError()) {
// log.error("[otaUpgrade][设备({}) OTA 升级失败,请求参数:({}),响应结果:({})]",
// device.getDeviceKey(), reqDTO, result);
// throw exception(DEVICE_DOWNSTREAM_FAILED, result.getMsg());
// }
// return message;
// }
private String getDeviceName(IotDeviceDO device, IotDeviceDO parentDevice) {
return parentDevice != null ? parentDevice.getDeviceName() : device.getDeviceName();

View File

@ -7,7 +7,7 @@ import jakarta.validation.Valid;
/**
* IoT 设备上行 Service 接口
*
* 目的设备 -> 插件 -> 服务端
* 目的设备 -> 网关 -> 服务端
*
* @author 芋道源码
*/
@ -20,47 +20,33 @@ public interface IotDeviceUpstreamService {
*/
void upstreamDevice(@Valid IotDeviceUpstreamReqVO simulatorReqVO);
/**
* 更新设备状态
*
* @param updateReqDTO 更新设备状态 DTO
*/
void updateDeviceState(IotDeviceStateUpdateReqDTO updateReqDTO);
/**
* 上报设备属性数据
*
* @param reportReqDTO 上报设备属性数据 DTO
*/
void reportDeviceProperty(IotDevicePropertyReportReqDTO reportReqDTO);
/**
* 上报设备事件数据
*
* @param reportReqDTO 设备事件
*/
void reportDeviceEvent(IotDeviceEventReportReqDTO reportReqDTO);
/**
* 注册设备
*
* @param registerReqDTO 注册设备 DTO
*/
void registerDevice(IotDeviceRegisterReqDTO registerReqDTO);
/**
* 注册子设备
*
* @param registerReqDTO 注册子设备 DTO
*/
void registerSubDevice(IotDeviceRegisterSubReqDTO registerReqDTO);
/**
* 添加设备拓扑
*
* @param addReqDTO 添加设备拓扑 DTO
*/
void addDeviceTopology(IotDeviceTopologyAddReqDTO addReqDTO);
// /**
// * 上报设备事件数据
// *
// * @param reportReqDTO 设备事件
// */
// void reportDeviceEvent(IotDeviceEventReportReqDTO reportReqDTO);
//
// /**
// * 注册设备
// *
// * @param registerReqDTO 注册设备 DTO
// */
// void registerDevice(IotDeviceRegisterReqDTO registerReqDTO);
//
// /**
// * 注册子设备
// *
// * @param registerReqDTO 注册子设备 DTO
// */
// void registerSubDevice(IotDeviceRegisterSubReqDTO registerReqDTO);
//
// /**
// * 添加设备拓扑
// *
// * @param addReqDTO 添加设备拓扑 DTO
// */
// void addDeviceTopology(IotDeviceTopologyAddReqDTO addReqDTO);
/**
* Emqx 连接认证

View File

@ -1,22 +1,15 @@
package cn.iocoder.yudao.module.iot.service.device.control;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.framework.common.util.object.ObjectUtils;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.*;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceUpstreamReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageTypeEnum;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.enums.product.IotProductDeviceTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.mq.producer.device.IotDeviceProducer;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.device.data.IotDevicePropertyService;
import cn.iocoder.yudao.module.iot.util.MqttSignUtils;
@ -45,9 +38,7 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
@Resource
private IotDevicePropertyService devicePropertyService;
@Resource
private IotDeviceProducer deviceProducer;
// TODO @芋艿需要重新实现下
@Override
@SuppressWarnings("unchecked")
public void upstreamDevice(IotDeviceUpstreamReqVO simulatorReqVO) {
@ -74,53 +65,13 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
}
// 2.3 情况三状态变更
if (Objects.equals(simulatorReqVO.getType(), IotDeviceMessageTypeEnum.STATE.getType())) {
updateDeviceState(((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO()
.setRequestId(IdUtil.fastSimpleUUID()).setReportTime(LocalDateTime.now())
.setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName()))
.setState((Integer) simulatorReqVO.getData()));
// TODO @芋艿这里未搞完
return;
}
throw new IllegalArgumentException("未知的类型:" + simulatorReqVO.getType());
}
@Override
public void updateDeviceState(IotDeviceStateUpdateReqDTO updateReqDTO) {
Assert.isTrue(ObjectUtils.equalsAny(updateReqDTO.getState(),
IotDeviceStateEnum.ONLINE.getState(), IotDeviceStateEnum.OFFLINE.getState()),
"状态不合法");
// 1.1 获得设备
log.info("[updateDeviceState][更新设备状态: {}]", updateReqDTO);
IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceNameFromCache(
updateReqDTO.getProductKey(), updateReqDTO.getDeviceName());
if (device == null) {
log.error("[updateDeviceState][设备({}/{}) 不存在]",
updateReqDTO.getProductKey(), updateReqDTO.getDeviceName());
return;
}
TenantUtils.execute(device.getTenantId(), () -> {
// 1.2 记录设备的最后时间
updateDeviceLastTime(device, updateReqDTO);
// 1.3 当前状态一致不处理
if (Objects.equals(device.getState(), updateReqDTO.getState())) {
return;
}
// 2. 更新设备状态
deviceService.updateDeviceState(device.getId(), updateReqDTO.getState());
// 3. TODO 芋艿子设备的关联
// 4. 发送设备消息
IotDeviceMessage message = BeanUtils.toBean(updateReqDTO, IotDeviceMessage.class)
.setType(IotDeviceMessageTypeEnum.STATE.getType())
.setIdentifier(ObjUtil.equals(updateReqDTO.getState(), IotDeviceStateEnum.ONLINE.getState())
? IotDeviceMessageIdentifierEnum.STATE_ONLINE.getIdentifier()
: IotDeviceMessageIdentifierEnum.STATE_OFFLINE.getIdentifier());
sendDeviceMessage(message, device);
});
}
@Override
// @Override TODO 芋艿待重新实现
public void reportDeviceProperty(IotDevicePropertyReportReqDTO reportReqDTO) {
// 1.1 获得设备
log.info("[reportDeviceProperty][上报设备属性: {}]", reportReqDTO);
@ -131,18 +82,16 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
reportReqDTO.getProductKey(), reportReqDTO.getDeviceName());
return;
}
// 1.2 记录设备的最后时间
updateDeviceLastTime(device, reportReqDTO);
// 2. 发送设备消息
IotDeviceMessage message = BeanUtils.toBean(reportReqDTO, IotDeviceMessage.class)
.setType(IotDeviceMessageTypeEnum.PROPERTY.getType())
.setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_REPORT.getIdentifier())
.setData(reportReqDTO.getProperties());
sendDeviceMessage(message, device);
// IotDeviceMessage message = BeanUtils.toBean(reportReqDTO, IotDeviceMessage.class)
// .setType(IotDeviceMessageTypeEnum.PROPERTY.getType())
// .setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_REPORT.getIdentifier())
// .setData(reportReqDTO.getProperties());
// sendDeviceMessage(message, device);
}
@Override
// @Override TODO 芋艿待重新实现
public void reportDeviceEvent(IotDeviceEventReportReqDTO reportReqDTO) {
// 1.1 获得设备
log.info("[reportDeviceEvent][上报设备事件: {}]", reportReqDTO);
@ -153,18 +102,16 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
reportReqDTO.getProductKey(), reportReqDTO.getDeviceName());
return;
}
// 1.2 记录设备的最后时间
updateDeviceLastTime(device, reportReqDTO);
// 2. 发送设备消息
IotDeviceMessage message = BeanUtils.toBean(reportReqDTO, IotDeviceMessage.class)
.setType(IotDeviceMessageTypeEnum.EVENT.getType())
.setIdentifier(reportReqDTO.getIdentifier())
.setData(reportReqDTO.getParams());
sendDeviceMessage(message, device);
// IotDeviceMessage message = BeanUtils.toBean(reportReqDTO, IotDeviceMessage.class)
// .setType(IotDeviceMessageTypeEnum.EVENT.getType())
// .setIdentifier(reportReqDTO.getIdentifier())
// .setData(reportReqDTO.getParams());
// sendDeviceMessage(message, device);
}
@Override
// @Override TODO 芋艿待重新实现
public void registerDevice(IotDeviceRegisterReqDTO registerReqDTO) {
log.info("[registerDevice][注册设备: {}]", registerReqDTO);
registerDevice0(registerReqDTO.getProductKey(), registerReqDTO.getDeviceName(), null, registerReqDTO);
@ -186,18 +133,18 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
registerReqDTO, productKey, device, gatewayId);
}
// 1.2 记录设备的最后时间
updateDeviceLastTime(device, registerReqDTO);
// updateDeviceLastTime(device, registerReqDTO);
// 2. 发送设备消息
if (registerNew) {
IotDeviceMessage message = BeanUtils.toBean(registerReqDTO, IotDeviceMessage.class)
.setType(IotDeviceMessageTypeEnum.REGISTER.getType())
.setIdentifier(IotDeviceMessageIdentifierEnum.REGISTER_REGISTER.getIdentifier());
sendDeviceMessage(message, device);
// IotDeviceMessage message = BeanUtils.toBean(registerReqDTO, IotDeviceMessage.class)
// .setType(IotDeviceMessageTypeEnum.REGISTER.getType())
// .setIdentifier(IotDeviceMessageIdentifierEnum.REGISTER_REGISTER.getIdentifier());
// sendDeviceMessage(message, device);
}
}
@Override
// @Override TODO 芋艿待重新实现
public void registerSubDevice(IotDeviceRegisterSubReqDTO registerReqDTO) {
// 1.1 注册子设备
log.info("[registerSubDevice][注册子设备: {}]", registerReqDTO);
@ -214,7 +161,7 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
return;
}
// 1.2 记录设备的最后时间
updateDeviceLastTime(device, registerReqDTO);
// updateDeviceLastTime(device, registerReqDTO);
// 2. 处理子设备
if (CollUtil.isNotEmpty(registerReqDTO.getParams())) {
@ -224,14 +171,14 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
}
// 3. 发送设备消息
IotDeviceMessage message = BeanUtils.toBean(registerReqDTO, IotDeviceMessage.class)
.setType(IotDeviceMessageTypeEnum.REGISTER.getType())
.setIdentifier(IotDeviceMessageIdentifierEnum.REGISTER_REGISTER_SUB.getIdentifier())
.setData(registerReqDTO.getParams());
sendDeviceMessage(message, device);
// IotDeviceMessage message = BeanUtils.toBean(registerReqDTO, IotDeviceMessage.class)
// .setType(IotDeviceMessageTypeEnum.REGISTER.getType())
// .setIdentifier(IotDeviceMessageIdentifierEnum.REGISTER_REGISTER_SUB.getIdentifier())
// .setData(registerReqDTO.getParams());
// sendDeviceMessage(message, device);
}
@Override
// @Override TODO 芋艿待重新实现
public void addDeviceTopology(IotDeviceTopologyAddReqDTO addReqDTO) {
// 1.1 获得设备
log.info("[addDeviceTopology][添加设备拓扑: {}]", addReqDTO);
@ -247,8 +194,6 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
addReqDTO.getProductKey(), addReqDTO.getDeviceName(), device);
return;
}
// 1.2 记录设备的最后时间
updateDeviceLastTime(device, addReqDTO);
// 2. 处理拓扑
if (CollUtil.isNotEmpty(addReqDTO.getParams())) {
@ -270,11 +215,11 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
}
// 3. 发送设备消息
IotDeviceMessage message = BeanUtils.toBean(addReqDTO, IotDeviceMessage.class)
.setType(IotDeviceMessageTypeEnum.TOPOLOGY.getType())
.setIdentifier(IotDeviceMessageIdentifierEnum.TOPOLOGY_ADD.getIdentifier())
.setData(addReqDTO.getParams());
sendDeviceMessage(message, device);
// IotDeviceMessage message = BeanUtils.toBean(addReqDTO, IotDeviceMessage.class)
// .setType(IotDeviceMessageTypeEnum.TOPOLOGY.getType())
// .setIdentifier(IotDeviceMessageIdentifierEnum.TOPOLOGY_ADD.getIdentifier())
// .setData(addReqDTO.getParams());
// sendDeviceMessage(message, device);
}
// TODO @芋艿后续需要考虑http 的认证
@ -310,33 +255,4 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
return false;
}
private void updateDeviceLastTime(IotDeviceDO device, IotDeviceUpstreamAbstractReqDTO reqDTO) {
// 1. 异步记录设备与插件实例的映射
// pluginInstanceService.updateDevicePluginInstanceProcessIdAsync(device.getDeviceKey(), reqDTO.getProcessId());
// TODO @芋艿需要单独补充下
// 2. 异步更新设备的最后时间
devicePropertyService.updateDeviceReportTimeAsync(device.getDeviceKey(), LocalDateTime.now());
}
private void sendDeviceMessage(IotDeviceMessage message, IotDeviceDO device) {
// 1. 完善消息
message.setDeviceKey(device.getDeviceKey())
.setTenantId(device.getTenantId());
if (StrUtil.isEmpty(message.getRequestId())) {
message.setRequestId(IdUtil.fastSimpleUUID());
}
if (message.getReportTime() == null) {
message.setReportTime(LocalDateTime.now());
}
// 2. 发送消息
try {
deviceProducer.sendDeviceMessage(message);
log.info("[sendDeviceMessage][message({}) 发送消息成功]", message);
} catch (Exception e) {
log.error("[sendDeviceMessage][message({}) 发送消息失败]", message, e);
}
}
}

View File

@ -3,8 +3,8 @@ package cn.iocoder.yudao.module.iot.service.device.data;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.data.IotDevicePropertyHistoryPageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.data.IotDevicePropertyRespVO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDevicePropertyDO;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import jakarta.validation.Valid;
import java.time.LocalDateTime;
@ -56,16 +56,43 @@ public interface IotDevicePropertyService {
* 获得最后上报时间小于指定时间的设备标识
*
* @param maxReportTime 最大上报时间
* @return 设备标识列表
* @return [productKey, deviceName] 列表
*/
Set<String> getDeviceKeysByReportTime(LocalDateTime maxReportTime);
Set<String[]> getProductKeyDeviceNameListByReportTime(LocalDateTime maxReportTime);
/**
* 异步更新设备上报时间
* 更新设备上报时间
*
* @param deviceKey 设备标识
* @param productKey 产品标识
* @param deviceName 设备名称
* @param reportTime 上报时间
*/
void updateDeviceReportTimeAsync(String deviceKey, LocalDateTime reportTime);
void updateDeviceReportTime(String productKey, String deviceName, LocalDateTime reportTime);
/**
* 更新设备关联的网关 serverId
*
* @param productKey 产品标识
* @param deviceName 设备名称
* @param serverId 网关 serverId
*/
void updateDeviceServerId(String productKey, String deviceName, String serverId);
/**
* 删除设备关联的网关 serverId
*
* @param productKey 产品标识
* @param deviceName 设备名称
*/
void deleteDeviceServerId(String productKey, String deviceName);
/**
* 获得设备关联的网关 serverId
*
* @param productKey 产品标识
* @param deviceName 设备名称
* @return 网关 serverId
*/
String getDeviceServerId(String productKey, String deviceName);
}

View File

@ -9,17 +9,18 @@ import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.data.IotDevicePropertyHistoryPageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.data.IotDevicePropertyRespVO;
import cn.iocoder.yudao.module.iot.controller.admin.thingmodel.model.dataType.ThingModelDateOrTextDataSpecs;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDevicePropertyDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotThingModelDO;
import cn.iocoder.yudao.module.iot.dal.redis.device.DevicePropertyRedisDAO;
import cn.iocoder.yudao.module.iot.dal.redis.device.DeviceReportTimeRedisDAO;
import cn.iocoder.yudao.module.iot.dal.redis.device.DeviceServerIdRedisDAO;
import cn.iocoder.yudao.module.iot.dal.tdengine.IotDevicePropertyMapper;
import cn.iocoder.yudao.module.iot.enums.thingmodel.IotDataSpecsDataTypeEnum;
import cn.iocoder.yudao.module.iot.enums.thingmodel.IotThingModelTypeEnum;
import cn.iocoder.yudao.module.iot.framework.tdengine.core.TDengineTableField;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.product.IotProductService;
import cn.iocoder.yudao.module.iot.service.thingmodel.IotThingModelService;
@ -27,7 +28,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
@ -70,6 +70,8 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
private DevicePropertyRedisDAO deviceDataRedisDAO;
@Resource
private DeviceReportTimeRedisDAO deviceReportTimeRedisDAO;
@Resource
private DeviceServerIdRedisDAO deviceServerIdRedisDAO;
@Resource
private IotDevicePropertyMapper devicePropertyMapper;
@ -153,7 +155,8 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
LocalDateTimeUtil.toEpochMilli(message.getReportTime()));
// 3.2 保存设备属性日志
deviceDataRedisDAO.putAll(message.getDeviceKey(), convertMap(properties.entrySet(), Map.Entry::getKey,
// TODO @芋艿这里要调整下
deviceDataRedisDAO.putAll(device.getDeviceKey(), convertMap(properties.entrySet(), Map.Entry::getKey,
entry -> IotDevicePropertyDO.builder().value(entry.getValue()).updateTime(message.getReportTime()).build()));
}
@ -187,14 +190,31 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
// ========== 设备时间相关操作 ==========
@Override
public Set<String> getDeviceKeysByReportTime(LocalDateTime maxReportTime) {
public Set<String[]> getProductKeyDeviceNameListByReportTime(LocalDateTime maxReportTime) {
return deviceReportTimeRedisDAO.range(maxReportTime);
}
@Override
@Async
public void updateDeviceReportTimeAsync(String deviceKey, LocalDateTime reportTime) {
deviceReportTimeRedisDAO.update(deviceKey, reportTime);
public void updateDeviceReportTime(String productKey, String deviceName, LocalDateTime reportTime) {
deviceReportTimeRedisDAO.update(productKey, deviceName, reportTime);
}
@Override
public void updateDeviceServerId(String productKey, String deviceName, String serverId) {
if (StrUtil.isEmpty(serverId)) {
return;
}
deviceServerIdRedisDAO.update(productKey, deviceName, serverId);
}
@Override
public void deleteDeviceServerId(String productKey, String deviceName) {
deviceServerIdRedisDAO.delete(productKey, deviceName);
}
@Override
public String getDeviceServerId(String productKey, String deviceName) {
return deviceServerIdRedisDAO.get(productKey, deviceName);
}
}

View File

@ -3,9 +3,9 @@ package cn.iocoder.yudao.module.iot.service.rule;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.scene.IotRuleScenePageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.scene.IotRuleSceneSaveReqVO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import jakarta.validation.Valid;
import java.util.List;

View File

@ -17,6 +17,7 @@ import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.scene.IotRuleScenePageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.scene.IotRuleSceneSaveReqVO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO;
import cn.iocoder.yudao.module.iot.dal.mysql.rule.IotRuleSceneMapper;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum;
@ -26,7 +27,6 @@ import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneTriggerConditionParame
import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.framework.job.core.IotSchedulerManager;
import cn.iocoder.yudao.module.iot.job.rule.IotRuleSceneJob;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.service.rule.action.IotRuleSceneAction;
import jakarta.annotation.Resource;
import lombok.SneakyThrows;
@ -240,16 +240,17 @@ public class IotRuleSceneServiceImpl implements IotRuleSceneService {
@Override
public void executeRuleSceneByDevice(IotDeviceMessage message) {
TenantUtils.execute(message.getTenantId(), () -> {
// 1. 获得设备匹配的规则场景
List<IotRuleSceneDO> ruleScenes = getMatchedRuleSceneListByMessage(message);
if (CollUtil.isEmpty(ruleScenes)) {
return;
}
// 2. 执行规则场景
executeRuleSceneAction(message, ruleScenes);
});
// TODO @芋艿这里的 tenantId通过设备获取
// TenantUtils.execute(message.getTenantId(), () -> {
// // 1. 获得设备匹配的规则场景
// List<IotRuleSceneDO> ruleScenes = getMatchedRuleSceneListByMessage(message);
// if (CollUtil.isEmpty(ruleScenes)) {
// return;
// }
//
// // 2. 执行规则场景
// executeRuleSceneAction(message, ruleScenes);
// });
}
@Override

View File

@ -1,8 +1,8 @@
package cn.iocoder.yudao.module.iot.service.rule.action;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import javax.annotation.Nullable;

View File

@ -1,8 +1,8 @@
package cn.iocoder.yudao.module.iot.service.rule.action;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import org.springframework.stereotype.Component;
import javax.annotation.Nullable;

View File

@ -2,10 +2,10 @@ package cn.iocoder.yudao.module.iot.service.rule.action;
import cn.hutool.core.lang.Assert;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.service.rule.IotDataBridgeService;
import cn.iocoder.yudao.module.iot.service.rule.action.databridge.IotDataBridgeExecute;
import jakarta.annotation.Resource;

View File

@ -2,10 +2,10 @@ package cn.iocoder.yudao.module.iot.service.rule.action;
import cn.hutool.core.lang.Assert;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.control.IotDeviceDownstreamReqVO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.device.control.IotDeviceDownstreamService;
import jakarta.annotation.Resource;

View File

@ -1,8 +1,8 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.hutool.core.util.ObjUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;

View File

@ -1,8 +1,7 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
/**
* IoT 数据桥梁的执行器 execute 接口

View File

@ -4,8 +4,8 @@ import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.common.util.http.HttpUtils;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeHttpConfig;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.*;
@ -16,8 +16,6 @@ import org.springframework.web.util.UriComponentsBuilder;
import java.util.HashMap;
import java.util.Map;
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
/**
* Http {@link IotDataBridgeExecute} 实现类
*
@ -48,7 +46,8 @@ public class IotHttpDataBridgeExecute implements IotDataBridgeExecute<IotDataBri
if (CollUtil.isNotEmpty(config.getHeaders())) {
config.getHeaders().putAll(config.getHeaders());
}
headers.add(HEADER_TENANT_ID, message.getTenantId().toString());
// TODO @puhui999@yunai可能需要通过设备查询到租户然后 set
// headers.add(HEADER_TENANT_ID, message.getTenantId().toString());
// 1.2 构建 URL
UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromUriString(config.getUrl());
if (CollUtil.isNotEmpty(config.getQuery())) {

View File

@ -1,8 +1,8 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeKafkaMQConfig;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

View File

@ -1,8 +1,8 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRabbitMQConfig;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

View File

@ -2,8 +2,8 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRedisStreamConfig;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;

View File

@ -1,8 +1,8 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRocketMQConfig;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;

View File

@ -2,8 +2,8 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.framework.test.core.ut.BaseMockitoUnitTest;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.*;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
@ -41,8 +41,8 @@ public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
@BeforeEach
public void setUp() {
// 创建共享的测试消息
message = IotDeviceMessage.builder().requestId("TEST-001").reportTime(LocalDateTime.now()).tenantId(1L)
.productKey("testProduct").deviceName("testDevice").deviceKey("testDeviceKey")
message = IotDeviceMessage.builder().messageId("TEST-001").reportTime(LocalDateTime.now())
.productKey("testProduct").deviceName("testDevice")
.type("property").identifier("temperature").data("{\"value\": 60}")
.build();
}

View File

@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.iot.enums.device;
package cn.iocoder.yudao.module.iot.core.enums;
import cn.iocoder.yudao.framework.common.core.ArrayValuable;
import lombok.Getter;

View File

@ -2,7 +2,7 @@ package cn.iocoder.yudao.module.iot.core.mq.message;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageIdentifierEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageTypeEnum;
import cn.iocoder.yudao.module.iot.core.util.IotCoreUtils;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@ -79,7 +79,7 @@ public class IotDeviceMessage {
private LocalDateTime reportTime;
/**
* 服务编号该消息由哪个消息发送
* 服务编号该消息由哪个 server 服务进行消费
*/
private String serverId;
@ -97,6 +97,18 @@ public class IotDeviceMessage {
return this;
}
public IotDeviceMessage ofStateOnline() {
this.setType(IotDeviceMessageTypeEnum.STATE.getType());
this.setIdentifier(IotDeviceMessageIdentifierEnum.STATE_ONLINE.getIdentifier());
return this;
}
public IotDeviceMessage ofStateOffline() {
this.setType(IotDeviceMessageTypeEnum.STATE.getType());
this.setIdentifier(IotDeviceMessageIdentifierEnum.STATE_OFFLINE.getIdentifier());
return this;
}
public static IotDeviceMessage of(String productKey, String deviceName) {
return of(productKey, deviceName,
null, null);
@ -113,17 +125,11 @@ public class IotDeviceMessage {
if (reportTime == null) {
reportTime = LocalDateTime.now();
}
String messageId = IotCoreUtils.generateMessageId();
String messageId = IotDeviceMessageUtils.generateMessageId();
return IotDeviceMessage.builder()
.messageId(messageId).reportTime(reportTime)
.productKey(productKey).deviceName(deviceName)
.serverId(serverId).build();
}
// ========== Topic 相关 ==========
public static String buildMessageBusGatewayDeviceMessageTopic(String serverId) {
return String.format(MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC, serverId);
}
}

View File

@ -2,6 +2,7 @@ package cn.iocoder.yudao.module.iot.core.mq.producer;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import lombok.RequiredArgsConstructor;
/**
@ -30,7 +31,7 @@ public class IotDeviceMessageProducer {
* @param message 设备消息
*/
public void sendGatewayDeviceMessage(String serverId, Object message) {
messageBus.post(IotDeviceMessage.buildMessageBusGatewayDeviceMessageTopic(serverId), message);
messageBus.post(IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(serverId), message);
}
}

View File

@ -1,29 +0,0 @@
package cn.iocoder.yudao.module.iot.core.util;
import cn.hutool.core.util.IdUtil;
import cn.hutool.system.SystemUtil;
/**
* IoT 核心模块的工具类
*
* @author 芋道源码
*/
public class IotCoreUtils {
/**
* 生成服务器编号
*
* @param serverPort 服务器端口
* @return 服务器编号
*/
public static String generateServerId(Integer serverPort) {
String serverId = String.format("%s.%d", SystemUtil.getHostInfo().getAddress(), serverPort);
// 避免一些场景无法使用 . 符号例如说 RocketMQ Topic
return serverId.replaceAll("\\.", "_");
}
public static String generateMessageId() {
return IdUtil.fastSimpleUUID();
}
}

View File

@ -0,0 +1,49 @@
package cn.iocoder.yudao.module.iot.core.util;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
/**
* IoT 设备消息的工具类
*
* @author 芋道源码
*/
public class IotDeviceMessageUtils {
// ========== Message 相关 ==========
public static String generateMessageId() {
return IdUtil.fastSimpleUUID();
}
/**
* 是否是上行消息由设备发送
*
* @param message 消息
* @return 是否
*/
public static boolean isUpstreamMessage(IotDeviceMessage message) {
return StrUtil.isNotEmpty(message.getServerId());
}
// ========== Topic 相关 ==========
public static String buildMessageBusGatewayDeviceMessageTopic(String serverId) {
return String.format(IotDeviceMessage.MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC, serverId);
}
/**
* 生成服务器编号
*
* @param serverPort 服务器端口
* @return 服务器编号
*/
public static String generateServerId(Integer serverPort) {
String serverId = String.format("%s.%d", SystemUtil.getHostInfo().getAddress(), serverPort);
// 避免一些场景无法使用 . 符号例如说 RocketMQ Topic
return serverId.replaceAll("\\.", "_");
}
}

View File

@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.http;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -27,7 +28,7 @@ public class IotHttpDownstreamSubscriber implements IotMessageSubscriber<IotDevi
@Override
public String getTopic() {
return IotDeviceMessage.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
return IotDeviceMessageUtils.buildMessageBusGatewayDeviceMessageTopic(protocol.getServerId());
}
@Override

View File

@ -1,7 +1,7 @@
package cn.iocoder.yudao.module.iot.gateway.protocol.http;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.core.util.IotCoreUtils;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.router.IotHttpUpstreamHandler;
import io.vertx.core.AbstractVerticle;
@ -70,7 +70,7 @@ public class IotHttpUpstreamProtocol extends AbstractVerticle {
}
public String getServerId() {
return IotCoreUtils.generateServerId(httpProperties.getServerPort());
return IotDeviceMessageUtils.generateServerId(httpProperties.getServerPort());
}
}

View File

@ -18,39 +18,9 @@ public class IotDeviceUpstreamClient implements IotDeviceUpstreamApi {
@Resource
private IotDeviceUpstreamApi deviceUpstreamApi;
@Override
public CommonResult<Boolean> updateDeviceState(IotDeviceStateUpdateReqDTO updateReqDTO) {
return deviceUpstreamApi.updateDeviceState(updateReqDTO);
}
@Override
public CommonResult<Boolean> reportDeviceEvent(IotDeviceEventReportReqDTO reportReqDTO) {
return deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
}
@Override
public CommonResult<Boolean> registerDevice(IotDeviceRegisterReqDTO registerReqDTO) {
return deviceUpstreamApi.registerDevice(registerReqDTO);
}
@Override
public CommonResult<Boolean> registerSubDevice(IotDeviceRegisterSubReqDTO registerReqDTO) {
return deviceUpstreamApi.registerSubDevice(registerReqDTO);
}
@Override
public CommonResult<Boolean> addDeviceTopology(IotDeviceTopologyAddReqDTO addReqDTO) {
return deviceUpstreamApi.addDeviceTopology(addReqDTO);
}
@Override
public CommonResult<Boolean> authenticateEmqxConnection(IotDeviceEmqxAuthReqDTO authReqDTO) {
return deviceUpstreamApi.authenticateEmqxConnection(authReqDTO);
}
@Override
public CommonResult<Boolean> reportDeviceProperty(IotDevicePropertyReportReqDTO reportReqDTO) {
return deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
}
}

View File

@ -4,7 +4,7 @@ import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.net.component.core.util.IotNetComponentCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;