parseServiceDataFromPayload(JSONObject jsonObject) {
- JSONObject params = jsonObject.getJSONObject("params");
- if (params == null) {
- log.warn("[parseServiceDataFromPayload][消息格式不正确,缺少 params 字段][jsonObject: {}]", jsonObject);
- return Map.of();
- }
- return params;
- }
-
/**
* 发送响应消息
*
@@ -104,18 +88,22 @@ public class IotMqttServiceHandler extends IotMqttAbstractHandler {
* @param method 响应方法
*/
private void sendResponse(String topic, JSONObject jsonObject, String method) {
- String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic);
+ try {
+ String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic);
- // 构建响应消息
- JSONObject response = new JSONObject();
- response.set("id", jsonObject.getStr("id"));
- response.set("code", 200);
- response.set("method", method);
- response.set("data", new JSONObject());
+ // 构建响应消息
+ JSONObject response = new JSONObject();
+ response.set("id", jsonObject.getStr("id"));
+ response.set("code", 200);
+ response.set("method", method);
+ response.set("data", new JSONObject());
- // 发送响应
- protocol.publishMessage(replyTopic, response.toString());
- log.debug("[sendResponse][发送响应消息][topic: {}]", replyTopic);
+ // 发送响应
+ protocol.publishMessage(replyTopic, response.toString());
+ log.debug("[sendResponse][发送响应消息成功][topic: {}]", replyTopic);
+ } catch (Exception e) {
+ log.error("[sendResponse][发送响应消息失败][topic: {}]", topic, e);
+ }
}
}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamRouter.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamRouter.java
index c4b37ad148..70e5a31b18 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamRouter.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/protocol/mqtt/router/IotMqttUpstreamRouter.java
@@ -5,6 +5,7 @@ import cn.hutool.extra.spring.SpringUtil;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
+import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService;
import io.vertx.mqtt.messages.MqttPublishMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -22,6 +23,7 @@ public class IotMqttUpstreamRouter {
private final IotMqttUpstreamProtocol protocol;
private final IotDeviceMessageProducer deviceMessageProducer;
+ private final IotDeviceMessageService deviceMessageService;
// 处理器实例
private IotMqttPropertyHandler propertyHandler;
@@ -31,10 +33,11 @@ public class IotMqttUpstreamRouter {
public IotMqttUpstreamRouter(IotMqttUpstreamProtocol protocol) {
this.protocol = protocol;
this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class);
+ this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
// 初始化处理器
- this.propertyHandler = new IotMqttPropertyHandler(protocol, deviceMessageProducer);
- this.eventHandler = new IotMqttEventHandler(protocol, deviceMessageProducer);
- this.serviceHandler = new IotMqttServiceHandler(protocol, deviceMessageProducer);
+ this.propertyHandler = new IotMqttPropertyHandler(protocol, deviceMessageProducer, deviceMessageService);
+ this.eventHandler = new IotMqttEventHandler(protocol, deviceMessageProducer, deviceMessageService);
+ this.serviceHandler = new IotMqttServiceHandler(protocol, deviceMessageProducer, deviceMessageService);
}
/**
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/IotDeviceCacheService.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/IotDeviceCacheService.java
new file mode 100644
index 0000000000..efd8dc60f5
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/IotDeviceCacheService.java
@@ -0,0 +1,75 @@
+package cn.iocoder.yudao.module.iot.gateway.service.device;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * IoT 设备缓存 Service 接口
+ *
+ * @author 芋道源码
+ */
+public interface IotDeviceCacheService {
+
+ /**
+ * 设备信息
+ */
+ @Data
+ @AllArgsConstructor
+ @NoArgsConstructor
+ class DeviceInfo {
+ /**
+ * 设备编号
+ */
+ private Long deviceId;
+ /**
+ * 产品标识
+ */
+ private String productKey;
+ /**
+ * 设备名称
+ */
+ private String deviceName;
+ /**
+ * 设备密钥
+ */
+ private String deviceKey;
+ /**
+ * 租户编号
+ */
+ private Long tenantId;
+ }
+
+ /**
+ * 根据 productKey 和 deviceName 获取设备信息
+ *
+ * @param productKey 产品标识
+ * @param deviceName 设备名称
+ * @return 设备信息,如果不存在返回 null
+ */
+ DeviceInfo getDeviceInfo(String productKey, String deviceName);
+
+ /**
+ * 根据 deviceId 获取设备信息
+ *
+ * @param deviceId 设备编号
+ * @return 设备信息,如果不存在返回 null
+ */
+ DeviceInfo getDeviceInfoById(Long deviceId);
+
+ /**
+ * 清除设备缓存
+ *
+ * @param deviceId 设备编号
+ */
+ void evictDeviceCache(Long deviceId);
+
+ /**
+ * 清除设备缓存
+ *
+ * @param productKey 产品标识
+ * @param deviceName 设备名称
+ */
+ void evictDeviceCache(String productKey, String deviceName);
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/IotDeviceCacheServiceImpl.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/IotDeviceCacheServiceImpl.java
new file mode 100644
index 0000000000..5de9d6b719
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/IotDeviceCacheServiceImpl.java
@@ -0,0 +1,241 @@
+package cn.iocoder.yudao.module.iot.gateway.service.device;
+
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.extra.spring.SpringUtil;
+import cn.iocoder.yudao.framework.common.pojo.CommonResult;
+import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
+import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceInfoReqDTO;
+import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceInfoRespDTO;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * IoT 设备缓存 Service 实现类
+ *
+ * 使用本地缓存 + 远程 API 的方式获取设备信息,提高性能并避免敏感信息传输
+ *
+ * @author 芋道源码
+ */
+@Service
+@Slf4j
+public class IotDeviceCacheServiceImpl implements IotDeviceCacheService {
+
+ /**
+ * 设备信息本地缓存
+ * Key: deviceId
+ * Value: DeviceInfo
+ */
+ private final ConcurrentHashMap deviceIdCache = new ConcurrentHashMap<>();
+
+ /**
+ * 设备名称到设备ID的映射缓存
+ * Key: productKey:deviceName
+ * Value: deviceId
+ */
+ private final ConcurrentHashMap deviceNameCache = new ConcurrentHashMap<>();
+
+ /**
+ * 锁对象,防止并发请求同一设备信息
+ */
+ private final ConcurrentHashMap lockMap = new ConcurrentHashMap<>();
+
+ @Override
+ public DeviceInfo getDeviceInfo(String productKey, String deviceName) {
+ if (StrUtil.isEmpty(productKey) || StrUtil.isEmpty(deviceName)) {
+ log.warn("[getDeviceInfo][参数为空][productKey: {}][deviceName: {}]", productKey, deviceName);
+ return null;
+ }
+
+ String cacheKey = buildDeviceNameCacheKey(productKey, deviceName);
+
+ // 1. 先从缓存获取 deviceId
+ Long deviceId = deviceNameCache.get(cacheKey);
+ if (deviceId != null) {
+ DeviceInfo deviceInfo = deviceIdCache.get(deviceId);
+ if (deviceInfo != null) {
+ log.debug("[getDeviceInfo][缓存命中][productKey: {}][deviceName: {}][deviceId: {}]",
+ productKey, deviceName, deviceId);
+ return deviceInfo;
+ }
+ }
+
+ // 2. 缓存未命中,从远程 API 获取
+ return loadDeviceInfoFromApi(productKey, deviceName, cacheKey);
+ }
+
+ @Override
+ public DeviceInfo getDeviceInfoById(Long deviceId) {
+ if (deviceId == null) {
+ log.warn("[getDeviceInfoById][deviceId 为空]");
+ return null;
+ }
+
+ // 1. 先从缓存获取
+ DeviceInfo deviceInfo = deviceIdCache.get(deviceId);
+ if (deviceInfo != null) {
+ log.debug("[getDeviceInfoById][缓存命中][deviceId: {}]", deviceId);
+ return deviceInfo;
+ }
+
+ // 2. 缓存未命中,从远程 API 获取
+ return loadDeviceInfoByIdFromApi(deviceId);
+ }
+
+ @Override
+ public void evictDeviceCache(Long deviceId) {
+ if (deviceId == null) {
+ return;
+ }
+
+ DeviceInfo deviceInfo = deviceIdCache.remove(deviceId);
+ if (deviceInfo != null) {
+ String cacheKey = buildDeviceNameCacheKey(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
+ deviceNameCache.remove(cacheKey);
+ log.info("[evictDeviceCache][清除设备缓存][deviceId: {}]", deviceId);
+ }
+ }
+
+ @Override
+ public void evictDeviceCache(String productKey, String deviceName) {
+ if (StrUtil.isEmpty(productKey) || StrUtil.isEmpty(deviceName)) {
+ return;
+ }
+
+ String cacheKey = buildDeviceNameCacheKey(productKey, deviceName);
+ Long deviceId = deviceNameCache.remove(cacheKey);
+ if (deviceId != null) {
+ deviceIdCache.remove(deviceId);
+ log.info("[evictDeviceCache][清除设备缓存][productKey: {}][deviceName: {}]", productKey, deviceName);
+ }
+ }
+
+ /**
+ * 从远程 API 加载设备信息
+ *
+ * @param productKey 产品标识
+ * @param deviceName 设备名称
+ * @param cacheKey 缓存键
+ * @return 设备信息
+ */
+ private DeviceInfo loadDeviceInfoFromApi(String productKey, String deviceName, String cacheKey) {
+ // 使用锁防止并发请求同一设备信息
+ ReentrantLock lock = lockMap.computeIfAbsent(cacheKey, k -> new ReentrantLock());
+ lock.lock();
+ try {
+ // 双重检查,防止重复请求
+ Long deviceId = deviceNameCache.get(cacheKey);
+ if (deviceId != null) {
+ DeviceInfo deviceInfo = deviceIdCache.get(deviceId);
+ if (deviceInfo != null) {
+ return deviceInfo;
+ }
+ }
+
+ log.info("[loadDeviceInfoFromApi][从远程API获取设备信息][productKey: {}][deviceName: {}]",
+ productKey, deviceName);
+
+ try {
+ // 调用远程 API 获取设备信息
+ IotDeviceCommonApi deviceCommonApi = SpringUtil.getBean(IotDeviceCommonApi.class);
+ IotDeviceInfoReqDTO reqDTO = new IotDeviceInfoReqDTO();
+ reqDTO.setProductKey(productKey);
+ reqDTO.setDeviceName(deviceName);
+
+ CommonResult result = deviceCommonApi.getDeviceInfo(reqDTO);
+
+ if (result == null || !result.isSuccess() || result.getData() == null) {
+ log.warn("[loadDeviceInfoFromApi][远程API调用失败][productKey: {}][deviceName: {}][result: {}]",
+ productKey, deviceName, result);
+ return null;
+ }
+
+ IotDeviceInfoRespDTO respDTO = result.getData();
+ DeviceInfo deviceInfo = new DeviceInfo(
+ respDTO.getDeviceId(),
+ respDTO.getProductKey(),
+ respDTO.getDeviceName(),
+ respDTO.getDeviceKey(),
+ respDTO.getTenantId());
+
+ // 缓存设备信息
+ cacheDeviceInfo(deviceInfo, cacheKey);
+
+ log.info("[loadDeviceInfoFromApi][设备信息获取成功并已缓存][deviceInfo: {}]", deviceInfo);
+ return deviceInfo;
+
+ } catch (Exception e) {
+ log.error("[loadDeviceInfoFromApi][远程API调用异常][productKey: {}][deviceName: {}]",
+ productKey, deviceName, e);
+ return null;
+ }
+ } finally {
+ lock.unlock();
+ // 清理锁对象,避免内存泄漏
+ if (lockMap.size() > 1000) { // 简单的清理策略
+ lockMap.entrySet().removeIf(entry -> !entry.getValue().hasQueuedThreads());
+ }
+ }
+ }
+
+ /**
+ * 从远程 API 根据 deviceId 加载设备信息
+ *
+ * @param deviceId 设备编号
+ * @return 设备信息
+ */
+ private DeviceInfo loadDeviceInfoByIdFromApi(Long deviceId) {
+ String lockKey = "deviceId:" + deviceId;
+ ReentrantLock lock = lockMap.computeIfAbsent(lockKey, k -> new ReentrantLock());
+ lock.lock();
+ try {
+ // 双重检查
+ DeviceInfo deviceInfo = deviceIdCache.get(deviceId);
+ if (deviceInfo != null) {
+ return deviceInfo;
+ }
+
+ log.info("[loadDeviceInfoByIdFromApi][从远程API获取设备信息][deviceId: {}]", deviceId);
+
+ try {
+ // TODO: 这里需要添加根据 deviceId 获取设备信息的 API
+ // 暂时返回 null,等待 API 完善
+ log.warn("[loadDeviceInfoByIdFromApi][根据deviceId获取设备信息的API尚未实现][deviceId: {}]", deviceId);
+ return null;
+
+ } catch (Exception e) {
+ log.error("[loadDeviceInfoByIdFromApi][远程API调用异常][deviceId: {}]", deviceId, e);
+ return null;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * 缓存设备信息
+ *
+ * @param deviceInfo 设备信息
+ * @param cacheKey 缓存键
+ */
+ private void cacheDeviceInfo(DeviceInfo deviceInfo, String cacheKey) {
+ if (deviceInfo != null && deviceInfo.getDeviceId() != null) {
+ deviceIdCache.put(deviceInfo.getDeviceId(), deviceInfo);
+ deviceNameCache.put(cacheKey, deviceInfo.getDeviceId());
+ }
+ }
+
+ /**
+ * 构建设备名称缓存键
+ *
+ * @param productKey 产品标识
+ * @param deviceName 设备名称
+ * @return 缓存键
+ */
+ private String buildDeviceNameCacheKey(String productKey, String deviceName) {
+ return productKey + ":" + deviceName;
+ }
+
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/IotDeviceClientServiceImpl.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/IotDeviceClientServiceImpl.java
index 366d94aab1..ab499c42c7 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/IotDeviceClientServiceImpl.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/device/IotDeviceClientServiceImpl.java
@@ -1,17 +1,26 @@
package cn.iocoder.yudao.module.iot.gateway.service.device;
import cn.hutool.core.lang.Assert;
+import cn.hutool.core.bean.BeanUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
+import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceInfoReqDTO;
+import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceInfoRespDTO;
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.client.RestTemplateBuilder;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
+import java.util.LinkedHashMap;
+
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
/**
@@ -43,6 +52,11 @@ public class IotDeviceClientServiceImpl implements IotDeviceCommonApi {
return doPost("/auth", authReqDTO);
}
+ @Override
+ public CommonResult getDeviceInfo(IotDeviceInfoReqDTO infoReqDTO) {
+ return doPostForDeviceInfo("/info", infoReqDTO);
+ }
+
@SuppressWarnings("unchecked")
private CommonResult doPost(String url, T requestBody) {
try {
@@ -57,4 +71,37 @@ public class IotDeviceClientServiceImpl implements IotDeviceCommonApi {
}
}
+ @SuppressWarnings("unchecked")
+ private CommonResult doPostForDeviceInfo(String url, T requestBody) {
+ try {
+ // 使用 ParameterizedTypeReference 来处理泛型类型
+ ParameterizedTypeReference>> typeRef = new ParameterizedTypeReference>>() {
+ };
+
+ HttpEntity requestEntity = new HttpEntity<>(requestBody);
+ ResponseEntity>> response = restTemplate.exchange(url,
+ HttpMethod.POST, requestEntity, typeRef);
+
+ CommonResult> rawResult = response.getBody();
+ log.info("[doPostForDeviceInfo][url({}) requestBody({}) rawResult({})]", url, requestBody, rawResult);
+ Assert.notNull(rawResult, "请求结果不能为空");
+
+ // 手动转换数据类型
+ CommonResult result = new CommonResult<>();
+ result.setCode(rawResult.getCode());
+ result.setMsg(rawResult.getMsg());
+
+ if (rawResult.isSuccess() && rawResult.getData() != null) {
+ // 将 LinkedHashMap 转换为 IotDeviceInfoRespDTO
+ IotDeviceInfoRespDTO deviceInfo = BeanUtil.toBean(rawResult.getData(), IotDeviceInfoRespDTO.class);
+ result.setData(deviceInfo);
+ }
+
+ return result;
+ } catch (Exception e) {
+ log.error("[doPostForDeviceInfo][url({}) requestBody({}) 发生异常]", url, requestBody, e);
+ return CommonResult.error(INTERNAL_SERVER_ERROR);
+ }
+ }
+
}
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/message/IotDeviceMessageService.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/message/IotDeviceMessageService.java
index 9a5c458a0d..2feea15eb2 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/message/IotDeviceMessageService.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/message/IotDeviceMessageService.java
@@ -12,7 +12,7 @@ public interface IotDeviceMessageService {
/**
* 编码消息
*
- * @param message 消息
+ * @param message 消息
* @param productKey 产品 Key
* @param deviceName 设备名称
* @return 编码后的消息内容
@@ -23,10 +23,10 @@ public interface IotDeviceMessageService {
/**
* 解码消息
*
- * @param bytes 消息内容
+ * @param bytes 消息内容
* @param productKey 产品 Key
* @param deviceName 设备名称
- * @param serverId 设备连接的 serverId
+ * @param serverId 设备连接的 serverId
* @return 解码后的消息内容
*/
IotDeviceMessage decodeDeviceMessage(byte[] bytes,
@@ -35,8 +35,21 @@ public interface IotDeviceMessageService {
/**
* 构建【设备上线】消息
*
+ * @param productKey 产品 Key
+ * @param deviceName 设备名称
+ * @param serverId 设备连接的 serverId
* @return 消息
*/
IotDeviceMessage buildDeviceMessageOfStateOnline(String productKey, String deviceName, String serverId);
+ /**
+ * 构建【设备下线】消息
+ *
+ * @param productKey 产品 Key
+ * @param deviceName 设备名称
+ * @param serverId 设备连接的 serverId
+ * @return 消息
+ */
+ IotDeviceMessage buildDeviceMessageOfStateOffline(String productKey, String deviceName, String serverId);
+
}
diff --git a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/message/IotDeviceMessageServiceImpl.java b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/message/IotDeviceMessageServiceImpl.java
index f39b08baf1..3c89ca7efe 100644
--- a/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/message/IotDeviceMessageServiceImpl.java
+++ b/yudao-module-iot/yudao-module-iot-gateway/src/main/java/cn/iocoder/yudao/module/iot/gateway/service/message/IotDeviceMessageServiceImpl.java
@@ -6,6 +6,9 @@ import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec;
+import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceCacheService;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
@@ -18,6 +21,7 @@ import java.util.Map;
* @author 芋道源码
*/
@Service
+@Slf4j
public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
/**
@@ -25,6 +29,9 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
*/
private final Map codes;
+ @Resource
+ private IotDeviceCacheService deviceCacheService;
+
public IotDeviceMessageServiceImpl(List codes) {
this.codes = CollectionUtils.convertMap(codes, IotAlinkDeviceMessageCodec::type);
}
@@ -32,51 +39,106 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
@Override
public byte[] encodeDeviceMessage(IotDeviceMessage message,
String productKey, String deviceName) {
- // TODO @芋艿:获取设备信息
- String codecType = "alink";
- return codes.get(codecType).encode(message);
+ // 获取设备信息以确定编解码类型
+ IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfo(productKey, deviceName);
+ if (deviceInfo == null) {
+ log.warn("[encodeDeviceMessage][设备信息不存在][productKey: {}][deviceName: {}]",
+ productKey, deviceName);
+ return null;
+ }
+
+ String codecType = "alink"; // 默认使用 alink 编解码器
+ IotAlinkDeviceMessageCodec codec = codes.get(codecType);
+ if (codec == null) {
+ log.error("[encodeDeviceMessage][编解码器不存在][codecType: {}]", codecType);
+ return null;
+ }
+
+ return codec.encode(message);
}
@Override
public IotDeviceMessage decodeDeviceMessage(byte[] bytes,
String productKey, String deviceName, String serverId) {
- // TODO @芋艿:获取设备信息
- String codecType = "alink";
- IotDeviceMessage message = codes.get(codecType).decode(bytes);
+ // 获取设备信息
+ IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfo(productKey, deviceName);
+ if (deviceInfo == null) {
+ log.warn("[decodeDeviceMessage][设备信息不存在][productKey: {}][deviceName: {}]",
+ productKey, deviceName);
+ return null;
+ }
+
+ String codecType = "alink"; // 默认使用 alink 编解码器
+ IotAlinkDeviceMessageCodec codec = codes.get(codecType);
+ if (codec == null) {
+ log.error("[decodeDeviceMessage][编解码器不存在][codecType: {}]", codecType);
+ return null;
+ }
+
+ IotDeviceMessage message = codec.decode(bytes);
+ if (message == null) {
+ log.warn("[decodeDeviceMessage][消息解码失败][productKey: {}][deviceName: {}]",
+ productKey, deviceName);
+ return null;
+ }
+
// 补充后端字段
- Long deviceId = 25L;
- Long tenantId = 1L;
- appendDeviceMessage(message, deviceId, tenantId, serverId);
- return message;
+ return appendDeviceMessage(message, deviceInfo, serverId);
}
@Override
public IotDeviceMessage buildDeviceMessageOfStateOnline(String productKey, String deviceName, String serverId) {
+ // 获取设备信息
+ IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfo(productKey, deviceName);
+ if (deviceInfo == null) {
+ log.warn("[buildDeviceMessageOfStateOnline][设备信息不存在][productKey: {}][deviceName: {}]",
+ productKey, deviceName);
+ return null;
+ }
+
IotDeviceMessage message = IotDeviceMessage.of(null,
IotDeviceMessageMethodEnum.STATE_ONLINE.getMethod(), null);
- // 补充后端字段
- Long deviceId = 25L;
- Long tenantId = 1L;
- return appendDeviceMessage(message, deviceId, tenantId, serverId);
+
+ return appendDeviceMessage(message, deviceInfo, serverId);
+ }
+
+ @Override
+ public IotDeviceMessage buildDeviceMessageOfStateOffline(String productKey, String deviceName, String serverId) {
+ // 获取设备信息
+ IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfo(productKey, deviceName);
+ if (deviceInfo == null) {
+ log.warn("[buildDeviceMessageOfStateOffline][设备信息不存在][productKey: {}][deviceName: {}]",
+ productKey, deviceName);
+ return null;
+ }
+
+ IotDeviceMessage message = IotDeviceMessage.of(null,
+ IotDeviceMessageMethodEnum.STATE_OFFLINE.getMethod(), null);
+
+ return appendDeviceMessage(message, deviceInfo, serverId);
}
/**
* 补充消息的后端字段
*
- * @param message 消息
- * @param deviceId 设备编号
- * @param tenantId 租户编号
- * @param serverId 设备连接的 serverId
+ * @param message 消息
+ * @param deviceInfo 设备信息
+ * @param serverId 设备连接的 serverId
* @return 消息
*/
private IotDeviceMessage appendDeviceMessage(IotDeviceMessage message,
- Long deviceId, Long tenantId, String serverId) {
+ IotDeviceCacheService.DeviceInfo deviceInfo, String serverId) {
message.setId(IotDeviceMessageUtils.generateMessageId()).setReportTime(LocalDateTime.now())
- .setDeviceId(deviceId).setTenantId(tenantId).setServerId(serverId);
+ .setDeviceId(deviceInfo.getDeviceId()).setTenantId(deviceInfo.getTenantId()).setServerId(serverId);
+
// 特殊:如果设备没有指定 requestId,则使用 messageId
if (StrUtil.isEmpty(message.getRequestId())) {
message.setRequestId(message.getId());
}
+
+ log.debug("[appendDeviceMessage][消息字段补充完成][deviceId: {}][tenantId: {}]",
+ deviceInfo.getDeviceId(), deviceInfo.getTenantId());
+
return message;
}