feat:【IoT 物联网】重新实现 IotDeviceLogMessageSubscriber 的日志记录

This commit is contained in:
YunaiV 2025-06-01 10:51:55 +08:00
parent c3485a3f3d
commit ac624b7495
12 changed files with 91 additions and 109 deletions

View File

@ -11,7 +11,7 @@ import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public enum IotDeviceMessageIdentifierEnum {
PROPERTY_GET("get"), // 下行 TODO 芋艿讨论貌似这个上行更合理device 主动拉取配置 IotDevicePropertyGetReqDTO 一样的配置
PROPERTY_GET("get"), // 下行
PROPERTY_SET("set"), // 下行
PROPERTY_REPORT("report"), // 上行

View File

@ -9,6 +9,7 @@ import lombok.Data;
@Data
public class IotDeviceLogPageReqVO extends PageParam {
// TODO @芋艿优先级改成通过 deviceId 查询然后转换下
@Schema(description = "设备标识", requiredMode = Schema.RequiredMode.REQUIRED, example = "device123")
@NotEmpty(message = "设备标识不能为空")
private String deviceKey;

View File

@ -1,10 +1,10 @@
package cn.iocoder.yudao.module.iot.dal.dataobject.device;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.product.IotProductDO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageIdentifierEnum;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@ -31,11 +31,11 @@ public class IotDeviceLogDO {
private String id;
/**
* 请求编号
* 消息编号
*
* 对应 {@link IotDeviceMessage#getRequestId()} 字段
* 对应 {@link IotDeviceMessage#getMessageId()} 字段
*/
private String requestId;
private String messageId;
/**
* 产品标识
@ -50,11 +50,11 @@ public class IotDeviceLogDO {
*/
private String deviceName;
/**
* 设备标识
* <p>
* 关联 {@link IotDeviceDO#getDeviceKey()}}
* 设备编号
*
* 关联 {@link IotDeviceDO#getId()}
*/
private String deviceKey; // 非存储字段用于 TDengine TAG
private Long deviceId;
/**
* 日志类型
@ -87,6 +87,11 @@ public class IotDeviceLogDO {
*/
private Long reportTime;
/**
* 租户编号
*/
private Long tenantId;
/**
* 时序时间
*/

View File

@ -39,11 +39,10 @@ public class IotDeviceLogMessageSubscriber implements IotMessageSubscriber<IotDe
return "iot_device_log_consumer";
}
// TODO @芋艿后续再对接这个细节逻辑
@Override
public void onMessage(IotDeviceMessage message) {
log.info("[onMessage][消息内容({})]", message);
// deviceLogService.createDeviceLog(message);
deviceLogService.createDeviceLog(message);
}
}

View File

@ -17,6 +17,7 @@ import java.time.LocalDateTime;
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Deprecated
public class IotDeviceMessage {
/**

View File

@ -146,14 +146,11 @@ public class IotDeviceDownstreamServiceImpl implements IotDeviceDownstreamServic
// TODO @super可优化过滤掉不合法的属性
// 2. 发送请求
// TODO @芋艿deviceName 的设置
String deviceName = "xx";
Long tenantId = 1L;
cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage message = cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage
.of(getProductKey(device, parentDevice), getDeviceName(device, parentDevice), deviceName,
null, tenantId);
.of(getProductKey(device, parentDevice), getDeviceName(device, parentDevice));
String serverId = "192_168_64_1_8092";
deviceMessageProducer.sendGatewayDeviceMessage(serverId, message);
deviceMessageProducer.sendDeviceMessage(message);
// TODO @芋艿后续可以清理掉
return null;
}

View File

@ -2,8 +2,8 @@ package cn.iocoder.yudao.module.iot.service.device.data;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.data.IotDeviceLogPageReqVO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceLogDO;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import javax.annotation.Nullable;
import java.time.LocalDateTime;

View File

@ -8,9 +8,11 @@ import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.data.IotDeviceLogPageReqVO;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceLogDO;
import cn.iocoder.yudao.module.iot.dal.tdengine.IotDeviceLogMapper;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import jakarta.annotation.Resource;
@ -34,6 +36,9 @@ import java.util.stream.Collectors;
@Validated
public class IotDeviceLogServiceImpl implements IotDeviceLogService {
@Resource
private IotDeviceService deviceService;
@Resource
private IotDeviceLogMapper deviceLogMapper;
@ -51,9 +56,16 @@ public class IotDeviceLogServiceImpl implements IotDeviceLogService {
@Override
public void createDeviceLog(IotDeviceMessage message) {
IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceNameFromCache(
message.getProductKey(), message.getDeviceName());
if (device == null) {
log.error("[createDeviceLog][设备({}/{}) 不存在]", message.getProductKey(), message.getDeviceName());
return;
}
IotDeviceLogDO log = BeanUtils.toBean(message, IotDeviceLogDO.class)
.setId(IdUtil.fastSimpleUUID())
.setContent(JsonUtils.toJsonString(message.getData()));
.setContent(JsonUtils.toJsonString(message.getData()))
.setTenantId(device.getTenantId());
deviceLogMapper.insert(log);
}

View File

@ -8,15 +8,16 @@
CREATE STABLE IF NOT EXISTS device_log (
ts TIMESTAMP,
id NCHAR(50),
product_key NCHAR(50),
device_name NCHAR(50),
message_id NCHAR(50),
type NCHAR(50),
identifier NCHAR(255),
content NCHAR(1024),
code INT,
report_time TIMESTAMP
report_time TIMESTAMP,
tenant_id BIGINT
) TAGS (
device_key NCHAR(50)
product_key NCHAR(50),
device_name NCHAR(50)
)
</update>
@ -25,25 +26,21 @@
</select>
<insert id="insert">
INSERT INTO device_log_${deviceKey} (ts, id, product_key, device_name, type, identifier, content, code, report_time)
INSERT INTO device_log_${productKey}_${deviceName} (
ts, id, message_id, type, identifier,
content, code, report_time, tenant_id
)
USING device_log
TAGS ('${deviceKey}')
TAGS ('${productKey}', '${deviceName}')
VALUES (
NOW,
#{id},
#{productKey},
#{deviceName},
#{type},
#{identifier},
#{content},
#{code},
#{reportTime}
NOW, #{id}, #{messageId}, #{type}, #{identifier},
#{content}, #{code}, #{reportTime}, #{tenantId}
)
</insert>
<select id="selectPage" resultType="cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceLogDO">
SELECT ts, id, device_key, product_key, type, identifier, content, report_time
FROM device_log_${reqVO.deviceKey}
FROM device_log_${productKey}_${deviceName}
<where>
<if test="reqVO.type != null and reqVO.type != ''">
AND type = #{reqVO.type}

View File

@ -11,8 +11,6 @@ import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
import java.util.Map;
// TODO @芋艿参考阿里云的物模型优化 IoT 上下行消息的设计尽量保持一致渐进式不要一口气
/**
* IoT 设备消息
*/
@ -35,9 +33,9 @@ public class IotDeviceMessage {
public static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "_%s";
/**
* 请求编号
* 消息编号
*/
private String requestId;
private String messageId;
/**
* 设备信息
@ -47,10 +45,6 @@ public class IotDeviceMessage {
* 设备名称
*/
private String deviceName;
/**
* 设备标识
*/
private String deviceKey;
/**
* 消息类型
@ -89,11 +83,6 @@ public class IotDeviceMessage {
*/
private String serverId;
/**
* 租户编号
*/
private Long tenantId;
public IotDeviceMessage ofPropertyReport(Map<String, Object> properties) {
this.setType(IotDeviceMessageTypeEnum.PROPERTY.getType());
this.setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_REPORT.getIdentifier());
@ -108,26 +97,27 @@ public class IotDeviceMessage {
return this;
}
public static IotDeviceMessage of(String productKey, String deviceName, String deviceKey,
String serverId, Long tenantId) {
return of(productKey, deviceName, deviceKey,
null, null,
serverId, tenantId);
public static IotDeviceMessage of(String productKey, String deviceName) {
return of(productKey, deviceName,
null, null);
}
public static IotDeviceMessage of(String productKey, String deviceName, String deviceKey,
String requestId, LocalDateTime reportTime,
String serverId, Long tenantId) {
if (requestId == null) {
requestId = IotCoreUtils.generateRequestId();
public static IotDeviceMessage of(String productKey, String deviceName,
String serverId) {
return of(productKey, deviceName,
null, serverId);
}
public static IotDeviceMessage of(String productKey, String deviceName,
LocalDateTime reportTime, String serverId) {
if (reportTime == null) {
reportTime = LocalDateTime.now();
}
String messageId = IotCoreUtils.generateMessageId();
return IotDeviceMessage.builder()
.requestId(requestId).reportTime(reportTime)
.productKey(productKey).deviceName(deviceName).deviceKey(deviceKey)
.serverId(serverId).tenantId(tenantId).build();
.messageId(messageId).reportTime(reportTime)
.productKey(productKey).deviceName(deviceName)
.serverId(serverId).build();
}
// ========== Topic 相关 ==========

View File

@ -22,7 +22,7 @@ public class IotCoreUtils {
return serverId.replaceAll("\\.", "_");
}
public static String generateRequestId() {
public static String generateMessageId() {
return IdUtil.fastSimpleUUID();
}

View File

@ -2,8 +2,6 @@ package cn.iocoder.yudao.module.iot.gateway.protocol.http.router;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
@ -16,7 +14,6 @@ import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.time.LocalDateTime;
import java.util.Map;
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.BAD_REQUEST;
@ -69,36 +66,34 @@ public class IotHttpUpstreamHandler implements Handler<RoutingContext> {
@Override
public void handle(RoutingContext routingContext) {
String path = routingContext.request().path();
String requestId = IdUtil.fastSimpleUUID();
try {
// 1. 解析通用参数
Map<String, String> params = parseCommonParams(routingContext, requestId);
Map<String, String> params = parseCommonParams(routingContext);
String productKey = params.get("productKey");
String deviceName = params.get("deviceName");
JsonObject body = routingContext.body().asJsonObject();
requestId = params.get("requestId");
// 2. 根据路径模式处理不同类型的请求
if (isPropertyPostPath(path)) {
// 处理属性上报
handlePropertyPost(routingContext, productKey, deviceName, requestId, body);
handlePropertyPost(routingContext, productKey, deviceName, body);
return;
}
if (isEventPostPath(path)) {
// 处理事件上报
String identifier = routingContext.pathParam("identifier");
handleEventPost(routingContext, productKey, deviceName, identifier, requestId, body);
handleEventPost(routingContext, productKey, deviceName, identifier, body);
return;
}
// 不支持的请求路径
sendErrorResponse(routingContext, requestId, "unknown", BAD_REQUEST.getCode(), "不支持的请求路径");
sendErrorResponse(routingContext, "unknown", BAD_REQUEST.getCode(), "不支持的请求路径");
} catch (Exception e) {
log.error("[handle][处理上行请求异常] path={}", path, e);
String method = determineMethodFromPath(path, routingContext);
sendErrorResponse(routingContext, requestId, method, INTERNAL_SERVER_ERROR.getCode(),
sendErrorResponse(routingContext, method, INTERNAL_SERVER_ERROR.getCode(),
INTERNAL_SERVER_ERROR.getMsg());
}
}
@ -107,18 +102,12 @@ public class IotHttpUpstreamHandler implements Handler<RoutingContext> {
* 解析通用参数
*
* @param routingContext 路由上下文
* @param defaultRequestId 默认请求 ID
* @return 参数映射
*/
private Map<String, String> parseCommonParams(RoutingContext routingContext, String defaultRequestId) {
private Map<String, String> parseCommonParams(RoutingContext routingContext) {
Map<String, String> params = MapUtil.newHashMap();
params.put("productKey", routingContext.pathParam("productKey"));
params.put("deviceName", routingContext.pathParam("deviceName"));
JsonObject body = routingContext.body().asJsonObject();
String requestId = ObjUtil.defaultIfNull(body.getString("id"), defaultRequestId);
params.put("requestId", requestId);
return params;
}
@ -149,23 +138,18 @@ public class IotHttpUpstreamHandler implements Handler<RoutingContext> {
* @param routingContext 路由上下文
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param requestId 请求 ID
* @param body 请求体
*/
private void handlePropertyPost(RoutingContext routingContext, String productKey, String deviceName,
String requestId, JsonObject body) {
JsonObject body) {
// 1.1 构建设备消息
String deviceKey = "xxx"; // TODO @芋艿待支持
Long tenantId = 1L; // TODO @芋艿待支持
IotDeviceMessage message = IotDeviceMessage.of(productKey, deviceName, deviceKey,
requestId, LocalDateTime.now(),
protocol.getServerId(), tenantId)
IotDeviceMessage message = IotDeviceMessage.of(productKey, deviceName, protocol.getServerId())
.ofPropertyReport(parsePropertiesFromBody(body));
// 1.2 发送消息
deviceMessageProducer.sendDeviceMessage(message);
// 2. 返回响应
sendResponse(routingContext, requestId, null, null);
sendResponse(routingContext, null);
}
/**
@ -175,11 +159,10 @@ public class IotHttpUpstreamHandler implements Handler<RoutingContext> {
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param identifier 事件标识符
* @param requestId 请求 ID
* @param body 请求体
*/
private void handleEventPost(RoutingContext routingContext, String productKey, String deviceName,
String identifier, String requestId, JsonObject body) {
String identifier, JsonObject body) {
// // 处理事件上报
// IotDeviceEventReportReqDTO reportReqDTO = parseEventReportRequest(productKey, deviceName, identifier,
// requestId, body);
@ -196,11 +179,9 @@ public class IotHttpUpstreamHandler implements Handler<RoutingContext> {
* 发送响应
*
* @param routingContext 路由上下文
* @param requestId 请求 ID
* @param method 方法名
* @param result 结果
*/
private void sendResponse(RoutingContext routingContext, String requestId, String method,
private void sendResponse(RoutingContext routingContext,
CommonResult<Boolean> result) {
// // TODO @芋艿后续再优化
// IotStandardResponse response;
@ -218,12 +199,11 @@ public class IotHttpUpstreamHandler implements Handler<RoutingContext> {
* 发送错误响应
*
* @param routingContext 路由上下文
* @param requestId 请求 ID
* @param method 方法名
* @param code 错误代码
* @param message 错误消息
*/
private void sendErrorResponse(RoutingContext routingContext, String requestId, String method, Integer code,
private void sendErrorResponse(RoutingContext routingContext, String method, Integer code,
String message) {
// IotStandardResponse errorResponse = IotStandardResponse.error(requestId, method, code, message);
// IotNetComponentCommonUtils.writeJsonResponse(routingContext, errorResponse);