【新增功能】 数据接收,JSON 标准格式数据接收

This commit is contained in:
安浩浩 2024-10-31 21:47:54 +08:00
parent 8c84ac9d8a
commit 3dafd31da6
16 changed files with 247 additions and 55 deletions

View File

@ -0,0 +1,27 @@
package cn.iocoder.yudao.module.iot.dal.dataobject.tdengine;
import cn.iocoder.yudao.module.iot.domain.BaseEntity;
import lombok.Data;
import java.util.List;
@Data
public class TableDO extends BaseEntity {
/**
* 超级表普通列字段的值
* 值需要与创建超级表时普通列字段的数据类型对应上
*/
private List<TdFieldDO> schemaFieldValues;
/**
* 超级表标签字段的值
* 值需要与创建超级表时标签字段的数据类型对应上
*/
private List<TdFieldDO> tagsFieldValues;
/**
* 表名称
*/
private String tableName;
}

View File

@ -15,18 +15,23 @@ public class TableData {
*/
private List<String> schemaFieldList;
/**
* 超级表标签字段的值
* 值需要与创建超级表时标签字段的数据类型对应上
*/
private List<Object> tagsValueList;
/**
* 超级表普通列字段的值
* 值需要与创建超级表时普通列字段的数据类型对应上
*/
private List<Object> schemaValueList;
/**
* 超级表标签字段的名称
*/
private List<String> tagsFieldList;
/**
* 超级表标签字段的值
* 值需要与创建超级表时标签字段的数据类型对应上
*/
private List<Object> tagsValueList;
/**
* 表名称
*/

View File

@ -19,6 +19,11 @@ public class TdFieldDO {
*/
private String fieldName;
/**
* 字段值
*/
private Object fieldValue;
/**
* 字段类型
*/
@ -28,4 +33,15 @@ public class TdFieldDO {
* 字段长度
*/
private Integer dataLength = 0;
public TdFieldDO(String fieldName, String dataType, Integer dataLength) {
this.fieldName = fieldName;
this.dataType = dataType;
this.dataLength = dataLength;
}
public TdFieldDO(String fieldName, Object fieldValue) {
this.fieldName = fieldName;
this.fieldValue = fieldValue;
}
}

View File

@ -55,4 +55,7 @@ public interface IotThinkModelFunctionMapper extends BaseMapperX<IotThinkModelFu
IotThinkModelFunctionDO::getName, name);
}
default List<IotThinkModelFunctionDO> selectListByProductKey(String productKey){
return selectList(IotThinkModelFunctionDO::getProductKey, productKey);
}
}

View File

@ -1,9 +1,9 @@
package cn.iocoder.yudao.module.iot.dal.tdengine;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.TableDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.TdFieldDO;
import cn.iocoder.yudao.module.iot.domain.FieldsVo;
import cn.iocoder.yudao.module.iot.domain.SelectDto;
import cn.iocoder.yudao.module.iot.domain.TableDto;
import cn.iocoder.yudao.module.iot.domain.TagsSelectDao;
import cn.iocoder.yudao.module.iot.domain.visual.SelectVisualDto;
import com.baomidou.dynamic.datasource.annotation.DS;
@ -86,9 +86,20 @@ public interface TdEngineMapper {
@Param("superTableName") String superTableName,
@Param("field") TdFieldDO field);
void createTable(TableDto tableDto);
/**
* 创建表 - 创建超级表的子表
* @param tableDO 表信息
*/
@InterceptorIgnore(tenantLine = "true")
void createTable(TableDO tableDO);
void insertData(TableDto tableDto);
/**
* 插入数据 - 指定列插入数据
*
* @param tableDto 数据
*/
@InterceptorIgnore(tenantLine = "true")
void insertData(TableDO tableDto);
List<Map<String, Object>> selectByTimestamp(SelectDto selectDto);

View File

@ -6,6 +6,7 @@ import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
// TODO @芋艿在瞅瞅
@ -24,6 +25,7 @@ public class EmqxServiceImpl implements EmqxService {
// TODO 多线程处理消息
@Override
@Async
public void subscribeCallback(String topic, MqttMessage mqttMessage) {
log.info("收到消息,主题: {}, 内容: {}", topic, new String(mqttMessage.getPayload()));
// 根据不同的主题处理不同的业务逻辑

View File

@ -34,6 +34,6 @@ public class IotDeviceDataServiceImpl implements IotDeviceDataService {
.deviceName(deviceName)
.deviceKey(device.getDeviceKey())
.build();
thingModelMessageService.saveThingModelMessage(thingModelMessage);
thingModelMessageService.saveThingModelMessage(device,thingModelMessage);
}
}

View File

@ -20,6 +20,7 @@ import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.UUID;
import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception;
@ -213,10 +214,30 @@ public class IotDeviceServiceImpl implements IotDeviceService {
@Override
public void updateDeviceStatus(IotDeviceStatusUpdateReqVO updateReqVO) {
// 校验存在
validateDeviceExists(updateReqVO.getId());
IotDeviceDO iotDeviceDO = validateDeviceExists(updateReqVO.getId());
// 更新状态和更新时间
IotDeviceDO updateObj = BeanUtils.toBean(updateReqVO, IotDeviceDO.class);
// 以前是未激活现在是上线设置设备激活时间
if (Objects.equals(iotDeviceDO.getStatus(), IotDeviceStatusEnum.INACTIVE.getStatus())
&& Objects.equals(updateObj.getStatus(), IotDeviceStatusEnum.ONLINE.getStatus())) {
updateObj.setActiveTime(LocalDateTime.now());
}
// 如果是上线设置上线时间
if (Objects.equals(updateObj.getStatus(), IotDeviceStatusEnum.ONLINE.getStatus())) {
updateObj.setLastOnlineTime(LocalDateTime.now());
}
// 如果是离线设置离线时间
if (Objects.equals(updateObj.getStatus(), IotDeviceStatusEnum.OFFLINE.getStatus())) {
updateObj.setLastOfflineTime(LocalDateTime.now());
}
// 设置状态更新时间
updateObj.setStatusLastUpdateTime(LocalDateTime.now());
deviceMapper.updateById(updateObj);
}

View File

@ -60,27 +60,10 @@ public class IotDbStructureDataServiceImpl implements IotDbStructureDataService
dataType("NCHAR").
dataLength(64).
build());
//
tagsFields.add(TdFieldDO.builder().
fieldName("year").
fieldName("device_type").
dataType("INT").
build());
//
tagsFields.add(TdFieldDO.builder().
fieldName("month").
dataType("INT").
build());
//
tagsFields.add(TdFieldDO.builder().
fieldName("day").
dataType("INT").
build());
//
tagsFields.add(TdFieldDO.builder().
fieldName("hour").
dataType("INT").
build());
// 4. 获取超级表的名称
String superTableName = getProductPropertySTableName(deviceType, thingModel.getProductKey());

View File

@ -1,8 +1,8 @@
package cn.iocoder.yudao.module.iot.service.tdengine;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.TableDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.TdFieldDO;
import cn.iocoder.yudao.module.iot.domain.SelectDto;
import cn.iocoder.yudao.module.iot.domain.TableDto;
import cn.iocoder.yudao.module.iot.domain.TagsSelectDao;
import cn.iocoder.yudao.module.iot.domain.visual.SelectVisualDto;
@ -70,17 +70,15 @@ public interface IotTdEngineService {
* 创建表
*
* @param tableDto 表信息
* @throws Exception 异常
*/
void createTable(TableDto tableDto) throws Exception;
void createTable(TableDO tableDto);
/**
* 插入数据
*
* @param tableDto 表信息
* @throws Exception 异常
*/
void insertData(TableDto tableDto) throws Exception;
void insertData(TableDO tableDto);
/**
* 根据时间戳查询数据

View File

@ -1,9 +1,9 @@
package cn.iocoder.yudao.module.iot.service.tdengine;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.TableDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.TdFieldDO;
import cn.iocoder.yudao.module.iot.dal.tdengine.TdEngineMapper;
import cn.iocoder.yudao.module.iot.domain.SelectDto;
import cn.iocoder.yudao.module.iot.domain.TableDto;
import cn.iocoder.yudao.module.iot.domain.TagsSelectDao;
import cn.iocoder.yudao.module.iot.domain.visual.SelectVisualDto;
import jakarta.annotation.Resource;
@ -31,12 +31,12 @@ public class IotTdEngineServiceImpl implements IotTdEngineService {
}
@Override
public void createTable(TableDto tableDto) {
public void createTable(TableDO tableDto) {
tdEngineMapper.createTable(tableDto);
}
@Override
public void insertData(TableDto tableDto) {
public void insertData(TableDO tableDto) {
tdEngineMapper.insertData(tableDto);
}

View File

@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.service.tdengine;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.ThingModelMessage;
/**
@ -10,7 +11,8 @@ public interface IotThingModelMessageService {
/**
* 保存物模型消息
*
* @param device 设备
* @param thingModelMessage 物模型消息
*/
void saveThingModelMessage(ThingModelMessage thingModelMessage);
void saveThingModelMessage(IotDeviceDO device,ThingModelMessage thingModelMessage);
}

View File

@ -1,13 +1,127 @@
package cn.iocoder.yudao.module.iot.service.tdengine;
import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.IotDeviceStatusUpdateReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.FieldParser;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.TableDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.TdFieldDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.ThingModelMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.thinkmodelfunction.IotThinkModelFunctionDO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStatusEnum;
import cn.iocoder.yudao.module.iot.enums.product.IotProductFunctionTypeEnum;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import cn.iocoder.yudao.module.iot.service.thinkmodelfunction.IotThinkModelFunctionService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Slf4j
@Service
public class IotThingModelMessageServiceImpl implements IotThingModelMessageService {
@Value("${spring.datasource.dynamic.datasource.tdengine.url}")
private String url;
@Resource
private IotThinkModelFunctionService iotThinkModelFunctionService;
@Resource
private IotDeviceService iotDeviceService;
@Resource
private IotTdEngineService iotTdEngineService;
@Override
public void saveThingModelMessage(ThingModelMessage thingModelMessage) {
// TODO 芋艿后续实现
@TenantIgnore
public void saveThingModelMessage(IotDeviceDO device, ThingModelMessage thingModelMessage) {
// 判断设备状态如果为未激活状态创建数据表
if (device.getStatus().equals(0)) {
// 创建设备数据表
createDeviceTable(device.getDeviceType(), device.getProductKey(), device.getDeviceName(), device.getDeviceKey());
// 更新设备状态
IotDeviceStatusUpdateReqVO updateReqVO = new IotDeviceStatusUpdateReqVO();
updateReqVO.setId(device.getId());
updateReqVO.setStatus(IotDeviceStatusEnum.ONLINE.getStatus());
iotDeviceService.updateDeviceStatus(updateReqVO);
}
// 1. 获取设备属性
Map<String, Object> params = thingModelMessage.dataToMap();
// 2. 物模型校验过滤非物模型属性
List<IotThinkModelFunctionDO> thinkModelFunctionListByProductKey = iotThinkModelFunctionService.getThinkModelFunctionListByProductKey(thingModelMessage.getProductKey());
// 2.1 筛选是属性 IotProductFunctionTypeEnum
thinkModelFunctionListByProductKey.removeIf(iotThinkModelFunctionDO -> !iotThinkModelFunctionDO.getType().equals(IotProductFunctionTypeEnum.PROPERTY.getType()));
if (thinkModelFunctionListByProductKey.isEmpty()) {
return;
}
// 2.2 获取属性名称
Map<String, String> thingModelProperties = thinkModelFunctionListByProductKey.stream().collect(Collectors.toMap(IotThinkModelFunctionDO::getIdentifier, IotThinkModelFunctionDO::getName));
// 4. 保存属性记录
List<TdFieldDO> schemaFieldValues = new ArrayList<>();
// 1. 设置字段名
schemaFieldValues.add(new TdFieldDO("time", thingModelMessage.getTime()));
// 2. 遍历新属性
params.forEach((key, val) -> {
if (thingModelProperties.containsKey(key)) {
schemaFieldValues.add(new TdFieldDO(key.toLowerCase(), val));
}
});
// 3. 保存设备属性
TableDO tableData = new TableDO();
tableData.setDataBaseName(url.substring(url.lastIndexOf("/") + 1));
tableData.setSuperTableName(getProductPropertySTableName(device.getDeviceType(), device.getProductKey()));
tableData.setTableName("device_" + device.getProductKey().toLowerCase() + "_" + device.getDeviceName().toLowerCase());
tableData.setSchemaFieldValues(schemaFieldValues);
// 4. 保存数据
iotTdEngineService.insertData(tableData);
}
private void createDeviceTable(Integer deviceType, String productKey, String deviceName, String deviceKey) {
List<TdFieldDO> tagsFieldValues = new ArrayList<>();
String SuperTableName = getProductPropertySTableName(deviceType, productKey);
List<Map<String, Object>> maps = iotTdEngineService.describeSuperTable(url.substring(url.lastIndexOf("/") + 1), SuperTableName);
if (maps != null) {
List<Map<String, Object>> taggedNotesList = maps.stream().filter(map -> "TAG".equals(map.get("note"))).toList();
tagsFieldValues = FieldParser.parse(taggedNotesList.stream()
.map(map -> List.of(map.get("field"), map.get("type"), map.get("length")))
.collect(Collectors.toList()));
for (TdFieldDO tagsFieldValue : tagsFieldValues) {
switch (tagsFieldValue.getFieldName()) {
case "product_key" -> tagsFieldValue.setFieldValue(productKey);
case "device_key" -> tagsFieldValue.setFieldValue(deviceKey);
case "device_name" -> tagsFieldValue.setFieldValue(deviceName);
case "device_type" -> tagsFieldValue.setFieldValue(deviceType);
}
}
}
// 1. 创建设备数据表
String tableName = "device_" + productKey.toLowerCase() + "_" + deviceName.toLowerCase();
TableDO tableDto = new TableDO();
tableDto.setDataBaseName(url.substring(url.lastIndexOf("/") + 1));
tableDto.setSuperTableName(SuperTableName);
tableDto.setTableName(tableName);
tableDto.setTagsFieldValues(tagsFieldValues);
iotTdEngineService.createTable(tableDto);
}
static String getProductPropertySTableName(Integer deviceType, String productKey) {
return switch (deviceType) {
case 1 -> String.format("gateway_sub_%s", productKey).toLowerCase();
case 2 -> String.format("gateway_%s", productKey).toLowerCase();
default -> String.format("device_%s", productKey).toLowerCase();
};
}
}

View File

@ -68,4 +68,12 @@ public interface IotThinkModelFunctionService {
* @param productId 产品编号
*/
void createSuperTableDataModel(Long productId);
/**
* 获得产品物模型列表
*
* @param productKey 产品 Key
* @return 产品物模型列表
*/
List<IotThinkModelFunctionDO> getThinkModelFunctionListByProductKey(String productKey);
}

View File

@ -199,6 +199,11 @@ public class IotThinkModelFunctionServiceImpl implements IotThinkModelFunctionSe
dbStructureDataService.createSuperTableDataModel(product, functionList);
}
@Override
public List<IotThinkModelFunctionDO> getThinkModelFunctionListByProductKey(String productKey) {
return thinkModelFunctionMapper.selectListByProductKey(productKey);
}
/**
* 创建默认的事件和服务
*/

View File

@ -110,30 +110,27 @@
</foreach>
</update>
<!-- CREATE TABLE [IF NOT EXISTS] tb_name USING stb_name TAGS (tag_value1, ...);-->
<update id="createTable">
create table
if not exists #{dataBaseName}.#{tableName}
using #{dataBaseName}.#{superTableName}
tags
<foreach item="item" collection="tagsFieldValues" separator=","
open="(" close=")" index="">
CREATE TABLE IF NOT EXISTS ${dataBaseName}.${tableName}
USING ${dataBaseName}.${superTableName}
<foreach item="item" collection="tagsFieldValues" separator="," open="(" close=")">
${item.fieldName}
</foreach>
TAGS
<foreach item="item" collection="tagsFieldValues" separator="," open="(" close=")">
#{item.fieldValue}
</foreach>
</update>
<!-- insert into d1004 (ts, voltage, phase) values("2018-10-04 14:38:06", 223, 0.29) -->
<insert id="insertData">
insert into #{dataBaseName}.#{tableName}
INSERT INTO ${dataBaseName}.${tableName}
<foreach item="item" collection="schemaFieldValues" separator=","
open="(" close=")" index="">
#{item.fieldName}
${item.fieldName}
</foreach>
using #{dataBaseName}.#{superTableName}
tags
<foreach item="item" collection="tagsFieldValues" separator=","
open="(" close=")" index="">
#{item.fieldValue}
</foreach>
values
VALUES
<foreach item="item" collection="schemaFieldValues" separator=","
open="(" close=")" index="">
#{item.fieldValue}