【代码评审】IoT:网络组件的 review

This commit is contained in:
YunaiV 2025-04-04 07:50:24 +08:00
parent b5ce269fef
commit 72d8511d6b
15 changed files with 94 additions and 96 deletions

View File

@ -24,16 +24,6 @@
<artifactId>yudao-module-iot-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-component-http</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-component-emqx</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.iocoder.boot</groupId>
@ -144,6 +134,17 @@
<!-- <artifactId>spring-boot-starter-amqp</artifactId>-->
<!-- </dependency>-->
<!-- IoT 网络组件:接收来自设备的上行数据 -->
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-component-http</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.iocoder.boot</groupId>
<artifactId>yudao-module-iot-component-emqx</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
</project>

View File

@ -16,7 +16,7 @@ import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success;
*/
@RestController
@Validated
@Primary
@Primary // 保证优先匹配因为 yudao-module-iot-component-core 也有 IotDeviceUpstreamApi 的实现并且也可能会被 biz 引入
public class IoTDeviceUpstreamApiImpl implements IotDeviceUpstreamApi {
@Resource

View File

@ -133,3 +133,5 @@ Parameter 1 of method deviceDownstreamServer in IotPluginCommonAutoConfiguration
### 5. 使用默认配置
组件现已加入完善的默认配置和空值处理机制,使配置更加灵活。但需要注意的是,这些默认配置值必须通过在主应用配置文件中设置相应的属性才能生效。
// TODO 芋艿:后续继续完善 README.md

View File

@ -13,6 +13,7 @@
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<!-- TODO @芋艿description 后续统一优化一波 -->
<description>
物联网组件核心模块
</description>

View File

@ -24,12 +24,14 @@ import org.springframework.scheduling.annotation.EnableScheduling;
public class IotComponentCommonAutoConfiguration {
/**
* 创建EMQX设备下行服务器
* 当yudao.iot.component.emqx.enabled=true时使用emqxDeviceDownstreamHandler
* 创建 EMQX 设备下行服务器
*
* yudao.iot.component.emqx.enabled = true 优先使用 emqxDeviceDownstreamHandler
*/
@Bean
@ConditionalOnProperty(prefix = "yudao.iot.component.emqx", name = "enabled", havingValue = "true")
public IotDeviceDownstreamServer emqxDeviceDownstreamServer(IotComponentCommonProperties properties,
public IotDeviceDownstreamServer emqxDeviceDownstreamServer(
IotComponentCommonProperties properties,
@Qualifier("emqxDeviceDownstreamHandler") IotDeviceDownstreamHandler deviceDownstreamHandler) {
return new IotDeviceDownstreamServer(properties, deviceDownstreamHandler);
}

View File

@ -17,27 +17,24 @@ import java.util.concurrent.TimeUnit;
/**
* IoT 组件实例心跳定时任务
* <p>
* 将组件的状态定时上报给 server 服务器
* <p>
* 用于定时发送心跳给服务端
* 将组件的状态定时上报给 server 服务器
*/
@RequiredArgsConstructor
@Slf4j
public class IotComponentInstanceHeartbeatJob {
/**
* 内嵌模式的端口值固定为0
* 内嵌模式的端口值固定为 0
*/
private static final Integer EMBEDDED_PORT = 0;
private final IotDeviceUpstreamApi deviceUpstreamApi;
private final IotDeviceDownstreamServer deviceDownstreamServer;
private final IotDeviceDownstreamServer deviceDownstreamServer; // TODO @haohao这个变量还需要哇
private final IotComponentCommonProperties commonProperties;
private final IotComponentRegistry componentRegistry;
/**
* 初始化方法由Spring调用
* 注册当前组件并发送上线心跳
* 初始化方法 Spring调 注册当前组件并发送上线心跳
*/
public void init() {
// 将当前组件注册到注册表
@ -64,8 +61,7 @@ public class IotComponentInstanceHeartbeatJob {
}
/**
* 停止方法由Spring调用
* 发送下线心跳并注销组件
* 停止方法 Spring 调用发送下线心跳并注销组件
*/
public void stop() {
// 发送所有组件的下线心跳
@ -101,31 +97,29 @@ public class IotComponentInstanceHeartbeatJob {
}
/**
* 构建心跳DTO
* 构建心跳 DTO
*
* @param component 组件信息
* @param online 是否在线
* @return 心跳DTO
* @return 心跳 DTO
*/
private IotPluginInstanceHeartbeatReqDTO buildPluginInstanceHeartbeatReqDTO(IotComponentInfo component,
Boolean online) {
return new IotPluginInstanceHeartbeatReqDTO()
.setPluginKey(component.getPluginKey())
.setProcessId(component.getProcessId())
.setHostIp(component.getHostIp())
.setDownstreamPort(component.getDownstreamPort())
.setPluginKey(component.getPluginKey()).setProcessId(component.getProcessId())
.setHostIp(component.getHostIp()).setDownstreamPort(component.getDownstreamPort())
.setOnline(online);
}
// TODO @haohao要和 IotPluginCommonUtils 保持一致么
/**
* 获取当前进程ID
* 获取当前进程 ID
*
* @return 进程ID
* @return 进程 ID
*/
private String getProcessId() {
// 获取进程的 name
String name = ManagementFactory.getRuntimeMXBean().getName();
// 分割名称格式为 pid@hostname
// TODO @haohao是不是 SystemUtil.getCurrentPID(); 直接获取 pid
return name.split("@")[0];
}
}

View File

@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.component.core.heartbeat;
import lombok.Data;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@ -9,6 +8,8 @@ import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
// TODO @haohao组件相关的注释要不把 组件 => 网络组件可能更容易理解
// TODO @haohaoyudao-module-iot-components => yudao-module-iot-net-components 增加一个 net 如何虽然会长一点但是意思更精准
/**
* IoT 组件注册表
* <p>
@ -22,14 +23,14 @@ public class IotComponentRegistry {
* 组件信息
*/
@Data
@ToString
public static class IotComponentInfo {
/**
* 组件Key
* 组件 Key
*/
private final String pluginKey;
/**
* 主机IP
* 主机 IP
*/
private final String hostIp;
/**
@ -37,23 +38,24 @@ public class IotComponentRegistry {
*/
private final Integer downstreamPort;
/**
* 进程ID
* 进程 ID
*/
private final String processId;
}
/**
* 组件映射表key为组件Key
* 组件映射表key 为组件 Key
*/
private final Map<String, IotComponentInfo> components = new ConcurrentHashMap<>();
/**
* 注册组件
*
* @param pluginKey 组件Key
* @param hostIp 主机IP
* @param pluginKey 组件 Key
* @param hostIp 主机 IP
* @param downstreamPort 下游端口
* @param processId 进程ID
* @param processId 进程 ID
*/
public void registerComponent(String pluginKey, String hostIp, Integer downstreamPort, String processId) {
log.info("[registerComponent][注册组件, pluginKey={}, hostIp={}, downstreamPort={}, processId={}]",
@ -64,7 +66,7 @@ public class IotComponentRegistry {
/**
* 注销组件
*
* @param pluginKey 组件Key
* @param pluginKey 组件 Key
*/
public void unregisterComponent(String pluginKey) {
log.info("[unregisterComponent][注销组件, pluginKey={}]", pluginKey);
@ -83,10 +85,11 @@ public class IotComponentRegistry {
/**
* 获取指定组件
*
* @param pluginKey 组件Key
* @param pluginKey 组件 Key
* @return 组件信息
*/
public IotComponentInfo getComponent(String pluginKey) {
return components.get(pluginKey);
}
}

View File

@ -62,13 +62,8 @@ public class IotStandardResponse {
* @return 成功响应
*/
public static IotStandardResponse success(String id, String method, Object data) {
return new IotStandardResponse()
.setId(id)
.setCode(200)
.setData(data)
.setMessage("success")
.setMethod(method)
.setVersion("1.0");
return new IotStandardResponse().setId(id).setCode(200).setData(data).setMessage("success")
.setMethod(method).setVersion("1.0");
}
/**
@ -81,13 +76,8 @@ public class IotStandardResponse {
* @return 错误响应
*/
public static IotStandardResponse error(String id, String method, Integer code, String message) {
return new IotStandardResponse()
.setId(id)
.setCode(code)
.setData(null)
.setMessage(message)
.setMethod(method)
.setVersion("1.0");
return new IotStandardResponse().setId(id).setCode(code).setData(null).setMessage(message)
.setMethod(method).setVersion("1.0");
}
}

View File

@ -9,6 +9,7 @@ import io.vertx.core.http.HttpHeaders;
import io.vertx.ext.web.RoutingContext;
import org.springframework.http.MediaType;
// TODO @haohao名字要改下哈
/**
* IoT 插件的通用工具类
*

View File

@ -31,41 +31,45 @@ import java.lang.management.ManagementFactory;
@AutoConfiguration
@EnableConfigurationProperties(IotComponentEmqxProperties.class)
@ConditionalOnProperty(prefix = "yudao.iot.component.emqx", name = "enabled", havingValue = "true", matchIfMissing = false)
// TODO @haohao是不是不用扫 cn.iocoder.yudao.module.iot.component.core 它尽量靠自动配置
@ComponentScan(basePackages = {
"cn.iocoder.yudao.module.iot.component.core", // 核心包
"cn.iocoder.yudao.module.iot.component.emqx" // EMQX组件包
"cn.iocoder.yudao.module.iot.component.emqx" // EMQX 组件包
})
public class IotComponentEmqxAutoConfiguration {
/**
* 组件key
* 组件 key
*/
private static final String PLUGIN_KEY = "emqx";
public IotComponentEmqxAutoConfiguration() {
// TODO @haohao这个日志融合到 initialize
log.info("[IotComponentEmqxAutoConfiguration][已启动]");
}
@EventListener(ApplicationStartedEvent.class)
public void initialize(ApplicationStartedEvent event) {
// 从应用上下文中获取需要的Bean
// 从应用上下文中获取需要的 Bean
IotComponentRegistry componentRegistry = event.getApplicationContext().getBean(IotComponentRegistry.class);
IotComponentCommonProperties commonProperties = event.getApplicationContext().getBean(IotComponentCommonProperties.class);
// 设置当前组件的核心标识
// TODO @haohao如果多个组件都去设置会不会冲突哈
commonProperties.setPluginKey(PLUGIN_KEY);
// EMQX组件注册到组件注册表
// EMQX 组件注册到组件注册表
componentRegistry.registerComponent(
PLUGIN_KEY,
SystemUtil.getHostInfo().getAddress(),
0, // 内嵌模式固定为0
0, // 内嵌模式固定为 0
getProcessId()
);
log.info("[initialize][IoT EMQX 组件初始化完成]");
}
// TODO @haohao这个可能要注意可能会有多个冲突
@Bean
public Vertx vertx() {
return Vertx.vertx();
@ -73,6 +77,7 @@ public class IotComponentEmqxAutoConfiguration {
@Bean
public MqttClient mqttClient(Vertx vertx, IotComponentEmqxProperties emqxProperties) {
// TODO @haohao这个日志要不要去掉避免过多哈
log.info("MQTT配置: host={}, port={}, username={}, ssl={}",
emqxProperties.getMqttHost(), emqxProperties.getMqttPort(),
emqxProperties.getMqttUsername(), emqxProperties.getMqttSsl());
@ -81,14 +86,12 @@ public class IotComponentEmqxAutoConfiguration {
.setClientId("yudao-iot-downstream-" + IdUtil.fastSimpleUUID())
.setUsername(emqxProperties.getMqttUsername())
.setPassword(emqxProperties.getMqttPassword());
// TODO @haohao可以用 ObjUtil.default
if (emqxProperties.getMqttSsl() != null) {
options.setSsl(emqxProperties.getMqttSsl());
} else {
options.setSsl(false);
log.warn("MQTT SSL配置为null默认设置为false");
}
return MqttClient.create(vertx, options);
}
@ -106,6 +109,7 @@ public class IotComponentEmqxAutoConfiguration {
return new IotDeviceDownstreamHandlerImpl(mqttClient);
}
// TODO @haohao这个通用下一下哈
/**
* 获取当前进程ID
*
@ -118,4 +122,5 @@ public class IotComponentEmqxAutoConfiguration {
String pid = name.split("@")[0];
return pid;
}
}

View File

@ -7,47 +7,48 @@ import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* IoT EMQX组件配置属性
* IoT EMQX 组件配置属性
*/
@ConfigurationProperties(prefix = "yudao.iot.component.emqx")
@Data
public class IotComponentEmqxProperties {
/**
* 是否启用EMQX组件
* 是否启用 EMQX 组件
*/
private Boolean enabled;
// TODO @haohao一般中英文之间加个空格哈写作注释习惯类似 MQTT 密码
/**
* 服务主机
*/
@NotBlank(message = "MQTT服务器主机不能为空")
@NotBlank(message = "MQTT 服务器主机不能为空")
private String mqttHost;
/**
* 服务端口
*/
@NotNull(message = "MQTT服务器端口不能为空")
@NotNull(message = "MQTT 服务器端口不能为空")
private Integer mqttPort;
/**
* 服务用户名
*/
@NotBlank(message = "MQTT服务器用户名不能为空")
@NotBlank(message = "MQTT 服务器用户名不能为空")
private String mqttUsername;
/**
* 服务密码
*/
@NotBlank(message = "MQTT服务器密码不能为空")
@NotBlank(message = "MQTT 服务器密码不能为空")
private String mqttPassword;
/**
* 是否启用 SSL
*/
@NotNull(message = "MQTT SSL配置不能为空")
@NotNull(message = "MQTT SSL 配置不能为空")
private Boolean mqttSsl;
/**
* 订阅的主题列表
*/
@NotEmpty(message = "MQTT订阅主题不能为空")
@NotEmpty(message = "MQTT 订阅主题不能为空")
private String[] mqttTopics;
/**

View File

@ -167,6 +167,7 @@ public class IotDeviceDownstreamHandlerImpl implements IotDeviceDownstreamHandle
log.info("[publishMessage][发送消息成功][topic: {}][payload: {}]", topic, payload);
}
// TODO @haohao这个要不抽到 IotPluginCommonUtils
/**
* 生成请求 ID
*/

View File

@ -1,6 +1,7 @@
package cn.iocoder.yudao.module.iot.component.emqx.upstream;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.api.device.IotDeviceUpstreamApi;
import cn.iocoder.yudao.module.iot.component.core.heartbeat.IotComponentRegistry;
import cn.iocoder.yudao.module.iot.component.emqx.config.IotComponentEmqxProperties;
@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit;
@Slf4j
public class IotDeviceUpstreamServer {
// TODO @haohao抽到 IotComponentEmqxProperties
/**
* 重连延迟时间(毫秒)
*/
@ -101,7 +103,7 @@ public class IotDeviceUpstreamServer {
CompletableFuture<Void> httpFuture = server.listen(finalAuthPort)
.toCompletionStage()
.toCompletableFuture()
.thenAccept(v -> log.info("[start][HTTP服务器启动完成,端口: {}]", server.actualPort()));
.thenAccept(v -> log.info("[start][HTTP 服务器启动完成,端口: {}]", server.actualPort()));
// 2. 连接 MQTT Broker
CompletableFuture<Void> mqttFuture = connectMqtt()
@ -110,7 +112,7 @@ public class IotDeviceUpstreamServer {
.thenAccept(v -> {
// 2.1 添加 MQTT 断开重连监听器
client.closeHandler(closeEvent -> {
log.warn("[closeHandler][MQTT连接已断开,准备重连]");
log.warn("[closeHandler][MQTT 连接已断开,准备重连]");
reconnectWithDelay();
});
// 2. 设置 MQTT 消息处理器
@ -135,7 +137,7 @@ public class IotDeviceUpstreamServer {
*/
private void setupMessageHandler() {
client.publishHandler(mqttMessageHandler::handle);
log.debug("[setupMessageHandler][MQTT消息处理器设置完成]");
log.debug("[setupMessageHandler][MQTT 消息处理器设置完成]");
}
/**
@ -159,22 +161,20 @@ public class IotDeviceUpstreamServer {
* @return 连接结果的Future
*/
private Future<Void> connectMqtt() {
// 检查必要的MQTT配置
// 检查必要的 MQTT 配置
String host = emqxProperties.getMqttHost();
Integer port = emqxProperties.getMqttPort();
if (host == null) {
String msg = "[connectMqtt][MQTT Host为null无法连接]";
String msg = "[connectMqtt][MQTT Host null无法连接]";
log.error(msg);
return Future.failedFuture(new IllegalStateException(msg));
}
if (port == null) {
log.warn("[connectMqtt][MQTT Port为null使用默认端口1883]");
port = 1883; // 默认MQTT端口
log.warn("[connectMqtt][MQTT Port null使用默认端口 1883]");
port = 1883; // 默认 MQTT 端口
}
final Integer finalPort = port; // 为lambda表达式创建final变量
final Integer finalPort = port;
return client.connect(finalPort, host)
.compose(connAck -> {
log.info("[connectMqtt][MQTT客户端连接成功]");
@ -195,19 +195,15 @@ public class IotDeviceUpstreamServer {
private Future<Void> subscribeToTopics() {
String[] topics = emqxProperties.getMqttTopics();
if (ArrayUtil.isEmpty(topics)) {
log.warn("[subscribeToTopics][未配置MQTT主题或为null使用默认主题]");
// 默认订阅所有设备上下行主题
topics = new String[]{"/device/#"};
log.warn("[subscribeToTopics][未配置 MQTT 主题或为 null使用默认主题]");
topics = new String[]{"/device/#"}; // 默认订阅所有设备上下行主题
}
log.info("[subscribeToTopics][开始订阅设备上行消息主题]");
Future<Void> compositeFuture = Future.succeededFuture();
for (String topic : topics) {
if (topic == null) {
continue; // 跳过null主题
}
String trimmedTopic = topic.trim();
if (trimmedTopic.isEmpty()) {
String trimmedTopic = StrUtil.trim(topic);
if (StrUtil.isBlank(trimmedTopic)) {
continue;
}
compositeFuture = compositeFuture.compose(v -> client.subscribe(trimmedTopic, DEFAULT_QOS.value())

View File

@ -19,6 +19,7 @@ import org.springframework.context.event.EventListener;
import java.lang.management.ManagementFactory;
// TODO @haohao类似 IotComponentEmqxAutoConfiguration 的建议
/**
* IoT 组件 HTTP 的自动配置类
*

View File

@ -5,7 +5,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;
/**
* IoT HTTP组件配置属性
* IoT HTTP 组件配置属性
*/
@ConfigurationProperties(prefix = "yudao.iot.component.http")
@Validated