reactor:【IoT 物联网】消息处理时,异步进行时间相关的记录

This commit is contained in:
YunaiV 2025-06-11 21:31:01 +08:00
parent 33fed79820
commit c3499af524
13 changed files with 105 additions and 257 deletions

View File

@ -22,7 +22,7 @@ public interface RedisKeyConstants {
/** /**
* 设备的最后上报时间采用 ZSET 结构 * 设备的最后上报时间采用 ZSET 结构
* *
* KEY 格式{productKey},${deviceName} * KEY 格式{deviceId}
* SCORE上报时间 * SCORE上报时间
*/ */
String DEVICE_REPORT_TIMES = "iot:device_report_times"; String DEVICE_REPORT_TIMES = "iot:device_report_times";
@ -39,7 +39,7 @@ public interface RedisKeyConstants {
/** /**
* 设备信息的数据缓存使用 Spring Cache 操作忽略租户 * 设备信息的数据缓存使用 Spring Cache 操作忽略租户
* *
* KEY 格式 1device_${id} * KEY 格式 1device_${deviceId}
* KEY 格式 2device_${productKey}_${deviceName} * KEY 格式 2device_${productKey}_${deviceName}
* VALUE 数据类型String(JSON) * VALUE 数据类型String(JSON)
*/ */
@ -48,7 +48,7 @@ public interface RedisKeyConstants {
/** /**
* 产品信息的数据缓存使用 Spring Cache 操作忽略租户 * 产品信息的数据缓存使用 Spring Cache 操作忽略租户
* *
* KEY 格式product_${id} * KEY 格式product_${productId}
* VALUE 数据类型String(JSON) * VALUE 数据类型String(JSON)
*/ */
String PRODUCT = "iot:product"; String PRODUCT = "iot:product";

View File

@ -1,8 +1,6 @@
package cn.iocoder.yudao.module.iot.dal.redis.device; package cn.iocoder.yudao.module.iot.dal.redis.device;
import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.collection.CollectionUtils;
import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants; import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
@ -11,6 +9,8 @@ import org.springframework.stereotype.Repository;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Set; import java.util.Set;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
/** /**
* 设备的最后上报时间的 Redis DAO * 设备的最后上报时间的 Redis DAO
* *
@ -22,17 +22,15 @@ public class DeviceReportTimeRedisDAO {
@Resource @Resource
private StringRedisTemplate stringRedisTemplate; private StringRedisTemplate stringRedisTemplate;
public void update(String productKey, String deviceName, LocalDateTime reportTime) { public void update(Long deviceId, LocalDateTime reportTime) {
String value = productKey + StrUtil.COMMA + deviceName; // 使用 , 分隔 stringRedisTemplate.opsForZSet().add(RedisKeyConstants.DEVICE_REPORT_TIMES, String.valueOf(deviceId),
stringRedisTemplate.opsForZSet().add(RedisKeyConstants.DEVICE_REPORT_TIMES, value,
LocalDateTimeUtil.toEpochMilli(reportTime)); LocalDateTimeUtil.toEpochMilli(reportTime));
} }
public Set<String[]> range(LocalDateTime maxReportTime) { public Set<Long> range(LocalDateTime maxReportTime) {
Set<String> values = stringRedisTemplate.opsForZSet().rangeByScore(RedisKeyConstants.DEVICE_REPORT_TIMES, 0, Set<String> values = stringRedisTemplate.opsForZSet().rangeByScore(RedisKeyConstants.DEVICE_REPORT_TIMES,
LocalDateTimeUtil.toEpochMilli(maxReportTime)); 0, LocalDateTimeUtil.toEpochMilli(maxReportTime));
return CollectionUtils.convertSet(values, return convertSet(values, Long::parseLong);
value -> value.split(StrUtil.COMMA)); // 使用, 分隔
} }
} }

View File

@ -1,6 +1,5 @@
package cn.iocoder.yudao.module.iot.dal.redis.device; package cn.iocoder.yudao.module.iot.dal.redis.device;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants; import cn.iocoder.yudao.module.iot.dal.redis.RedisKeyConstants;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
@ -17,40 +16,15 @@ public class DeviceServerIdRedisDAO {
@Resource @Resource
private StringRedisTemplate stringRedisTemplate; private StringRedisTemplate stringRedisTemplate;
/** public void update(Long deviceId, String serverId) {
* 更新设备关联的网关 serverId stringRedisTemplate.opsForHash().put(RedisKeyConstants.DEVICE_SERVER_ID,
* String.valueOf(deviceId), serverId);
* @param productKey 产品标识
* @param deviceName 设备名称
* @param serverId 网关 serverId
*/
public void update(String productKey, String deviceName, String serverId) {
String hashKey = buildHashKey(productKey, deviceName);
stringRedisTemplate.opsForHash().put(RedisKeyConstants.DEVICE_SERVER_ID, hashKey, serverId);
} }
/** public String get(Long deviceId) {
* 获得设备关联的网关 serverId Object value = stringRedisTemplate.opsForHash().get(RedisKeyConstants.DEVICE_SERVER_ID,
* String.valueOf(deviceId));
* @param productKey 产品标识
* @param deviceName 设备名称
* @return 网关 serverId
*/
public String get(String productKey, String deviceName) {
String hashKey = buildHashKey(productKey, deviceName);
Object value = stringRedisTemplate.opsForHash().get(RedisKeyConstants.DEVICE_SERVER_ID, hashKey);
return value != null ? (String) value : null; return value != null ? (String) value : null;
} }
/**
* 构建 HASH KEY
*
* @param productKey 产品标识
* @param deviceName 设备名称
* @return HASH KEY
*/
private String buildHashKey(String productKey, String deviceName) {
return productKey + StrUtil.COMMA + deviceName;
}
} }

View File

@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.job.device; package cn.iocoder.yudao.module.iot.job.device;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.quartz.core.handler.JobHandler; import cn.iocoder.yudao.framework.quartz.core.handler.JobHandler;
import cn.iocoder.yudao.framework.tenant.core.job.TenantJob; import cn.iocoder.yudao.framework.tenant.core.job.TenantJob;
@ -20,8 +19,6 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import static cn.iocoder.yudao.framework.common.util.collection.CollectionUtils.convertSet;
/** /**
* IoT 设备离线检查 Job * IoT 设备离线检查 Job
* *
@ -46,7 +43,6 @@ public class IotDeviceOfflineCheckJob implements JobHandler {
@Resource @Resource
private IotDeviceMessageService deviceMessageService; private IotDeviceMessageService deviceMessageService;
// TODO @芋艿需要重构下
@Override @Override
@TenantJob @TenantJob
public String execute(String param) { public String execute(String param) {
@ -56,22 +52,20 @@ public class IotDeviceOfflineCheckJob implements JobHandler {
return JsonUtils.toJsonString(Collections.emptyList()); return JsonUtils.toJsonString(Collections.emptyList());
} }
// 1.2 获取超时的设备集合 // 1.2 获取超时的设备集合
Set<String[]> timeoutDevices = devicePropertyService.getProductKeyDeviceNameListByReportTime( Set<Long> timeoutDeviceIds = devicePropertyService.getDeviceIdListByReportTime(
LocalDateTime.now().minus(OFFLINE_TIMEOUT)); LocalDateTime.now().minus(OFFLINE_TIMEOUT));
Set<String> timeoutDevices2 = convertSet(timeoutDevices, item -> item[0] + StrUtil.COMMA + item[1]);
// 2. 下线设备 // 2. 下线设备
List<String[]> offlineDeviceKeys = CollUtil.newArrayList(); List<String[]> offlineDevices = CollUtil.newArrayList();
for (IotDeviceDO device : devices) { for (IotDeviceDO device : devices) {
String timeoutDeviceKey = device.getProductKey() + StrUtil.COMMA + device.getDeviceName(); if (!timeoutDeviceIds.contains(device.getId())) {
if (!timeoutDevices2.contains(timeoutDeviceKey)) {
continue; continue;
} }
offlineDeviceKeys.add(new String[]{device.getProductKey(), device.getDeviceName()}); offlineDevices.add(new String[]{device.getProductKey(), device.getDeviceName()});
// 为什么不直接更新状态呢因为通过 IotDeviceMessage 可以经过一系列的处理例如说记录日志等等 // 为什么不直接更新状态呢因为通过 IotDeviceMessage 可以经过一系列的处理例如说记录日志等等
deviceMessageService.sendDeviceMessage(IotDeviceMessage.buildStateOffline().setDeviceId(device.getId())); deviceMessageService.sendDeviceMessage(IotDeviceMessage.buildStateOffline().setDeviceId(device.getId()));
} }
return JsonUtils.toJsonString(offlineDeviceKeys); return JsonUtils.toJsonString(offlineDevices);
} }
} }

View File

@ -65,9 +65,9 @@ public class IotDeviceMessageSubscriber implements IotMessageSubscriber<IotDevic
TenantUtils.execute(message.getTenantId(), () -> { TenantUtils.execute(message.getTenantId(), () -> {
// 1.1 更新设备的最后时间 // 1.1 更新设备的最后时间
IotDeviceDO device = deviceService.validateDeviceExistsFromCache(message.getDeviceId()); IotDeviceDO device = deviceService.validateDeviceExistsFromCache(message.getDeviceId());
devicePropertyService.updateDeviceReportTime(device.getProductKey(), device.getDeviceName(), LocalDateTime.now()); devicePropertyService.updateDeviceReportTimeAsync(device.getId(), LocalDateTime.now());
// 1.2 更新设备的连接 server // 1.2 更新设备的连接 server
devicePropertyService.updateDeviceServerId(device.getProductKey(), device.getDeviceName(), message.getServerId()); devicePropertyService.updateDeviceServerIdAsync(device.getId(), message.getServerId());
// 2. 未上线的设备强制上线 // 2. 未上线的设备强制上线
forceDeviceOnline(message, device); forceDeviceOnline(message, device);

View File

@ -1,7 +1,9 @@
package cn.iocoder.yudao.module.iot.service.device.message; package cn.iocoder.yudao.module.iot.service.device.message;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.exception.ServiceException; import cn.iocoder.yudao.framework.common.exception.ServiceException;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils; import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum; import cn.iocoder.yudao.module.iot.core.enums.IotDeviceStateEnum;
@ -16,6 +18,7 @@ import cn.iocoder.yudao.module.iot.service.device.property.IotDevicePropertyServ
import com.google.common.base.Objects; import com.google.common.base.Objects;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
@ -56,10 +59,16 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
log.info("[defineDeviceMessageStable][设备消息超级表不存在,创建成功]"); log.info("[defineDeviceMessageStable][设备消息超级表不存在,创建成功]");
} }
// TODO @芋艿要不要异步记录 @Async
private void createDeviceLog(IotDeviceMessage message) { void createDeviceLogAsync(IotDeviceMessage message) {
IotDeviceMessageDO messageDO = BeanUtils.toBean(message, IotDeviceMessageDO.class) IotDeviceMessageDO messageDO = BeanUtils.toBean(message, IotDeviceMessageDO.class)
.setUpstream(IotDeviceMessageUtils.isUpstreamMessage(message)); .setUpstream(IotDeviceMessageUtils.isUpstreamMessage(message));
if (message.getParams() != null) {
messageDO.setParams(JsonUtils.toJsonString(messageDO.getData()));
}
if (messageDO.getData() != null) {
messageDO.setData(JsonUtils.toJsonString(messageDO.getData()));
}
deviceLogMapper.insert(messageDO); deviceLogMapper.insert(messageDO);
} }
@ -72,6 +81,10 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
// TODO @芋艿针对连接网关的设备是不是 productKeydeviceName 需要调整下 // TODO @芋艿针对连接网关的设备是不是 productKeydeviceName 需要调整下
@Override @Override
public IotDeviceMessage sendDeviceMessage(IotDeviceMessage message, IotDeviceDO device) { public IotDeviceMessage sendDeviceMessage(IotDeviceMessage message, IotDeviceDO device) {
return sendDeviceMessage(message, device, null);
}
private IotDeviceMessage sendDeviceMessage(IotDeviceMessage message, IotDeviceDO device, String serverId) {
// 1. 补充信息 // 1. 补充信息
appendDeviceMessage(message, device); appendDeviceMessage(message, device);
@ -84,31 +97,31 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
// 2.2 情况二发送下行消息 // 2.2 情况二发送下行消息
// 如果是下行消息需要校验 serverId 存在 // 如果是下行消息需要校验 serverId 存在
String serverId = devicePropertyService.getDeviceServerId(device.getProductKey(), device.getDeviceName());
if (StrUtil.isEmpty(serverId)) { if (StrUtil.isEmpty(serverId)) {
throw exception(DEVICE_DOWNSTREAM_FAILED_SERVER_ID_NULL); serverId = devicePropertyService.getDeviceServerId(device.getId());
if (StrUtil.isEmpty(serverId)) {
throw exception(DEVICE_DOWNSTREAM_FAILED_SERVER_ID_NULL);
}
} }
deviceMessageProducer.sendDeviceMessageToGateway(serverId, message); deviceMessageProducer.sendDeviceMessageToGateway(serverId, message);
// 特殊记录消息日志原因上行消息消费时已经会记录下行消息因为消费在 Gateway 所以需要在这里记录 // 特殊记录消息日志原因上行消息消费时已经会记录下行消息因为消费在 Gateway 所以需要在这里记录
createDeviceLog(message); getSelf().createDeviceLogAsync(message);
return message; return message;
} }
/** /**
* 补充消息的后端字段 * 补充消息的后端字段
* *
* @param message 消息 * @param message 消息
* @param device 设备信息 * @param device 设备信息
* @return 消息
*/ */
private IotDeviceMessage appendDeviceMessage(IotDeviceMessage message, IotDeviceDO device) { private void appendDeviceMessage(IotDeviceMessage message, IotDeviceDO device) {
message.setId(IotDeviceMessageUtils.generateMessageId()).setReportTime(LocalDateTime.now()) message.setId(IotDeviceMessageUtils.generateMessageId()).setReportTime(LocalDateTime.now())
.setDeviceId(device.getId()).setTenantId(device.getTenantId()); .setDeviceId(device.getId()).setTenantId(device.getTenantId());
// 特殊如果设备没有指定 requestId则使用 messageId // 特殊如果设备没有指定 requestId则使用 messageId
if (StrUtil.isEmpty(message.getRequestId())) { if (StrUtil.isEmpty(message.getRequestId())) {
message.setRequestId(message.getId()); message.setRequestId(message.getId());
} }
return message;
} }
@Override @Override
@ -120,26 +133,33 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
replyData = handleUpstreamDeviceMessage0(message, device); replyData = handleUpstreamDeviceMessage0(message, device);
} catch (ServiceException ex) { } catch (ServiceException ex) {
serviceException = ex; serviceException = ex;
log.warn("[onMessage][message({}) 业务异常]", message, serviceException); log.warn("[handleUpstreamDeviceMessage][message({}) 业务异常]", message, serviceException);
} catch (Exception ex) { } catch (Exception ex) {
log.error("[onMessage][message({}) 发生异常]", message, ex); log.error("[handleUpstreamDeviceMessage][message({}) 发生异常]", message, ex);
throw ex; throw ex;
} }
// 2. 记录消息 // 2. 记录消息
createDeviceLog(message); getSelf().createDeviceLogAsync(message);
// 3. 回复消息前提 _reply 消息并且非禁用回复的消息 // 3. 回复消息前提 _reply 消息并且非禁用回复的消息
if (IotDeviceMessageUtils.isReplyMessage(message) if (IotDeviceMessageUtils.isReplyMessage(message)
|| IotDeviceMessageMethodEnum.isReplyDisabled(message.getMethod())) { || IotDeviceMessageMethodEnum.isReplyDisabled(message.getMethod())
|| StrUtil.isEmpty(message.getServerId())) {
return; return;
} }
sendDeviceMessage(IotDeviceMessage.replyOf(message.getRequestId(), message.getMethod(), replyData, try {
serviceException != null ? serviceException.getCode() : null, IotDeviceMessage replyMessage = IotDeviceMessage.replyOf(message.getRequestId(), message.getMethod(), replyData,
serviceException != null ? serviceException.getMessage() : null)); serviceException != null ? serviceException.getCode() : null,
serviceException != null ? serviceException.getMessage() : null);
sendDeviceMessage(replyMessage, device, message.getServerId());
} catch (Exception ex) {
log.error("[handleUpstreamDeviceMessage][message({}) 回复消息失败]", message, ex);
}
} }
// TODO @芋艿可优化未来逻辑复杂后可以独立拆除 Processor 处理器 // TODO @芋艿可优化未来逻辑复杂后可以独立拆除 Processor 处理器
@SuppressWarnings("SameReturnValue")
private Object handleUpstreamDeviceMessage0(IotDeviceMessage message, IotDeviceDO device) { private Object handleUpstreamDeviceMessage0(IotDeviceMessage message, IotDeviceDO device) {
// 设备上线 // 设备上线
if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.STATE_ONLINE.getMethod())) { if (Objects.equal(message.getMethod(), IotDeviceMessageMethodEnum.STATE_ONLINE.getMethod())) {
@ -164,4 +184,8 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
return null; return null;
} }
private IotDeviceMessageServiceImpl getSelf() {
return SpringUtil.getBean(getClass());
}
} }

View File

@ -55,38 +55,35 @@ public interface IotDevicePropertyService {
// ========== 设备时间相关操作 ========== // ========== 设备时间相关操作 ==========
/** /**
* 获得最后上报时间小于指定时间的设备标识 * 获得最后上报时间小于指定时间的设备编号集合
* *
* @param maxReportTime 最大上报时间 * @param maxReportTime 最大上报时间
* @return [productKey, deviceName] 列表 * @return 设备编号集合
*/ */
Set<String[]> getProductKeyDeviceNameListByReportTime(LocalDateTime maxReportTime); Set<Long> getDeviceIdListByReportTime(LocalDateTime maxReportTime);
/** /**
* 更新设备上报时间 * 更新设备上报时间
* *
* @param productKey 产品标识 * @param id 设备编号
* @param deviceName 设备名称
* @param reportTime 上报时间 * @param reportTime 上报时间
*/ */
void updateDeviceReportTime(String productKey, String deviceName, LocalDateTime reportTime); void updateDeviceReportTimeAsync(Long id, LocalDateTime reportTime);
/** /**
* 更新设备关联的网关 serverId * 更新设备关联的网关服务 serverId
* *
* @param productKey 产品标识 * @param id 设备编号
* @param deviceName 设备名称
* @param serverId 网关 serverId * @param serverId 网关 serverId
*/ */
void updateDeviceServerId(String productKey, String deviceName, String serverId); void updateDeviceServerIdAsync(Long id, String serverId);
/** /**
* 获得设备关联的网关 serverId * 获得设备关联的网关服务 serverId
* *
* @param productKey 产品标识 * @param id 设备编号
* @param deviceName 设备名称
* @return 网关 serverId * @return 网关 serverId
*/ */
String getDeviceServerId(String productKey, String deviceName); String getDeviceServerId(Long id);
} }

View File

@ -27,6 +27,7 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@ -183,26 +184,27 @@ public class IotDevicePropertyServiceImpl implements IotDevicePropertyService {
// ========== 设备时间相关操作 ========== // ========== 设备时间相关操作 ==========
@Override @Override
public Set<String[]> getProductKeyDeviceNameListByReportTime(LocalDateTime maxReportTime) { public Set<Long> getDeviceIdListByReportTime(LocalDateTime maxReportTime) {
return deviceReportTimeRedisDAO.range(maxReportTime); return deviceReportTimeRedisDAO.range(maxReportTime);
} }
@Override @Override
public void updateDeviceReportTime(String productKey, String deviceName, LocalDateTime reportTime) { @Async
deviceReportTimeRedisDAO.update(productKey, deviceName, reportTime); public void updateDeviceReportTimeAsync(Long id, LocalDateTime reportTime) {
deviceReportTimeRedisDAO.update(id, reportTime);
} }
@Override @Override
public void updateDeviceServerId(String productKey, String deviceName, String serverId) { public void updateDeviceServerIdAsync(Long id, String serverId) {
if (StrUtil.isEmpty(serverId)) { if (StrUtil.isEmpty(serverId)) {
return; return;
} }
deviceServerIdRedisDAO.update(productKey, deviceName, serverId); deviceServerIdRedisDAO.update(id, serverId);
} }
@Override @Override
public String getDeviceServerId(String productKey, String deviceName) { public String getDeviceServerId(Long id) {
return deviceServerIdRedisDAO.get(productKey, deviceName); return deviceServerIdRedisDAO.get(id);
} }
} }

View File

@ -12,11 +12,13 @@
tenant_id BIGINT, tenant_id BIGINT,
server_id NCHAR(50), server_id NCHAR(50),
upstream BOOL, upstream BOOL,
reply BOOL,
request_id NCHAR(50), request_id NCHAR(50),
method NCHAR(100), method NCHAR(100),
params NCHAR(2048), params NCHAR(2048),
data NCHAR(2048), data NCHAR(2048),
code INT code INT,
msg NCHAR(256)
) TAGS ( ) TAGS (
device_id BIGINT device_id BIGINT
) )
@ -29,21 +31,22 @@
<insert id="insert"> <insert id="insert">
INSERT INTO device_message_${deviceId} ( INSERT INTO device_message_${deviceId} (
ts, id, report_time, tenant_id, server_id, ts, id, report_time, tenant_id, server_id,
upstream, request_id, method, params, data, upstream, reply, request_id, method, params,
code data, code, msg
) )
USING device_message USING device_message
TAGS (#{deviceId}) TAGS (#{deviceId})
VALUES ( VALUES (
#{ts}, #{id}, #{reportTime}, #{tenantId}, #{serverId}, NOW, #{id}, #{reportTime}, #{tenantId}, #{serverId},
#{upstream}, #{requestId}, #{method}, #{params}, #{data}, #{upstream}, #{reply}, #{requestId}, #{method}, #{params},
#{code} #{data}, #{code}, #{msg}
) )
</insert> </insert>
<select id="selectPage" resultType="cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceMessageDO"> <select id="selectPage" resultType="cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceMessageDO">
SELECT ts, id, report_time, device_id, tenant_id, server_id, upstream, SELECT ts, id, report_time, tenant_id, server_id,
request_id, method, params, data, code upstream, reply, request_id, method, params,
data, code, msg
FROM device_message_${reqVO.deviceId} FROM device_message_${reqVO.deviceId}
<where> <where>
<if test="reqVO.method != null and reqVO.method != ''"> <if test="reqVO.method != null and reqVO.method != ''">

View File

@ -39,7 +39,7 @@ public class IotHttpDownstreamSubscriber implements IotMessageSubscriber<IotDevi
@Override @Override
public void onMessage(IotDeviceMessage message) { public void onMessage(IotDeviceMessage message) {
log.error("[onMessage][IoT 网关 HTTP 协议不支持下行消息,忽略消息:{}]", message); log.info("[onMessage][IoT 网关 HTTP 协议不支持下行消息,忽略消息:{}]", message);
} }
} }

View File

@ -6,7 +6,6 @@ import cn.hutool.core.text.StrPool;
import cn.hutool.extra.spring.SpringUtil; import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol; import cn.iocoder.yudao.module.iot.gateway.protocol.http.IotHttpUpstreamProtocol;
import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService; import cn.iocoder.yudao.module.iot.gateway.service.device.message.IotDeviceMessageService;
import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.RoutingContext;
@ -26,13 +25,10 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
private final IotHttpUpstreamProtocol protocol; private final IotHttpUpstreamProtocol protocol;
private final IotDeviceMessageProducer deviceMessageProducer;
private final IotDeviceMessageService deviceMessageService; private final IotDeviceMessageService deviceMessageService;
public IotHttpUpstreamHandler(IotHttpUpstreamProtocol protocol) { public IotHttpUpstreamHandler(IotHttpUpstreamProtocol protocol) {
this.protocol = protocol; this.protocol = protocol;
this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class);
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class); this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
} }
@ -46,10 +42,11 @@ public class IotHttpUpstreamHandler extends IotHttpAbstractHandler {
// 2.1 解析消息 // 2.1 解析消息
byte[] bytes = context.body().buffer().getBytes(); byte[] bytes = context.body().buffer().getBytes();
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(bytes, IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(bytes,
productKey, deviceName, protocol.getServerId()); productKey, deviceName);
Assert.equals(method, message.getMethod(), "method 不匹配"); Assert.equals(method, message.getMethod(), "method 不匹配");
// 2.2 发送消息 // 2.2 发送消息
deviceMessageProducer.sendDeviceMessage(message); deviceMessageService.sendDeviceMessage(message,
productKey, deviceName, protocol.getServerId());
// 3. 返回结果 // 3. 返回结果
return CommonResult.success(MapUtil.of("messageId", message.getId())); return CommonResult.success(MapUtil.of("messageId", message.getId()));

View File

@ -96,20 +96,18 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
/** /**
* 补充消息的后端字段 * 补充消息的后端字段
* *
* @param message 消息 * @param message 消息
* @param device 设备信息 * @param device 设备信息
* @param serverId 设备连接的 serverId * @param serverId 设备连接的 serverId
* @return 消息
*/ */
private IotDeviceMessage appendDeviceMessage(IotDeviceMessage message, private void appendDeviceMessage(IotDeviceMessage message,
IotDeviceRespDTO device, String serverId) { IotDeviceRespDTO device, String serverId) {
message.setId(IotDeviceMessageUtils.generateMessageId()).setReportTime(LocalDateTime.now()) message.setId(IotDeviceMessageUtils.generateMessageId()).setReportTime(LocalDateTime.now())
.setDeviceId(device.getId()).setTenantId(device.getTenantId()).setServerId(serverId); .setDeviceId(device.getId()).setTenantId(device.getTenantId()).setServerId(serverId);
// 特殊如果设备没有指定 requestId则使用 messageId // 特殊如果设备没有指定 requestId则使用 messageId
if (StrUtil.isEmpty(message.getRequestId())) { if (StrUtil.isEmpty(message.getRequestId())) {
message.setRequestId(message.getId()); message.setRequestId(message.getId());
} }
return message;
} }
} }

View File

@ -1,139 +0,0 @@
package cn.iocoder.yudao.module.iot.gateway.service.auth;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.time.Duration;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.when;
/**
* {@link IotDeviceTokenServiceImpl} 的单元测试
*
* @author 芋道源码
*/
@ExtendWith(MockitoExtension.class)
class IotDeviceTokenServiceImplTest {
@Mock
private IotGatewayProperties gatewayProperties;
@InjectMocks
private IotDeviceTokenServiceImpl tokenService;
private IotGatewayProperties.TokenProperties tokenProperties;
@BeforeEach
void setUp() {
// 初始化 Token 配置
tokenProperties = new IotGatewayProperties.TokenProperties();
tokenProperties.setSecret("1234567890123456789012345678901");
tokenProperties.setExpiration(Duration.ofDays(7));
when(gatewayProperties.getToken()).thenReturn(tokenProperties);
}
@Test
void testCreateToken_Success() {
// 准备参数
String productKey = "testProduct";
String deviceName = "testDevice";
// 调用方法
String token = tokenService.createToken(productKey, deviceName);
// 验证结果
assertNotNull(token);
assertFalse(token.isEmpty());
}
@Test
void testCreateToken_WithBlankParameters() {
// 测试空白参数
assertNull(tokenService.createToken("", "deviceName"));
assertNull(tokenService.createToken("productKey", ""));
assertNull(tokenService.createToken(null, "deviceName"));
assertNull(tokenService.createToken("productKey", null));
}
@Test
void testCreateToken_WithoutConfig() {
// 模拟配置为空
when(gatewayProperties.getToken()).thenReturn(null);
// 调用方法
String token = tokenService.createToken("productKey", "deviceName");
// 验证结果
assertNull(token);
}
@Test
void testVerifyToken_Success() {
// 准备参数
String productKey = "testProduct";
String deviceName = "testDevice";
// 创建 Token
String token = tokenService.createToken(productKey, deviceName);
assertNotNull(token);
// 验证 Token
IotDeviceAuthUtils.DeviceInfo deviceInfo = tokenService.verifyToken(token);
// 验证结果
assertNotNull(deviceInfo);
assertEquals(productKey, deviceInfo.getProductKey());
assertEquals(deviceName, deviceInfo.getDeviceName());
}
@Test
void testVerifyToken_WithBlankToken() {
// 测试空白 Token
assertNull(tokenService.verifyToken(""));
assertNull(tokenService.verifyToken(null));
}
@Test
void testVerifyToken_WithInvalidToken() {
// 测试无效 Token
assertNull(tokenService.verifyToken("invalid.token.here"));
}
@Test
void testVerifyToken_WithoutConfig() {
// 模拟配置为空
when(gatewayProperties.getToken()).thenReturn(null);
// 调用方法
IotDeviceAuthUtils.DeviceInfo deviceInfo = tokenService.verifyToken("any.token.here");
// 验证结果
assertNull(deviceInfo);
}
@Test
void testTokenRoundTrip() {
// 测试完整的 Token 创建和验证流程
String productKey = "myProduct";
String deviceName = "myDevice";
// 1. 创建 Token
String token = tokenService.createToken(productKey, deviceName);
assertNotNull(token);
// 2. 验证 Token
IotDeviceAuthUtils.DeviceInfo deviceInfo = tokenService.verifyToken(token);
assertNotNull(deviceInfo);
assertEquals(productKey, deviceInfo.getProductKey());
assertEquals(deviceName, deviceInfo.getDeviceName());
}
}