【新增功能】 mqtt 数据接收

This commit is contained in:
安浩浩 2024-10-27 21:26:35 +08:00
parent 88088c7987
commit 8c84ac9d8a
13 changed files with 224 additions and 23 deletions

View File

@ -25,6 +25,11 @@
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-spring-boot-starter-biz-tenant</artifactId>
</dependency>
<!-- Web 相关 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>

View File

@ -0,0 +1,70 @@
package cn.iocoder.yudao.module.iot.dal.dataobject.tdengine;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.Map;
/**
* 物模型消息
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class ThingModelMessage {
/**
* 消息ID
*/
private String id;
/**
* 扩展功能的参数
*/
private Object sys;
/**
* 请求方法 例如thing.event.property.post
*/
private String method;
/**
* 请求参数
*/
private Object params;
/**
* 属性上报时间戳
*/
private Long time;
/**
* 设备信息
*/
private String productKey;
/**
* 设备名称
*/
private String deviceName;
/**
* 设备 key
*/
private String deviceKey;
/**
* 转换为 Map 类型
*/
public Map<String, Object> dataToMap() {
Map<String, Object> mapData = new HashMap<>();
if (params instanceof Map) {
((Map<?, ?>) params).forEach((key, value) -> mapData.put(key.toString(), value));
}
return mapData;
}
}

View File

@ -1,5 +1,8 @@
package cn.iocoder.yudao.module.iot.emq.service;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceDataService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@ -16,13 +19,20 @@ import org.springframework.stereotype.Service;
@Service
public class EmqxServiceImpl implements EmqxService {
@Resource
private IotDeviceDataService iotDeviceDataService;
// TODO 多线程处理消息
@Override
public void subscribeCallback(String topic, MqttMessage mqttMessage) {
log.info("收到消息,主题: {}, 内容: {}", topic, new String(mqttMessage.getPayload()));
// 根据不同的主题处理不同的业务逻辑
if (topic.contains("/property/post")) {
// 设备上报数据
// 设备上报数据 topic /sys/f13f57c63e9/dianbiao1/thing/event/property/post
String productKey = topic.split("/")[2];
String deviceName = topic.split("/")[3];
String message = new String(mqttMessage.getPayload());
iotDeviceDataService.saveDeviceData(productKey, deviceName, message);
}
}
@ -30,7 +40,7 @@ public class EmqxServiceImpl implements EmqxService {
public void subscribe(MqttClient client) {
try {
// 订阅默认主题可以根据需要修改
// client.subscribe("$share/yudao/+/+/#", 1);
client.subscribe("/sys/+/+/#", 1);
log.info("订阅默认主题成功");
} catch (Exception e) {
log.error("订阅默认主题失败", e);

View File

@ -0,0 +1,26 @@
package cn.iocoder.yudao.module.iot.service.device;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.IotDevicePageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.IotDeviceSaveReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.IotDeviceStatusUpdateReqVO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import jakarta.validation.Valid;
/**
* IoT 设备数据 Service 接口
*
* @author 芋道源码
*/
public interface IotDeviceDataService {
/**
* 保存设备数据
*
* @param productKey 产品 key
* @param deviceName 设备名称
* @param message 消息
*/
void saveDeviceData(String productKey, String deviceName, String message);
}

View File

@ -0,0 +1,39 @@
package cn.iocoder.yudao.module.iot.service.device;
import cn.hutool.json.JSONObject;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.ThingModelMessage;
import cn.iocoder.yudao.module.iot.service.tdengine.IotThingModelMessageService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class IotDeviceDataServiceImpl implements IotDeviceDataService {
@Resource
private IotDeviceService deviceService;
@Resource
private IotThingModelMessageService thingModelMessageService;
@Override
public void saveDeviceData(String productKey, String deviceName, String message) {
// 1. 根据产品 key 和设备名称获得设备信息
IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceName(productKey, deviceName);
// 2. 解析消息保存数据
JSONObject jsonObject = new JSONObject(message);
log.info("[saveDeviceData][productKey({}) deviceName({}) data({})]", productKey, deviceName, jsonObject);
ThingModelMessage thingModelMessage = ThingModelMessage.builder()
.id(jsonObject.getStr("id"))
.sys(jsonObject.get("sys"))
.method(jsonObject.getStr("method"))
.params(jsonObject.get("params"))
.time(jsonObject.getLong("time") == null ? System.currentTimeMillis() : jsonObject.getLong("time"))
.productKey(productKey)
.deviceName(deviceName)
.deviceKey(device.getDeviceKey())
.build();
thingModelMessageService.saveThingModelMessage(thingModelMessage);
}
}

View File

@ -64,4 +64,13 @@ public interface IotDeviceService {
* @return 设备数量
*/
Long getDeviceCountByProductId(Long productId);
/**
* 根据产品 key 和设备名称获得设备信息
*
* @param productKey 产品 key
* @param deviceName 设备名称
* @return 设备信息
*/
IotDeviceDO getDeviceByProductKeyAndDeviceName(String productKey, String deviceName);
}

View File

@ -4,6 +4,7 @@ import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.IotDevicePageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.IotDeviceSaveReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.IotDeviceStatusUpdateReqVO;
@ -32,7 +33,7 @@ import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.*;
@Service
@Validated
@Slf4j
public class DeviceServiceImpl implements IotDeviceService {
public class IotDeviceServiceImpl implements IotDeviceService {
@Resource
private IotDeviceMapper deviceMapper;
@ -224,4 +225,10 @@ public class DeviceServiceImpl implements IotDeviceService {
return deviceMapper.selectCountByProductId(productId);
}
@Override
@TenantIgnore
public IotDeviceDO getDeviceByProductKeyAndDeviceName(String productKey, String deviceName) {
return deviceMapper.selectByProductKeyAndDeviceName(productKey, deviceName);
}
}

View File

@ -25,7 +25,7 @@ import java.util.stream.Collectors;
public class IotDbStructureDataServiceImpl implements IotDbStructureDataService {
@Resource
private TdEngineService tdEngineService;
private IotTdEngineService iotTdEngineService;
@Resource
private TdRestApi tdRestApi;
@ -87,7 +87,7 @@ public class IotDbStructureDataServiceImpl implements IotDbStructureDataService
// 5. 创建超级表
String dataBaseName = url.substring(url.lastIndexOf("/") + 1);
tdEngineService.createSuperTable(schemaFields, tagsFields, dataBaseName, superTableName);
iotTdEngineService.createSuperTable(schemaFields, tagsFields, dataBaseName, superTableName);
}
@Override
@ -107,7 +107,7 @@ public class IotDbStructureDataServiceImpl implements IotDbStructureDataService
private List<TdFieldDO> getTableFields(String tableName) {
List<TdFieldDO> fields = new ArrayList<>();
// 获取超级表的描述信息
List<Map<String, Object>> maps = tdEngineService.describeSuperTable(url.substring(url.lastIndexOf("/") + 1), tableName);
List<Map<String, Object>> maps = iotTdEngineService.describeSuperTable(url.substring(url.lastIndexOf("/") + 1), tableName);
if (maps != null) {
// 过滤掉 note 字段为 TAG 的记录
maps = maps.stream().filter(map -> !"TAG".equals(map.get("note"))).toList();
@ -133,16 +133,16 @@ public class IotDbStructureDataServiceImpl implements IotDbStructureDataService
String dataBaseName = url.substring(url.lastIndexOf("/") + 1);
// 添加新增字段
if (CollUtil.isNotEmpty(addFields)) {
tdEngineService.addColumnForSuperTable(dataBaseName,tableName, addFields);
iotTdEngineService.addColumnForSuperTable(dataBaseName, tableName, addFields);
}
// 删除旧字段
if (CollUtil.isNotEmpty(dropFields)) {
tdEngineService.dropColumnForSuperTable(dataBaseName,tableName, dropFields);
iotTdEngineService.dropColumnForSuperTable(dataBaseName, tableName, dropFields);
}
// 修改字段先删除再添加
if (CollUtil.isNotEmpty(modifyFields)) {
tdEngineService.dropColumnForSuperTable(dataBaseName,tableName, modifyFields);
tdEngineService.addColumnForSuperTable(dataBaseName,tableName, modifyFields);
iotTdEngineService.dropColumnForSuperTable(dataBaseName, tableName, modifyFields);
iotTdEngineService.addColumnForSuperTable(dataBaseName, tableName, modifyFields);
}
}
@ -181,7 +181,7 @@ public class IotDbStructureDataServiceImpl implements IotDbStructureDataService
String superTableName = getProductPropertySTableName(product.getDeviceType(), product.getProductKey());
String dataBaseName = url.substring(url.lastIndexOf("/") + 1);
Integer tableExists = tdEngineService.checkSuperTableExists(dataBaseName, superTableName);
Integer tableExists = iotTdEngineService.checkSuperTableExists(dataBaseName, superTableName);
if (tableExists != null && tableExists > 0) {
updateSuperTable(thingModel, product.getDeviceType());

View File

@ -12,7 +12,7 @@ import java.util.Map;
/**
* TdEngineService
*/
public interface TdEngineService {
public interface IotTdEngineService {
/**
* 创建数据库
@ -49,7 +49,7 @@ public interface TdEngineService {
* @param superTableName 超级表名称
* @param fieldsVo 字段信息
*/
void addColumnForSuperTable(String dataBaseName,String superTableName, List<TdFieldDO> fieldsVo);
void addColumnForSuperTable(String dataBaseName, String superTableName, List<TdFieldDO> fieldsVo);
/**
* 为超级表删除列
@ -58,7 +58,7 @@ public interface TdEngineService {
* @param superTableName 超级表名称
* @param fieldsVo 字段信息
*/
void dropColumnForSuperTable(String dataBaseName,String superTableName, List<TdFieldDO> fieldsVo);
void dropColumnForSuperTable(String dataBaseName, String superTableName, List<TdFieldDO> fieldsVo);
/**
* 为超级表添加tag

View File

@ -15,7 +15,7 @@ import java.util.Map;
@Service
@Slf4j
public class TdEngineServiceImpl implements TdEngineService {
public class IotTdEngineServiceImpl implements IotTdEngineService {
@Resource
private TdEngineMapper tdEngineMapper;

View File

@ -0,0 +1,16 @@
package cn.iocoder.yudao.module.iot.service.tdengine;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.ThingModelMessage;
/**
* 物模型消息 Service
*/
public interface IotThingModelMessageService {
/**
* 保存物模型消息
*
* @param thingModelMessage 物模型消息
*/
void saveThingModelMessage(ThingModelMessage thingModelMessage);
}

View File

@ -0,0 +1,13 @@
package cn.iocoder.yudao.module.iot.service.tdengine;
import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.ThingModelMessage;
import org.springframework.stereotype.Service;
@Service
public class IotThingModelMessageServiceImpl implements IotThingModelMessageService {
@Override
public void saveThingModelMessage(ThingModelMessage thingModelMessage) {
// TODO 芋艿后续实现
}
}

View File

@ -2,9 +2,10 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.iocoder.yudao.module.iot.dal.tdengine.TdEngineMapper">
<!-- TODO 对 $ 符号有安全要求的话,后期改为接口方式 -->
<update id="createDatabase" parameterType="String">
create database if not exists #{dataBaseName}
CREATE DATABASE IF NOT EXISTS ${dataBaseName}
</update>
<update id="createSuperTable">
@ -271,7 +272,8 @@
<select id="getCountByTimestamp" parameterType="cn.iocoder.yudao.module.iot.domain.SelectDto"
resultType="java.util.Map">
SELECT count(0) AS count
FROM #{dataBaseName}.#{tableName} WHERE ${fieldName} BETWEEN #{startTime} AND #{endTime}
FROM #{dataBaseName}.#{tableName}
WHERE ${fieldName} BETWEEN #{startTime} AND #{endTime}
</select>
<select id="showSuperTables" resultType="java.util.Map">
@ -287,26 +289,30 @@
<select id="getLastDataByTags" parameterType="cn.iocoder.yudao.module.iot.domain.TagsSelectDao"
resultType="Map">
select last(*)
from #{dataBaseName}.#{stableName} group by ${tagsName}
from #{dataBaseName}.#{stableName}
group by ${tagsName}
</select>
<select id="getHistoryData" resultType="java.util.Map"
parameterType="cn.iocoder.yudao.module.iot.domain.visual.SelectVisualDto">
SELECT #{fieldName}, ts
FROM #{dataBaseName}.#{tableName} WHERE ts BETWEEN #{startTime} AND #{endTime}
FROM #{dataBaseName}.#{tableName}
WHERE ts BETWEEN #{startTime} AND #{endTime}
LIMIT #{num}
</select>
<select id="getRealtimeData" resultType="java.util.Map"
parameterType="cn.iocoder.yudao.module.iot.domain.visual.SelectVisualDto">
SELECT #{fieldName}, ts
FROM #{dataBaseName}.#{tableName} LIMIT #{num}
FROM #{dataBaseName}.#{tableName}
LIMIT #{num}
</select>
<select id="getAggregateData" resultType="java.util.Map"
parameterType="cn.iocoder.yudao.module.iot.domain.visual.SelectVisualDto">
SELECT #{aggregate}(${fieldName})
FROM #{dataBaseName}.#{tableName} WHERE ts BETWEEN #{startTime} AND #{endTime} interval (${interval})
FROM #{dataBaseName}.#{tableName}
WHERE ts BETWEEN #{startTime} AND #{endTime} interval (${interval})
LIMIT #{num}
</select>