diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/device/IotDeviceMessageIdentifierEnum.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/device/IotDeviceMessageIdentifierEnum.java index 5289575b02..d85799be02 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/device/IotDeviceMessageIdentifierEnum.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/device/IotDeviceMessageIdentifierEnum.java @@ -15,7 +15,9 @@ public enum IotDeviceMessageIdentifierEnum { PROPERTY_REPORT("report"), STATE_ONLINE("online"), - STATE_OFFLINE("offline"); + STATE_OFFLINE("offline"), + + SERVICE_REPLY_SUFFIX("_reply"); /** diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/device/IotDeviceMessageTypeEnum.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/device/IotDeviceMessageTypeEnum.java index 20140ead2e..473b542ecd 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/device/IotDeviceMessageTypeEnum.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/device/IotDeviceMessageTypeEnum.java @@ -15,7 +15,8 @@ public enum IotDeviceMessageTypeEnum implements ArrayValuable { STATE("state"), // 设备状态 PROPERTY("property"), // 设备属性 - EVENT("event"); // 设备事件 + EVENT("event"), // 设备事件 + SERVICE("service"); // 设备服务 public static final String[] ARRAYS = Arrays.stream(values()).map(IotDeviceMessageTypeEnum::getType).toArray(String[]::new); diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/device/IotDeviceStateEnum.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/device/IotDeviceStateEnum.java index a1e528ed82..6ce2677dbe 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/device/IotDeviceStateEnum.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/device/IotDeviceStateEnum.java @@ -35,4 +35,8 @@ public enum IotDeviceStateEnum implements ArrayValuable { return ARRAYS; } + public static boolean isOnline(Integer state) { + return ONLINE.getState().equals(state); + } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/device/IotDeviceOfflineCheckJob.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/device/IotDeviceOfflineCheckJob.java new file mode 100644 index 0000000000..96f33e48c7 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/job/device/IotDeviceOfflineCheckJob.java @@ -0,0 +1,24 @@ +package cn.iocoder.yudao.module.iot.job.device; + +import cn.iocoder.yudao.framework.quartz.core.handler.JobHandler; +import cn.iocoder.yudao.framework.tenant.core.job.TenantJob; +import org.springframework.stereotype.Component; + +// TODO @芋艿:待实现 +/** + * IoT 设备离线检查 Job + * + * 检测逻辑:设备最后一条 {@link cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage} 消息超过一定时间,则认为设备离线 + * + * @author 芋道源码 + */ +@Component +public class IotDeviceOfflineCheckJob implements JobHandler { + + @Override + @TenantJob + public String execute(String param) { + return ""; + } + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceOnlineMessageConsumer.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceOnlineMessageConsumer.java new file mode 100644 index 0000000000..83caea1902 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceOnlineMessageConsumer.java @@ -0,0 +1,85 @@ +package cn.iocoder.yudao.module.iot.mq.consumer.device; + +import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.module.iot.api.device.dto.IotDeviceStateUpdateReqDTO; +import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO; +import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum; +import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageTypeEnum; +import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum; +import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; +import cn.iocoder.yudao.module.iot.service.device.IotDeviceService; +import cn.iocoder.yudao.module.iot.service.device.upstream.IotDeviceUpstreamService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; +import java.util.Objects; + +/** + * 针对 {@link IotDeviceMessage} 的消费者,将离线的设备,自动标记为上线 + * + * 注意:只有设备上行消息,才会触发该逻辑 + * + * @author 芋道源码 + */ +@Component +@Slf4j +public class IotDeviceOnlineMessageConsumer { + + @Resource + private IotDeviceService deviceService; + + @Resource + private IotDeviceUpstreamService deviceUpstreamService; + + @EventListener + @Async + public void onMessage(IotDeviceMessage message) { + // 1.1 只处理上行消息。因为,只有设备上行的消息,才会触发设备上线的逻辑 + if (!isUpstreamMessage(message)) { + return; + } + // 1.2 如果设备已在线,则不做处理 + log.info("[onMessage][消息内容({})]", message); + IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceNameFromCache( + message.getProductKey(), message.getDeviceName()); + if (device == null) { + log.error("[onMessage][消息({}) 对应的设备部存在]", message); + return; + } + if (IotDeviceStateEnum.isOnline(device.getState())) { + return; + } + + // 2. 标记设备为在线 + // 为什么不直接更新状态呢?因为通过 IotDeviceMessage 可以经过一系列的处理,例如说记录日志等等 + deviceUpstreamService.updateDeviceState(((IotDeviceStateUpdateReqDTO) + new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID()).setReportTime(LocalDateTime.now()) + .setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName())) + .setState((IotDeviceStateEnum.ONLINE.getState()))); + } + + private boolean isUpstreamMessage(IotDeviceMessage message) { + // 设备属性 + if (Objects.equals(message.getType(), IotDeviceMessageTypeEnum.PROPERTY.getType()) + && Objects.equals(message.getIdentifier(), IotDeviceMessageIdentifierEnum.PROPERTY_REPORT.getIdentifier())) { + return true; + } + // 设备事件 + if (Objects.equals(message.getType(), IotDeviceMessageTypeEnum.EVENT.getType())) { + return true; + } + // 设备服务 + // noinspection RedundantIfStatement + if (Objects.equals(message.getType(), IotDeviceMessageTypeEnum.SERVICE.getType()) + && !StrUtil.endWith(message.getIdentifier(), IotDeviceMessageIdentifierEnum.SERVICE_REPLY_SUFFIX.getIdentifier())) { + return true; + } + return false; + } + +} \ No newline at end of file