feat:【IoT 物联网】增加(重构) IotDevicePropertyMessageSubscriber 记录设备属性

This commit is contained in:
YunaiV 2025-06-02 15:10:26 +08:00
parent 0bb01eaeeb
commit 1498389d26
9 changed files with 83 additions and 63 deletions

View File

@ -20,8 +20,11 @@ public class IotDevicePropertyHistoryPageReqVO extends PageParam {
@NotNull(message = "设备编号不能为空") @NotNull(message = "设备编号不能为空")
private Long deviceId; private Long deviceId;
@Schema(description = "设备 Key", hidden = true) @Schema(description = "产品 Key", hidden = true)
private String deviceKey; // 非前端传递后端自己查询设置 private String productKey; // 非前端传递后端自己查询设置
@Schema(description = "设备名称", hidden = true)
private String deviceName; // 非前端传递后端自己查询设置
@Schema(description = "属性标识符", requiredMode = Schema.RequiredMode.REQUIRED) @Schema(description = "属性标识符", requiredMode = Schema.RequiredMode.REQUIRED)
@NotEmpty(message = "属性标识符不能为空") @NotEmpty(message = "属性标识符不能为空")

View File

@ -9,15 +9,14 @@ import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDevicePropertyDO;
*/ */
public interface RedisKeyConstants { public interface RedisKeyConstants {
// TODO @芋艿弱化 deviceKey使用 product_key + device_name 替代
/** /**
* 设备属性的数据缓存采用 HASH 结构 * 设备属性的数据缓存采用 HASH 结构
* <p> * <p>
* KEY 格式device_property:{deviceKey} * KEY 格式device_property:{productKey},${deviceName}
* HASH KEYidentifier 属性标识 * HASH KEYidentifier 属性标识
* VALUE 数据类型String(JSON) {@link IotDevicePropertyDO} * VALUE 数据类型String(JSON) {@link IotDevicePropertyDO}
*/ */
String DEVICE_PROPERTY = "iot:device_property:%s"; String DEVICE_PROPERTY = "iot:device_property:%s,%s";
/** /**
* 设备的最后上报时间采用 ZSET 结构 * 设备的最后上报时间采用 ZSET 结构

View File

@ -22,8 +22,8 @@ public class DevicePropertyRedisDAO {
@Resource @Resource
private StringRedisTemplate stringRedisTemplate; private StringRedisTemplate stringRedisTemplate;
public Map<String, IotDevicePropertyDO> get(String deviceKey) { public Map<String, IotDevicePropertyDO> get(String productKey, String deviceName) {
String redisKey = formatKey(deviceKey); String redisKey = formatKey(productKey, deviceName);
Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(redisKey); Map<Object, Object> entries = stringRedisTemplate.opsForHash().entries(redisKey);
if (CollUtil.isEmpty(entries)) { if (CollUtil.isEmpty(entries)) {
return Collections.emptyMap(); return Collections.emptyMap();
@ -33,18 +33,18 @@ public class DevicePropertyRedisDAO {
entry -> JsonUtils.parseObject((String) entry.getValue(), IotDevicePropertyDO.class)); entry -> JsonUtils.parseObject((String) entry.getValue(), IotDevicePropertyDO.class));
} }
public void putAll(String deviceKey, Map<String, IotDevicePropertyDO> properties) { public void putAll(String productKey, String deviceName, Map<String, IotDevicePropertyDO> properties) {
if (CollUtil.isEmpty(properties)) { if (CollUtil.isEmpty(properties)) {
return; return;
} }
String redisKey = formatKey(deviceKey); String redisKey = formatKey(productKey, deviceName);
stringRedisTemplate.opsForHash().putAll(redisKey, convertMap(properties.entrySet(), stringRedisTemplate.opsForHash().putAll(redisKey, convertMap(properties.entrySet(),
Map.Entry::getKey, Map.Entry::getKey,
entry -> JsonUtils.toJsonString(entry.getValue()))); entry -> JsonUtils.toJsonString(entry.getValue())));
} }
private static String formatKey(String deviceKey) { private static String formatKey(String productKey, String deviceName) {
return String.format(DEVICE_PROPERTY, deviceKey); return String.format(DEVICE_PROPERTY, productKey, deviceName);
} }
} }

View File

@ -33,7 +33,7 @@ public interface IotDevicePropertyMapper {
List<TDengineTableField> oldFields, List<TDengineTableField> oldFields,
List<TDengineTableField> newFields) { List<TDengineTableField> newFields) {
oldFields.removeIf(field -> StrUtil.equalsAny(field.getField(), oldFields.removeIf(field -> StrUtil.equalsAny(field.getField(),
TDengineTableField.FIELD_TS, "report_time", "device_key")); TDengineTableField.FIELD_TS, "report_time", "device_name"));
List<TDengineTableField> addFields = newFields.stream().filter( // 新增的字段 List<TDengineTableField> addFields = newFields.stream().filter( // 新增的字段
newField -> oldFields.stream().noneMatch(oldField -> oldField.getField().equals(newField.getField()))) newField -> oldFields.stream().noneMatch(oldField -> oldField.getField().equals(newField.getField())))
.collect(Collectors.toList()); .collect(Collectors.toList());

View File

@ -1,40 +0,0 @@
package cn.iocoder.yudao.module.iot.mq.consumer.device;
import cn.hutool.core.util.ObjectUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageTypeEnum;
import cn.iocoder.yudao.module.iot.service.device.data.IotDevicePropertyService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import jakarta.annotation.Resource;
/**
* 针对 {@link IotDeviceMessage} 的消费者记录设备属性
*
* @author alwayssuper
*/
@Component
@Slf4j
public class IotDevicePropertyMessageConsumer {
@Resource
private IotDevicePropertyService deviceDataService;
@EventListener
@Async
public void onMessage(IotDeviceMessage message) {
if (ObjectUtil.notEqual(message.getType(), IotDeviceMessageTypeEnum.PROPERTY.getType())
|| ObjectUtil.notEqual(message.getIdentifier(), IotDeviceMessageIdentifierEnum.PROPERTY_REPORT.getIdentifier())) {
return;
}
log.info("[onMessage][消息内容({})]", message);
// 保存设备属性
deviceDataService.saveDeviceProperty(message);
}
}

View File

@ -0,0 +1,54 @@
package cn.iocoder.yudao.module.iot.mq.consumer.device;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageTypeEnum;
import cn.iocoder.yudao.module.iot.service.device.data.IotDevicePropertyService;
import com.google.common.base.Objects;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 针对 {@link IotDeviceMessage} 的消费者记录设备属性
*
* @author alwayssuper
*/
@Component
@Slf4j
public class IotDevicePropertyMessageSubscriber implements IotMessageSubscriber<IotDeviceMessage> {
@Resource
private IotDevicePropertyService deviceDataService;
@Resource
private IotMessageBus messageBus;
@PostConstruct
public void init() {
messageBus.register(this);
}
@Override
public String getTopic() {
return IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC;
}
@Override
public String getGroup() {
return "iot_device_property_consumer";
}
@Override
public void onMessage(IotDeviceMessage message) {
if (Objects.equal(message.getType(), IotDeviceMessageTypeEnum.PROPERTY.getType())
&& Objects.equal(message.getIdentifier(), IotDeviceMessageIdentifierEnum.PROPERTY_REPORT.getIdentifier())) {
// 保存设备属性
deviceDataService.saveDeviceProperty(message);
}
}
}

View File

@ -8,6 +8,7 @@ import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
// TODO @puhui999后面重构哈
/** /**
* 针对 {@link IotDeviceMessage} 的消费者处理规则场景 * 针对 {@link IotDeviceMessage} 的消费者处理规则场景
* *

View File

@ -128,7 +128,8 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
return; return;
} }
// 1. 获得设备信息 // 1. 获得设备信息
IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceNameFromCache(message.getProductKey(), message.getDeviceName()); IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceNameFromCache(
message.getProductKey(), message.getDeviceName());
if (device == null) { if (device == null) {
log.error("[saveDeviceProperty][消息({}) 对应的设备不存在]", message); log.error("[saveDeviceProperty][消息({}) 对应的设备不存在]", message);
return; return;
@ -155,9 +156,9 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
LocalDateTimeUtil.toEpochMilli(message.getReportTime())); LocalDateTimeUtil.toEpochMilli(message.getReportTime()));
// 3.2 保存设备属性日志 // 3.2 保存设备属性日志
// TODO @芋艿这里要调整下 Map<String, IotDevicePropertyDO> properties2 = convertMap(properties.entrySet(), Map.Entry::getKey, entry ->
deviceDataRedisDAO.putAll(device.getDeviceKey(), convertMap(properties.entrySet(), Map.Entry::getKey, IotDevicePropertyDO.builder().value(entry.getValue()).updateTime(message.getReportTime()).build());
entry -> IotDevicePropertyDO.builder().value(entry.getValue()).updateTime(message.getReportTime()).build())); deviceDataRedisDAO.putAll(device.getProductKey(), device.getDeviceName(), properties2);
} }
@Override @Override
@ -166,15 +167,16 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
IotDeviceDO device = deviceService.validateDeviceExists(deviceId); IotDeviceDO device = deviceService.validateDeviceExists(deviceId);
// 获得设备属性 // 获得设备属性
return deviceDataRedisDAO.get(device.getDeviceKey()); return deviceDataRedisDAO.get(device.getProductKey(), device.getDeviceName());
} }
@Override @Override
public PageResult<IotDevicePropertyRespVO> getHistoryDevicePropertyPage(IotDevicePropertyHistoryPageReqVO pageReqVO) { public PageResult<IotDevicePropertyRespVO> getHistoryDevicePropertyPage(IotDevicePropertyHistoryPageReqVO pageReqVO) {
// 获取设备信息 // 获取设备信息
IotDeviceDO device = deviceService.validateDeviceExists(pageReqVO.getDeviceId()); IotDeviceDO device = deviceService.validateDeviceExists(pageReqVO.getDeviceId());
pageReqVO.setDeviceKey(device.getDeviceKey()); pageReqVO.setProductKey(device.getProductKey()).setDeviceName(device.getDeviceName());
// 分页查询
try { try {
IPage<IotDevicePropertyRespVO> page = devicePropertyMapper.selectPageByHistory( IPage<IotDevicePropertyRespVO> page = devicePropertyMapper.selectPageByHistory(
new Page<>(pageReqVO.getPageNo(), pageReqVO.getPageSize()), pageReqVO); new Page<>(pageReqVO.getPageNo(), pageReqVO.getPageSize()), pageReqVO);

View File

@ -20,7 +20,7 @@
</foreach> </foreach>
) )
TAGS ( TAGS (
device_key NCHAR(50) device_name NCHAR(50)
) )
</update> </update>
@ -46,9 +46,9 @@
</update> </update>
<insert id="insert"> <insert id="insert">
INSERT INTO device_property_${device.deviceKey} INSERT INTO device_property_${device.productKey}_${device.deviceName}
USING product_property_${device.productKey} USING product_property_${device.productKey}
TAGS ('${device.deviceKey}') TAGS ('${device.deviceName}')
(ts, report_time, (ts, report_time,
<foreach item="key" collection="properties.keys" separator=","> <foreach item="key" collection="properties.keys" separator=",">
${@cn.hutool.core.util.StrUtil@toUnderlineCase(key)} ${@cn.hutool.core.util.StrUtil@toUnderlineCase(key)}
@ -66,9 +66,10 @@
DESCRIBE product_property_${productKey} DESCRIBE product_property_${productKey}
</select> </select>
<select id="selectPageByHistory" resultType="cn.iocoder.yudao.module.iot.controller.admin.device.vo.data.IotDevicePropertyRespVO"> <select id="selectPageByHistory"
resultType="cn.iocoder.yudao.module.iot.controller.admin.device.vo.data.IotDevicePropertyRespVO">
SELECT ${@cn.hutool.core.util.StrUtil@toUnderlineCase(reqVO.identifier)} AS `value`, ts AS update_time SELECT ${@cn.hutool.core.util.StrUtil@toUnderlineCase(reqVO.identifier)} AS `value`, ts AS update_time
FROM device_property_${reqVO.deviceKey} FROM device_property_${reqVO.productKey}_${reqVO.deviceName}
WHERE ${@cn.hutool.core.util.StrUtil@toUnderlineCase(reqVO.identifier)} IS NOT NULL WHERE ${@cn.hutool.core.util.StrUtil@toUnderlineCase(reqVO.identifier)} IS NOT NULL
AND ts BETWEEN ${@cn.hutool.core.date.LocalDateTimeUtil@toEpochMilli(reqVO.times[0])} AND ts BETWEEN ${@cn.hutool.core.date.LocalDateTimeUtil@toEpochMilli(reqVO.times[0])}
AND ${@cn.hutool.core.date.LocalDateTimeUtil@toEpochMilli(reqVO.times[1])} AND ${@cn.hutool.core.date.LocalDateTimeUtil@toEpochMilli(reqVO.times[1])}