diff --git a/plugins/yudao-module-iot-http-plugin-2.2.0-snapshot.jar b/plugins/yudao-module-iot-http-plugin-2.2.0-snapshot.jar index fa75769049..8b5e72b4a4 100644 Binary files a/plugins/yudao-module-iot-http-plugin-2.2.0-snapshot.jar and b/plugins/yudao-module-iot-http-plugin-2.2.0-snapshot.jar differ diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index fb3cf8562d..01e2f14547 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -67,7 +67,7 @@ 3.0.6 1.2.5 0.9.0 - 4.4.0 + 4.5.11 3.5.0 4.11.0 @@ -626,6 +626,12 @@ vertx-web ${vertx.version} + + + io.vertx + vertx-mqtt + ${vertx.version} + diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/ServiceRegistry.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/ServiceRegistry.java deleted file mode 100644 index a914e8029f..0000000000 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/api/ServiceRegistry.java +++ /dev/null @@ -1,37 +0,0 @@ -package cn.iocoder.yudao.module.iot.api; - -import java.util.HashMap; -import java.util.Map; - -// TODO 芋艿:纠结下 -/** - * 服务注册表 - 插架模块使用,无法使用 Spring 注入 - */ -public class ServiceRegistry { - - private static final Map, Object> services = new HashMap<>(); - - /** - * 注册服务 - * - * @param serviceClass 服务类 - * @param serviceImpl 服务实现 - * @param 服务类 - */ - public static void registerService(Class serviceClass, T serviceImpl) { - services.put(serviceClass, serviceImpl); - } - - /** - * 获得服务 - * - * @param serviceClass 服务类 - * @param 服务类 - * @return 服务实现 - */ - @SuppressWarnings("unchecked") - public static T getService(Class serviceClass) { - return (T) services.get(serviceClass); - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/RpcRequest.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/RpcRequest.java deleted file mode 100644 index b2a9f03607..0000000000 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/RpcRequest.java +++ /dev/null @@ -1,39 +0,0 @@ -package cn.iocoder.yudao.module.iot.mqttrpc.common; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -// TODO @芋艿:要不要加个 mqtt 值了的前缀 -/** - * MQTT RPC 请求 - */ -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class RpcRequest { - - /** - * 方法名 - */ - private String method; - - /** - * 参数 - */ - // TODO @haohao:object 对象会不会不好序列化? - private Object[] params; - - /** - * 关联 ID - */ - private String correlationId; - - /** - * 回复地址 - */ - private String replyTo; - -} diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/RpcResponse.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/RpcResponse.java deleted file mode 100644 index f3225d08e7..0000000000 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/RpcResponse.java +++ /dev/null @@ -1,33 +0,0 @@ -package cn.iocoder.yudao.module.iot.mqttrpc.common; - -import lombok.AllArgsConstructor; -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -/** - * MQTT RPC 响应 - */ -@Data -@Builder -@NoArgsConstructor -@AllArgsConstructor -public class RpcResponse { - - /** - * 关联 ID - */ - private String correlationId; - - /** - * 结果 - */ - // TODO @haohao:object 对象会不会不好反序列化? - private Object result; - - /** - * 错误 - */ - private String error; - -} diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/SerializationUtils.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/SerializationUtils.java deleted file mode 100644 index 620b007635..0000000000 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/common/SerializationUtils.java +++ /dev/null @@ -1,19 +0,0 @@ -package cn.iocoder.yudao.module.iot.mqttrpc.common; - -import cn.hutool.json.JSONUtil; - -/** - * 序列化工具类 - * - */ -public class SerializationUtils { - - public static String serialize(Object obj) { - return JSONUtil.toJsonStr(obj); - } - - public static T deserialize(String json, Class clazz) { - return JSONUtil.toBean(json, clazz); - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/plugin/PluginStart.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/plugin/PluginStart.java index 2cb688cfa5..96ca833690 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/plugin/PluginStart.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/plugin/PluginStart.java @@ -1,22 +1,20 @@ package cn.iocoder.yudao.module.iot.framework.plugin; -import java.util.List; - -import javax.annotation.Resource; - +import cn.hutool.core.collection.CollUtil; +import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; +import cn.iocoder.yudao.module.iot.dal.dataobject.plugininfo.PluginInfoDO; +import cn.iocoder.yudao.module.iot.enums.plugin.IotPluginStatusEnum; +import cn.iocoder.yudao.module.iot.service.plugin.PluginInfoService; +import lombok.extern.slf4j.Slf4j; import org.pf4j.spring.SpringPluginManager; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; -import lombok.extern.slf4j.Slf4j; - -import cn.iocoder.yudao.module.iot.service.plugin.PluginInfoService; -import cn.hutool.core.collection.CollUtil; -import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; -import cn.iocoder.yudao.module.iot.dal.dataobject.plugininfo.PluginInfoDO; -import cn.iocoder.yudao.module.iot.enums.plugin.IotPluginStatusEnum; +import javax.annotation.Resource; +import java.util.List; +// TODO @芋艿:需要 review 下 @Component @Slf4j public class PluginStart implements ApplicationRunner { diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/plugin/UnifiedConfiguration.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/plugin/UnifiedConfiguration.java index 374e3856a1..150051ce58 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/plugin/UnifiedConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/plugin/UnifiedConfiguration.java @@ -1,42 +1,35 @@ package cn.iocoder.yudao.module.iot.framework.plugin; -import cn.iocoder.yudao.module.iot.api.ServiceRegistry; -import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi; import cn.iocoder.yudao.module.iot.framework.plugin.listener.CustomPluginStateListener; import lombok.extern.slf4j.Slf4j; import org.pf4j.spring.SpringPluginManager; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.DependsOn; -import javax.annotation.Resource; +import java.nio.file.Paths; +// TODO @芋艿:需要 review 下 @Slf4j @Configuration public class UnifiedConfiguration { - private static final String SERVICE_REGISTRY_INITIALIZED_MARKER = "serviceRegistryInitializedMarker"; - - @Resource - private DeviceDataApi deviceDataApi; - - @Bean(SERVICE_REGISTRY_INITIALIZED_MARKER) - public Object serviceRegistryInitializedMarker() { - ServiceRegistry.registerService(DeviceDataApi.class, deviceDataApi); - log.info("[init][将 DeviceDataApi 实例注册到 ServiceRegistry 中]"); - return new Object(); - } + @Value("${pf4j.pluginsDir:pluginsDir}") + private String pluginsDir; @Bean - @DependsOn(SERVICE_REGISTRY_INITIALIZED_MARKER) +// @DependsOn("deviceDataApiImpl") public SpringPluginManager pluginManager() { log.info("[init][实例化 SpringPluginManager]"); - SpringPluginManager springPluginManager = new SpringPluginManager() { + SpringPluginManager springPluginManager = new SpringPluginManager(Paths.get(pluginsDir)) { +// SpringPluginManager springPluginManager = new SpringPluginManager() { + @Override public void startPlugins() { // 禁用插件启动,避免插件启动时,启动所有插件 log.info("[init][禁用默认启动所有插件]"); } + }; springPluginManager.addPluginStateListener(new CustomPluginStateListener()); return springPluginManager; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/plugin/listener/CustomPluginStateListener.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/plugin/listener/CustomPluginStateListener.java index c0802d7f57..4542868b03 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/plugin/listener/CustomPluginStateListener.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/framework/plugin/listener/CustomPluginStateListener.java @@ -5,6 +5,7 @@ import org.pf4j.PluginStateEvent; import org.pf4j.PluginStateListener; import org.springframework.stereotype.Component; +// TODO @芋艿:需要 review 下 @Component @Slf4j public class CustomPluginStateListener implements PluginStateListener { diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java deleted file mode 100644 index c7a0500030..0000000000 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java +++ /dev/null @@ -1,40 +0,0 @@ -package cn.iocoder.yudao.module.iot.mqttrpc.config; - -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Configuration; - -@Data -@Configuration -@ConfigurationProperties(prefix = "mqtt") -public class MqttConfig { - /** - * MQTT 代理地址 - */ - private String broker; - - /** - * MQTT 用户名 - */ - private String username; - - /** - * MQTT 密码 - */ - private String password; - - /** - * MQTT 客户端 ID - */ - private String clientId; - - /** - * MQTT 请求主题 - */ - private String requestTopic; - - /** - * MQTT 响应主题前缀 - */ - private String responseTopicPrefix; -} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/server/RpcServer.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/server/RpcServer.java deleted file mode 100644 index 90ce2a3875..0000000000 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/server/RpcServer.java +++ /dev/null @@ -1,100 +0,0 @@ -package cn.iocoder.yudao.module.iot.mqttrpc.server; - -import cn.hutool.core.lang.UUID; -import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcRequest; -import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcResponse; -import cn.iocoder.yudao.module.iot.mqttrpc.common.SerializationUtils; -import cn.iocoder.yudao.module.iot.mqttrpc.config.MqttConfig; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.*; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.springframework.stereotype.Service; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.util.HashMap; -import java.util.Map; - -// TODO @芋艿:server 逻辑,再瞅瞅; -// TODO @haohao:如果只写在 iot biz 里,那么后续 server => client 貌似不方便?微信再讨论下~; -@Service -@Slf4j -public class RpcServer { - - private final MqttConfig mqttConfig; - private final MqttClient mqttClient; - private final Map methodRegistry = new HashMap<>(); - - public RpcServer(MqttConfig mqttConfig) throws MqttException { - this.mqttConfig = mqttConfig; - this.mqttClient = new MqttClient(mqttConfig.getBroker(), "rpc-server-" + UUID.randomUUID(), new MemoryPersistence()); - MqttConnectOptions options = new MqttConnectOptions(); - options.setAutomaticReconnect(true); - options.setCleanSession(true); - options.setUserName(mqttConfig.getUsername()); - options.setPassword(mqttConfig.getPassword().toCharArray()); - this.mqttClient.connect(options); - } - - @PostConstruct - public void init() throws MqttException { - mqttClient.subscribe(mqttConfig.getRequestTopic(), this::handleRequest); - log.info("RPC Server subscribed to topic: {}", mqttConfig.getRequestTopic()); - } - - private void handleRequest(String topic, MqttMessage message) { - RpcRequest request = SerializationUtils.deserialize(new String(message.getPayload()), RpcRequest.class); - RpcResponse response = new RpcResponse(); - response.setCorrelationId(request.getCorrelationId()); - - try { - MethodInvoker invoker = methodRegistry.get(request.getMethod()); - if (invoker == null) { - throw new NoSuchMethodException("Unknown method: " + request.getMethod()); - } - Object result = invoker.invoke(request.getParams()); - response.setResult(result); - } catch (Exception e) { - response.setError(e.getMessage()); - log.error("Error processing RPC request: {}", e.getMessage(), e); - } - - String replyPayload = SerializationUtils.serialize(response); - MqttMessage replyMessage = new MqttMessage(replyPayload.getBytes()); - replyMessage.setQos(1); - try { - mqttClient.publish(request.getReplyTo(), replyMessage); - log.info("Published response to {}", request.getReplyTo()); - } catch (MqttException e) { - log.error("Failed to publish response: {}", e.getMessage(), e); - } - } - - /** - * 注册可调用的方法 - * - * @param methodName 方法名称 - * @param invoker 方法调用器 - */ - public void registerMethod(String methodName, MethodInvoker invoker) { - methodRegistry.put(methodName, invoker); - log.info("Registered method: {}", methodName); - } - - @PreDestroy - public void cleanup() throws MqttException { - mqttClient.disconnect(); - log.info("RPC Server disconnected"); - } - - /** - * 方法调用器接口 - */ - @FunctionalInterface - public interface MethodInvoker { - - Object invoke(Object[] params) throws Exception; - - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/ExampleService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/ExampleService.java deleted file mode 100644 index 22ebe8b4f2..0000000000 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/plugin/ExampleService.java +++ /dev/null @@ -1,43 +0,0 @@ -package cn.iocoder.yudao.module.iot.service.plugin; - -import cn.iocoder.yudao.module.iot.mqttrpc.server.RpcServer; -import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Service; - -import javax.annotation.PostConstruct; - -@Service -@RequiredArgsConstructor -public class ExampleService { - - private final RpcServer rpcServer; - - @PostConstruct - public void registerMethods() { - rpcServer.registerMethod("add", params -> { - if (params.length != 2) { - throw new IllegalArgumentException("add方法需要两个参数"); - } - int a = ((Number) params[0]).intValue(); - int b = ((Number) params[1]).intValue(); - return add(a, b); - }); - - rpcServer.registerMethod("concat", params -> { - if (params.length != 2) { - throw new IllegalArgumentException("concat方法需要两个参数"); - } - String str1 = params[0].toString(); - String str2 = params[1].toString(); - return concat(str1, str2); - }); - } - - private int add(int a, int b) { - return a + b; - } - - private String concat(String a, String b) { - return a + b; - } -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/plugin.properties b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/plugin.properties new file mode 100644 index 0000000000..a23bafcf79 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/plugin.properties @@ -0,0 +1,6 @@ +plugin.id=emqx-plugin +plugin.class=cn.iocoder.yudao.module.iot.plugin.EmqxPlugin +plugin.version=0.0.1 +plugin.provider=ahh +plugin.dependencies= +plugin.description=emqx-plugin-0.0.1 diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/pom.xml b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/pom.xml new file mode 100644 index 0000000000..43d67f5207 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/pom.xml @@ -0,0 +1,164 @@ + + + + yudao-module-iot-plugin + cn.iocoder.boot + ${revision} + + 4.0.0 + jar + + yudao-module-iot-emqx-plugin + + ${project.artifactId} + + 物联网 插件模块 - emqx 插件 + + + + + emqx-plugin + cn.iocoder.yudao.module.iot.plugin.EmqxPlugin + 0.0.1 + ahh + emqx-plugin-0.0.1 + + + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + 1.6 + + + unzip jar file + package + + + + + + + run + + + + + + + maven-assembly-plugin + 2.3 + + + + src/main/assembly/assembly.xml + + + false + + + + make-assembly + package + + attached + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + ${plugin.id} + ${plugin.class} + ${plugin.version} + ${plugin.provider} + ${plugin.description} + ${plugin.dependencies} + + + + + + + maven-deploy-plugin + + true + + + + + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.pf4j + pf4j-spring + provided + + + + cn.iocoder.boot + yudao-module-iot-api + ${revision} + + + org.projectlombok + lombok + ${lombok.version} + provided + + + + io.vertx + vertx-core + + + + io.vertx + vertx-web + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + + + \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/assembly/assembly.xml b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/assembly/assembly.xml new file mode 100644 index 0000000000..daec9e4315 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/assembly/assembly.xml @@ -0,0 +1,31 @@ + + plugin + + zip + + false + + + false + runtime + lib + + *:jar:* + + + + + + + target/plugin-classes + classes + + + diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/EmqxPlugin.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/EmqxPlugin.java new file mode 100644 index 0000000000..e64695b06d --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/EmqxPlugin.java @@ -0,0 +1,45 @@ +package cn.iocoder.yudao.module.iot.plugin; + +import cn.iocoder.yudao.module.iot.api.ServiceRegistry; +import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi; +import lombok.extern.slf4j.Slf4j; +import org.pf4j.Plugin; +import org.pf4j.PluginWrapper; + +import javax.annotation.Resource; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +public class EmqxPlugin extends Plugin { + + private ExecutorService executorService; + @Resource + private DeviceDataApi deviceDataApi; + + public EmqxPlugin(PluginWrapper wrapper) { + super(wrapper); + this.executorService = Executors.newSingleThreadExecutor(); + } + + @Override + public void start() { + log.info("EmqxPlugin.start()"); + + if (executorService.isShutdown() || executorService.isTerminated()) { + executorService = Executors.newSingleThreadExecutor(); + } + + deviceDataApi = ServiceRegistry.getService(DeviceDataApi.class); + if (deviceDataApi == null) { + log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!"); + return; + } + + } + + @Override + public void stop() { + log.info("EmqxPlugin.stop()"); + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/pom.xml b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/pom.xml index 29c0200f1c..4658a1f6bf 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/pom.xml +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/pom.xml @@ -127,7 +127,7 @@ org.springframework.boot - spring-boot-starter-web + spring-boot-starter @@ -147,20 +147,11 @@ ${lombok.version} provided - - - io.vertx - vertx-core - - + io.vertx vertx-web - - - - org.eclipse.paho - org.eclipse.paho.client.mqttv3 + 4.5.11 \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/HttpPluginSpringbootApplication.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/HttpPluginSpringbootApplication.java index 6b553f92bf..2b871cadea 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/HttpPluginSpringbootApplication.java +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/HttpPluginSpringbootApplication.java @@ -5,7 +5,9 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class HttpPluginSpringbootApplication { + public static void main(String[] args) { SpringApplication.run(HttpPluginSpringbootApplication.class, args); } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/config/TestConfiguration.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/config/TestConfiguration.java new file mode 100644 index 0000000000..b32a1f59fb --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/config/TestConfiguration.java @@ -0,0 +1,34 @@ +package cn.iocoder.yudao.module.iot.config; + +import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi; +import cn.iocoder.yudao.module.iot.api.device.dto.DeviceDataCreateReqDTO; +import cn.iocoder.yudao.module.iot.plugin.HttpVertxPlugin; +import org.pf4j.DefaultPluginManager; +import org.pf4j.PluginWrapper; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +// TODO 芋艿:临时实现; +@Configuration +public class TestConfiguration { + + @Bean + public DeviceDataApi deviceDataApi() { + return new DeviceDataApi() { + + @Override + public void saveDeviceData(DeviceDataCreateReqDTO createDTO) { + System.out.println("saveDeviceData"); + } + + }; + } + + // TODO @haohao:可能要看下,有没更好的方式 + @Bean(initMethod = "start") + public HttpVertxPlugin HttpVertxPlugin() { + PluginWrapper pluginWrapper = new PluginWrapper(new DefaultPluginManager(), null, null, null); + return new HttpVertxPlugin(pluginWrapper); + } + +} diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/controller/RpcController.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/controller/RpcController.java deleted file mode 100644 index 4615dcf96f..0000000000 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/controller/RpcController.java +++ /dev/null @@ -1,38 +0,0 @@ - -package cn.iocoder.yudao.module.iot.controller; - -import cn.iocoder.yudao.module.iot.mqttrpc.client.RpcClient; -import lombok.RequiredArgsConstructor; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; - -import javax.annotation.Resource; -import java.util.concurrent.CompletableFuture; - -// TODO 芋艿:后续 review 下 -/** - * 插件实例 RPC 接口 - * - * @author 芋道源码 - */ -@RestController -@RequestMapping("/rpc") -@RequiredArgsConstructor -public class RpcController { - - @Resource - private RpcClient rpcClient; - - @PostMapping("/add") - public CompletableFuture add(@RequestParam int a, @RequestParam int b) throws Exception { - return rpcClient.call("add", new Object[]{a, b}, 10); - } - - @PostMapping("/concat") - public CompletableFuture concat(@RequestParam String str1, @RequestParam String str2) throws Exception { - return rpcClient.call("concat", new Object[]{str1, str2}, 10); - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/client/RpcClient.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/client/RpcClient.java deleted file mode 100644 index b73f88c537..0000000000 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/client/RpcClient.java +++ /dev/null @@ -1,93 +0,0 @@ -package cn.iocoder.yudao.module.iot.mqttrpc.client; - -import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcRequest; -import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcResponse; -import cn.iocoder.yudao.module.iot.mqttrpc.common.SerializationUtils; -import cn.iocoder.yudao.module.iot.mqttrpc.config.MqttConfig; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.springframework.stereotype.Service; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.util.UUID; -import java.util.concurrent.*; - -// TODO @芋艿:需要考虑,怎么公用! -@Service -@Slf4j -public class RpcClient { - - private final MqttConfig mqttConfig; - private final MqttClient mqttClient; - private final ConcurrentMap> pendingRequests = new ConcurrentHashMap<>(); - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - - public RpcClient(MqttConfig mqttConfig) throws MqttException { - this.mqttConfig = mqttConfig; - this.mqttClient = new MqttClient(mqttConfig.getBroker(), mqttConfig.getClientId(), new MemoryPersistence()); - MqttConnectOptions options = new MqttConnectOptions(); - options.setAutomaticReconnect(true); - options.setCleanSession(true); - options.setUserName(mqttConfig.getUsername()); - options.setPassword(mqttConfig.getPassword().toCharArray()); - this.mqttClient.connect(options); - } - - @PostConstruct - public void init() throws MqttException { - mqttClient.subscribe(mqttConfig.getResponseTopicPrefix() + "#", this::handleResponse); - log.info("RPC Client subscribed to topics: {}", mqttConfig.getResponseTopicPrefix() + "#"); - } - - private void handleResponse(String topic, MqttMessage message) { - String correlationId = topic.substring(mqttConfig.getResponseTopicPrefix().length()); - RpcResponse response = SerializationUtils.deserialize(new String(message.getPayload()), RpcResponse.class); - CompletableFuture future = pendingRequests.remove(correlationId); - if (future != null) { - if (response.getError() != null) { - future.completeExceptionally(new RuntimeException(response.getError())); - } else { - future.complete(response); - } - } else { - log.warn("Received response for unknown correlationId: {}", correlationId); - } - } - - public CompletableFuture call(String method, Object[] params, int timeoutSeconds) throws MqttException { - String correlationId = UUID.randomUUID().toString(); - String replyTo = mqttConfig.getResponseTopicPrefix() + correlationId; - - RpcRequest request = new RpcRequest(method, params, correlationId, replyTo); - String payload = SerializationUtils.serialize(request); - MqttMessage message = new MqttMessage(payload.getBytes()); - message.setQos(1); - mqttClient.publish(mqttConfig.getRequestTopic(), message); - - CompletableFuture futureResponse = new CompletableFuture<>(); - pendingRequests.put(correlationId, futureResponse); - - // 设置超时 - scheduler.schedule(() -> { - CompletableFuture removed = pendingRequests.remove(correlationId); - if (removed != null) { - removed.completeExceptionally(new TimeoutException("RPC call timed out")); - } - }, timeoutSeconds, TimeUnit.SECONDS); - - // 返回最终的结果 - return futureResponse.thenApply(RpcResponse::getResult); - } - - @PreDestroy - public void cleanup() throws MqttException { - mqttClient.disconnect(); - scheduler.shutdown(); - log.info("RPC Client disconnected"); - } -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java deleted file mode 100644 index 89569b0c3d..0000000000 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java +++ /dev/null @@ -1,41 +0,0 @@ -package cn.iocoder.yudao.module.iot.mqttrpc.config; - -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Configuration; - -@Data -@Configuration -@ConfigurationProperties(prefix = "mqtt") -public class MqttConfig { - - /** - * MQTT 代理地址 - */ - private String broker; - - /** - * MQTT 用户名 - */ - private String username; - - /** - * MQTT 密码 - */ - private String password; - - /** - * MQTT 客户端 ID - */ - private String clientId; - - /** - * MQTT 请求主题 - */ - private String requestTopic; - - /** - * MQTT 响应主题前缀 - */ - private String responseTopicPrefix; -} diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/HttpVertxPlugin.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/HttpVertxPlugin.java index 1d6fcad92b..54d9c7c2bc 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/HttpVertxPlugin.java +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/HttpVertxPlugin.java @@ -1,22 +1,22 @@ package cn.iocoder.yudao.module.iot.plugin; -import cn.iocoder.yudao.module.iot.api.ServiceRegistry; +import cn.hutool.extra.spring.SpringUtil; import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi; import io.vertx.core.Vertx; import io.vertx.ext.web.Router; import io.vertx.ext.web.handler.BodyHandler; +import lombok.extern.slf4j.Slf4j; import org.pf4j.PluginWrapper; import org.pf4j.spring.SpringPlugin; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; -import lombok.extern.slf4j.Slf4j; - @Slf4j public class HttpVertxPlugin extends SpringPlugin { private static final int PORT = 8092; private Vertx vertx; + private DeviceDataApi deviceDataApi; public HttpVertxPlugin(PluginWrapper wrapper) { @@ -28,7 +28,7 @@ public class HttpVertxPlugin extends SpringPlugin { log.info("HttpVertxPlugin.start()"); // 获取 DeviceDataApi 实例 - deviceDataApi = ServiceRegistry.getService(DeviceDataApi.class); + deviceDataApi = SpringUtil.getBean(DeviceDataApi.class); if (deviceDataApi == null) { log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!"); return; diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/resources/application.yml b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/resources/application.yml index ea2234f83e..9056af48a3 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/resources/application.yml +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/resources/application.yml @@ -1,10 +1,7 @@ -server: - port: 8092 - spring: application: name: yudao-module-iot-http-plugin - + # MQTT-RPC 配置 mqtt: broker: tcp://chaojiniu.top:1883 diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/plugin.properties b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/plugin.properties index 31050c5bac..939e0f6929 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/plugin.properties +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/plugin.properties @@ -1,6 +1,7 @@ plugin.id=mqtt-plugin +plugin.description=Vert.x MQTT plugin plugin.class=cn.iocoder.yudao.module.iot.plugin.MqttPlugin -plugin.version=0.0.1 +plugin.version=1.0.0 +plugin.requires= plugin.provider=ahh -plugin.dependencies= -plugin.description=mqtt-plugin-0.0.1 +plugin.license=Apache-2.0 diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/pom.xml b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/pom.xml index 9607e0f93c..462fbd0901 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/pom.xml +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/pom.xml @@ -145,10 +145,11 @@ ${lombok.version} provided - + - org.eclipse.paho - org.eclipse.paho.client.mqttv3 + io.vertx + vertx-mqtt + 4.5.11 \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java index b3749e4025..54ff31f36b 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java @@ -1,45 +1,36 @@ package cn.iocoder.yudao.module.iot.plugin; -import cn.iocoder.yudao.module.iot.api.ServiceRegistry; -import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi; import lombok.extern.slf4j.Slf4j; import org.pf4j.Plugin; import org.pf4j.PluginWrapper; -import javax.annotation.Resource; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - @Slf4j public class MqttPlugin extends Plugin { - private ExecutorService executorService; - @Resource - private DeviceDataApi deviceDataApi; + private MqttServerExtension mqttServerExtension; public MqttPlugin(PluginWrapper wrapper) { super(wrapper); - this.executorService = Executors.newSingleThreadExecutor(); } @Override public void start() { - log.info("MqttPlugin.start()"); - - if (executorService.isShutdown() || executorService.isTerminated()) { - executorService = Executors.newSingleThreadExecutor(); - } - - deviceDataApi = ServiceRegistry.getService(DeviceDataApi.class); - if (deviceDataApi == null) { - log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!"); - return; - } - + log.info("MQTT Plugin started."); + mqttServerExtension = new MqttServerExtension(); + mqttServerExtension.startMqttServer(); } @Override public void stop() { - log.info("MqttPlugin.stop()"); + log.info("MQTT Plugin stopped."); + if (mqttServerExtension != null) { + mqttServerExtension.stopMqttServer().onComplete(ar -> { + if (ar.succeeded()) { + log.info("Stopped MQTT Server successfully"); + } else { + log.error("Failed to stop MQTT Server: {}", ar.cause().getMessage()); + } + }); + } } -} \ No newline at end of file +} diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttServerExtension.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttServerExtension.java new file mode 100644 index 0000000000..868d238ee9 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttServerExtension.java @@ -0,0 +1,231 @@ +package cn.iocoder.yudao.module.iot.plugin; + +import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.mqtt.MqttEndpoint; +import io.vertx.mqtt.MqttServer; +import io.vertx.mqtt.MqttServerOptions; +import io.vertx.mqtt.MqttTopicSubscription; +import io.vertx.mqtt.messages.MqttDisconnectMessage; +import io.vertx.mqtt.messages.MqttPublishMessage; +import io.vertx.mqtt.messages.MqttSubscribeMessage; +import io.vertx.mqtt.messages.MqttUnsubscribeMessage; +import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode; +import lombok.extern.slf4j.Slf4j; +import org.pf4j.Extension; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +/** + * 根据官方示例,整合常见 MQTT 功能到 PF4J 的 Extension 类中 + */ +@Slf4j +@Extension +public class MqttServerExtension { + + private Vertx vertx; + private MqttServer mqttServer; + + /** + * 启动 MQTT 服务端 + * 可根据需要决定是否启用 SSL/TLS、WebSocket、多实例部署等 + */ + public void startMqttServer() { + // 初始化 Vert.x + vertx = Vertx.vertx(); + + // ========== 如果需要 SSL/TLS,请参考下面注释,启用注释并替换端口、证书路径等 ========== + // MqttServerOptions options = new MqttServerOptions() + // .setPort(8883) + // .setKeyCertOptions(new PemKeyCertOptions() + // .setKeyPath("./src/test/resources/tls/server-key.pem") + // .setCertPath("./src/test/resources/tls/server-cert.pem")) + // .setSsl(true); + + // ========== 如果需要 WebSocket,请设置 setUseWebSocket(true) ========== + // options.setUseWebSocket(true); + + // ========== 默认不启用 SSL 的示例 ========== + MqttServerOptions options = new MqttServerOptions() + .setPort(1883) + .setHost("0.0.0.0") + .setUseWebSocket(false); // 如果需要 WebSocket,请改为 true + + mqttServer = MqttServer.create(vertx, options); + + // 指定 endpointHandler,处理客户端连接等 + mqttServer.endpointHandler(endpoint -> { + handleClientConnect(endpoint); + handleDisconnect(endpoint); + handleSubscribe(endpoint); + handleUnsubscribe(endpoint); + handlePublish(endpoint); + handlePing(endpoint); + }); + + // 启动监听 + mqttServer.listen(ar -> { + if (ar.succeeded()) { + log.info("MQTT server is listening on port {}", mqttServer.actualPort()); + } else { + log.error("Error on starting the server", ar.cause()); + } + }); + } + + /** + * 优雅关闭 MQTT 服务端 + */ + public Future stopMqttServer() { + if (mqttServer != null) { + return mqttServer.close().onComplete(ar -> { + if (ar.succeeded()) { + log.info("MQTT server closed."); + if (vertx != null) { + vertx.close(); + log.info("Vert.x instance closed."); + } + } else { + log.error("Failed to close MQTT server: {}", ar.cause().getMessage()); + } + }); + } + return Future.succeededFuture(); + } + + // ==================== 以下为官方示例中常见事件的处理封装 ==================== + + /** + * 处理客户端连接 (CONNECT) + */ + private void handleClientConnect(MqttEndpoint endpoint) { + // 打印 CONNECT 的主要信息 + log.info("MQTT client [{}] request to connect, clean session = {}", + endpoint.clientIdentifier(), endpoint.isCleanSession()); + + if (endpoint.auth() != null) { + log.info("[username = {}, password = {}]", endpoint.auth().getUsername(), endpoint.auth().getPassword()); + } + log.info("[properties = {}]", endpoint.connectProperties()); + + if (endpoint.will() != null) { + log.info("[will topic = {}, msg = {}, QoS = {}, isRetain = {}]", + endpoint.will().getWillTopic(), + new String(endpoint.will().getWillMessageBytes()), + endpoint.will().getWillQos(), + endpoint.will().isWillRetain()); + } + + log.info("[keep alive timeout = {}]", endpoint.keepAliveTimeSeconds()); + + // 接受远程客户端的连接 + endpoint.accept(false); + } + + /** + * 处理客户端主动断开 (DISCONNECT) + */ + private void handleDisconnect(MqttEndpoint endpoint) { + endpoint.disconnectMessageHandler((MqttDisconnectMessage disconnectMessage) -> { + log.info("Received disconnect from client [{}], reason code = {}", + endpoint.clientIdentifier(), disconnectMessage.code()); + }); + } + + /** + * 处理客户端订阅 (SUBSCRIBE) + */ + private void handleSubscribe(MqttEndpoint endpoint) { + endpoint.subscribeHandler((MqttSubscribeMessage subscribe) -> { + List reasonCodes = new ArrayList<>(); + for (MqttTopicSubscription s : subscribe.topicSubscriptions()) { + log.info("Subscription for {} with QoS {}", s.topicName(), s.qualityOfService()); + // 将客户端请求的 QoS 转换为返回给客户端的 reason code(可能是错误码或实际 granted QoS) + reasonCodes.add(MqttSubAckReasonCode.qosGranted(s.qualityOfService())); + } + // 回复 SUBACK,MQTT 5.0 时可指定 reasonCodes、properties + endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES); + }); + } + + /** + * 处理客户端取消订阅 (UNSUBSCRIBE) + */ + private void handleUnsubscribe(MqttEndpoint endpoint) { + endpoint.unsubscribeHandler((MqttUnsubscribeMessage unsubscribe) -> { + for (String topic : unsubscribe.topics()) { + log.info("Unsubscription for {}", topic); + } + // 回复 UNSUBACK,MQTT 5.0 时可指定 reasonCodes、properties + endpoint.unsubscribeAcknowledge(unsubscribe.messageId()); + }); + } + + /** + * 处理客户端发布的消息 (PUBLISH) + */ + private void handlePublish(MqttEndpoint endpoint) { + // 接收 PUBLISH 消息 + endpoint.publishHandler((MqttPublishMessage message) -> { + String payload = message.payload().toString(Charset.defaultCharset()); + log.info("Received message [{}] on topic [{}] with QoS [{}]", + payload, message.topicName(), message.qosLevel()); + + // 根据不同 QoS,回复客户端 + if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) { + endpoint.publishAcknowledge(message.messageId()); + } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) { + endpoint.publishReceived(message.messageId()); + } + }); + + // 如果 QoS = 2,需要处理 PUBREL + endpoint.publishReleaseHandler(messageId -> { + endpoint.publishComplete(messageId); + }); + } + + /** + * 处理客户端 PINGREQ + */ + private void handlePing(MqttEndpoint endpoint) { + endpoint.pingHandler(v -> { + // 这里仅做日志, PINGRESP 已自动发送 + log.info("Ping received from client [{}]", endpoint.clientIdentifier()); + }); + } + + // ==================== 如果需要服务端向客户端发布消息,可用以下示例 ==================== + + /** + * 服务端主动向已连接的某个 endpoint 发布消息的示例 + * 如果使用 MQTT 5.0,可以传递更多消息属性 + */ + public void publishToClient(MqttEndpoint endpoint, String topic, String content) { + endpoint.publish(topic, + Buffer.buffer(content), + MqttQoS.AT_LEAST_ONCE, // QoS 自行选择 + false, + false); + + // 处理 QoS 1 和 QoS 2 的 ACK + endpoint.publishAcknowledgeHandler(messageId -> { + log.info("Received PUBACK from client [{}] for messageId = {}", endpoint.clientIdentifier(), messageId); + }).publishReceivedHandler(messageId -> { + endpoint.publishRelease(messageId); + }).publishCompletionHandler(messageId -> { + log.info("Received PUBCOMP from client [{}] for messageId = {}", endpoint.clientIdentifier(), messageId); + }); + } + + // ==================== 如果需要多实例部署,用于多核扩展,可参考以下思路 ==================== + // 例如,在宿主应用或插件中循环启动多个 MqttServerExtension 实例,或使用 Vert.x 的 deployVerticle: + // DeploymentOptions options = new DeploymentOptions().setInstances(10); + // vertx.deployVerticle(() -> new MyMqttVerticle(), options); + +}