reactor:【IoT 物联网】增加 http 网络组件,接入 rocketmq 消息总线

This commit is contained in:
YunaiV 2025-05-30 20:47:01 +08:00
parent 385cea8d90
commit 1b59aa9ccb
14 changed files with 177 additions and 174 deletions

View File

@ -28,6 +28,12 @@
<artifactId>yudao-module-iot-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-core</artifactId>
<version>${revision}</version>
</dependency>
<!-- IoT 协议模块 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
@ -80,10 +86,11 @@
</dependency>
<!-- 消息队列相关 -->
<!-- TODO @芋艿:临时打开 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<optional>true</optional>
<!-- <optional>true</optional>-->
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
@ -147,8 +154,6 @@
<!-- <artifactId>yudao-module-iot-script</artifactId>-->
<!-- <version>${revision}</version>-->
<!-- </dependency>-->
</dependencies>
</project>

View File

@ -0,0 +1,49 @@
package cn.iocoder.yudao.module.iot.mq.consumer.device;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBus;
import cn.iocoder.yudao.module.iot.core.messagebus.core.IotMessageBusSubscriber;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.service.device.data.IotDeviceLogService;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 针对 {@link IotDeviceMessage} 的消费者记录设备日志
*
* @author 芋道源码
*/
@Component
@Slf4j
public class IotDeviceLogMessageBusSubscriber implements IotMessageBusSubscriber<IotDeviceMessage> {
@Resource
private IotMessageBus messageBus;
@Resource
private IotDeviceLogService deviceLogService;
@PostConstruct
public void init() {
messageBus.register(this);
}
@Override
public String getTopic() {
return IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC;
}
@Override
public String getGroup() {
return "iot_device_log_consumer";
}
// TODO @芋艿后续再对接这个细节逻辑
@Override
public void onMessage(IotDeviceMessage message) {
log.info("[onMessage][消息内容({})]", message);
// deviceLogService.createDeviceLog(message);
}
}

View File

@ -1,30 +0,0 @@
package cn.iocoder.yudao.module.iot.mq.consumer.device;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.service.device.data.IotDeviceLogService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* 针对 {@link IotDeviceMessage} 的消费者记录设备日志
*
* @author 芋道源码
*/
@Component
@Slf4j
public class IotDeviceLogMessageConsumer {
@Resource
private IotDeviceLogService deviceLogService;
@EventListener
@Async
public void onMessage(IotDeviceMessage message) {
log.info("[onMessage][消息内容({})]", message);
deviceLogService.createDeviceLog(message);
}
}

View File

@ -1,5 +1,6 @@
package cn.iocoder.yudao.module.iot.core.mq.message;
import cn.hutool.core.util.IdUtil;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageIdentifierEnum;
import cn.iocoder.yudao.module.iot.core.enums.IotDeviceMessageTypeEnum;
import lombok.AllArgsConstructor;
@ -8,6 +9,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
import java.util.Map;
// TODO @芋艿参考阿里云的物模型优化 IoT 上下行消息的设计尽量保持一致渐进式不要一口气
@ -20,6 +22,18 @@ import java.time.LocalDateTime;
@Builder
public class IotDeviceMessage {
/**
* 消息总线应用的设备消息 Topic iot-gateway 发给 iot-biz 进行消费
*/
public static final String MESSAGE_BUS_DEVICE_MESSAGE_TOPIC = "iot_device_message";
/**
* 消息总线设备消息 Topic iot-biz 发送给 iot-gateway 的某个 server(protocol) 进行消费
*
* 其中%s 就是该server(protocol) 的标识
*/
public static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "/%s";
/**
* 请求编号
*/
@ -69,9 +83,42 @@ public class IotDeviceMessage {
*/
private LocalDateTime reportTime;
/**
* 服务编号该消息由哪个消息发送
*/
private String serverId;
/**
* 租户编号
*/
private Long tenantId;
public IotDeviceMessage ofPropertyReport(Map<String, Object> properties) {
this.setType(IotDeviceMessageTypeEnum.PROPERTY.getType());
this.setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_REPORT.getIdentifier());
this.setData(properties);
return this;
}
public static IotDeviceMessage of(String productKey, String deviceName, String deviceKey,
String requestId, LocalDateTime reportTime,
String serverId, Long tenantId) {
if (requestId == null) {
requestId = IdUtil.fastSimpleUUID();
}
if (reportTime == null) {
reportTime = LocalDateTime.now();
}
return IotDeviceMessage.builder()
.requestId(requestId).reportTime(reportTime)
.productKey(productKey).deviceName(deviceName).deviceKey(deviceKey)
.serverId(serverId).tenantId(tenantId).build();
}
// ========== Topic 相关 ==========
public static String getMessageBusGatewayDeviceMessageTopic(String serverId) {
return String.format(MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC, serverId);
}
}

View File

@ -12,18 +12,6 @@ import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class IotDeviceMessageProducer {
/**
* 消息总线应用的设备消息 Topic iot-gateway 发给 iot-biz 进行消费
*/
private static final String MESSAGE_BUS_DEVICE_MESSAGE_TOPIC = "iot_device_message";
/**
* 消息总线设备消息 Topic iot-biz 发送给 iot-gateway 的某个 server(protocol) 进行消费
*
* 其中%s 就是该server(protocol) 的标识
*/
private static final String MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC = MESSAGE_BUS_DEVICE_MESSAGE_TOPIC + "/%s";
private final IotMessageBus messageBus;
/**
@ -32,17 +20,17 @@ public class IotDeviceMessageProducer {
* @param message 设备消息
*/
public void sendDeviceMessage(IotDeviceMessage message) {
messageBus.post(MESSAGE_BUS_DEVICE_MESSAGE_TOPIC, message);
messageBus.post(IotDeviceMessage.MESSAGE_BUS_DEVICE_MESSAGE_TOPIC, message);
}
/**
* 发送网关设备消息
*
* @param server 网关的 server 标识
* @param serverId 网关的 serverId 标识
* @param message 设备消息
*/
public void sendGatewayDeviceMessage(String server, Object message) {
messageBus.post(String.format(MESSAGE_BUS_GATEWAY_DEVICE_MESSAGE_TOPIC, server), message);
public void sendGatewayDeviceMessage(String serverId, Object message) {
messageBus.post(IotDeviceMessage.getMessageBusGatewayDeviceMessageTopic(serverId), message);
}
}

View File

@ -1,2 +0,0 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.iocoder.yudao.module.iot.core.messagebus.config.IotMessageBusAutoConfiguration

View File

@ -0,0 +1 @@
cn.iocoder.yudao.module.iot.core.messagebus.config.IotMessageBusAutoConfiguration

View File

@ -24,6 +24,12 @@
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-core</artifactId>
<version>${revision}</version>
</dependency>
<!-- IoT 协议模块 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>

View File

@ -2,12 +2,13 @@ package cn.iocoder.yudao.module.iot.net.component.http.config;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.net.component.http.downstream.IotDeviceDownstreamHandlerImpl;
import cn.iocoder.yudao.module.iot.net.component.http.upstream.IotDeviceUpstreamServer;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.net.component.core.config.IotNetComponentCommonProperties;
import cn.iocoder.yudao.module.iot.net.component.core.downstream.IotDeviceDownstreamHandler;
import cn.iocoder.yudao.module.iot.net.component.core.heartbeat.IotNetComponentRegistry;
import cn.iocoder.yudao.module.iot.net.component.core.util.IotNetComponentCommonUtils;
import cn.iocoder.yudao.module.iot.net.component.http.downstream.IotDeviceDownstreamHandlerImpl;
import cn.iocoder.yudao.module.iot.net.component.http.upstream.IotDeviceUpstreamServer;
import io.vertx.core.Vertx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
@ -15,7 +16,6 @@ import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Lazy;
@ -86,25 +86,14 @@ public class IotNetComponentHttpAutoConfiguration {
/**
* 创建设备上行服务器
*
* @param vertx Vert.x 实例
* @param deviceUpstreamApi 设备上行 API
* @param properties HTTP 组件配置属性
* @param applicationContext 应用上下文
* @return 设备上行服务器
*/
@Bean(name = "httpDeviceUpstreamServer", initMethod = "start", destroyMethod = "stop")
public IotDeviceUpstreamServer deviceUpstreamServer(
@Lazy @Qualifier("httpVertx") Vertx vertx,
IotDeviceUpstreamApi deviceUpstreamApi,
IotNetComponentHttpProperties properties,
ApplicationContext applicationContext) {
if (log.isDebugEnabled()) {
log.debug("HTTP 服务器配置: port={}", properties.getServerPort());
} else {
log.info("HTTP 服务器将监听端口: {}", properties.getServerPort());
}
return new IotDeviceUpstreamServer(vertx, properties, deviceUpstreamApi, applicationContext);
IotDeviceMessageProducer deviceMessageProducer) {
return new IotDeviceUpstreamServer(vertx, properties, deviceUpstreamApi, deviceMessageProducer);
}
/**

View File

@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.net.component.http.upstream;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.net.component.http.config.IotNetComponentHttpProperties;
import cn.iocoder.yudao.module.iot.net.component.http.upstream.router.IotDeviceUpstreamVertxHandler;
import io.vertx.core.AbstractVerticle;
@ -8,9 +9,8 @@ import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.BodyHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Lazy;
/**
* IoT 设备上行服务器
@ -19,48 +19,24 @@ import org.springframework.context.annotation.Lazy;
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotDeviceUpstreamServer extends AbstractVerticle {
/**
* Vert.x 实例
*/
private final Vertx vertx;
/**
* HTTP 组件配置属性
*/
private final IotNetComponentHttpProperties httpProperties;
/**
* 设备上行 API
*/
private final IotDeviceUpstreamApi deviceUpstreamApi;
/**
* Spring 应用上下文
*/
private final ApplicationContext applicationContext;
private final IotDeviceMessageProducer deviceMessageProducer;
/**
* 构造函数
*
* @param vertx Vert.x 实例
* @param httpProperties HTTP 组件配置属性
* @param deviceUpstreamApi 设备上行 API
* @param applicationContext Spring 应用上下文
*/
public IotDeviceUpstreamServer(
@Lazy Vertx vertx,
IotNetComponentHttpProperties httpProperties,
IotDeviceUpstreamApi deviceUpstreamApi,
ApplicationContext applicationContext) {
this.vertx = vertx;
this.httpProperties = httpProperties;
this.deviceUpstreamApi = deviceUpstreamApi;
this.applicationContext = applicationContext;
@Override
public void start() throws Exception {
start(Promise.promise());
}
// TODO @haohao这样貌似初始化不到我临时拷贝上去了
@Override
public void start(Promise<Void> startPromise) {
// 创建路由
@ -69,7 +45,7 @@ public class IotDeviceUpstreamServer extends AbstractVerticle {
// 创建处理器
IotDeviceUpstreamVertxHandler upstreamHandler = new IotDeviceUpstreamVertxHandler(
deviceUpstreamApi, applicationContext);
deviceUpstreamApi, deviceMessageProducer);
// 添加路由处理器
router.post(IotDeviceUpstreamVertxHandler.PROPERTY_PATH).handler(upstreamHandler::handle);

View File

@ -8,17 +8,16 @@ import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceEventReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDevicePropertyReportReqDTO;
import cn.iocoder.yudao.module.iot.api.device.dto.control.upstream.IotDeviceStateUpdateReqDTO;
import cn.iocoder.yudao.module.iot.enums.device.IotDeviceStateEnum;
import cn.iocoder.yudao.module.iot.core.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.core.mq.producer.IotDeviceMessageProducer;
import cn.iocoder.yudao.module.iot.net.component.core.constants.IotDeviceTopicEnum;
import cn.iocoder.yudao.module.iot.net.component.core.pojo.IotStandardResponse;
import cn.iocoder.yudao.module.iot.net.component.core.util.IotNetComponentCommonUtils;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RoutingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationContext;
import java.time.LocalDateTime;
import java.util.Map;
@ -33,6 +32,7 @@ import static cn.iocoder.yudao.framework.common.exception.enums.GlobalErrorCodeC
*
* @author 芋道源码
*/
@RequiredArgsConstructor
@Slf4j
public class IotDeviceUpstreamVertxHandler implements Handler<RoutingContext> {
@ -70,17 +70,10 @@ public class IotDeviceUpstreamVertxHandler implements Handler<RoutingContext> {
* 设备上行 API
*/
private final IotDeviceUpstreamApi deviceUpstreamApi;
/**
* 构造函数
*
* @param deviceUpstreamApi 设备上行 API
* @param applicationContext 应用上下文
* 设备消息生产者
*/
public IotDeviceUpstreamVertxHandler(IotDeviceUpstreamApi deviceUpstreamApi,
ApplicationContext applicationContext) {
this.deviceUpstreamApi = deviceUpstreamApi;
}
private final IotDeviceMessageProducer deviceMessageProducer;
@Override
public void handle(RoutingContext routingContext) {
@ -170,18 +163,17 @@ public class IotDeviceUpstreamVertxHandler implements Handler<RoutingContext> {
*/
private void handlePropertyPost(RoutingContext routingContext, String productKey, String deviceName,
String requestId, JsonObject body) {
// 处理属性上报
IotDevicePropertyReportReqDTO reportReqDTO = parsePropertyReportRequest(productKey, deviceName,
requestId, body);
// 1.1 构建设备消息
String deviceKey = "xxx"; // TODO @芋艿待支持
Long tenantId = 1L; // TODO @芋艿待支持
IotDeviceMessage message = IotDeviceMessage.of(productKey, deviceName, deviceKey,
requestId, LocalDateTime.now(), IotNetComponentCommonUtils.getProcessId(), tenantId)
.ofPropertyReport(parsePropertiesFromBody(body));
// 1.2 发送消息
deviceMessageProducer.sendDeviceMessage(message);
// 设备上线
updateDeviceState(reportReqDTO.getProductKey(), reportReqDTO.getDeviceName());
// 属性上报
CommonResult<Boolean> result = deviceUpstreamApi.reportDeviceProperty(reportReqDTO);
// 返回响应
sendResponse(routingContext, requestId, PROPERTY_METHOD, result);
// 2. 返回响应
sendResponse(routingContext, requestId, PROPERTY_METHOD, null);
}
/**
@ -200,9 +192,6 @@ public class IotDeviceUpstreamVertxHandler implements Handler<RoutingContext> {
IotDeviceEventReportReqDTO reportReqDTO = parseEventReportRequest(productKey, deviceName, identifier,
requestId, body);
// 设备上线
updateDeviceState(reportReqDTO.getProductKey(), reportReqDTO.getDeviceName());
// 事件上报
CommonResult<Boolean> result = deviceUpstreamApi.reportDeviceEvent(reportReqDTO);
String method = EVENT_METHOD_PREFIX + identifier + EVENT_METHOD_SUFFIX;
@ -221,8 +210,11 @@ public class IotDeviceUpstreamVertxHandler implements Handler<RoutingContext> {
*/
private void sendResponse(RoutingContext routingContext, String requestId, String method,
CommonResult<Boolean> result) {
// TODO @芋艿后续再优化
IotStandardResponse response;
if (result.isSuccess()) {
if (result == null ) {
response = IotStandardResponse.success(requestId, method, null);
} else if (result.isSuccess()) {
response = IotStandardResponse.success(requestId, method, result.getData());
} else {
response = IotStandardResponse.error(requestId, method, result.getCode(), result.getMsg());
@ -265,45 +257,7 @@ public class IotDeviceUpstreamVertxHandler implements Handler<RoutingContext> {
EVENT_METHOD_SUFFIX;
}
/**
* 更新设备状态
*
* @param productKey 产品 Key
* @param deviceName 设备名称
*/
private void updateDeviceState(String productKey, String deviceName) {
IotDeviceStateUpdateReqDTO reqDTO = ((IotDeviceStateUpdateReqDTO) new IotDeviceStateUpdateReqDTO()
.setRequestId(IdUtil.fastSimpleUUID())
.setProcessId(IotNetComponentCommonUtils.getProcessId())
.setReportTime(LocalDateTime.now())
.setProductKey(productKey)
.setDeviceName(deviceName)).setState(IotDeviceStateEnum.ONLINE.getState());
deviceUpstreamApi.updateDeviceState(reqDTO);
}
/**
* 解析属性上报请求
*
* @param productKey 产品 Key
* @param deviceName 设备名称
* @param requestId 请求 ID
* @param body 请求体
* @return 属性上报请求 DTO
*/
private IotDevicePropertyReportReqDTO parsePropertyReportRequest(String productKey, String deviceName,
String requestId, JsonObject body) {
// 解析属性
Map<String, Object> properties = parsePropertiesFromBody(body);
// 构建属性上报请求 DTO
return ((IotDevicePropertyReportReqDTO) new IotDevicePropertyReportReqDTO()
.setRequestId(requestId)
.setProcessId(IotNetComponentCommonUtils.getProcessId())
.setReportTime(LocalDateTime.now())
.setProductKey(productKey)
.setDeviceName(deviceName)).setProperties(properties);
}
// TODO @芋艿这块在看看
/**
* 从请求体解析属性
*

View File

@ -50,6 +50,12 @@
<version>${revision}</version>
</dependency>
<!-- TODO @芋艿:消息队列,后续可能去掉,默认不使用 rocketmq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
</dependencies>
<build>
@ -72,4 +78,4 @@
</plugins>
</build>
</project>
</project>

View File

@ -26,13 +26,13 @@ yudao:
upstream-url: http://127.0.0.1:48080 # 主程序 API 地址
upstream-connect-timeout: 30s # 连接超时
upstream-read-timeout: 30s # 读取超时
# 下行通信配置,用于接收主程序的控制指令
downstream-port: 18888 # 下行服务器端口
# 组件服务唯一标识
server-key: yudao-module-iot-net-component-server
# 心跳频率,单位:毫秒
heartbeat-interval: 30000
@ -50,15 +50,26 @@ yudao:
enabled: true # 启用EMQX组件
mqtt-host: 127.0.0.1 # MQTT服务器主机地址
mqtt-port: 1883 # MQTT服务器端口
mqtt-username: yudao # MQTT服务器用户名
mqtt-password: 123456 # MQTT服务器密码
mqtt-username: admin # MQTT服务器用户名
mqtt-password: admin123 # MQTT服务器密码
mqtt-ssl: false # 是否启用SSL
mqtt-topics: # 订阅的主题列表
- "/sys/#"
auth-port: 8101 # 认证端口
message-bus:
type: rocketmq # 消息总线的类型
# 日志配置
logging:
level:
cn.iocoder.yudao: INFO
root: INFO
root: INFO
--- #################### 消息队列相关 ####################
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv
# Producer 配置项
producer:
group: ${spring.application.name}_PRODUCER # 生产者分组

View File

@ -311,6 +311,9 @@ yudao:
kd100:
key: pLXUGAwK5305
customer: E77DF18BE109F454A5CD319E44BF5177
iot:
message-bus:
type: rocketmq # 消息总线的类型
debug: false
# 插件配置 TODO 芋艿【IOT】需要处理下