From 8c84ac9d8a88dd73d03746274f4f6e2de7004668 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=E5=AE=89=E6=B5=A9=E6=B5=A9?= <1036606149@qq.com>
Date: Sun, 27 Oct 2024 21:26:35 +0800
Subject: [PATCH] =?UTF-8?q?=E3=80=90=E6=96=B0=E5=A2=9E=E5=8A=9F=E8=83=BD?=
=?UTF-8?q?=E3=80=91=20mqtt=20=E6=95=B0=E6=8D=AE=E6=8E=A5=E6=94=B6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
yudao-module-iot/yudao-module-iot-biz/pom.xml | 5 ++
.../tdengine/ThingModelMessage.java | 70 +++++++++++++++++++
.../iot/emq/service/EmqxServiceImpl.java | 14 +++-
.../service/device/IotDeviceDataService.java | 26 +++++++
.../device/IotDeviceDataServiceImpl.java | 39 +++++++++++
.../iot/service/device/IotDeviceService.java | 9 +++
...iceImpl.java => IotDeviceServiceImpl.java} | 9 ++-
.../IotDbStructureDataServiceImpl.java | 16 ++---
...neService.java => IotTdEngineService.java} | 10 +--
...eImpl.java => IotTdEngineServiceImpl.java} | 2 +-
.../tdengine/IotThingModelMessageService.java | 16 +++++
.../IotThingModelMessageServiceImpl.java | 13 ++++
.../mapper/tdengine/TdEngineMapper.xml | 18 +++--
13 files changed, 224 insertions(+), 23 deletions(-)
create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/tdengine/ThingModelMessage.java
create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceDataService.java
create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceDataServiceImpl.java
rename yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/{DeviceServiceImpl.java => IotDeviceServiceImpl.java} (95%)
rename yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/tdengine/{TdEngineService.java => IotTdEngineService.java} (91%)
rename yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/tdengine/{TdEngineServiceImpl.java => IotTdEngineServiceImpl.java} (97%)
create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/tdengine/IotThingModelMessageService.java
create mode 100644 yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/tdengine/IotThingModelMessageServiceImpl.java
diff --git a/yudao-module-iot/yudao-module-iot-biz/pom.xml b/yudao-module-iot/yudao-module-iot-biz/pom.xml
index cb2fd818f9..013287d2b9 100644
--- a/yudao-module-iot/yudao-module-iot-biz/pom.xml
+++ b/yudao-module-iot/yudao-module-iot-biz/pom.xml
@@ -25,6 +25,11 @@
${revision}
+
+ cn.iocoder.boot
+ yudao-spring-boot-starter-biz-tenant
+
+
cn.iocoder.boot
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/tdengine/ThingModelMessage.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/tdengine/ThingModelMessage.java
new file mode 100644
index 0000000000..d5009dc244
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/tdengine/ThingModelMessage.java
@@ -0,0 +1,70 @@
+package cn.iocoder.yudao.module.iot.dal.dataobject.tdengine;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 物模型消息
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@Builder
+public class ThingModelMessage {
+
+ /**
+ * 消息ID
+ */
+ private String id;
+
+ /**
+ * 扩展功能的参数
+ */
+ private Object sys;
+
+ /**
+ * 请求方法 例如:thing.event.property.post
+ */
+ private String method;
+
+ /**
+ * 请求参数
+ */
+ private Object params;
+
+ /**
+ * 属性上报时间戳
+ */
+ private Long time;
+
+ /**
+ * 设备信息
+ */
+ private String productKey;
+
+ /**
+ * 设备名称
+ */
+ private String deviceName;
+
+ /**
+ * 设备 key
+ */
+ private String deviceKey;
+
+ /**
+ * 转换为 Map 类型
+ */
+ public Map dataToMap() {
+ Map mapData = new HashMap<>();
+ if (params instanceof Map) {
+ ((Map, ?>) params).forEach((key, value) -> mapData.put(key.toString(), value));
+ }
+ return mapData;
+ }
+}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/emq/service/EmqxServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/emq/service/EmqxServiceImpl.java
index 0c1a87f7ff..c82d0d3057 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/emq/service/EmqxServiceImpl.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/emq/service/EmqxServiceImpl.java
@@ -1,5 +1,8 @@
package cn.iocoder.yudao.module.iot.emq.service;
+import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
+import cn.iocoder.yudao.module.iot.service.device.IotDeviceDataService;
+import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@@ -16,13 +19,20 @@ import org.springframework.stereotype.Service;
@Service
public class EmqxServiceImpl implements EmqxService {
+ @Resource
+ private IotDeviceDataService iotDeviceDataService;
+
// TODO 多线程处理消息
@Override
public void subscribeCallback(String topic, MqttMessage mqttMessage) {
log.info("收到消息,主题: {}, 内容: {}", topic, new String(mqttMessage.getPayload()));
// 根据不同的主题,处理不同的业务逻辑
if (topic.contains("/property/post")) {
- // 设备上报数据
+ // 设备上报数据 topic /sys/f13f57c63e9/dianbiao1/thing/event/property/post
+ String productKey = topic.split("/")[2];
+ String deviceName = topic.split("/")[3];
+ String message = new String(mqttMessage.getPayload());
+ iotDeviceDataService.saveDeviceData(productKey, deviceName, message);
}
}
@@ -30,7 +40,7 @@ public class EmqxServiceImpl implements EmqxService {
public void subscribe(MqttClient client) {
try {
// 订阅默认主题,可以根据需要修改
-// client.subscribe("$share/yudao/+/+/#", 1);
+ client.subscribe("/sys/+/+/#", 1);
log.info("订阅默认主题成功");
} catch (Exception e) {
log.error("订阅默认主题失败", e);
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceDataService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceDataService.java
new file mode 100644
index 0000000000..4f3ef37d6c
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceDataService.java
@@ -0,0 +1,26 @@
+package cn.iocoder.yudao.module.iot.service.device;
+
+import cn.iocoder.yudao.framework.common.pojo.PageResult;
+import cn.iocoder.yudao.module.iot.controller.admin.device.vo.IotDevicePageReqVO;
+import cn.iocoder.yudao.module.iot.controller.admin.device.vo.IotDeviceSaveReqVO;
+import cn.iocoder.yudao.module.iot.controller.admin.device.vo.IotDeviceStatusUpdateReqVO;
+import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
+import jakarta.validation.Valid;
+
+/**
+ * IoT 设备数据 Service 接口
+ *
+ * @author 芋道源码
+ */
+public interface IotDeviceDataService {
+
+
+ /**
+ * 保存设备数据
+ *
+ * @param productKey 产品 key
+ * @param deviceName 设备名称
+ * @param message 消息
+ */
+ void saveDeviceData(String productKey, String deviceName, String message);
+}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceDataServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceDataServiceImpl.java
new file mode 100644
index 0000000000..8f0c859ad6
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceDataServiceImpl.java
@@ -0,0 +1,39 @@
+package cn.iocoder.yudao.module.iot.service.device;
+
+import cn.hutool.json.JSONObject;
+import cn.iocoder.yudao.module.iot.dal.dataobject.device.IotDeviceDO;
+import cn.iocoder.yudao.module.iot.dal.dataobject.tdengine.ThingModelMessage;
+import cn.iocoder.yudao.module.iot.service.tdengine.IotThingModelMessageService;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Service
+public class IotDeviceDataServiceImpl implements IotDeviceDataService {
+
+ @Resource
+ private IotDeviceService deviceService;
+ @Resource
+ private IotThingModelMessageService thingModelMessageService;
+
+ @Override
+ public void saveDeviceData(String productKey, String deviceName, String message) {
+ // 1. 根据产品 key 和设备名称,获得设备信息
+ IotDeviceDO device = deviceService.getDeviceByProductKeyAndDeviceName(productKey, deviceName);
+ // 2. 解析消息,保存数据
+ JSONObject jsonObject = new JSONObject(message);
+ log.info("[saveDeviceData][productKey({}) deviceName({}) data({})]", productKey, deviceName, jsonObject);
+ ThingModelMessage thingModelMessage = ThingModelMessage.builder()
+ .id(jsonObject.getStr("id"))
+ .sys(jsonObject.get("sys"))
+ .method(jsonObject.getStr("method"))
+ .params(jsonObject.get("params"))
+ .time(jsonObject.getLong("time") == null ? System.currentTimeMillis() : jsonObject.getLong("time"))
+ .productKey(productKey)
+ .deviceName(deviceName)
+ .deviceKey(device.getDeviceKey())
+ .build();
+ thingModelMessageService.saveThingModelMessage(thingModelMessage);
+ }
+}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceService.java
index 032a7478ee..c967a957ed 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceService.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceService.java
@@ -64,4 +64,13 @@ public interface IotDeviceService {
* @return 设备数量
*/
Long getDeviceCountByProductId(Long productId);
+
+ /**
+ * 根据产品 key 和设备名称,获得设备信息
+ *
+ * @param productKey 产品 key
+ * @param deviceName 设备名称
+ * @return 设备信息
+ */
+ IotDeviceDO getDeviceByProductKeyAndDeviceName(String productKey, String deviceName);
}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceServiceImpl.java
similarity index 95%
rename from yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java
rename to yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceServiceImpl.java
index 2ae08bb94b..d24975b72e 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/DeviceServiceImpl.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/device/IotDeviceServiceImpl.java
@@ -4,6 +4,7 @@ import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.pojo.PageResult;
import cn.iocoder.yudao.framework.common.util.object.BeanUtils;
+import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnore;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.IotDevicePageReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.IotDeviceSaveReqVO;
import cn.iocoder.yudao.module.iot.controller.admin.device.vo.IotDeviceStatusUpdateReqVO;
@@ -32,7 +33,7 @@ import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.*;
@Service
@Validated
@Slf4j
-public class DeviceServiceImpl implements IotDeviceService {
+public class IotDeviceServiceImpl implements IotDeviceService {
@Resource
private IotDeviceMapper deviceMapper;
@@ -224,4 +225,10 @@ public class DeviceServiceImpl implements IotDeviceService {
return deviceMapper.selectCountByProductId(productId);
}
+ @Override
+ @TenantIgnore
+ public IotDeviceDO getDeviceByProductKeyAndDeviceName(String productKey, String deviceName) {
+ return deviceMapper.selectByProductKeyAndDeviceName(productKey, deviceName);
+ }
+
}
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/tdengine/IotDbStructureDataServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/tdengine/IotDbStructureDataServiceImpl.java
index d42fd72fc9..0506546be5 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/tdengine/IotDbStructureDataServiceImpl.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/tdengine/IotDbStructureDataServiceImpl.java
@@ -25,7 +25,7 @@ import java.util.stream.Collectors;
public class IotDbStructureDataServiceImpl implements IotDbStructureDataService {
@Resource
- private TdEngineService tdEngineService;
+ private IotTdEngineService iotTdEngineService;
@Resource
private TdRestApi tdRestApi;
@@ -87,7 +87,7 @@ public class IotDbStructureDataServiceImpl implements IotDbStructureDataService
// 5. 创建超级表
String dataBaseName = url.substring(url.lastIndexOf("/") + 1);
- tdEngineService.createSuperTable(schemaFields, tagsFields, dataBaseName, superTableName);
+ iotTdEngineService.createSuperTable(schemaFields, tagsFields, dataBaseName, superTableName);
}
@Override
@@ -107,7 +107,7 @@ public class IotDbStructureDataServiceImpl implements IotDbStructureDataService
private List getTableFields(String tableName) {
List fields = new ArrayList<>();
// 获取超级表的描述信息
- List