feat:【IoT 物联网】新增设备信息查询功能,优化 MQTT 消息处理逻辑
This commit is contained in:
parent
800a85f7bc
commit
4ea6e08f99
|
@ -34,13 +34,6 @@
|
|||
<version>${revision}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- IoT 协议模块 -->
|
||||
<dependency>
|
||||
<groupId>cn.iocoder.boot</groupId>
|
||||
<artifactId>yudao-module-iot-protocol</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cn.iocoder.boot</groupId>
|
||||
<artifactId>yudao-spring-boot-starter-biz-tenant</artifactId>
|
||||
|
|
|
@ -4,6 +4,9 @@ import cn.iocoder.yudao.framework.common.enums.RpcConstants;
|
|||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceInfoReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceInfoRespDTO;
|
||||
import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
|
||||
import cn.iocoder.yudao.module.iot.service.device.IotDeviceService;
|
||||
import jakarta.annotation.Resource;
|
||||
import jakarta.annotation.security.PermitAll;
|
||||
|
@ -35,4 +38,25 @@ public class IoTDeviceApiImpl implements IotDeviceCommonApi {
|
|||
return success(deviceService.authDevice(authReqDTO));
|
||||
}
|
||||
|
||||
@Override
|
||||
@PostMapping(RpcConstants.RPC_API_PREFIX + "/iot/device/info")
|
||||
@PermitAll
|
||||
public CommonResult<IotDeviceInfoRespDTO> getDeviceInfo(@RequestBody IotDeviceInfoReqDTO infoReqDTO) {
|
||||
IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceNameFromCache(
|
||||
infoReqDTO.getProductKey(), infoReqDTO.getDeviceName());
|
||||
|
||||
if (device == null) {
|
||||
return success(null);
|
||||
}
|
||||
|
||||
IotDeviceInfoRespDTO respDTO = new IotDeviceInfoRespDTO();
|
||||
respDTO.setDeviceId(device.getId());
|
||||
respDTO.setProductKey(device.getProductKey());
|
||||
respDTO.setDeviceName(device.getDeviceName());
|
||||
respDTO.setDeviceKey(device.getDeviceKey());
|
||||
respDTO.setTenantId(device.getTenantId());
|
||||
|
||||
return success(respDTO);
|
||||
}
|
||||
|
||||
}
|
|
@ -2,6 +2,8 @@ package cn.iocoder.yudao.module.iot.core.biz;
|
|||
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceInfoReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceInfoRespDTO;
|
||||
|
||||
/**
|
||||
* IoT 设备通用 API
|
||||
|
@ -10,6 +12,20 @@ import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
|
|||
*/
|
||||
public interface IotDeviceCommonApi {
|
||||
|
||||
/**
|
||||
* 设备认证
|
||||
*
|
||||
* @param authReqDTO 认证请求
|
||||
* @return 认证结果
|
||||
*/
|
||||
CommonResult<Boolean> authDevice(IotDeviceAuthReqDTO authReqDTO);
|
||||
|
||||
/**
|
||||
* 获取设备信息
|
||||
*
|
||||
* @param infoReqDTO 设备信息请求
|
||||
* @return 设备信息
|
||||
*/
|
||||
CommonResult<IotDeviceInfoRespDTO> getDeviceInfo(IotDeviceInfoReqDTO infoReqDTO);
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
package cn.iocoder.yudao.module.iot.core.biz.dto;
|
||||
|
||||
import jakarta.validation.constraints.NotBlank;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* IoT 设备信息查询 Request DTO
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Data
|
||||
public class IotDeviceInfoReqDTO {
|
||||
|
||||
/**
|
||||
* 产品标识
|
||||
*/
|
||||
@NotBlank(message = "产品标识不能为空")
|
||||
private String productKey;
|
||||
|
||||
/**
|
||||
* 设备名称
|
||||
*/
|
||||
@NotBlank(message = "设备名称不能为空")
|
||||
private String deviceName;
|
||||
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package cn.iocoder.yudao.module.iot.core.biz.dto;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
* IoT 设备信息 Response DTO
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Data
|
||||
public class IotDeviceInfoRespDTO {
|
||||
|
||||
/**
|
||||
* 设备编号
|
||||
*/
|
||||
private Long deviceId;
|
||||
|
||||
/**
|
||||
* 产品标识
|
||||
*/
|
||||
private String productKey;
|
||||
|
||||
/**
|
||||
* 设备名称
|
||||
*/
|
||||
private String deviceName;
|
||||
|
||||
/**
|
||||
* 设备密钥
|
||||
*/
|
||||
private String deviceKey;
|
||||
|
||||
/**
|
||||
* 租户编号
|
||||
*/
|
||||
private Long tenantId;
|
||||
|
||||
}
|
|
@ -1,14 +1,14 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.protocol.mqtt;
|
||||
|
||||
import cn.hutool.json.JSONObject;
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
|
||||
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageSubscriber;
|
||||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceCacheService;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
|
@ -24,6 +24,9 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
|
|||
private final IotMqttUpstreamProtocol protocol;
|
||||
private final IotMessageBus messageBus;
|
||||
|
||||
@Resource
|
||||
private IotDeviceCacheService deviceCacheService;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
messageBus.register(this);
|
||||
|
@ -45,24 +48,24 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
|
|||
log.info("[onMessage][接收到下行消息:{}]", message);
|
||||
|
||||
try {
|
||||
// 根据消息类型处理不同的下行消息
|
||||
String messageType = message.getType();
|
||||
switch (messageType) {
|
||||
case "property":
|
||||
// 根据消息方法处理不同的下行消息
|
||||
String method = message.getMethod();
|
||||
if (method == null) {
|
||||
log.warn("[onMessage][消息方法为空]");
|
||||
return;
|
||||
}
|
||||
|
||||
if (method.startsWith("thing.service.property.")) {
|
||||
handlePropertyMessage(message);
|
||||
break;
|
||||
case "service":
|
||||
} else if (method.startsWith("thing.service.") && !method.contains("property") && !method.contains("config")
|
||||
&& !method.contains("ota")) {
|
||||
handleServiceMessage(message);
|
||||
break;
|
||||
case "config":
|
||||
} else if (method.startsWith("thing.service.config.")) {
|
||||
handleConfigMessage(message);
|
||||
break;
|
||||
case "ota":
|
||||
} else if (method.startsWith("thing.service.ota.")) {
|
||||
handleOtaMessage(message);
|
||||
break;
|
||||
default:
|
||||
log.warn("[onMessage][未知的消息类型:{}]", messageType);
|
||||
break;
|
||||
} else {
|
||||
log.warn("[onMessage][未知的消息方法:{}]", method);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[onMessage][处理下行消息失败:{}]", message, e);
|
||||
|
@ -75,24 +78,32 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
|
|||
* @param message 设备消息
|
||||
*/
|
||||
private void handlePropertyMessage(IotDeviceMessage message) {
|
||||
String identifier = message.getIdentifier();
|
||||
String productKey = message.getProductKey();
|
||||
String deviceName = message.getDeviceName();
|
||||
String method = message.getMethod();
|
||||
|
||||
if ("set".equals(identifier)) {
|
||||
// 通过 deviceId 获取设备信息
|
||||
IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfoById(message.getDeviceId());
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[handlePropertyMessage][设备信息不存在][deviceId: {}]", message.getDeviceId());
|
||||
return;
|
||||
}
|
||||
|
||||
String productKey = deviceInfo.getProductKey();
|
||||
String deviceName = deviceInfo.getDeviceName();
|
||||
|
||||
if ("thing.service.property.set".equals(method)) {
|
||||
// 属性设置
|
||||
String topic = IotDeviceTopicEnum.buildPropertySetTopic(productKey, deviceName);
|
||||
JSONObject payload = buildDownstreamPayload(message, "thing.service.property.set");
|
||||
JSONObject payload = buildDownstreamPayload(message, method);
|
||||
protocol.publishMessage(topic, payload.toString());
|
||||
log.info("[handlePropertyMessage][发送属性设置消息][topic: {}]", topic);
|
||||
} else if ("get".equals(identifier)) {
|
||||
} else if ("thing.service.property.get".equals(method)) {
|
||||
// 属性获取
|
||||
String topic = IotDeviceTopicEnum.buildPropertyGetTopic(productKey, deviceName);
|
||||
JSONObject payload = buildDownstreamPayload(message, "thing.service.property.get");
|
||||
JSONObject payload = buildDownstreamPayload(message, method);
|
||||
protocol.publishMessage(topic, payload.toString());
|
||||
log.info("[handlePropertyMessage][发送属性获取消息][topic: {}]", topic);
|
||||
} else {
|
||||
log.warn("[handlePropertyMessage][未知的属性操作:{}]", identifier);
|
||||
log.warn("[handlePropertyMessage][未知的属性操作:{}]", method);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -102,12 +113,23 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
|
|||
* @param message 设备消息
|
||||
*/
|
||||
private void handleServiceMessage(IotDeviceMessage message) {
|
||||
String identifier = message.getIdentifier();
|
||||
String productKey = message.getProductKey();
|
||||
String deviceName = message.getDeviceName();
|
||||
String method = message.getMethod();
|
||||
|
||||
String topic = IotDeviceTopicEnum.buildServiceTopic(productKey, deviceName, identifier);
|
||||
JSONObject payload = buildDownstreamPayload(message, "thing.service." + identifier);
|
||||
// 通过 deviceId 获取设备信息
|
||||
IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfoById(message.getDeviceId());
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[handleServiceMessage][设备信息不存在][deviceId: {}]", message.getDeviceId());
|
||||
return;
|
||||
}
|
||||
|
||||
String productKey = deviceInfo.getProductKey();
|
||||
String deviceName = deviceInfo.getDeviceName();
|
||||
|
||||
// 从方法中提取服务标识符
|
||||
String serviceIdentifier = method.substring("thing.service.".length());
|
||||
|
||||
String topic = IotDeviceTopicEnum.buildServiceTopic(productKey, deviceName, serviceIdentifier);
|
||||
JSONObject payload = buildDownstreamPayload(message, method);
|
||||
protocol.publishMessage(topic, payload.toString());
|
||||
log.info("[handleServiceMessage][发送服务调用消息][topic: {}]", topic);
|
||||
}
|
||||
|
@ -118,11 +140,18 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
|
|||
* @param message 设备消息
|
||||
*/
|
||||
private void handleConfigMessage(IotDeviceMessage message) {
|
||||
String productKey = message.getProductKey();
|
||||
String deviceName = message.getDeviceName();
|
||||
// 通过 deviceId 获取设备信息
|
||||
IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfoById(message.getDeviceId());
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[handleConfigMessage][设备信息不存在][deviceId: {}]", message.getDeviceId());
|
||||
return;
|
||||
}
|
||||
|
||||
String productKey = deviceInfo.getProductKey();
|
||||
String deviceName = deviceInfo.getDeviceName();
|
||||
|
||||
String topic = IotDeviceTopicEnum.buildConfigSetTopic(productKey, deviceName);
|
||||
JSONObject payload = buildDownstreamPayload(message, "thing.service.config.set");
|
||||
JSONObject payload = buildDownstreamPayload(message, message.getMethod());
|
||||
protocol.publishMessage(topic, payload.toString());
|
||||
log.info("[handleConfigMessage][发送配置设置消息][topic: {}]", topic);
|
||||
}
|
||||
|
@ -133,11 +162,18 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
|
|||
* @param message 设备消息
|
||||
*/
|
||||
private void handleOtaMessage(IotDeviceMessage message) {
|
||||
String productKey = message.getProductKey();
|
||||
String deviceName = message.getDeviceName();
|
||||
// 通过 deviceId 获取设备信息
|
||||
IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfoById(message.getDeviceId());
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[handleOtaMessage][设备信息不存在][deviceId: {}]", message.getDeviceId());
|
||||
return;
|
||||
}
|
||||
|
||||
String productKey = deviceInfo.getProductKey();
|
||||
String deviceName = deviceInfo.getDeviceName();
|
||||
|
||||
String topic = IotDeviceTopicEnum.buildOtaUpgradeTopic(productKey, deviceName);
|
||||
JSONObject payload = buildDownstreamPayload(message, "thing.service.ota.upgrade");
|
||||
JSONObject payload = buildDownstreamPayload(message, message.getMethod());
|
||||
protocol.publishMessage(topic, payload.toString());
|
||||
log.info("[handleOtaMessage][发送 OTA 升级消息][topic: {}]", topic);
|
||||
}
|
||||
|
@ -151,7 +187,7 @@ public class IotMqttDownstreamSubscriber implements IotMessageSubscriber<IotDevi
|
|||
*/
|
||||
private JSONObject buildDownstreamPayload(IotDeviceMessage message, String method) {
|
||||
JSONObject payload = new JSONObject();
|
||||
payload.set("id", message.getMessageId());
|
||||
payload.set("id", message.getId()); // 使用正确的消息ID字段
|
||||
payload.set("version", "1.0");
|
||||
payload.set("method", method);
|
||||
payload.set("params", message.getData());
|
||||
|
|
|
@ -10,6 +10,7 @@ import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
|||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceAuthUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.auth.IotDeviceTokenService;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
|
@ -28,12 +29,14 @@ public class IotMqttAuthRouter {
|
|||
private final IotDeviceMessageProducer deviceMessageProducer;
|
||||
private final IotDeviceTokenService deviceTokenService;
|
||||
private final IotDeviceCommonApi deviceCommonApi;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
public IotMqttAuthRouter(IotMqttUpstreamProtocol protocol) {
|
||||
this.protocol = protocol;
|
||||
this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class);
|
||||
this.deviceTokenService = SpringUtil.getBean(IotDeviceTokenService.class);
|
||||
this.deviceCommonApi = SpringUtil.getBean(IotDeviceCommonApi.class);
|
||||
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -125,15 +128,15 @@ public class IotMqttAuthRouter {
|
|||
}
|
||||
|
||||
try {
|
||||
// 发送设备状态消息
|
||||
IotDeviceMessage message = IotDeviceMessage.of(
|
||||
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId());
|
||||
|
||||
// 使用 IotDeviceMessageService 构建设备状态消息
|
||||
IotDeviceMessage message;
|
||||
if (online) {
|
||||
message = message.ofStateOnline();
|
||||
message = deviceMessageService.buildDeviceMessageOfStateOnline(
|
||||
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId());
|
||||
log.info("[handleDeviceStateChange][发送设备上线消息成功][username: {}]", username);
|
||||
} else {
|
||||
message = message.ofStateOffline();
|
||||
message = deviceMessageService.buildDeviceMessageOfStateOffline(
|
||||
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), protocol.getServerId());
|
||||
log.info("[handleDeviceStateChange][发送设备下线消息成功][username: {}]", username);
|
||||
}
|
||||
|
||||
|
|
|
@ -6,10 +6,11 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
|||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Map;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* IoT 网关 MQTT 事件处理器
|
||||
|
@ -24,6 +25,7 @@ public class IotMqttEventHandler extends IotMqttAbstractHandler {
|
|||
|
||||
private final IotMqttUpstreamProtocol protocol;
|
||||
private final IotDeviceMessageProducer deviceMessageProducer;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
@Override
|
||||
public void handle(String topic, String payload) {
|
||||
|
@ -31,7 +33,6 @@ public class IotMqttEventHandler extends IotMqttAbstractHandler {
|
|||
log.info("[handle][接收到设备事件上报][topic: {}]", topic);
|
||||
|
||||
// 解析消息内容
|
||||
JSONObject jsonObject = JSONUtil.parseObj(payload);
|
||||
String[] topicParts = parseTopic(topic);
|
||||
if (topicParts == null) {
|
||||
return;
|
||||
|
@ -45,12 +46,10 @@ public class IotMqttEventHandler extends IotMqttAbstractHandler {
|
|||
return;
|
||||
}
|
||||
|
||||
Map<String, Object> eventData = parseEventDataFromPayload(jsonObject);
|
||||
IotDeviceMessage message = IotDeviceMessage.of(productKey, deviceName, protocol.getServerId());
|
||||
// 设置事件消息类型和标识符
|
||||
message.setType("event");
|
||||
message.setIdentifier(eventIdentifier);
|
||||
message.setData(eventData);
|
||||
// 使用 IotDeviceMessageService 解码消息
|
||||
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName, protocol.getServerId());
|
||||
|
||||
// 发送消息
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
|
@ -58,7 +57,7 @@ public class IotMqttEventHandler extends IotMqttAbstractHandler {
|
|||
|
||||
// 发送响应消息
|
||||
String method = "thing.event." + eventIdentifier + ".post";
|
||||
sendResponse(topic, jsonObject, method);
|
||||
sendResponse(topic, JSONUtil.parseObj(payload), method);
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][处理设备事件上报失败][topic: {}][payload: {}]", topic, payload, e);
|
||||
}
|
||||
|
@ -80,21 +79,6 @@ public class IotMqttEventHandler extends IotMqttAbstractHandler {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从消息载荷解析事件数据
|
||||
*
|
||||
* @param jsonObject 消息 JSON 对象
|
||||
* @return 事件数据映射
|
||||
*/
|
||||
private Map<String, Object> parseEventDataFromPayload(JSONObject jsonObject) {
|
||||
JSONObject params = jsonObject.getJSONObject("params");
|
||||
if (params == null) {
|
||||
log.warn("[parseEventDataFromPayload][消息格式不正确,缺少 params 字段][jsonObject: {}]", jsonObject);
|
||||
return Map.of();
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送响应消息
|
||||
*
|
||||
|
@ -103,6 +87,7 @@ public class IotMqttEventHandler extends IotMqttAbstractHandler {
|
|||
* @param method 响应方法
|
||||
*/
|
||||
private void sendResponse(String topic, JSONObject jsonObject, String method) {
|
||||
try {
|
||||
String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic);
|
||||
|
||||
// 构建响应消息
|
||||
|
@ -114,7 +99,10 @@ public class IotMqttEventHandler extends IotMqttAbstractHandler {
|
|||
|
||||
// 发送响应
|
||||
protocol.publishMessage(replyTopic, response.toString());
|
||||
log.debug("[sendResponse][发送响应消息][topic: {}]", replyTopic);
|
||||
log.debug("[sendResponse][发送响应消息成功][topic: {}]", replyTopic);
|
||||
} catch (Exception e) {
|
||||
log.error("[sendResponse][发送响应消息失败][topic: {}]", topic, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -6,10 +6,11 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
|||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Map;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* IoT 网关 MQTT 属性处理器
|
||||
|
@ -24,6 +25,7 @@ public class IotMqttPropertyHandler extends IotMqttAbstractHandler {
|
|||
|
||||
private final IotMqttUpstreamProtocol protocol;
|
||||
private final IotDeviceMessageProducer deviceMessageProducer;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
@Override
|
||||
public void handle(String topic, String payload) {
|
||||
|
@ -51,27 +53,26 @@ public class IotMqttPropertyHandler extends IotMqttAbstractHandler {
|
|||
try {
|
||||
log.info("[handlePropertyPost][接收到设备属性上报][topic: {}]", topic);
|
||||
|
||||
// 解析消息内容
|
||||
JSONObject jsonObject = JSONUtil.parseObj(payload);
|
||||
// 解析主题获取设备信息
|
||||
String[] topicParts = parseTopic(topic);
|
||||
if (topicParts == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 构建设备消息
|
||||
String productKey = topicParts[2];
|
||||
String deviceName = topicParts[3];
|
||||
Map<String, Object> properties = parsePropertiesFromPayload(jsonObject);
|
||||
|
||||
IotDeviceMessage message = IotDeviceMessage.of(productKey, deviceName, protocol.getServerId())
|
||||
.ofPropertyReport(properties);
|
||||
// 使用 IotDeviceMessageService 解码消息
|
||||
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName, protocol.getServerId());
|
||||
|
||||
// 发送消息
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
log.info("[handlePropertyPost][处理设备属性上报成功][topic: {}]", topic);
|
||||
|
||||
// 发送响应消息
|
||||
sendResponse(topic, jsonObject, "thing.event.property.post");
|
||||
sendResponse(topic, JSONUtil.parseObj(payload), "thing.event.property.post");
|
||||
} catch (Exception e) {
|
||||
log.error("[handlePropertyPost][处理设备属性上报失败][topic: {}][payload: {}]", topic, payload, e);
|
||||
}
|
||||
|
@ -86,7 +87,24 @@ public class IotMqttPropertyHandler extends IotMqttAbstractHandler {
|
|||
private void handlePropertySetReply(String topic, String payload) {
|
||||
try {
|
||||
log.info("[handlePropertySetReply][接收到属性设置响应][topic: {}]", topic);
|
||||
// TODO: 处理属性设置响应逻辑
|
||||
|
||||
// 解析主题获取设备信息
|
||||
String[] topicParts = parseTopic(topic);
|
||||
if (topicParts == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
String productKey = topicParts[2];
|
||||
String deviceName = topicParts[3];
|
||||
|
||||
// 使用 IotDeviceMessageService 解码消息
|
||||
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName, protocol.getServerId());
|
||||
|
||||
// 发送消息
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
log.info("[handlePropertySetReply][处理属性设置响应成功][topic: {}]", topic);
|
||||
} catch (Exception e) {
|
||||
log.error("[handlePropertySetReply][处理属性设置响应失败][topic: {}][payload: {}]", topic, payload, e);
|
||||
}
|
||||
|
@ -101,27 +119,29 @@ public class IotMqttPropertyHandler extends IotMqttAbstractHandler {
|
|||
private void handlePropertyGetReply(String topic, String payload) {
|
||||
try {
|
||||
log.info("[handlePropertyGetReply][接收到属性获取响应][topic: {}]", topic);
|
||||
// TODO: 处理属性获取响应逻辑
|
||||
|
||||
// 解析主题获取设备信息
|
||||
String[] topicParts = parseTopic(topic);
|
||||
if (topicParts == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
String productKey = topicParts[2];
|
||||
String deviceName = topicParts[3];
|
||||
|
||||
// 使用 IotDeviceMessageService 解码消息
|
||||
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName, protocol.getServerId());
|
||||
|
||||
// 发送消息
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
log.info("[handlePropertyGetReply][处理属性获取响应成功][topic: {}]", topic);
|
||||
} catch (Exception e) {
|
||||
log.error("[handlePropertyGetReply][处理属性获取响应失败][topic: {}][payload: {}]", topic, payload, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从消息载荷解析属性
|
||||
*
|
||||
* @param jsonObject 消息 JSON 对象
|
||||
* @return 属性映射
|
||||
*/
|
||||
private Map<String, Object> parsePropertiesFromPayload(JSONObject jsonObject) {
|
||||
JSONObject params = jsonObject.getJSONObject("params");
|
||||
if (params == null) {
|
||||
log.warn("[parsePropertiesFromPayload][消息格式不正确,缺少 params 字段][jsonObject: {}]", jsonObject);
|
||||
return Map.of();
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送响应消息
|
||||
*
|
||||
|
@ -130,6 +150,7 @@ public class IotMqttPropertyHandler extends IotMqttAbstractHandler {
|
|||
* @param method 响应方法
|
||||
*/
|
||||
private void sendResponse(String topic, JSONObject jsonObject, String method) {
|
||||
try {
|
||||
String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic);
|
||||
|
||||
// 构建响应消息
|
||||
|
@ -141,7 +162,10 @@ public class IotMqttPropertyHandler extends IotMqttAbstractHandler {
|
|||
|
||||
// 发送响应
|
||||
protocol.publishMessage(replyTopic, response.toString());
|
||||
log.debug("[sendResponse][发送响应消息][topic: {}]", replyTopic);
|
||||
log.debug("[sendResponse][发送响应消息成功][topic: {}]", replyTopic);
|
||||
} catch (Exception e) {
|
||||
log.error("[sendResponse][发送响应消息失败][topic: {}]", topic, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -6,10 +6,11 @@ import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
|||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Map;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
/**
|
||||
* IoT 网关 MQTT 服务处理器
|
||||
|
@ -24,6 +25,7 @@ public class IotMqttServiceHandler extends IotMqttAbstractHandler {
|
|||
|
||||
private final IotMqttUpstreamProtocol protocol;
|
||||
private final IotDeviceMessageProducer deviceMessageProducer;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
@Override
|
||||
public void handle(String topic, String payload) {
|
||||
|
@ -31,7 +33,6 @@ public class IotMqttServiceHandler extends IotMqttAbstractHandler {
|
|||
log.info("[handle][接收到设备服务调用响应][topic: {}]", topic);
|
||||
|
||||
// 解析消息内容
|
||||
JSONObject jsonObject = JSONUtil.parseObj(payload);
|
||||
String[] topicParts = parseTopic(topic);
|
||||
if (topicParts == null) {
|
||||
return;
|
||||
|
@ -45,12 +46,10 @@ public class IotMqttServiceHandler extends IotMqttAbstractHandler {
|
|||
return;
|
||||
}
|
||||
|
||||
Map<String, Object> serviceData = parseServiceDataFromPayload(jsonObject);
|
||||
IotDeviceMessage message = IotDeviceMessage.of(productKey, deviceName, protocol.getServerId());
|
||||
// 设置服务消息类型和标识符
|
||||
message.setType("service");
|
||||
message.setIdentifier(serviceIdentifier);
|
||||
message.setData(serviceData);
|
||||
// 使用 IotDeviceMessageService 解码消息
|
||||
byte[] messageBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
IotDeviceMessage message = deviceMessageService.decodeDeviceMessage(
|
||||
messageBytes, productKey, deviceName, protocol.getServerId());
|
||||
|
||||
// 发送消息
|
||||
deviceMessageProducer.sendDeviceMessage(message);
|
||||
|
@ -58,7 +57,7 @@ public class IotMqttServiceHandler extends IotMqttAbstractHandler {
|
|||
|
||||
// 发送响应消息
|
||||
String method = "thing.service." + serviceIdentifier;
|
||||
sendResponse(topic, jsonObject, method);
|
||||
sendResponse(topic, JSONUtil.parseObj(payload), method);
|
||||
} catch (Exception e) {
|
||||
log.error("[handle][处理设备服务调用响应失败][topic: {}][payload: {}]", topic, payload, e);
|
||||
}
|
||||
|
@ -81,21 +80,6 @@ public class IotMqttServiceHandler extends IotMqttAbstractHandler {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从消息载荷解析服务数据
|
||||
*
|
||||
* @param jsonObject 消息 JSON 对象
|
||||
* @return 服务数据映射
|
||||
*/
|
||||
private Map<String, Object> parseServiceDataFromPayload(JSONObject jsonObject) {
|
||||
JSONObject params = jsonObject.getJSONObject("params");
|
||||
if (params == null) {
|
||||
log.warn("[parseServiceDataFromPayload][消息格式不正确,缺少 params 字段][jsonObject: {}]", jsonObject);
|
||||
return Map.of();
|
||||
}
|
||||
return params;
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送响应消息
|
||||
*
|
||||
|
@ -104,6 +88,7 @@ public class IotMqttServiceHandler extends IotMqttAbstractHandler {
|
|||
* @param method 响应方法
|
||||
*/
|
||||
private void sendResponse(String topic, JSONObject jsonObject, String method) {
|
||||
try {
|
||||
String replyTopic = IotDeviceTopicEnum.getReplyTopic(topic);
|
||||
|
||||
// 构建响应消息
|
||||
|
@ -115,7 +100,10 @@ public class IotMqttServiceHandler extends IotMqttAbstractHandler {
|
|||
|
||||
// 发送响应
|
||||
protocol.publishMessage(replyTopic, response.toString());
|
||||
log.debug("[sendResponse][发送响应消息][topic: {}]", replyTopic);
|
||||
log.debug("[sendResponse][发送响应消息成功][topic: {}]", replyTopic);
|
||||
} catch (Exception e) {
|
||||
log.error("[sendResponse][发送响应消息失败][topic: {}]", topic, e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -5,6 +5,7 @@ import cn.hutool.extra.spring.SpringUtil;
|
|||
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
|
||||
import cn.iocoder.yudao.module.iot.gateway.enums.IotDeviceTopicEnum;
|
||||
import cn.iocoder.yudao.module.iot.gateway.protocol.mqtt.IotMqttUpstreamProtocol;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.message.IotDeviceMessageService;
|
||||
import io.vertx.mqtt.messages.MqttPublishMessage;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
@ -22,6 +23,7 @@ public class IotMqttUpstreamRouter {
|
|||
|
||||
private final IotMqttUpstreamProtocol protocol;
|
||||
private final IotDeviceMessageProducer deviceMessageProducer;
|
||||
private final IotDeviceMessageService deviceMessageService;
|
||||
|
||||
// 处理器实例
|
||||
private IotMqttPropertyHandler propertyHandler;
|
||||
|
@ -31,10 +33,11 @@ public class IotMqttUpstreamRouter {
|
|||
public IotMqttUpstreamRouter(IotMqttUpstreamProtocol protocol) {
|
||||
this.protocol = protocol;
|
||||
this.deviceMessageProducer = SpringUtil.getBean(IotDeviceMessageProducer.class);
|
||||
this.deviceMessageService = SpringUtil.getBean(IotDeviceMessageService.class);
|
||||
// 初始化处理器
|
||||
this.propertyHandler = new IotMqttPropertyHandler(protocol, deviceMessageProducer);
|
||||
this.eventHandler = new IotMqttEventHandler(protocol, deviceMessageProducer);
|
||||
this.serviceHandler = new IotMqttServiceHandler(protocol, deviceMessageProducer);
|
||||
this.propertyHandler = new IotMqttPropertyHandler(protocol, deviceMessageProducer, deviceMessageService);
|
||||
this.eventHandler = new IotMqttEventHandler(protocol, deviceMessageProducer, deviceMessageService);
|
||||
this.serviceHandler = new IotMqttServiceHandler(protocol, deviceMessageProducer, deviceMessageService);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.service.device;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* IoT 设备缓存 Service 接口
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public interface IotDeviceCacheService {
|
||||
|
||||
/**
|
||||
* 设备信息
|
||||
*/
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
class DeviceInfo {
|
||||
/**
|
||||
* 设备编号
|
||||
*/
|
||||
private Long deviceId;
|
||||
/**
|
||||
* 产品标识
|
||||
*/
|
||||
private String productKey;
|
||||
/**
|
||||
* 设备名称
|
||||
*/
|
||||
private String deviceName;
|
||||
/**
|
||||
* 设备密钥
|
||||
*/
|
||||
private String deviceKey;
|
||||
/**
|
||||
* 租户编号
|
||||
*/
|
||||
private Long tenantId;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据 productKey 和 deviceName 获取设备信息
|
||||
*
|
||||
* @param productKey 产品标识
|
||||
* @param deviceName 设备名称
|
||||
* @return 设备信息,如果不存在返回 null
|
||||
*/
|
||||
DeviceInfo getDeviceInfo(String productKey, String deviceName);
|
||||
|
||||
/**
|
||||
* 根据 deviceId 获取设备信息
|
||||
*
|
||||
* @param deviceId 设备编号
|
||||
* @return 设备信息,如果不存在返回 null
|
||||
*/
|
||||
DeviceInfo getDeviceInfoById(Long deviceId);
|
||||
|
||||
/**
|
||||
* 清除设备缓存
|
||||
*
|
||||
* @param deviceId 设备编号
|
||||
*/
|
||||
void evictDeviceCache(Long deviceId);
|
||||
|
||||
/**
|
||||
* 清除设备缓存
|
||||
*
|
||||
* @param productKey 产品标识
|
||||
* @param deviceName 设备名称
|
||||
*/
|
||||
void evictDeviceCache(String productKey, String deviceName);
|
||||
|
||||
}
|
|
@ -0,0 +1,241 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.service.device;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.extra.spring.SpringUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceInfoReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceInfoRespDTO;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* IoT 设备缓存 Service 实现类
|
||||
* <p>
|
||||
* 使用本地缓存 + 远程 API 的方式获取设备信息,提高性能并避免敏感信息传输
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class IotDeviceCacheServiceImpl implements IotDeviceCacheService {
|
||||
|
||||
/**
|
||||
* 设备信息本地缓存
|
||||
* Key: deviceId
|
||||
* Value: DeviceInfo
|
||||
*/
|
||||
private final ConcurrentHashMap<Long, DeviceInfo> deviceIdCache = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 设备名称到设备ID的映射缓存
|
||||
* Key: productKey:deviceName
|
||||
* Value: deviceId
|
||||
*/
|
||||
private final ConcurrentHashMap<String, Long> deviceNameCache = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 锁对象,防止并发请求同一设备信息
|
||||
*/
|
||||
private final ConcurrentHashMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public DeviceInfo getDeviceInfo(String productKey, String deviceName) {
|
||||
if (StrUtil.isEmpty(productKey) || StrUtil.isEmpty(deviceName)) {
|
||||
log.warn("[getDeviceInfo][参数为空][productKey: {}][deviceName: {}]", productKey, deviceName);
|
||||
return null;
|
||||
}
|
||||
|
||||
String cacheKey = buildDeviceNameCacheKey(productKey, deviceName);
|
||||
|
||||
// 1. 先从缓存获取 deviceId
|
||||
Long deviceId = deviceNameCache.get(cacheKey);
|
||||
if (deviceId != null) {
|
||||
DeviceInfo deviceInfo = deviceIdCache.get(deviceId);
|
||||
if (deviceInfo != null) {
|
||||
log.debug("[getDeviceInfo][缓存命中][productKey: {}][deviceName: {}][deviceId: {}]",
|
||||
productKey, deviceName, deviceId);
|
||||
return deviceInfo;
|
||||
}
|
||||
}
|
||||
|
||||
// 2. 缓存未命中,从远程 API 获取
|
||||
return loadDeviceInfoFromApi(productKey, deviceName, cacheKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeviceInfo getDeviceInfoById(Long deviceId) {
|
||||
if (deviceId == null) {
|
||||
log.warn("[getDeviceInfoById][deviceId 为空]");
|
||||
return null;
|
||||
}
|
||||
|
||||
// 1. 先从缓存获取
|
||||
DeviceInfo deviceInfo = deviceIdCache.get(deviceId);
|
||||
if (deviceInfo != null) {
|
||||
log.debug("[getDeviceInfoById][缓存命中][deviceId: {}]", deviceId);
|
||||
return deviceInfo;
|
||||
}
|
||||
|
||||
// 2. 缓存未命中,从远程 API 获取
|
||||
return loadDeviceInfoByIdFromApi(deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evictDeviceCache(Long deviceId) {
|
||||
if (deviceId == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
DeviceInfo deviceInfo = deviceIdCache.remove(deviceId);
|
||||
if (deviceInfo != null) {
|
||||
String cacheKey = buildDeviceNameCacheKey(deviceInfo.getProductKey(), deviceInfo.getDeviceName());
|
||||
deviceNameCache.remove(cacheKey);
|
||||
log.info("[evictDeviceCache][清除设备缓存][deviceId: {}]", deviceId);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evictDeviceCache(String productKey, String deviceName) {
|
||||
if (StrUtil.isEmpty(productKey) || StrUtil.isEmpty(deviceName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
String cacheKey = buildDeviceNameCacheKey(productKey, deviceName);
|
||||
Long deviceId = deviceNameCache.remove(cacheKey);
|
||||
if (deviceId != null) {
|
||||
deviceIdCache.remove(deviceId);
|
||||
log.info("[evictDeviceCache][清除设备缓存][productKey: {}][deviceName: {}]", productKey, deviceName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从远程 API 加载设备信息
|
||||
*
|
||||
* @param productKey 产品标识
|
||||
* @param deviceName 设备名称
|
||||
* @param cacheKey 缓存键
|
||||
* @return 设备信息
|
||||
*/
|
||||
private DeviceInfo loadDeviceInfoFromApi(String productKey, String deviceName, String cacheKey) {
|
||||
// 使用锁防止并发请求同一设备信息
|
||||
ReentrantLock lock = lockMap.computeIfAbsent(cacheKey, k -> new ReentrantLock());
|
||||
lock.lock();
|
||||
try {
|
||||
// 双重检查,防止重复请求
|
||||
Long deviceId = deviceNameCache.get(cacheKey);
|
||||
if (deviceId != null) {
|
||||
DeviceInfo deviceInfo = deviceIdCache.get(deviceId);
|
||||
if (deviceInfo != null) {
|
||||
return deviceInfo;
|
||||
}
|
||||
}
|
||||
|
||||
log.info("[loadDeviceInfoFromApi][从远程API获取设备信息][productKey: {}][deviceName: {}]",
|
||||
productKey, deviceName);
|
||||
|
||||
try {
|
||||
// 调用远程 API 获取设备信息
|
||||
IotDeviceCommonApi deviceCommonApi = SpringUtil.getBean(IotDeviceCommonApi.class);
|
||||
IotDeviceInfoReqDTO reqDTO = new IotDeviceInfoReqDTO();
|
||||
reqDTO.setProductKey(productKey);
|
||||
reqDTO.setDeviceName(deviceName);
|
||||
|
||||
CommonResult<IotDeviceInfoRespDTO> result = deviceCommonApi.getDeviceInfo(reqDTO);
|
||||
|
||||
if (result == null || !result.isSuccess() || result.getData() == null) {
|
||||
log.warn("[loadDeviceInfoFromApi][远程API调用失败][productKey: {}][deviceName: {}][result: {}]",
|
||||
productKey, deviceName, result);
|
||||
return null;
|
||||
}
|
||||
|
||||
IotDeviceInfoRespDTO respDTO = result.getData();
|
||||
DeviceInfo deviceInfo = new DeviceInfo(
|
||||
respDTO.getDeviceId(),
|
||||
respDTO.getProductKey(),
|
||||
respDTO.getDeviceName(),
|
||||
respDTO.getDeviceKey(),
|
||||
respDTO.getTenantId());
|
||||
|
||||
// 缓存设备信息
|
||||
cacheDeviceInfo(deviceInfo, cacheKey);
|
||||
|
||||
log.info("[loadDeviceInfoFromApi][设备信息获取成功并已缓存][deviceInfo: {}]", deviceInfo);
|
||||
return deviceInfo;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[loadDeviceInfoFromApi][远程API调用异常][productKey: {}][deviceName: {}]",
|
||||
productKey, deviceName, e);
|
||||
return null;
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
// 清理锁对象,避免内存泄漏
|
||||
if (lockMap.size() > 1000) { // 简单的清理策略
|
||||
lockMap.entrySet().removeIf(entry -> !entry.getValue().hasQueuedThreads());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从远程 API 根据 deviceId 加载设备信息
|
||||
*
|
||||
* @param deviceId 设备编号
|
||||
* @return 设备信息
|
||||
*/
|
||||
private DeviceInfo loadDeviceInfoByIdFromApi(Long deviceId) {
|
||||
String lockKey = "deviceId:" + deviceId;
|
||||
ReentrantLock lock = lockMap.computeIfAbsent(lockKey, k -> new ReentrantLock());
|
||||
lock.lock();
|
||||
try {
|
||||
// 双重检查
|
||||
DeviceInfo deviceInfo = deviceIdCache.get(deviceId);
|
||||
if (deviceInfo != null) {
|
||||
return deviceInfo;
|
||||
}
|
||||
|
||||
log.info("[loadDeviceInfoByIdFromApi][从远程API获取设备信息][deviceId: {}]", deviceId);
|
||||
|
||||
try {
|
||||
// TODO: 这里需要添加根据 deviceId 获取设备信息的 API
|
||||
// 暂时返回 null,等待 API 完善
|
||||
log.warn("[loadDeviceInfoByIdFromApi][根据deviceId获取设备信息的API尚未实现][deviceId: {}]", deviceId);
|
||||
return null;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[loadDeviceInfoByIdFromApi][远程API调用异常][deviceId: {}]", deviceId, e);
|
||||
return null;
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 缓存设备信息
|
||||
*
|
||||
* @param deviceInfo 设备信息
|
||||
* @param cacheKey 缓存键
|
||||
*/
|
||||
private void cacheDeviceInfo(DeviceInfo deviceInfo, String cacheKey) {
|
||||
if (deviceInfo != null && deviceInfo.getDeviceId() != null) {
|
||||
deviceIdCache.put(deviceInfo.getDeviceId(), deviceInfo);
|
||||
deviceNameCache.put(cacheKey, deviceInfo.getDeviceId());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建设备名称缓存键
|
||||
*
|
||||
* @param productKey 产品标识
|
||||
* @param deviceName 设备名称
|
||||
* @return 缓存键
|
||||
*/
|
||||
private String buildDeviceNameCacheKey(String productKey, String deviceName) {
|
||||
return productKey + ":" + deviceName;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,17 +1,26 @@
|
|||
package cn.iocoder.yudao.module.iot.gateway.service.device;
|
||||
|
||||
import cn.hutool.core.lang.Assert;
|
||||
import cn.hutool.core.bean.BeanUtil;
|
||||
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.IotDeviceCommonApi;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceAuthReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceInfoReqDTO;
|
||||
import cn.iocoder.yudao.module.iot.core.biz.dto.IotDeviceInfoRespDTO;
|
||||
import cn.iocoder.yudao.module.iot.gateway.config.IotGatewayProperties;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.web.client.RestTemplateBuilder;
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
|
||||
import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeConstants.INTERNAL_SERVER_ERROR;
|
||||
|
||||
/**
|
||||
|
@ -43,6 +52,11 @@ public class IotDeviceClientServiceImpl implements IotDeviceCommonApi {
|
|||
return doPost("/auth", authReqDTO);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CommonResult<IotDeviceInfoRespDTO> getDeviceInfo(IotDeviceInfoReqDTO infoReqDTO) {
|
||||
return doPostForDeviceInfo("/info", infoReqDTO);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T> CommonResult<Boolean> doPost(String url, T requestBody) {
|
||||
try {
|
||||
|
@ -57,4 +71,37 @@ public class IotDeviceClientServiceImpl implements IotDeviceCommonApi {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T> CommonResult<IotDeviceInfoRespDTO> doPostForDeviceInfo(String url, T requestBody) {
|
||||
try {
|
||||
// 使用 ParameterizedTypeReference 来处理泛型类型
|
||||
ParameterizedTypeReference<CommonResult<LinkedHashMap<String, Object>>> typeRef = new ParameterizedTypeReference<CommonResult<LinkedHashMap<String, Object>>>() {
|
||||
};
|
||||
|
||||
HttpEntity<T> requestEntity = new HttpEntity<>(requestBody);
|
||||
ResponseEntity<CommonResult<LinkedHashMap<String, Object>>> response = restTemplate.exchange(url,
|
||||
HttpMethod.POST, requestEntity, typeRef);
|
||||
|
||||
CommonResult<LinkedHashMap<String, Object>> rawResult = response.getBody();
|
||||
log.info("[doPostForDeviceInfo][url({}) requestBody({}) rawResult({})]", url, requestBody, rawResult);
|
||||
Assert.notNull(rawResult, "请求结果不能为空");
|
||||
|
||||
// 手动转换数据类型
|
||||
CommonResult<IotDeviceInfoRespDTO> result = new CommonResult<>();
|
||||
result.setCode(rawResult.getCode());
|
||||
result.setMsg(rawResult.getMsg());
|
||||
|
||||
if (rawResult.isSuccess() && rawResult.getData() != null) {
|
||||
// 将 LinkedHashMap 转换为 IotDeviceInfoRespDTO
|
||||
IotDeviceInfoRespDTO deviceInfo = BeanUtil.toBean(rawResult.getData(), IotDeviceInfoRespDTO.class);
|
||||
result.setData(deviceInfo);
|
||||
}
|
||||
|
||||
return result;
|
||||
} catch (Exception e) {
|
||||
log.error("[doPostForDeviceInfo][url({}) requestBody({}) 发生异常]", url, requestBody, e);
|
||||
return CommonResult.error(INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -35,8 +35,21 @@ public interface IotDeviceMessageService {
|
|||
/**
|
||||
* 构建【设备上线】消息
|
||||
*
|
||||
* @param productKey 产品 Key
|
||||
* @param deviceName 设备名称
|
||||
* @param serverId 设备连接的 serverId
|
||||
* @return 消息
|
||||
*/
|
||||
IotDeviceMessage buildDeviceMessageOfStateOnline(String productKey, String deviceName, String serverId);
|
||||
|
||||
/**
|
||||
* 构建【设备下线】消息
|
||||
*
|
||||
* @param productKey 产品 Key
|
||||
* @param deviceName 设备名称
|
||||
* @param serverId 设备连接的 serverId
|
||||
* @return 消息
|
||||
*/
|
||||
IotDeviceMessage buildDeviceMessageOfStateOffline(String productKey, String deviceName, String serverId);
|
||||
|
||||
}
|
||||
|
|
|
@ -6,6 +6,9 @@ import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageMethodEnum;
|
|||
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
|
||||
import cn.iocoder.yudao.module.iot.core.util.IotDeviceMessageUtils;
|
||||
import cn.iocoder.yudao.module.iot.gateway.codec.alink.IotAlinkDeviceMessageCodec;
|
||||
import cn.iocoder.yudao.module.iot.gateway.service.device.IotDeviceCacheService;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
@ -18,6 +21,7 @@ import java.util.Map;
|
|||
* @author 芋道源码
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
|
||||
|
||||
/**
|
||||
|
@ -25,6 +29,9 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
|
|||
*/
|
||||
private final Map<String, IotAlinkDeviceMessageCodec> codes;
|
||||
|
||||
@Resource
|
||||
private IotDeviceCacheService deviceCacheService;
|
||||
|
||||
public IotDeviceMessageServiceImpl(List<IotAlinkDeviceMessageCodec> codes) {
|
||||
this.codes = CollectionUtils.convertMap(codes, IotAlinkDeviceMessageCodec::type);
|
||||
}
|
||||
|
@ -32,51 +39,106 @@ public class IotDeviceMessageServiceImpl implements IotDeviceMessageService {
|
|||
@Override
|
||||
public byte[] encodeDeviceMessage(IotDeviceMessage message,
|
||||
String productKey, String deviceName) {
|
||||
// TODO @芋艿:获取设备信息
|
||||
String codecType = "alink";
|
||||
return codes.get(codecType).encode(message);
|
||||
// 获取设备信息以确定编解码类型
|
||||
IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfo(productKey, deviceName);
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[encodeDeviceMessage][设备信息不存在][productKey: {}][deviceName: {}]",
|
||||
productKey, deviceName);
|
||||
return null;
|
||||
}
|
||||
|
||||
String codecType = "alink"; // 默认使用 alink 编解码器
|
||||
IotAlinkDeviceMessageCodec codec = codes.get(codecType);
|
||||
if (codec == null) {
|
||||
log.error("[encodeDeviceMessage][编解码器不存在][codecType: {}]", codecType);
|
||||
return null;
|
||||
}
|
||||
|
||||
return codec.encode(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IotDeviceMessage decodeDeviceMessage(byte[] bytes,
|
||||
String productKey, String deviceName, String serverId) {
|
||||
// TODO @芋艿:获取设备信息
|
||||
String codecType = "alink";
|
||||
IotDeviceMessage message = codes.get(codecType).decode(bytes);
|
||||
// 获取设备信息
|
||||
IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfo(productKey, deviceName);
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[decodeDeviceMessage][设备信息不存在][productKey: {}][deviceName: {}]",
|
||||
productKey, deviceName);
|
||||
return null;
|
||||
}
|
||||
|
||||
String codecType = "alink"; // 默认使用 alink 编解码器
|
||||
IotAlinkDeviceMessageCodec codec = codes.get(codecType);
|
||||
if (codec == null) {
|
||||
log.error("[decodeDeviceMessage][编解码器不存在][codecType: {}]", codecType);
|
||||
return null;
|
||||
}
|
||||
|
||||
IotDeviceMessage message = codec.decode(bytes);
|
||||
if (message == null) {
|
||||
log.warn("[decodeDeviceMessage][消息解码失败][productKey: {}][deviceName: {}]",
|
||||
productKey, deviceName);
|
||||
return null;
|
||||
}
|
||||
|
||||
// 补充后端字段
|
||||
Long deviceId = 25L;
|
||||
Long tenantId = 1L;
|
||||
appendDeviceMessage(message, deviceId, tenantId, serverId);
|
||||
return message;
|
||||
return appendDeviceMessage(message, deviceInfo, serverId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IotDeviceMessage buildDeviceMessageOfStateOnline(String productKey, String deviceName, String serverId) {
|
||||
// 获取设备信息
|
||||
IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfo(productKey, deviceName);
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[buildDeviceMessageOfStateOnline][设备信息不存在][productKey: {}][deviceName: {}]",
|
||||
productKey, deviceName);
|
||||
return null;
|
||||
}
|
||||
|
||||
IotDeviceMessage message = IotDeviceMessage.of(null,
|
||||
IotDeviceMessageMethodEnum.STATE_ONLINE.getMethod(), null);
|
||||
// 补充后端字段
|
||||
Long deviceId = 25L;
|
||||
Long tenantId = 1L;
|
||||
return appendDeviceMessage(message, deviceId, tenantId, serverId);
|
||||
|
||||
return appendDeviceMessage(message, deviceInfo, serverId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IotDeviceMessage buildDeviceMessageOfStateOffline(String productKey, String deviceName, String serverId) {
|
||||
// 获取设备信息
|
||||
IotDeviceCacheService.DeviceInfo deviceInfo = deviceCacheService.getDeviceInfo(productKey, deviceName);
|
||||
if (deviceInfo == null) {
|
||||
log.warn("[buildDeviceMessageOfStateOffline][设备信息不存在][productKey: {}][deviceName: {}]",
|
||||
productKey, deviceName);
|
||||
return null;
|
||||
}
|
||||
|
||||
IotDeviceMessage message = IotDeviceMessage.of(null,
|
||||
IotDeviceMessageMethodEnum.STATE_OFFLINE.getMethod(), null);
|
||||
|
||||
return appendDeviceMessage(message, deviceInfo, serverId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 补充消息的后端字段
|
||||
*
|
||||
* @param message 消息
|
||||
* @param deviceId 设备编号
|
||||
* @param tenantId 租户编号
|
||||
* @param deviceInfo 设备信息
|
||||
* @param serverId 设备连接的 serverId
|
||||
* @return 消息
|
||||
*/
|
||||
private IotDeviceMessage appendDeviceMessage(IotDeviceMessage message,
|
||||
Long deviceId, Long tenantId, String serverId) {
|
||||
IotDeviceCacheService.DeviceInfo deviceInfo, String serverId) {
|
||||
message.setId(IotDeviceMessageUtils.generateMessageId()).setReportTime(LocalDateTime.now())
|
||||
.setDeviceId(deviceId).setTenantId(tenantId).setServerId(serverId);
|
||||
.setDeviceId(deviceInfo.getDeviceId()).setTenantId(deviceInfo.getTenantId()).setServerId(serverId);
|
||||
|
||||
// 特殊:如果设备没有指定 requestId,则使用 messageId
|
||||
if (StrUtil.isEmpty(message.getRequestId())) {
|
||||
message.setRequestId(message.getId());
|
||||
}
|
||||
|
||||
log.debug("[appendDeviceMessage][消息字段补充完成][deviceId: {}][tenantId: {}]",
|
||||
deviceInfo.getDeviceId(), deviceInfo.getTenantId());
|
||||
|
||||
return message;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue