【功能新增】IoT:增加 IotDeviceOfflineCheckJob,处理设备超时下线

This commit is contained in:
YunaiV 2025-01-29 21:18:38 +08:00
parent eb74f753a8
commit 39aaeaa298
10 changed files with 135 additions and 19 deletions

View File

@ -122,7 +122,7 @@ public class IotDeviceController {
@Parameter(name = "deviceType", description = "设备类型", example = "1")
public CommonResult<List<IotDeviceRespVO>> getSimpleDeviceList(
@RequestParam(value = "deviceType", required = false) Integer deviceType) {
List<IotDeviceDO> list = deviceService.getDeviceList(deviceType);
List<IotDeviceDO> list = deviceService.getDeviceListByDeviceType(deviceType);
return success(convertList(list, device -> // 只返回 idname 字段
new IotDeviceRespVO().setId(device.getId()).setDeviceName(device.getDeviceName())));
}

View File

@ -52,10 +52,14 @@ public interface IotDeviceMapper extends BaseMapperX<IotDeviceDO> {
.apply("LOWER(device_key) = {0}", deviceKey.toLowerCase()));
}
default List<IotDeviceDO> selectList(Integer deviceType) {
default List<IotDeviceDO> selectListByDeviceType(Integer deviceType) {
return selectList(IotDeviceDO::getDeviceType, deviceType);
}
default List<IotDeviceDO> selectListByState(Integer state) {
return selectList(IotDeviceDO::getState, state);
}
default Long selectCountByGroupId(Long groupId) {
return selectCount(new LambdaQueryWrapperX<IotDeviceDO>()
.apply("FIND_IN_SET(" + groupId + ",group_ids) > 0")

View File

@ -24,7 +24,7 @@ public interface RedisKeyConstants {
* KEY 格式{deviceKey}
* SCORE上报时间
*/
String DEVICE_REPORT_TIME = "device_report_time";
String DEVICE_REPORT_TIMES = "device_report_times";
/**
* 设备信息的数据缓存使用 Spring Cache 操作
@ -32,7 +32,7 @@ public interface RedisKeyConstants {
* KEY 格式device_${productKey}_${deviceKey}
* VALUE 数据类型String(JSON)
*/
String DEVICE = "device";
String DEVICE = "device";
/**
* 物模型的数据缓存使用 Spring Cache 操作

View File

@ -7,6 +7,7 @@ import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Repository;
import java.time.LocalDateTime;
import java.util.Set;
/**
* 设备的最后上报时间的 Redis DAO
@ -20,8 +21,13 @@ public class DeviceReportTimeRedisDAO {
private StringRedisTemplate stringRedisTemplate;
public void update(String deviceKey, LocalDateTime reportTime) {
stringRedisTemplate.opsForZSet().add(RedisKeyConstants.DEVICE_REPORT_TIME, deviceKey,
stringRedisTemplate.opsForZSet().add(RedisKeyConstants.DEVICE_REPORT_TIMES, deviceKey,
LocalDateTimeUtil.toEpochMilli(reportTime));
}
public Set<String> range(LocalDateTime maxReportTime) {
return stringRedisTemplate.opsForZSet().rangeByScore(RedisKeyConstants.DEVICE_REPORT_TIMES, 0,
LocalDateTimeUtil.toEpochMilli(maxReportTime));
}
}

View File

@ -1,10 +1,25 @@
package cn.iocoder.yudao.module.iot.job.device;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.quartz.core.handler.JobHandler;
import cn.iocoder.yudao.framework.tenant.core.job.TenantJob;
import cn.iocoder.yudao.module.iot.api.device.dto.IotDeviceStateUpdateReqDTO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.device.data.IotDevicePropertyService;
import cn.iocoder.yudao.module.iot.service.device.upstream.IotDeviceUpstreamService;
import jakarta.annotation.Resource;
import org.springframework.stereotype.Component;
// TODO @芋艿待实现
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Set;
/**
* IoT 设备离线检查 Job
*
@ -15,10 +30,46 @@ import org.springframework.stereotype.Component;
@Component
public class IotDeviceOfflineCheckJob implements JobHandler {
/**
* 设备离线超时时间
*
* TODO 芋艿暂定 10 分钟后续看看要不要基于设备或者全局有配置文件
*/
public static final Duration OFFLINE_TIMEOUT = Duration.ofMinutes(10);
@Resource
private IotDeviceService deviceService;
@Resource
private IotDevicePropertyService devicePropertyService;
@Resource
private IotDeviceUpstreamService deviceUpstreamService;
@Override
@TenantJob
public String execute(String param) {
return "";
// 1.1 获得在线设备列表
List<IotDeviceDO> devices = deviceService.getDeviceListByState(IotDeviceStateEnum.ONLINE.getState());
if (CollUtil.isEmpty(devices)) {
return JsonUtils.toJsonString(Collections.emptyList());
}
// 1.2 获取超时的 deviceKey 集合
Set<String> timeoutDeviceKeys = devicePropertyService.getDeviceKeysByReportTime(
LocalDateTime.now().minus(OFFLINE_TIMEOUT));
// 2. 下线设备
List<String> offlineDeviceKeys = CollUtil.newArrayList();
for (IotDeviceDO device : devices) {
if (!timeoutDeviceKeys.contains(device.getDeviceKey())) {
continue;
}
offlineDeviceKeys.add(device.getDeviceKey());
// 为什么不直接更新状态呢因为通过 IotDeviceMessage 可以经过一系列的处理例如说记录日志等等
deviceUpstreamService.updateDeviceState(((IotDeviceStateUpdateReqDTO)
new IotDeviceStateUpdateReqDTO().setRequestId(IdUtil.fastSimpleUUID()).setReportTime(LocalDateTime.now())
.setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName()))
.setState((IotDeviceStateEnum.OFFLINE.getState())));
}
return JsonUtils.toJsonString(offlineDeviceKeys);
}
}

View File

@ -93,15 +93,23 @@ public interface IotDeviceService {
PageResult<IotDeviceDO> getDevicePage(IotDevicePageReqVO pageReqVO);
/**
* 获得设备列表
* 基于设备类型获得设备列表
*
* @param deviceType 设备类型
* @return 设备列表
*/
List<IotDeviceDO> getDeviceList(@Nullable Integer deviceType);
List<IotDeviceDO> getDeviceListByDeviceType(@Nullable Integer deviceType);
/**
* 获得设备数量
* 获得状态获得设备列表
*
* @param state 状态
* @return 设备列表
*/
List<IotDeviceDO> getDeviceListByState(Integer state);
/**
* 基于产品编号获得设备数量
*
* @param productId 产品编号
* @return 设备数量

View File

@ -226,8 +226,13 @@ public class IotDeviceServiceImpl implements IotDeviceService {
}
@Override
public List<IotDeviceDO> getDeviceList(@Nullable Integer deviceType) {
return deviceMapper.selectList(deviceType);
public List<IotDeviceDO> getDeviceListByDeviceType(@Nullable Integer deviceType) {
return deviceMapper.selectListByDeviceType(deviceType);
}
@Override
public List<IotDeviceDO> getDeviceListByState(Integer state) {
return deviceMapper.selectListByState(state);
}
@Override

View File

@ -7,7 +7,9 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDevicePropertyDO;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import jakarta.validation.Valid;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.Set;
/**
* IoT 设备属性数据 Service 接口
@ -16,6 +18,8 @@ import java.util.Map;
*/
public interface IotDevicePropertyService {
// ========== 设备属性相关操作 ==========
/**
* 定义设备属性数据的结构
*
@ -46,4 +50,22 @@ public interface IotDevicePropertyService {
*/
PageResult<IotDevicePropertyRespVO> getHistoryDevicePropertyPage(@Valid IotDevicePropertyHistoryPageReqVO pageReqVO);
// ========== 设备时间相关操作 ==========
/**
* 获得最后上报时间小于指定时间的设备标识
*
* @param maxReportTime 最大上报时间
* @return 设备标识列表
*/
Set<String> getDeviceKeysByReportTime(LocalDateTime maxReportTime);
/**
* 更新设备上报时间
*
* @param deviceKey 设备标识
* @param reportTime 上报时间
*/
void updateDeviceReportTime(String deviceKey, LocalDateTime reportTime);
}

View File

@ -14,6 +14,7 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDevicePropertyDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotThingModelDO;
import cn.iocoder.yudao.module.iot.dal.redis.device.DevicePropertyRedisDAO;
import cn.iocoder.yudao.module.iot.dal.redis.device.DeviceReportTimeRedisDAO;
import cn.iocoder.yudao.module.iot.dal.tdengine.IotDevicePropertyMapper;
import cn.iocoder.yudao.module.iot.enums.thingmodel.IotDataSpecsDataTypeEnum;
import cn.iocoder.yudao.module.iot.enums.thingmodel.IotThingModelTypeEnum;
@ -28,10 +29,8 @@ import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.time.LocalDateTime;
import java.util.*;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.*;
@ -68,10 +67,14 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
@Resource
private DevicePropertyRedisDAO deviceDataRedisDAO;
@Resource
private DeviceReportTimeRedisDAO deviceReportTimeRedisDAO;
@Resource
private IotDevicePropertyMapper devicePropertyMapper;
// ========== 设备属性相关操作 ==========
@Override
public void defineDevicePropertyData(Long productId) {
// 1.1 查询产品和物模型
@ -179,4 +182,16 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
}
}
// ========== 设备时间相关操作 ==========
@Override
public Set<String> getDeviceKeysByReportTime(LocalDateTime maxReportTime) {
return deviceReportTimeRedisDAO.range(maxReportTime);
}
@Override
public void updateDeviceReportTime(String deviceKey, LocalDateTime reportTime) {
deviceReportTimeRedisDAO.update(deviceKey, reportTime);
}
}

View File

@ -18,6 +18,7 @@ import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.mq.producer.device.IotDeviceProducer;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.device.data.IotDevicePropertyService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@ -38,6 +39,8 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
@Resource
private IotDeviceService deviceService;
@Resource
private IotDevicePropertyService devicePropertyService;
@Resource
private IotDeviceProducer deviceProducer;
@ -107,9 +110,11 @@ public class IotDeviceUpstreamServiceImpl implements IotDeviceUpstreamService {
// TODO 芋艿待实现
}
private void updateDeviceLastTime(IotDeviceDO deviceDO, IotDeviceUpstreamAbstractReqDTO reqDTO) {
// TODO 芋艿插件状态
// TODO 芋艿操作时间
private void updateDeviceLastTime(IotDeviceDO device, IotDeviceUpstreamAbstractReqDTO reqDTO) {
// 1. TODO 芋艿插件状态
// 2. 更新设备的最后时间
devicePropertyService.updateDeviceReportTime(device.getDeviceKey(), LocalDateTime.now());
}
private void sendDeviceMessage(IotDeviceMessage message, IotDeviceDO device) {