【代码重构】IoT:弱化 TdEngineDDLMapper 封装,由每个业务独立实现
This commit is contained in:
parent
39896555f0
commit
064b3381df
|
@ -0,0 +1,27 @@
|
|||
package cn.iocoder.yudao.module.iot.enums.thingmodel;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
|
||||
/**
|
||||
* IoT 数据定义的数据类型枚举类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@AllArgsConstructor
|
||||
@Getter
|
||||
public enum IotDataSpecsDataTypeEnum {
|
||||
|
||||
INT("int"),
|
||||
FLOAT("float"),
|
||||
DOUBLE("double"),
|
||||
ENUM("enum"),
|
||||
BOOL("bool"),
|
||||
TEXT("text"),
|
||||
DATE("date"),
|
||||
STRUCT("struct"),
|
||||
ARRAY("array");
|
||||
|
||||
private final String dataType;
|
||||
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
package cn.iocoder.yudao.module.iot.api.device;
|
||||
|
||||
import cn.iocoder.yudao.module.iot.service.device.IotDeviceDataService;
|
||||
import cn.iocoder.yudao.module.iot.service.device.IotDevicePropertyDataService;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
|
@ -14,7 +14,7 @@ import javax.annotation.Resource;
|
|||
public class DeviceDataApiImpl implements DeviceDataApi {
|
||||
|
||||
@Resource
|
||||
private IotDeviceDataService deviceDataService;
|
||||
private IotDevicePropertyDataService deviceDataService;
|
||||
|
||||
@Override
|
||||
public void saveDeviceData(String productKey, String deviceName, String message) {
|
||||
|
|
|
@ -7,7 +7,7 @@ import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.IotDevi
|
|||
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.IotDeviceDataRespVO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDataDO;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.IotTimeDataRespVO;
|
||||
import cn.iocoder.yudao.module.iot.service.device.IotDeviceDataService;
|
||||
import cn.iocoder.yudao.module.iot.service.device.IotDevicePropertyDataService;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import jakarta.annotation.Resource;
|
||||
|
@ -29,7 +29,7 @@ import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
|
|||
public class IotDeviceDataController {
|
||||
|
||||
@Resource
|
||||
private IotDeviceDataService deviceDataService;
|
||||
private IotDevicePropertyDataService deviceDataService;
|
||||
|
||||
// TODO @浩浩:这里的 /latest-list,包括方法名。
|
||||
@GetMapping("/latest")
|
||||
|
|
|
@ -30,7 +30,8 @@ public class ThingModelProperty {
|
|||
private String description;
|
||||
/**
|
||||
* 云端可以对该属性进行的操作类型
|
||||
* 关联枚举 {@link IotProductThingModelAccessModeEnum}
|
||||
*
|
||||
* 枚举 {@link IotProductThingModelAccessModeEnum}
|
||||
*/
|
||||
private String accessMode;
|
||||
/**
|
||||
|
@ -42,6 +43,8 @@ public class ThingModelProperty {
|
|||
private Boolean required;
|
||||
/**
|
||||
* 数据类型,与 dataSpecs 的 dataType 保持一致
|
||||
*
|
||||
* 枚举 {@link cn.iocoder.yudao.module.iot.enums.thingmodel.IotDataSpecsDataTypeEnum}
|
||||
*/
|
||||
private String dataType;
|
||||
/**
|
||||
|
|
|
@ -20,7 +20,7 @@ public class ThingModelDateOrTextDataSpecs extends ThingModelDataSpecs {
|
|||
* 数据长度,单位为字节。取值不能超过 2048。
|
||||
* 当 dataType 为 text 时,需传入该参数。
|
||||
*/
|
||||
private Long length;
|
||||
private Integer length;
|
||||
/**
|
||||
* 默认值,可选参数,用于存储默认值。
|
||||
*/
|
||||
|
|
|
@ -16,6 +16,7 @@ public class FieldParser {
|
|||
/**
|
||||
* 物模型到td数据类型映射
|
||||
*/
|
||||
@Deprecated
|
||||
private static final HashMap<String, String> TYPE_MAPPING = new HashMap<>() {
|
||||
{
|
||||
put("INT", "INT");
|
||||
|
@ -74,6 +75,7 @@ public class FieldParser {
|
|||
/**
|
||||
* 获取字段字义
|
||||
*/
|
||||
@Deprecated
|
||||
public static String getFieldDefine(TdFieldDO field) {
|
||||
return "`" + field.getFieldName() + "`" + " "
|
||||
+ (field.getDataLength() > 0 ? String.format("%s(%d)", field.getDataType(), field.getDataLength())
|
||||
|
|
|
@ -2,13 +2,12 @@ package cn.iocoder.yudao.module.iot.dal.dataobject.tdengine;
|
|||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
// TODO @haohao:类似这个,其实可以参考 mybatis plus,querywrapper,搞个 TdEngineQueryWrapper。这样看起来会更好懂。
|
||||
/**
|
||||
* 查询DO
|
||||
*/
|
||||
@Data
|
||||
@Deprecated
|
||||
public class SelectDO {
|
||||
|
||||
// TODO @haoha:database 是个单词
|
||||
|
|
|
@ -6,6 +6,7 @@ import java.util.Map;
|
|||
|
||||
// TODO @haohao:类似 SelectDO 的想法,只是它是返回。ps:貌似可以在 tdengine 里面,创建一个 query 包,放这种比较特殊的查询和结果对象。dataobject 更多还是实际存储的结构化的 do
|
||||
@Data
|
||||
@Deprecated
|
||||
public class SelectVisualDO {
|
||||
|
||||
/**
|
||||
|
|
|
@ -6,6 +6,7 @@ import lombok.Data;
|
|||
* tags查询DO
|
||||
*/
|
||||
@Data
|
||||
@Deprecated
|
||||
public class TagsSelectDO {
|
||||
|
||||
/**
|
||||
|
|
|
@ -5,6 +5,7 @@ import lombok.Builder;
|
|||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
// TODO 芋艿:看看是不是后续简化掉。
|
||||
/**
|
||||
* TD 引擎的字段
|
||||
*/
|
||||
|
@ -12,6 +13,7 @@ import lombok.NoArgsConstructor;
|
|||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
@Deprecated
|
||||
public class TdFieldDO {
|
||||
|
||||
/**
|
||||
|
|
|
@ -12,6 +12,7 @@ import java.util.List;
|
|||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Deprecated
|
||||
public class TdResponse {
|
||||
|
||||
public static final int CODE_SUCCESS = 0;
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.springframework.stereotype.Service;
|
|||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@Deprecated // TODO 芋艿:貌似没用到
|
||||
public class TdRestApi {
|
||||
|
||||
@Value("${spring.datasource.dynamic.datasource.tdengine.url}")
|
||||
|
|
|
@ -7,6 +7,7 @@ import lombok.NoArgsConstructor;
|
|||
|
||||
import java.util.List;
|
||||
|
||||
@Deprecated
|
||||
/**
|
||||
* TD 引擎的数据库
|
||||
*/
|
||||
|
|
|
@ -16,6 +16,7 @@ import lombok.Builder;
|
|||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
// TODO @huihui:IotProductThingModelDO => IotThingModelDO
|
||||
/**
|
||||
* IoT 产品物模型功能 DO
|
||||
* <p>
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
package cn.iocoder.yudao.module.iot.dal.tdengine;
|
||||
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.iocoder.yudao.module.iot.framework.tdengine.core.TDengineTableField;
|
||||
import cn.iocoder.yudao.module.iot.framework.tdengine.core.annotation.TDengineDS;
|
||||
import com.baomidou.mybatisplus.annotation.InterceptorIgnore;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Mapper
|
||||
@TDengineDS
|
||||
@InterceptorIgnore(tenantLine = "true") // 避免 SQL 解析,因为 JSqlParser 对 TDengine 的 SQL 解析会报错
|
||||
public interface IotDevicePropertyDataMapper {
|
||||
|
||||
List<TDengineTableField> getProductPropertySTableFieldList(@Param("productKey") String productKey);
|
||||
|
||||
void createProductPropertySTable(@Param("productKey") String productKey,
|
||||
@Param("fields") List<TDengineTableField> fields);
|
||||
|
||||
@SuppressWarnings("SimplifyStreamApiCallChains") // 保持 JDK8 兼容性
|
||||
default void alterProductPropertySTable(String productKey,
|
||||
List<TDengineTableField> oldFields,
|
||||
List<TDengineTableField> newFields) {
|
||||
oldFields.removeIf(field -> TDengineTableField.FIELD_TS.equals(field.getField())
|
||||
|| TDengineTableField.FIELD_DEVICE_KEY.equals(field.getField()));
|
||||
List<TDengineTableField> addFields = newFields.stream().filter( // 新增的字段
|
||||
newField -> oldFields.stream().noneMatch(oldField -> oldField.getField().equals(newField.getField())))
|
||||
.collect(Collectors.toList());
|
||||
List<TDengineTableField> modifyFields = newFields.stream().filter( // 更新的字段
|
||||
newField -> oldFields.stream().anyMatch(oldField -> oldField.getField().equals(newField.getField())
|
||||
&& (ObjectUtil.notEqual(oldField.getType(), newField.getType())
|
||||
|| (newField.getLength() != null && ObjectUtil.notEqual(oldField.getLength(), newField.getLength())))))
|
||||
.collect(Collectors.toList());
|
||||
List<TDengineTableField> dropFields = oldFields.stream().filter( // 删除的字段
|
||||
oldField -> newFields.stream().noneMatch(n -> n.getField().equals(oldField.getField())))
|
||||
.collect(Collectors.toList());
|
||||
addFields.forEach(field -> alterProductPropertySTableAddField(productKey, field));
|
||||
// TODO 芋艿:tdengine 只允许 modify 长度;如果 type 变化,只能 drop + add
|
||||
modifyFields.forEach(field -> alterProductPropertySTableModifyField(productKey, field));
|
||||
dropFields.forEach(field -> alterProductPropertySTableDropField(productKey, field));
|
||||
}
|
||||
|
||||
void alterProductPropertySTableAddField(@Param("productKey") String productKey,
|
||||
@Param("field") TDengineTableField field);
|
||||
|
||||
void alterProductPropertySTableModifyField(@Param("productKey") String productKey,
|
||||
@Param("field") TDengineTableField field);
|
||||
|
||||
void alterProductPropertySTableDropField(@Param("productKey") String productKey,
|
||||
@Param("field") TDengineTableField field);
|
||||
|
||||
}
|
|
@ -96,7 +96,6 @@ public interface TdEngineDDLMapper {
|
|||
@TenantIgnore
|
||||
void modifyColumnWidthForSuperTable(TdTableDO superTable);
|
||||
|
||||
|
||||
/**
|
||||
* 修改超级表 - 为超级表添加标签
|
||||
*
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package cn.iocoder.yudao.module.iot.emq.service;
|
||||
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.service.device.IotDeviceDataService;
|
||||
import cn.iocoder.yudao.module.iot.service.device.IotDevicePropertyDataService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
||||
|
@ -21,7 +20,7 @@ import org.springframework.stereotype.Service;
|
|||
public class EmqxServiceImpl implements EmqxService {
|
||||
|
||||
@Resource
|
||||
private IotDeviceDataService iotDeviceDataService;
|
||||
private IotDevicePropertyDataService iotDeviceDataService;
|
||||
|
||||
// TODO 多线程处理消息
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
package cn.iocoder.yudao.module.iot.framework.tdengine.core;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* TDEngine 表字段
|
||||
*/
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class TDengineTableField {
|
||||
|
||||
/**
|
||||
* 字段名 - TDengine 默认 ts 字段,默认会被 TDengine 创建
|
||||
*/
|
||||
public static final String FIELD_TS = "ts";
|
||||
|
||||
/**
|
||||
* 字段名 - 我们系统定义的 device_key 字段,非 TDengine 默认字段
|
||||
*/
|
||||
public static final String FIELD_DEVICE_KEY = "device_key";
|
||||
|
||||
public static final String TYPE_TINYINT = "TINYINT";
|
||||
public static final String TYPE_INT = "INT";
|
||||
public static final String TYPE_FLOAT = "FLOAT";
|
||||
public static final String TYPE_DOUBLE = "DOUBLE";
|
||||
public static final String TYPE_BOOL = "BOOL";
|
||||
public static final String TYPE_NCHAR = "NCHAR";
|
||||
public static final String TYPE_TIMESTAMP = "TIMESTAMP";
|
||||
|
||||
/**
|
||||
* 注释 - TAG 字段
|
||||
*/
|
||||
public static final String NOTE_TAG = "TAG";
|
||||
|
||||
/**
|
||||
* 字段名
|
||||
*/
|
||||
private String field;
|
||||
|
||||
/**
|
||||
* 字段类型
|
||||
*/
|
||||
private String type;
|
||||
|
||||
/**
|
||||
* 字段长度
|
||||
*/
|
||||
private Integer length;
|
||||
|
||||
/**
|
||||
* 注释
|
||||
*/
|
||||
private String note;
|
||||
|
||||
public TDengineTableField(String field, String type) {
|
||||
this.field = field;
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
package cn.iocoder.yudao.module.iot.framework.tdengine.core.annotation;
|
||||
|
||||
import com.baomidou.dynamic.datasource.annotation.DS;
|
||||
|
||||
import java.lang.annotation.*;
|
||||
|
||||
/**
|
||||
* TDEngine 数据源
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Target({ElementType.TYPE, ElementType.METHOD})
|
||||
@Retention(RetentionPolicy.RUNTIME)
|
||||
@Documented
|
||||
@DS("tdengine")
|
||||
public @interface TDengineDS {
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
/**
|
||||
* iot 模块的 tdengine 拓展封装
|
||||
*/
|
||||
package cn.iocoder.yudao.module.iot.framework.tdengine;
|
|
@ -9,11 +9,18 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* IoT 设备数据 Service 接口
|
||||
* IoT 设备【属性】数据 Service 接口
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public interface IotDeviceDataService {
|
||||
public interface IotDevicePropertyDataService {
|
||||
|
||||
/**
|
||||
* 定义设备属性数据的结构
|
||||
*
|
||||
* @param productId 产品编号
|
||||
*/
|
||||
void defineDevicePropertyData(Long productId);
|
||||
|
||||
/**
|
||||
* 保存设备数据
|
||||
|
@ -40,4 +47,5 @@ public interface IotDeviceDataService {
|
|||
* @return 设备属性历史数据
|
||||
*/
|
||||
PageResult<Map<String, Object>> getHistoryDeviceProperties(@Valid IotDeviceDataPageReqVO deviceDataReqVO);
|
||||
|
||||
}
|
|
@ -1,19 +1,27 @@
|
|||
package cn.iocoder.yudao.module.iot.service.device;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.iocoder.yudao.framework.common.pojo.PageResult;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.deviceData.IotDeviceDataPageReqVO;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.thingmodel.model.dataType.ThingModelDateOrTextDataSpecs;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDataDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.SelectVisualDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.ThingModelMessage;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotProductThingModelDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.tdengine.IotDevicePropertyDataMapper;
|
||||
import cn.iocoder.yudao.module.iot.dal.redis.deviceData.DeviceDataRedisDAO;
|
||||
import cn.iocoder.yudao.module.iot.dal.tdengine.TdEngineDMLMapper;
|
||||
import cn.iocoder.yudao.module.iot.enums.IotConstants;
|
||||
import cn.iocoder.yudao.module.iot.enums.thingmodel.IotDataSpecsDataTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.enums.thingmodel.IotProductThingModelTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.framework.tdengine.core.TDengineTableField;
|
||||
import cn.iocoder.yudao.module.iot.service.product.IotProductService;
|
||||
import cn.iocoder.yudao.module.iot.service.tdengine.IotThingModelMessageService;
|
||||
import cn.iocoder.yudao.module.iot.service.thingmodel.IotProductThingModelService;
|
||||
import jakarta.annotation.Resource;
|
||||
|
@ -28,11 +36,32 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertList;
|
||||
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.filterList;
|
||||
|
||||
@Slf4j
|
||||
/**
|
||||
* IoT 设备【属性】数据 Service 实现类
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Service
|
||||
public class IotDeviceDataServiceImpl implements IotDeviceDataService {
|
||||
@Slf4j
|
||||
public class IotDevicePropertyDataServiceImpl implements IotDevicePropertyDataService {
|
||||
|
||||
/**
|
||||
* 物模型的数据类型,与 TDengine 数据类型的映射关系
|
||||
*/
|
||||
private static final Map<String, String> TYPE_MAPPING = MapUtil.<String, String>builder()
|
||||
.put(IotDataSpecsDataTypeEnum.INT.getDataType(), TDengineTableField.TYPE_INT)
|
||||
.put(IotDataSpecsDataTypeEnum.FLOAT.getDataType(), TDengineTableField.TYPE_FLOAT)
|
||||
.put(IotDataSpecsDataTypeEnum.DOUBLE.getDataType(), TDengineTableField.TYPE_DOUBLE)
|
||||
.put(IotDataSpecsDataTypeEnum.ENUM.getDataType(), TDengineTableField.TYPE_TINYINT) // TODO 芋艿:为什么要映射为 TINYINT 的说明?
|
||||
.put( IotDataSpecsDataTypeEnum.BOOL.getDataType(), TDengineTableField.TYPE_TINYINT) // TODO 芋艿:为什么要映射为 TINYINT 的说明?
|
||||
.put(IotDataSpecsDataTypeEnum.TEXT.getDataType(), TDengineTableField.TYPE_NCHAR)
|
||||
.put(IotDataSpecsDataTypeEnum.DATE.getDataType(), TDengineTableField.TYPE_TIMESTAMP)
|
||||
.put(IotDataSpecsDataTypeEnum.STRUCT.getDataType(), TDengineTableField.TYPE_NCHAR) // TODO 芋艿:怎么映射!!!!
|
||||
.put(IotDataSpecsDataTypeEnum.ARRAY.getDataType(), TDengineTableField.TYPE_NCHAR) // TODO 芋艿:怎么映射!!!!
|
||||
.build();
|
||||
|
||||
@Value("${spring.datasource.dynamic.datasource.tdengine.url}")
|
||||
private String url;
|
||||
|
@ -43,12 +72,61 @@ public class IotDeviceDataServiceImpl implements IotDeviceDataService {
|
|||
private IotThingModelMessageService thingModelMessageService;
|
||||
@Resource
|
||||
private IotProductThingModelService thingModelService;
|
||||
@Resource
|
||||
private IotProductService productService;
|
||||
|
||||
@Resource
|
||||
private TdEngineDMLMapper tdEngineDMLMapper;
|
||||
|
||||
@Resource
|
||||
private DeviceDataRedisDAO deviceDataRedisDAO;
|
||||
|
||||
@Resource
|
||||
private IotDevicePropertyDataMapper devicePropertyDataMapper;
|
||||
|
||||
@Override
|
||||
public void defineDevicePropertyData(Long productId) {
|
||||
// 1.1 查询产品和物模型
|
||||
IotProductDO product = productService.validateProductExists(productId);
|
||||
List<IotProductThingModelDO> thingModels = filterList(thingModelService.getProductThingModelListByProductId(productId),
|
||||
thingModel -> IotProductThingModelTypeEnum.PROPERTY.getType().equals(thingModel.getType()));
|
||||
// 1.2 解析 DB 里的字段
|
||||
List<TDengineTableField> oldFields = new ArrayList<>();
|
||||
try {
|
||||
oldFields.addAll(devicePropertyDataMapper.getProductPropertySTableFieldList(product.getProductKey()));
|
||||
} catch (Exception e) {
|
||||
if (!e.getMessage().contains("Table does not exist")) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
// 2.1 情况一:如果是新增的时候,需要创建表
|
||||
List<TDengineTableField> newFields = buildTableFieldList(thingModels);
|
||||
if (CollUtil.isEmpty(oldFields)) {
|
||||
if (CollUtil.isEmpty(newFields)) {
|
||||
log.info("[defineDevicePropertyData][productId({}) 没有需要定义的属性]", productId);
|
||||
return;
|
||||
}
|
||||
newFields.add(0, new TDengineTableField(TDengineTableField.FIELD_TS, TDengineTableField.TYPE_TIMESTAMP));
|
||||
devicePropertyDataMapper.createProductPropertySTable(product.getProductKey(), newFields);
|
||||
return;
|
||||
}
|
||||
// 2.2 情况二:如果是修改的时候,需要更新表
|
||||
devicePropertyDataMapper.alterProductPropertySTable(product.getProductKey(), oldFields, newFields);
|
||||
}
|
||||
|
||||
private List<TDengineTableField> buildTableFieldList(List<IotProductThingModelDO> thingModels) {
|
||||
return convertList(thingModels, thingModel -> {
|
||||
TDengineTableField field = new TDengineTableField(
|
||||
thingModel.getIdentifier().toLowerCase(), // TODO 芋艿:为什么要转成小写?
|
||||
TYPE_MAPPING.get(thingModel.getProperty().getDataType()));
|
||||
if (thingModel.getProperty().getDataType().equals(IotDataSpecsDataTypeEnum.TEXT.getDataType())) {
|
||||
field.setLength(((ThingModelDateOrTextDataSpecs) thingModel.getProperty().getDataSpecs()).getLength());
|
||||
}
|
||||
return field;
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void saveDeviceData(String productKey, String deviceName, String message) {
|
||||
// 1. 根据产品 key 和设备名称,获得设备信息
|
|
@ -7,6 +7,7 @@ import cn.iocoder.yudao.module.iot.controller.admin.product.vo.product.IotProduc
|
|||
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.mysql.product.IotProductMapper;
|
||||
import cn.iocoder.yudao.module.iot.enums.product.IotProductStatusEnum;
|
||||
import cn.iocoder.yudao.module.iot.service.device.IotDevicePropertyDataService;
|
||||
import cn.iocoder.yudao.module.iot.service.tdengine.IotThingModelMessageService;
|
||||
import cn.iocoder.yudao.module.iot.service.thingmodel.IotProductThingModelService;
|
||||
import com.baomidou.dynamic.datasource.annotation.DSTransactional;
|
||||
|
@ -39,6 +40,9 @@ public class IotProductServiceImpl implements IotProductService {
|
|||
@Resource
|
||||
@Lazy // 延迟加载,解决循环依赖
|
||||
private IotThingModelMessageService thingModelMessageService;
|
||||
@Resource
|
||||
@Lazy // 延迟加载,解决循环依赖
|
||||
private IotDevicePropertyDataService devicePropertyDataService;
|
||||
|
||||
@Override
|
||||
public Long createProduct(IotProductSaveReqVO createReqVO) {
|
||||
|
@ -119,8 +123,8 @@ public class IotProductServiceImpl implements IotProductService {
|
|||
// 3. 产品是发布状态
|
||||
if (Objects.equals(status, IotProductStatusEnum.PUBLISHED.getStatus())) {
|
||||
// 3.1 创建产品超级表数据模型
|
||||
thingModelFunctionService.createSuperTableDataModel(id);
|
||||
// 3.2 创建物模型日志超级表数据模型
|
||||
devicePropertyDataService.defineDevicePropertyData(id);
|
||||
// 3.2 创建物模型日志超级表数据模型 TODO 待定:message 要不要分;
|
||||
thingModelMessageService.createSuperTable(id);
|
||||
}
|
||||
productMapper.updateById(updateObj);
|
||||
|
|
|
@ -1,19 +0,0 @@
|
|||
package cn.iocoder.yudao.module.iot.service.tdengine;
|
||||
|
||||
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotProductThingModelDO;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* IoT 超级表服务,负责根据物模型创建和更新超级表,以及创建超级表的子表等操作。
|
||||
*/
|
||||
public interface IotSuperTableService {
|
||||
|
||||
/**
|
||||
* 创建超级表数据模型
|
||||
*/
|
||||
void createSuperTableDataModel(IotProductDO product, List<IotProductThingModelDO> thingModelList);
|
||||
|
||||
}
|
|
@ -1,260 +0,0 @@
|
|||
package cn.iocoder.yudao.module.iot.service.tdengine;
|
||||
|
||||
import cn.hutool.core.bean.BeanUtil;
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.thingmodel.model.ThingModelProperty;
|
||||
import cn.iocoder.yudao.module.iot.controller.admin.thingmodel.model.ThingModelRespVO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.FieldParser;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.TdFieldDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.TdTableDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.thingmodel.IotProductThingModelDO;
|
||||
import cn.iocoder.yudao.module.iot.dal.tdengine.TdEngineDDLMapper;
|
||||
import cn.iocoder.yudao.module.iot.enums.thingmodel.IotProductThingModelTypeEnum;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* IoT 超级表服务实现类,负责根据物模型创建和更新超级表,以及创建超级表的子表等操作。
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class IotSuperTableServiceImpl implements IotSuperTableService {
|
||||
|
||||
@Resource
|
||||
private TdEngineDDLMapper tdEngineDDLMapper;
|
||||
|
||||
@Value("${spring.datasource.dynamic.datasource.tdengine.url}")
|
||||
private String url;
|
||||
|
||||
@Override
|
||||
public void createSuperTableDataModel(IotProductDO product, List<IotProductThingModelDO> thingModelList) {
|
||||
ThingModelRespVO thingModel = buildThingModel(product, thingModelList);
|
||||
|
||||
if (thingModel.getModel() == null || CollUtil.isEmpty(thingModel.getModel().getProperties())) {
|
||||
log.warn("物模型属性列表为空,不创建超级表");
|
||||
return;
|
||||
}
|
||||
|
||||
String superTableName = getSuperTableName(product.getDeviceType(), product.getProductKey());
|
||||
String databaseName = getDatabaseName();
|
||||
|
||||
List<Map<String, Object>> results = tdEngineDDLMapper.showSuperTables(new TdTableDO(databaseName, superTableName));
|
||||
int tableExists = results == null || results.isEmpty() ? 0 : results.size();
|
||||
if (tableExists > 0) {
|
||||
updateSuperTable(thingModel, product.getDeviceType());
|
||||
} else {
|
||||
createSuperTable(thingModel, product.getDeviceType());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建超级表
|
||||
*/
|
||||
private void createSuperTable(ThingModelRespVO thingModel, Integer deviceType) {
|
||||
// 解析物模型,获取字段列表
|
||||
List<TdFieldDO> schemaFields = new ArrayList<>();
|
||||
schemaFields.add(TdFieldDO.builder()
|
||||
.fieldName("time")
|
||||
.dataType("TIMESTAMP")
|
||||
.build());
|
||||
schemaFields.addAll(FieldParser.parse(thingModel));
|
||||
|
||||
// 设置超级表的标签
|
||||
List<TdFieldDO> tagsFields = List.of(
|
||||
TdFieldDO.builder().fieldName("product_key").dataType("NCHAR").dataLength(64).build(),
|
||||
TdFieldDO.builder().fieldName("device_key").dataType("NCHAR").dataLength(64).build(),
|
||||
TdFieldDO.builder().fieldName("device_name").dataType("NCHAR").dataLength(64).build(),
|
||||
TdFieldDO.builder().fieldName("device_type").dataType("INT").build()
|
||||
);
|
||||
|
||||
// 获取超级表的名称和数据库名称
|
||||
String superTableName = getSuperTableName(deviceType, thingModel.getProductKey());
|
||||
String databaseName = getDatabaseName();
|
||||
|
||||
// 创建超级表
|
||||
tdEngineDDLMapper.createSuperTable(new TdTableDO(databaseName, superTableName, schemaFields, tagsFields));
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新超级表
|
||||
*/
|
||||
private void updateSuperTable(ThingModelRespVO thingModel, Integer deviceType) {
|
||||
String superTableName = getSuperTableName(deviceType, thingModel.getProductKey());
|
||||
try {
|
||||
List<TdFieldDO> oldFields = getTableFields(superTableName);
|
||||
List<TdFieldDO> newFields = FieldParser.parse(thingModel);
|
||||
|
||||
updateTableFields(superTableName, oldFields, newFields);
|
||||
} catch (Exception e) {
|
||||
log.error("更新物模型超级表失败: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取表的字段信息
|
||||
*/
|
||||
private List<TdFieldDO> getTableFields(String tableName) {
|
||||
List<Map<String, Object>> tableDescription = tdEngineDDLMapper.describeSuperTable(new TdTableDO(getDatabaseName(), tableName));
|
||||
if (CollUtil.isEmpty(tableDescription)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
return tableDescription.stream()
|
||||
.filter(map -> !"TAG".equals(map.get("note")))
|
||||
.filter(map -> !"time".equals(map.get("field")))
|
||||
.map(map -> TdFieldDO.builder()
|
||||
.fieldName((String) map.get("field"))
|
||||
.dataType((String) map.get("type"))
|
||||
.dataLength((Integer) map.get("length"))
|
||||
.build())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新表的字段,包括新增、修改和删除字段
|
||||
*/
|
||||
private void updateTableFields(String tableName, List<TdFieldDO> oldFields, List<TdFieldDO> newFields) {
|
||||
String databaseName = getDatabaseName();
|
||||
|
||||
// 获取新增、修改、删除的字段
|
||||
List<TdFieldDO> addFields = getAddFields(oldFields, newFields);
|
||||
List<TdFieldDO> modifyFields = getModifyFields(oldFields, newFields);
|
||||
List<TdFieldDO> dropFields = getDropFields(oldFields, newFields);
|
||||
|
||||
// 添加新增字段
|
||||
if (CollUtil.isNotEmpty(addFields)) {
|
||||
for (TdFieldDO addField : addFields) {
|
||||
tdEngineDDLMapper.addColumnForSuperTable(TdTableDO.builder()
|
||||
.dataBaseName(databaseName)
|
||||
.superTableName(tableName)
|
||||
.column(addField)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
// 删除旧字段
|
||||
if (CollUtil.isNotEmpty(dropFields)) {
|
||||
for (TdFieldDO dropField : dropFields) {
|
||||
tdEngineDDLMapper.dropColumnForSuperTable(TdTableDO.builder()
|
||||
.dataBaseName(databaseName)
|
||||
.superTableName(tableName)
|
||||
.column(dropField)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
// 修改字段(先删除再添加)
|
||||
if (CollUtil.isNotEmpty(modifyFields)) {
|
||||
for (TdFieldDO modifyField : modifyFields) {
|
||||
tdEngineDDLMapper.dropColumnForSuperTable(TdTableDO.builder()
|
||||
.dataBaseName(databaseName)
|
||||
.superTableName(tableName)
|
||||
.column(modifyField)
|
||||
.build());
|
||||
tdEngineDDLMapper.addColumnForSuperTable(TdTableDO.builder()
|
||||
.dataBaseName(databaseName)
|
||||
.superTableName(tableName)
|
||||
.column(modifyField)
|
||||
.build());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取需要新增的字段
|
||||
*/
|
||||
private List<TdFieldDO> getAddFields(List<TdFieldDO> oldFields, List<TdFieldDO> newFields) {
|
||||
Set<String> oldFieldNames = oldFields.stream()
|
||||
.map(TdFieldDO::getFieldName)
|
||||
.collect(Collectors.toSet());
|
||||
return newFields.stream()
|
||||
.filter(f -> !oldFieldNames.contains(f.getFieldName()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取需要修改的字段
|
||||
*/
|
||||
private List<TdFieldDO> getModifyFields(List<TdFieldDO> oldFields, List<TdFieldDO> newFields) {
|
||||
Map<String, TdFieldDO> oldFieldMap = oldFields.stream()
|
||||
.collect(Collectors.toMap(TdFieldDO::getFieldName, f -> f));
|
||||
|
||||
return newFields.stream()
|
||||
.filter(f -> {
|
||||
TdFieldDO oldField = oldFieldMap.get(f.getFieldName());
|
||||
return oldField != null && (
|
||||
!oldField.getDataType().equals(f.getDataType()) ||
|
||||
!Objects.equals(oldField.getDataLength(), f.getDataLength())
|
||||
);
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取需要删除的字段
|
||||
*/
|
||||
private List<TdFieldDO> getDropFields(List<TdFieldDO> oldFields, List<TdFieldDO> newFields) {
|
||||
Set<String> newFieldNames = newFields.stream()
|
||||
.map(TdFieldDO::getFieldName)
|
||||
.collect(Collectors.toSet());
|
||||
return oldFields.stream()
|
||||
.filter(f -> !"time".equals(f.getFieldName()))
|
||||
.filter(f -> !newFieldNames.contains(f.getFieldName()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建物模型
|
||||
*/
|
||||
private ThingModelRespVO buildThingModel(IotProductDO product, List<IotProductThingModelDO> thingModelList) {
|
||||
ThingModelRespVO thingModel = new ThingModelRespVO();
|
||||
thingModel.setId(product.getId());
|
||||
thingModel.setProductKey(product.getProductKey());
|
||||
|
||||
List<ThingModelProperty> properties = thingModelList.stream()
|
||||
.filter(item -> IotProductThingModelTypeEnum.PROPERTY.equals(
|
||||
IotProductThingModelTypeEnum.valueOfType(item.getType())))
|
||||
.map(this::buildThingModelProperty)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
ThingModelRespVO.Model model = new ThingModelRespVO.Model();
|
||||
model.setProperties(properties);
|
||||
thingModel.setModel(model);
|
||||
|
||||
return thingModel;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建物模型属性
|
||||
*/
|
||||
private ThingModelProperty buildThingModelProperty(IotProductThingModelDO thingModel) {
|
||||
ThingModelProperty property = BeanUtil.copyProperties(thingModel, ThingModelProperty.class);
|
||||
property.setDataType(thingModel.getProperty().getDataType());
|
||||
return property;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取数据库名称
|
||||
*/
|
||||
private String getDatabaseName() {
|
||||
int index = url.lastIndexOf("/");
|
||||
return index != -1 ? url.substring(index + 1) : url;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取超级表名称
|
||||
*/
|
||||
private String getSuperTableName(Integer deviceType, String productKey) {
|
||||
String prefix = switch (deviceType) {
|
||||
case 1 -> "gateway_sub_";
|
||||
case 2 -> "gateway_";
|
||||
default -> "device_";
|
||||
};
|
||||
return (prefix + productKey).toLowerCase();
|
||||
}
|
||||
|
||||
}
|
|
@ -61,13 +61,6 @@ public interface IotProductThingModelService {
|
|||
*/
|
||||
PageResult<IotProductThingModelDO> getProductThingModelPage(IotProductThingModelPageReqVO pageReqVO);
|
||||
|
||||
/**
|
||||
* 创建超级表数据模型
|
||||
*
|
||||
* @param productId 产品编号
|
||||
*/
|
||||
void createSuperTableDataModel(Long productId);
|
||||
|
||||
/**
|
||||
* 获得产品物模型列表
|
||||
*
|
||||
|
|
|
@ -21,7 +21,6 @@ import cn.iocoder.yudao.module.iot.enums.product.IotProductStatusEnum;
|
|||
import cn.iocoder.yudao.module.iot.enums.thingmodel.IotProductThingModelAccessModeEnum;
|
||||
import cn.iocoder.yudao.module.iot.enums.thingmodel.IotProductThingModelTypeEnum;
|
||||
import cn.iocoder.yudao.module.iot.service.product.IotProductService;
|
||||
import cn.iocoder.yudao.module.iot.service.tdengine.IotSuperTableService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
@ -50,8 +49,6 @@ public class IotProductThingModelServiceImpl implements IotProductThingModelServ
|
|||
|
||||
@Resource
|
||||
private IotProductService productService;
|
||||
@Resource
|
||||
private IotSuperTableService dbStructureDataService;
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
|
@ -183,18 +180,6 @@ public class IotProductThingModelServiceImpl implements IotProductThingModelServ
|
|||
return productThingModelMapper.selectPage(pageReqVO);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createSuperTableDataModel(Long productId) {
|
||||
// 1. 查询产品
|
||||
IotProductDO product = productService.getProduct(productId);
|
||||
|
||||
// 2. 查询产品的物模型功能列表
|
||||
List<IotProductThingModelDO> thingModelList = productThingModelMapper.selectListByProductId(productId);
|
||||
|
||||
// 3. 生成 TDengine 的数据模型
|
||||
dbStructureDataService.createSuperTableDataModel(product, thingModelList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<IotProductThingModelDO> getProductThingModelListByProductKey(String productKey) {
|
||||
return productThingModelMapper.selectListByProductKey(productKey);
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!DOCTYPE mapper
|
||||
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
|
||||
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
|
||||
<mapper namespace="cn.iocoder.yudao.module.iot.dal.tdengine.IotDevicePropertyDataMapper">
|
||||
|
||||
<select id="getProductPropertySTableFieldList" resultType="cn.iocoder.yudao.module.iot.framework.tdengine.core.TDengineTableField">
|
||||
DESCRIBE product_property_${productKey}
|
||||
</select>
|
||||
|
||||
<update id="createProductPropertySTable">
|
||||
CREATE STABLE product_property_${productKey}
|
||||
<foreach item="field" collection="fields" separator="," open="(" close=")">
|
||||
${field.field} ${field.type}
|
||||
<if test="field.length != null and field.length > 0">
|
||||
(${field.length})
|
||||
</if>
|
||||
</foreach>
|
||||
TAGS (
|
||||
device_key NCHAR(50)
|
||||
)
|
||||
</update>
|
||||
|
||||
<update id="alterProductPropertySTableAddField">
|
||||
ALTER STABLE product_property_${productKey}
|
||||
ADD COLUMN ${field.field} ${field.type}
|
||||
<if test="field.length != null and field.length > 0">
|
||||
(${field.length})
|
||||
</if>
|
||||
</update>
|
||||
|
||||
<update id="alterProductPropertySTableModifyField">
|
||||
ALTER STABLE product_property_${productKey}
|
||||
MODIFY COLUMN ${field.field} ${field.type}
|
||||
<if test="field.length != null and field.length > 0">
|
||||
(${field.length})
|
||||
</if>
|
||||
</update>
|
||||
|
||||
<update id="alterProductPropertySTableDropField">
|
||||
ALTER STABLE product_property_${productKey}
|
||||
DROP COLUMN ${field.field}
|
||||
</update>
|
||||
|
||||
</mapper>
|
Loading…
Reference in New Issue