diff --git a/yudao-module-iot/yudao-module-iot-biz/pom.xml b/yudao-module-iot/yudao-module-iot-biz/pom.xml
index 9148242c27..acecec22d9 100644
--- a/yudao-module-iot/yudao-module-iot-biz/pom.xml
+++ b/yudao-module-iot/yudao-module-iot-biz/pom.xml
@@ -28,6 +28,12 @@
yudao-module-iot-api
${revision}
+
+ cn.iocoder.boot
+ yudao-module-iot-core
+ ${revision}
+
+
cn.iocoder.boot
@@ -80,10 +86,11 @@
+
org.apache.rocketmq
rocketmq-spring-boot-starter
- true
+
org.springframework.kafka
@@ -147,8 +154,6 @@
-
-
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceLogMessageBusSubscriber.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceLogMessageBusSubscriber.java
new file mode 100644
index 0000000000..4b42781acc
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceLogMessageBusSubscriber.java
@@ -0,0 +1,49 @@
+package cn.iocoder.yudao.module.iot.mq.consumer.device;
+
+import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
+import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBusSubscriber;
+import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
+import cn.iocoder.yudao.module.iot.service.device.data.IotDeviceLogService;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+/**
+ * 针对 {@link IotDeviceMessage} 的消费者,记录设备日志
+ *
+ * @author 芋道源码
+ */
+@Component
+@Slf4j
+public class IotDeviceLogMessageBusSubscriber implements IotMessageBusSubscriber {
+
+ @Resource
+ private IotMessageBus messageBus;
+
+ @Resource
+ private IotDeviceLogService deviceLogService;
+
+ @PostConstruct
+ public void init() {
+ messageBus.register(this);
+ }
+
+ @Override
+ public String getTopic() {
+ return IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC;
+ }
+
+ @Override
+ public String getGroup() {
+ return "iot_device_log_consumer";
+ }
+
+ // TODO @芋艿:后续再对接这个细节逻辑;
+ @Override
+ public void onMessage(IotDeviceMessage message) {
+ log.info("[onMessage][消息内容({})]", message);
+// deviceLogService.createDeviceLog(message);
+ }
+
+}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceLogMessageConsumer.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceLogMessageConsumer.java
deleted file mode 100644
index 2972677918..0000000000
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mq/consumer/device/IotDeviceLogMessageConsumer.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package cn.iocoder.yudao.module.iot.mq.consumer.device;
-
-import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
-import cn.iocoder.yudao.module.iot.service.device.data.IotDeviceLogService;
-import jakarta.annotation.Resource;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.context.event.EventListener;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.stereotype.Component;
-
-/**
- * 针对 {@link IotDeviceMessage} 的消费者,记录设备日志
- *
- * @author 芋道源码
- */
-@Component
-@Slf4j
-public class IotDeviceLogMessageConsumer {
-
- @Resource
- private IotDeviceLogService deviceLogService;
-
- @EventListener
- @Async
- public void onMessage(IotDeviceMessage message) {
- log.info("[onMessage][消息内容({})]", message);
- deviceLogService.createDeviceLog(message);
- }
-
-}
diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java
index 43b9f2e5d4..0a5c8fbd3d 100644
--- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java
+++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/message/IotDeviceMessage.java
@@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.core.mq.message;
+import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageIdentifierEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageTypeEnum;
import lombok.AllArgsConstructor;
@@ -8,6 +9,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
+import java.util.Map;
// TODO @芋艿:参考阿里云的物模型,优化 IoT 上下行消息的设计,尽量保持一致(渐进式,不要一口气)!
@@ -20,6 +22,18 @@ import java.time.LocalDateTime;
@Builder
public class IotDeviceMessage {
+ /**
+ * 【消息总线】应用的设备消息 Topic,由 iot-gateway 发给 iot-biz 进行消费
+ */
+ public static final String MESSAGE_BUS_DEVICE_MESSAGE_TOPIC = "iot_device_message";
+
+ /**
+ * 【消息总线】设备消息 Topic,由 iot-biz 发送给 iot-gateway 的某个 “server”(protocol) 进行消费
+ *
+ * 其中,%s 就是该“server”(protocol) 的标识
+ */
+ public static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "/%s";
+
/**
* 请求编号
*/
@@ -69,9 +83,42 @@ public class IotDeviceMessage {
*/
private LocalDateTime reportTime;
+ /**
+ * 服务编号,该消息由哪个消息发送
+ */
+ private String serverId;
+
/**
* 租户编号
*/
private Long tenantId;
+ public IotDeviceMessage ofPropertyReport(Map properties) {
+ this.setType(IotDeviceMessageTypeEnum.PROPERTY.getType());
+ this.setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_REPORT.getIdentifier());
+ this.setData(properties);
+ return this;
+ }
+
+ public static IotDeviceMessage of(String productKey, String deviceName, String deviceKey,
+ String requestId, LocalDateTime reportTime,
+ String serverId, Long tenantId) {
+ if (requestId == null) {
+ requestId = IdUtil.fastSimpleUUID();
+ }
+ if (reportTime == null) {
+ reportTime = LocalDateTime.now();
+ }
+ return IotDeviceMessage.builder()
+ .requestId(requestId).reportTime(reportTime)
+ .productKey(productKey).deviceName(deviceName).deviceKey(deviceKey)
+ .serverId(serverId).tenantId(tenantId).build();
+ }
+
+ // ========== Topic 相关 ==========
+
+ public static String getMessageBusGatewayDeviceMessageTopic(String serverId) {
+ return String.format(MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC, serverId);
+ }
+
}
diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/producer/IotDeviceMessageProducer.java b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/producer/IotDeviceMessageProducer.java
index 7e23dc4b6d..5cf15305ec 100644
--- a/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/producer/IotDeviceMessageProducer.java
+++ b/yudao-module-iot/yudao-module-iot-core/src/main/java/cn/iocoder/yudao/module/iot/core/mq/producer/IotDeviceMessageProducer.java
@@ -12,18 +12,6 @@ import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class IotDeviceMessageProducer {
- /**
- * 【消息总线】应用的设备消息 Topic,由 iot-gateway 发给 iot-biz 进行消费
- */
- private static final String MESSAGE_BUS_DEVICE_MESSAGE_TOPIC = "iot_device_message";
-
- /**
- * 【消息总线】设备消息 Topic,由 iot-biz 发送给 iot-gateway 的某个 “server”(protocol) 进行消费
- *
- * 其中,%s 就是该“server”(protocol) 的标识
- */
- private static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "/%s";
-
private final IotMessageBus messageBus;
/**
@@ -32,17 +20,17 @@ public class IotDeviceMessageProducer {
* @param message 设备消息
*/
public void sendDeviceMessage(IotDeviceMessage message) {
- messageBus.post(MESSAGE_BUS_DEVICE_MESSAGE_TOPIC, message);
+ messageBus.post(IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC, message);
}
/**
* 发送网关设备消息
*
- * @param server 网关的 server 标识
+ * @param serverId 网关的 serverId 标识
* @param message 设备消息
*/
- public void sendGatewayDeviceMessage(String server, Object message) {
- messageBus.post(String.format(MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC, server), message);
+ public void sendGatewayDeviceMessage(String serverId, Object message) {
+ messageBus.post(IotDeviceMessage.getMessageBusGatewayDeviceMessageTopic(serverId), message);
}
}
diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/resources/META-INF/spring.factories b/yudao-module-iot/yudao-module-iot-core/src/main/resources/META-INF/spring.factories
deleted file mode 100644
index bfb44267ce..0000000000
--- a/yudao-module-iot/yudao-module-iot-core/src/main/resources/META-INF/spring.factories
+++ /dev/null
@@ -1,2 +0,0 @@
-org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
-cn.iocoder.yudao.module.iot.core.messagebus.config.IotMessageBusAutoConfiguration
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/yudao-module-iot/yudao-module-iot-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
new file mode 100644
index 0000000000..4c183f8227
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-core/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -0,0 +1 @@
+cn.iocoder.yudao.module.iot.core.messagebus.config.IotMessageBusAutoConfiguration
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/pom.xml b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/pom.xml
index ce968c4395..6bbf140fd9 100644
--- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/pom.xml
+++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-core/pom.xml
@@ -24,6 +24,12 @@
${revision}
+
+ cn.iocoder.boot
+ yudao-module-iot-core
+ ${revision}
+
+
cn.iocoder.boot
diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/config/IotNetComponentHttpAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/config/IotNetComponentHttpAutoConfiguration.java
index 1aa5903d47..a8ea951763 100644
--- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/config/IotNetComponentHttpAutoConfiguration.java
+++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/config/IotNetComponentHttpAutoConfiguration.java
@@ -2,12 +2,13 @@ package cn.iocoder.yudao.module.iot.net.component.http.config;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
-import cn.iocoder.yudao.module.iot.net.component.http.downstream.IotDeviceDownstreamHandlerImpl;
-import cn.iocoder.yudao.module.iot.net.component.http.upstream.IotDeviceUpstreamServer;
+import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.net.component.core.config.IotNetComponentCommonProperties;
import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.net.component.core.heartbeat.IotNetComponentRegistry;
import cn.iocoder.yudao.module.iot.net.component.core.util.IotNetComponentCommonUtils;
+import cn.iocoder.yudao.module.iot.net.component.http.downstream.IotDeviceDownstreamHandlerImpl;
+import cn.iocoder.yudao.module.iot.net.component.http.upstream.IotDeviceUpstreamServer;
import io.vertx.core.Vertx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
@@ -15,7 +16,6 @@ import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Lazy;
@@ -86,25 +86,14 @@ public class IotNetComponentHttpAutoConfiguration {
/**
* 创建设备上行服务器
- *
- * @param vertx Vert.x 实例
- * @param deviceUpstreamApi 设备上行 API
- * @param properties HTTP 组件配置属性
- * @param applicationContext 应用上下文
- * @return 设备上行服务器
*/
@Bean(name = "httpDeviceUpstreamServer", initMethod = "start", destroyMethod = "stop")
public IotDeviceUpstreamServer deviceUpstreamServer(
@Lazy @Qualifier("httpVertx") Vertx vertx,
IotDeviceUpstreamApi deviceUpstreamApi,
IotNetComponentHttpProperties properties,
- ApplicationContext applicationContext) {
- if (log.isDebugEnabled()) {
- log.debug("HTTP 服务器配置: port={}", properties.getServerPort());
- } else {
- log.info("HTTP 服务器将监听端口: {}", properties.getServerPort());
- }
- return new IotDeviceUpstreamServer(vertx, properties, deviceUpstreamApi, applicationContext);
+ IotDeviceMessageProducer deviceMessageProducer) {
+ return new IotDeviceUpstreamServer(vertx, properties, deviceUpstreamApi, deviceMessageProducer);
}
/**
diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/upstream/IotDeviceUpstreamServer.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/upstream/IotDeviceUpstreamServer.java
index 05af7bf2d8..eb185fbfa1 100644
--- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/upstream/IotDeviceUpstreamServer.java
+++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/upstream/IotDeviceUpstreamServer.java
@@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.net.component.http.upstream;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
+import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.net.component.http.config.IotNetComponentHttpProperties;
import cn.iocoder.yudao.module.iot.net.component.http.upstream.router.IotDeviceUpstreamVertxHandler;
import io.vertx.core.AbstractVerticle;
@@ -8,9 +9,8 @@ import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.annotation.Lazy;
/**
* IoT 设备上行服务器
@@ -19,48 +19,24 @@ import org.springframework.context.annotation.Lazy;
*
* @author 芋道源码
*/
+@RequiredArgsConstructor
@Slf4j
public class IotDeviceUpstreamServer extends AbstractVerticle {
- /**
- * Vert.x 实例
- */
private final Vertx vertx;
- /**
- * HTTP 组件配置属性
- */
private final IotNetComponentHttpProperties httpProperties;
- /**
- * 设备上行 API
- */
private final IotDeviceUpstreamApi deviceUpstreamApi;
- /**
- * Spring 应用上下文
- */
- private final ApplicationContext applicationContext;
+ private final IotDeviceMessageProducer deviceMessageProducer;
- /**
- * 构造函数
- *
- * @param vertx Vert.x 实例
- * @param httpProperties HTTP 组件配置属性
- * @param deviceUpstreamApi 设备上行 API
- * @param applicationContext Spring 应用上下文
- */
- public IotDeviceUpstreamServer(
- @Lazy Vertx vertx,
- IotNetComponentHttpProperties httpProperties,
- IotDeviceUpstreamApi deviceUpstreamApi,
- ApplicationContext applicationContext) {
- this.vertx = vertx;
- this.httpProperties = httpProperties;
- this.deviceUpstreamApi = deviceUpstreamApi;
- this.applicationContext = applicationContext;
+ @Override
+ public void start() throws Exception {
+ start(Promise.promise());
}
+ // TODO @haohao:这样貌似初始化不到;我临时拷贝上去了
@Override
public void start(Promise startPromise) {
// 创建路由
@@ -69,7 +45,7 @@ public class IotDeviceUpstreamServer extends AbstractVerticle {
// 创建处理器
IotDeviceUpstreamVertxHandler upstreamHandler = new IotDeviceUpstreamVertxHandler(
- deviceUpstreamApi, applicationContext);
+ deviceUpstreamApi, deviceMessageProducer);
// 添加路由处理器
router.post(IotDeviceUpstreamVertxHandler.PROPERTY_PATH).handler(upstreamHandler::handle);
diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/upstream/router/IotDeviceUpstreamVertxHandler.java b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/upstream/router/IotDeviceUpstreamVertxHandler.java
index 85bfdc0be4..47e90e0d46 100644
--- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/upstream/router/IotDeviceUpstreamVertxHandler.java
+++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-http/src/main/java/cn/iocoder/yudao/module/iot/net/component/http/upstream/router/IotDeviceUpstreamVertxHandler.java
@@ -8,17 +8,16 @@ import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
-import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
-import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
-import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
+import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
+import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.net.component.core.constants.IotDeviceTopicEnum;
import cn.iocoder.yudao.module.iot.net.component.core.pojo.IotStandardResponse;
import cn.iocoder.yudao.module.iot.net.component.core.util.IotNetComponentCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.context.ApplicationContext;
import java.time.LocalDateTime;
import java.util.Map;
@@ -33,6 +32,7 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
*
* @author 芋道源码
*/
+@RequiredArgsConstructor
@Slf4j
public class IotDeviceUpstreamVertxHandler implements Handler {
@@ -70,17 +70,10 @@ public class IotDeviceUpstreamVertxHandler implements Handler {
* 设备上行 API
*/
private final IotDeviceUpstreamApi deviceUpstreamApi;
-
/**
- * 构造函数
- *
- * @param deviceUpstreamApi 设备上行 API
- * @param applicationContext 应用上下文
+ * 设备消息生产者
*/
- public IotDeviceUpstreamVertxHandler(IotDeviceUpstreamApi deviceUpstreamApi,
- ApplicationContext applicationContext) {
- this.deviceUpstreamApi = deviceUpstreamApi;
- }
+ private final IotDeviceMessageProducer deviceMessageProducer;
@Override
public void handle(RoutingContext routingContext) {
@@ -170,18 +163,17 @@ public class IotDeviceUpstreamVertxHandler implements Handler {
*/
private void handlePropertyPost(RoutingContext routingContext, String productKey, String deviceName,
String requestId, JsonObject body) {
- // 处理属性上报
- IotDevicePropertyReportReqDTO reportReqDTO = parsePropertyReportRequest(productKey, deviceName,
- requestId, body);
+ // 1.1 构建设备消息
+ String deviceKey = "xxx"; // TODO @芋艿:待支持
+ Long tenantId = 1L; // TODO @芋艿:待支持
+ IotDeviceMessage message = IotDeviceMessage.of(productKey, deviceName, deviceKey,
+ requestId, LocalDateTime.now(), IotNetComponentCommonUtils.getProcessId(), tenantId)
+ .ofPropertyReport(parsePropertiesFromBody(body));
+ // 1.2 发送消息
+ deviceMessageProducer.sendDeviceMessage(message);
- // 设备上线
- updateDeviceState(reportReqDTO.getProductKey(), reportReqDTO.getDeviceName());
-
- // 属性上报
- CommonResult result = deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
-
- // 返回响应
- sendResponse(routingContext, requestId, PROPERTY_METHOD, result);
+ // 2. 返回响应
+ sendResponse(routingContext, requestId, PROPERTY_METHOD, null);
}
/**
@@ -200,9 +192,6 @@ public class IotDeviceUpstreamVertxHandler implements Handler {
IotDeviceEventReportReqDTO reportReqDTO = parseEventReportRequest(productKey, deviceName, identifier,
requestId, body);
- // 设备上线
- updateDeviceState(reportReqDTO.getProductKey(), reportReqDTO.getDeviceName());
-
// 事件上报
CommonResult result = deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
String method = EVENT_METHOD_PREFIX + identifier + EVENT_METHOD_SUFFIX;
@@ -221,8 +210,11 @@ public class IotDeviceUpstreamVertxHandler implements Handler {
*/
private void sendResponse(RoutingContext routingContext, String requestId, String method,
CommonResult result) {
+ // TODO @芋艿:后续再优化
IotStandardResponse response;
- if (result.isSuccess()) {
+ if (result == null ) {
+ response = IotStandardResponse.success(requestId, method, null);
+ } else if (result.isSuccess()) {
response = IotStandardResponse.success(requestId, method, result.getData());
} else {
response = IotStandardResponse.error(requestId, method, result.getCode(), result.getMsg());
@@ -265,45 +257,7 @@ public class IotDeviceUpstreamVertxHandler implements Handler {
EVENT_METHOD_SUFFIX;
}
- /**
- * 更新设备状态
- *
- * @param productKey 产品 Key
- * @param deviceName 设备名称
- */
- private void updateDeviceState(String productKey, String deviceName) {
- IotDeviceStateUpdateReqDTO reqDTO = ((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO()
- .setRequestId(IdUtil.fastSimpleUUID())
- .setProcessId(IotNetComponentCommonUtils.getProcessId())
- .setReportTime(LocalDateTime.now())
- .setProductKey(productKey)
- .setDeviceName(deviceName)).setState(IotDeviceStateEnum.ONLINE.getState());
- deviceUpstreamApi.updateDeviceState(reqDTO);
- }
-
- /**
- * 解析属性上报请求
- *
- * @param productKey 产品 Key
- * @param deviceName 设备名称
- * @param requestId 请求 ID
- * @param body 请求体
- * @return 属性上报请求 DTO
- */
- private IotDevicePropertyReportReqDTO parsePropertyReportRequest(String productKey, String deviceName,
- String requestId, JsonObject body) {
- // 解析属性
- Map properties = parsePropertiesFromBody(body);
-
- // 构建属性上报请求 DTO
- return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO()
- .setRequestId(requestId)
- .setProcessId(IotNetComponentCommonUtils.getProcessId())
- .setReportTime(LocalDateTime.now())
- .setProductKey(productKey)
- .setDeviceName(deviceName)).setProperties(properties);
- }
-
+ // TODO @芋艿:这块在看看
/**
* 从请求体解析属性
*
diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/pom.xml b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/pom.xml
index 4c2a612205..eaf000ef50 100644
--- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/pom.xml
+++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/pom.xml
@@ -50,6 +50,12 @@
${revision}
+
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+
+
@@ -72,4 +78,4 @@
-
\ No newline at end of file
+
\ No newline at end of file
diff --git a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/resources/application.yml b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/resources/application.yml
index ccaa7000a5..76385c51fe 100644
--- a/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/resources/application.yml
+++ b/yudao-module-iot/yudao-module-iot-net-components/yudao-module-iot-net-component-server/src/main/resources/application.yml
@@ -26,13 +26,13 @@ yudao:
upstream-url: http://127.0.0.1:48080 # 主程序 API 地址
upstream-connect-timeout: 30s # 连接超时
upstream-read-timeout: 30s # 读取超时
-
+
# 下行通信配置,用于接收主程序的控制指令
downstream-port: 18888 # 下行服务器端口
-
+
# 组件服务唯一标识
server-key: yudao-module-iot-net-component-server
-
+
# 心跳频率,单位:毫秒
heartbeat-interval: 30000
@@ -50,15 +50,26 @@ yudao:
enabled: true # 启用EMQX组件
mqtt-host: 127.0.0.1 # MQTT服务器主机地址
mqtt-port: 1883 # MQTT服务器端口
- mqtt-username: yudao # MQTT服务器用户名
- mqtt-password: 123456 # MQTT服务器密码
+ mqtt-username: admin # MQTT服务器用户名
+ mqtt-password: admin123 # MQTT服务器密码
mqtt-ssl: false # 是否启用SSL
mqtt-topics: # 订阅的主题列表
- "/sys/#"
auth-port: 8101 # 认证端口
+ message-bus:
+ type: rocketmq # 消息总线的类型
# 日志配置
logging:
level:
cn.iocoder.yudao: INFO
- root: INFO
\ No newline at end of file
+ root: INFO
+
+--- #################### 消息队列相关 ####################
+
+# rocketmq 配置项,对应 RocketMQProperties 配置类
+rocketmq:
+ name-server: 127.0.0.1:9876 # RocketMQ Namesrv
+ # Producer 配置项
+ producer:
+ group: ${spring.application.name}_PRODUCER # 生产者分组
diff --git a/yudao-server/src/main/resources/application.yaml b/yudao-server/src/main/resources/application.yaml
index 37b783d57d..b702244061 100644
--- a/yudao-server/src/main/resources/application.yaml
+++ b/yudao-server/src/main/resources/application.yaml
@@ -311,6 +311,9 @@ yudao:
kd100:
key: pLXUGAwK5305
customer: E77DF18BE109F454A5CD319E44BF5177
+ iot:
+ message-bus:
+ type: rocketmq # 消息总线的类型
debug: false
# 插件配置 TODO 芋艿:【IOT】需要处理下