diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index fb3cf8562d..0a9d0bf454 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -626,6 +626,12 @@ vertx-web ${vertx.version} + + + io.vertx + vertx-mqtt + ${vertx.version} + diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java deleted file mode 100644 index c7a0500030..0000000000 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java +++ /dev/null @@ -1,40 +0,0 @@ -package cn.iocoder.yudao.module.iot.mqttrpc.config; - -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Configuration; - -@Data -@Configuration -@ConfigurationProperties(prefix = "mqtt") -public class MqttConfig { - /** - * MQTT 代理地址 - */ - private String broker; - - /** - * MQTT 用户名 - */ - private String username; - - /** - * MQTT 密码 - */ - private String password; - - /** - * MQTT 客户端 ID - */ - private String clientId; - - /** - * MQTT 请求主题 - */ - private String requestTopic; - - /** - * MQTT 响应主题前缀 - */ - private String responseTopicPrefix; -} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/server/RpcServer.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/server/RpcServer.java deleted file mode 100644 index 90ce2a3875..0000000000 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/server/RpcServer.java +++ /dev/null @@ -1,100 +0,0 @@ -package cn.iocoder.yudao.module.iot.mqttrpc.server; - -import cn.hutool.core.lang.UUID; -import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcRequest; -import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcResponse; -import cn.iocoder.yudao.module.iot.mqttrpc.common.SerializationUtils; -import cn.iocoder.yudao.module.iot.mqttrpc.config.MqttConfig; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.*; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.springframework.stereotype.Service; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.util.HashMap; -import java.util.Map; - -// TODO @芋艿:server 逻辑,再瞅瞅; -// TODO @haohao:如果只写在 iot biz 里,那么后续 server => client 貌似不方便?微信再讨论下~; -@Service -@Slf4j -public class RpcServer { - - private final MqttConfig mqttConfig; - private final MqttClient mqttClient; - private final Map methodRegistry = new HashMap<>(); - - public RpcServer(MqttConfig mqttConfig) throws MqttException { - this.mqttConfig = mqttConfig; - this.mqttClient = new MqttClient(mqttConfig.getBroker(), "rpc-server-" + UUID.randomUUID(), new MemoryPersistence()); - MqttConnectOptions options = new MqttConnectOptions(); - options.setAutomaticReconnect(true); - options.setCleanSession(true); - options.setUserName(mqttConfig.getUsername()); - options.setPassword(mqttConfig.getPassword().toCharArray()); - this.mqttClient.connect(options); - } - - @PostConstruct - public void init() throws MqttException { - mqttClient.subscribe(mqttConfig.getRequestTopic(), this::handleRequest); - log.info("RPC Server subscribed to topic: {}", mqttConfig.getRequestTopic()); - } - - private void handleRequest(String topic, MqttMessage message) { - RpcRequest request = SerializationUtils.deserialize(new String(message.getPayload()), RpcRequest.class); - RpcResponse response = new RpcResponse(); - response.setCorrelationId(request.getCorrelationId()); - - try { - MethodInvoker invoker = methodRegistry.get(request.getMethod()); - if (invoker == null) { - throw new NoSuchMethodException("Unknown method: " + request.getMethod()); - } - Object result = invoker.invoke(request.getParams()); - response.setResult(result); - } catch (Exception e) { - response.setError(e.getMessage()); - log.error("Error processing RPC request: {}", e.getMessage(), e); - } - - String replyPayload = SerializationUtils.serialize(response); - MqttMessage replyMessage = new MqttMessage(replyPayload.getBytes()); - replyMessage.setQos(1); - try { - mqttClient.publish(request.getReplyTo(), replyMessage); - log.info("Published response to {}", request.getReplyTo()); - } catch (MqttException e) { - log.error("Failed to publish response: {}", e.getMessage(), e); - } - } - - /** - * 注册可调用的方法 - * - * @param methodName 方法名称 - * @param invoker 方法调用器 - */ - public void registerMethod(String methodName, MethodInvoker invoker) { - methodRegistry.put(methodName, invoker); - log.info("Registered method: {}", methodName); - } - - @PreDestroy - public void cleanup() throws MqttException { - mqttClient.disconnect(); - log.info("RPC Server disconnected"); - } - - /** - * 方法调用器接口 - */ - @FunctionalInterface - public interface MethodInvoker { - - Object invoke(Object[] params) throws Exception; - - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/plugin.properties b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/plugin.properties new file mode 100644 index 0000000000..a23bafcf79 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/plugin.properties @@ -0,0 +1,6 @@ +plugin.id=emqx-plugin +plugin.class=cn.iocoder.yudao.module.iot.plugin.EmqxPlugin +plugin.version=0.0.1 +plugin.provider=ahh +plugin.dependencies= +plugin.description=emqx-plugin-0.0.1 diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/pom.xml b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/pom.xml new file mode 100644 index 0000000000..43d67f5207 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/pom.xml @@ -0,0 +1,164 @@ + + + + yudao-module-iot-plugin + cn.iocoder.boot + ${revision} + + 4.0.0 + jar + + yudao-module-iot-emqx-plugin + + ${project.artifactId} + + 物联网 插件模块 - emqx 插件 + + + + + emqx-plugin + cn.iocoder.yudao.module.iot.plugin.EmqxPlugin + 0.0.1 + ahh + emqx-plugin-0.0.1 + + + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + 1.6 + + + unzip jar file + package + + + + + + + run + + + + + + + maven-assembly-plugin + 2.3 + + + + src/main/assembly/assembly.xml + + + false + + + + make-assembly + package + + attached + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + ${plugin.id} + ${plugin.class} + ${plugin.version} + ${plugin.provider} + ${plugin.description} + ${plugin.dependencies} + + + + + + + maven-deploy-plugin + + true + + + + + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.pf4j + pf4j-spring + provided + + + + cn.iocoder.boot + yudao-module-iot-api + ${revision} + + + org.projectlombok + lombok + ${lombok.version} + provided + + + + io.vertx + vertx-core + + + + io.vertx + vertx-web + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + + + \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/assembly/assembly.xml b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/assembly/assembly.xml new file mode 100644 index 0000000000..daec9e4315 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/assembly/assembly.xml @@ -0,0 +1,31 @@ + + plugin + + zip + + false + + + false + runtime + lib + + *:jar:* + + + + + + + target/plugin-classes + classes + + + diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/EmqxPlugin.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/EmqxPlugin.java new file mode 100644 index 0000000000..e64695b06d --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/EmqxPlugin.java @@ -0,0 +1,45 @@ +package cn.iocoder.yudao.module.iot.plugin; + +import cn.iocoder.yudao.module.iot.api.ServiceRegistry; +import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi; +import lombok.extern.slf4j.Slf4j; +import org.pf4j.Plugin; +import org.pf4j.PluginWrapper; + +import javax.annotation.Resource; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +public class EmqxPlugin extends Plugin { + + private ExecutorService executorService; + @Resource + private DeviceDataApi deviceDataApi; + + public EmqxPlugin(PluginWrapper wrapper) { + super(wrapper); + this.executorService = Executors.newSingleThreadExecutor(); + } + + @Override + public void start() { + log.info("EmqxPlugin.start()"); + + if (executorService.isShutdown() || executorService.isTerminated()) { + executorService = Executors.newSingleThreadExecutor(); + } + + deviceDataApi = ServiceRegistry.getService(DeviceDataApi.class); + if (deviceDataApi == null) { + log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!"); + return; + } + + } + + @Override + public void stop() { + log.info("EmqxPlugin.stop()"); + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/pom.xml b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/pom.xml index 29c0200f1c..22cb439681 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/pom.xml +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/pom.xml @@ -147,20 +147,11 @@ ${lombok.version} provided - - - io.vertx - vertx-core - - + io.vertx vertx-web - - - - org.eclipse.paho - org.eclipse.paho.client.mqttv3 + 4.5.11 \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/controller/RpcController.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/controller/RpcController.java deleted file mode 100644 index 4615dcf96f..0000000000 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/controller/RpcController.java +++ /dev/null @@ -1,38 +0,0 @@ - -package cn.iocoder.yudao.module.iot.controller; - -import cn.iocoder.yudao.module.iot.mqttrpc.client.RpcClient; -import lombok.RequiredArgsConstructor; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; - -import javax.annotation.Resource; -import java.util.concurrent.CompletableFuture; - -// TODO 芋艿:后续 review 下 -/** - * 插件实例 RPC 接口 - * - * @author 芋道源码 - */ -@RestController -@RequestMapping("/rpc") -@RequiredArgsConstructor -public class RpcController { - - @Resource - private RpcClient rpcClient; - - @PostMapping("/add") - public CompletableFuture add(@RequestParam int a, @RequestParam int b) throws Exception { - return rpcClient.call("add", new Object[]{a, b}, 10); - } - - @PostMapping("/concat") - public CompletableFuture concat(@RequestParam String str1, @RequestParam String str2) throws Exception { - return rpcClient.call("concat", new Object[]{str1, str2}, 10); - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/client/RpcClient.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/client/RpcClient.java deleted file mode 100644 index b73f88c537..0000000000 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/client/RpcClient.java +++ /dev/null @@ -1,93 +0,0 @@ -package cn.iocoder.yudao.module.iot.mqttrpc.client; - -import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcRequest; -import cn.iocoder.yudao.module.iot.mqttrpc.common.RpcResponse; -import cn.iocoder.yudao.module.iot.mqttrpc.common.SerializationUtils; -import cn.iocoder.yudao.module.iot.mqttrpc.config.MqttConfig; -import lombok.extern.slf4j.Slf4j; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.springframework.stereotype.Service; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import java.util.UUID; -import java.util.concurrent.*; - -// TODO @芋艿:需要考虑,怎么公用! -@Service -@Slf4j -public class RpcClient { - - private final MqttConfig mqttConfig; - private final MqttClient mqttClient; - private final ConcurrentMap> pendingRequests = new ConcurrentHashMap<>(); - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - - public RpcClient(MqttConfig mqttConfig) throws MqttException { - this.mqttConfig = mqttConfig; - this.mqttClient = new MqttClient(mqttConfig.getBroker(), mqttConfig.getClientId(), new MemoryPersistence()); - MqttConnectOptions options = new MqttConnectOptions(); - options.setAutomaticReconnect(true); - options.setCleanSession(true); - options.setUserName(mqttConfig.getUsername()); - options.setPassword(mqttConfig.getPassword().toCharArray()); - this.mqttClient.connect(options); - } - - @PostConstruct - public void init() throws MqttException { - mqttClient.subscribe(mqttConfig.getResponseTopicPrefix() + "#", this::handleResponse); - log.info("RPC Client subscribed to topics: {}", mqttConfig.getResponseTopicPrefix() + "#"); - } - - private void handleResponse(String topic, MqttMessage message) { - String correlationId = topic.substring(mqttConfig.getResponseTopicPrefix().length()); - RpcResponse response = SerializationUtils.deserialize(new String(message.getPayload()), RpcResponse.class); - CompletableFuture future = pendingRequests.remove(correlationId); - if (future != null) { - if (response.getError() != null) { - future.completeExceptionally(new RuntimeException(response.getError())); - } else { - future.complete(response); - } - } else { - log.warn("Received response for unknown correlationId: {}", correlationId); - } - } - - public CompletableFuture call(String method, Object[] params, int timeoutSeconds) throws MqttException { - String correlationId = UUID.randomUUID().toString(); - String replyTo = mqttConfig.getResponseTopicPrefix() + correlationId; - - RpcRequest request = new RpcRequest(method, params, correlationId, replyTo); - String payload = SerializationUtils.serialize(request); - MqttMessage message = new MqttMessage(payload.getBytes()); - message.setQos(1); - mqttClient.publish(mqttConfig.getRequestTopic(), message); - - CompletableFuture futureResponse = new CompletableFuture<>(); - pendingRequests.put(correlationId, futureResponse); - - // 设置超时 - scheduler.schedule(() -> { - CompletableFuture removed = pendingRequests.remove(correlationId); - if (removed != null) { - removed.completeExceptionally(new TimeoutException("RPC call timed out")); - } - }, timeoutSeconds, TimeUnit.SECONDS); - - // 返回最终的结果 - return futureResponse.thenApply(RpcResponse::getResult); - } - - @PreDestroy - public void cleanup() throws MqttException { - mqttClient.disconnect(); - scheduler.shutdown(); - log.info("RPC Client disconnected"); - } -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java deleted file mode 100644 index 89569b0c3d..0000000000 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/src/main/java/cn/iocoder/yudao/module/iot/mqttrpc/config/MqttConfig.java +++ /dev/null @@ -1,41 +0,0 @@ -package cn.iocoder.yudao.module.iot.mqttrpc.config; - -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Configuration; - -@Data -@Configuration -@ConfigurationProperties(prefix = "mqtt") -public class MqttConfig { - - /** - * MQTT 代理地址 - */ - private String broker; - - /** - * MQTT 用户名 - */ - private String username; - - /** - * MQTT 密码 - */ - private String password; - - /** - * MQTT 客户端 ID - */ - private String clientId; - - /** - * MQTT 请求主题 - */ - private String requestTopic; - - /** - * MQTT 响应主题前缀 - */ - private String responseTopicPrefix; -} diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/plugin.properties b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/plugin.properties index 31050c5bac..939e0f6929 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/plugin.properties +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/plugin.properties @@ -1,6 +1,7 @@ plugin.id=mqtt-plugin +plugin.description=Vert.x MQTT plugin plugin.class=cn.iocoder.yudao.module.iot.plugin.MqttPlugin -plugin.version=0.0.1 +plugin.version=1.0.0 +plugin.requires= plugin.provider=ahh -plugin.dependencies= -plugin.description=mqtt-plugin-0.0.1 +plugin.license=Apache-2.0 diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/pom.xml b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/pom.xml index 9607e0f93c..462fbd0901 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/pom.xml +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/pom.xml @@ -145,10 +145,11 @@ ${lombok.version} provided - + - org.eclipse.paho - org.eclipse.paho.client.mqttv3 + io.vertx + vertx-mqtt + 4.5.11 \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java index b3749e4025..54ff31f36b 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java @@ -1,45 +1,36 @@ package cn.iocoder.yudao.module.iot.plugin; -import cn.iocoder.yudao.module.iot.api.ServiceRegistry; -import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi; import lombok.extern.slf4j.Slf4j; import org.pf4j.Plugin; import org.pf4j.PluginWrapper; -import javax.annotation.Resource; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - @Slf4j public class MqttPlugin extends Plugin { - private ExecutorService executorService; - @Resource - private DeviceDataApi deviceDataApi; + private MqttServerExtension mqttServerExtension; public MqttPlugin(PluginWrapper wrapper) { super(wrapper); - this.executorService = Executors.newSingleThreadExecutor(); } @Override public void start() { - log.info("MqttPlugin.start()"); - - if (executorService.isShutdown() || executorService.isTerminated()) { - executorService = Executors.newSingleThreadExecutor(); - } - - deviceDataApi = ServiceRegistry.getService(DeviceDataApi.class); - if (deviceDataApi == null) { - log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!"); - return; - } - + log.info("MQTT Plugin started."); + mqttServerExtension = new MqttServerExtension(); + mqttServerExtension.startMqttServer(); } @Override public void stop() { - log.info("MqttPlugin.stop()"); + log.info("MQTT Plugin stopped."); + if (mqttServerExtension != null) { + mqttServerExtension.stopMqttServer().onComplete(ar -> { + if (ar.succeeded()) { + log.info("Stopped MQTT Server successfully"); + } else { + log.error("Failed to stop MQTT Server: {}", ar.cause().getMessage()); + } + }); + } } -} \ No newline at end of file +} diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttServerExtension.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttServerExtension.java new file mode 100644 index 0000000000..868d238ee9 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttServerExtension.java @@ -0,0 +1,231 @@ +package cn.iocoder.yudao.module.iot.plugin; + +import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.mqtt.MqttEndpoint; +import io.vertx.mqtt.MqttServer; +import io.vertx.mqtt.MqttServerOptions; +import io.vertx.mqtt.MqttTopicSubscription; +import io.vertx.mqtt.messages.MqttDisconnectMessage; +import io.vertx.mqtt.messages.MqttPublishMessage; +import io.vertx.mqtt.messages.MqttSubscribeMessage; +import io.vertx.mqtt.messages.MqttUnsubscribeMessage; +import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode; +import lombok.extern.slf4j.Slf4j; +import org.pf4j.Extension; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +/** + * 根据官方示例,整合常见 MQTT 功能到 PF4J 的 Extension 类中 + */ +@Slf4j +@Extension +public class MqttServerExtension { + + private Vertx vertx; + private MqttServer mqttServer; + + /** + * 启动 MQTT 服务端 + * 可根据需要决定是否启用 SSL/TLS、WebSocket、多实例部署等 + */ + public void startMqttServer() { + // 初始化 Vert.x + vertx = Vertx.vertx(); + + // ========== 如果需要 SSL/TLS,请参考下面注释,启用注释并替换端口、证书路径等 ========== + // MqttServerOptions options = new MqttServerOptions() + // .setPort(8883) + // .setKeyCertOptions(new PemKeyCertOptions() + // .setKeyPath("./src/test/resources/tls/server-key.pem") + // .setCertPath("./src/test/resources/tls/server-cert.pem")) + // .setSsl(true); + + // ========== 如果需要 WebSocket,请设置 setUseWebSocket(true) ========== + // options.setUseWebSocket(true); + + // ========== 默认不启用 SSL 的示例 ========== + MqttServerOptions options = new MqttServerOptions() + .setPort(1883) + .setHost("0.0.0.0") + .setUseWebSocket(false); // 如果需要 WebSocket,请改为 true + + mqttServer = MqttServer.create(vertx, options); + + // 指定 endpointHandler,处理客户端连接等 + mqttServer.endpointHandler(endpoint -> { + handleClientConnect(endpoint); + handleDisconnect(endpoint); + handleSubscribe(endpoint); + handleUnsubscribe(endpoint); + handlePublish(endpoint); + handlePing(endpoint); + }); + + // 启动监听 + mqttServer.listen(ar -> { + if (ar.succeeded()) { + log.info("MQTT server is listening on port {}", mqttServer.actualPort()); + } else { + log.error("Error on starting the server", ar.cause()); + } + }); + } + + /** + * 优雅关闭 MQTT 服务端 + */ + public Future stopMqttServer() { + if (mqttServer != null) { + return mqttServer.close().onComplete(ar -> { + if (ar.succeeded()) { + log.info("MQTT server closed."); + if (vertx != null) { + vertx.close(); + log.info("Vert.x instance closed."); + } + } else { + log.error("Failed to close MQTT server: {}", ar.cause().getMessage()); + } + }); + } + return Future.succeededFuture(); + } + + // ==================== 以下为官方示例中常见事件的处理封装 ==================== + + /** + * 处理客户端连接 (CONNECT) + */ + private void handleClientConnect(MqttEndpoint endpoint) { + // 打印 CONNECT 的主要信息 + log.info("MQTT client [{}] request to connect, clean session = {}", + endpoint.clientIdentifier(), endpoint.isCleanSession()); + + if (endpoint.auth() != null) { + log.info("[username = {}, password = {}]", endpoint.auth().getUsername(), endpoint.auth().getPassword()); + } + log.info("[properties = {}]", endpoint.connectProperties()); + + if (endpoint.will() != null) { + log.info("[will topic = {}, msg = {}, QoS = {}, isRetain = {}]", + endpoint.will().getWillTopic(), + new String(endpoint.will().getWillMessageBytes()), + endpoint.will().getWillQos(), + endpoint.will().isWillRetain()); + } + + log.info("[keep alive timeout = {}]", endpoint.keepAliveTimeSeconds()); + + // 接受远程客户端的连接 + endpoint.accept(false); + } + + /** + * 处理客户端主动断开 (DISCONNECT) + */ + private void handleDisconnect(MqttEndpoint endpoint) { + endpoint.disconnectMessageHandler((MqttDisconnectMessage disconnectMessage) -> { + log.info("Received disconnect from client [{}], reason code = {}", + endpoint.clientIdentifier(), disconnectMessage.code()); + }); + } + + /** + * 处理客户端订阅 (SUBSCRIBE) + */ + private void handleSubscribe(MqttEndpoint endpoint) { + endpoint.subscribeHandler((MqttSubscribeMessage subscribe) -> { + List reasonCodes = new ArrayList<>(); + for (MqttTopicSubscription s : subscribe.topicSubscriptions()) { + log.info("Subscription for {} with QoS {}", s.topicName(), s.qualityOfService()); + // 将客户端请求的 QoS 转换为返回给客户端的 reason code(可能是错误码或实际 granted QoS) + reasonCodes.add(MqttSubAckReasonCode.qosGranted(s.qualityOfService())); + } + // 回复 SUBACK,MQTT 5.0 时可指定 reasonCodes、properties + endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES); + }); + } + + /** + * 处理客户端取消订阅 (UNSUBSCRIBE) + */ + private void handleUnsubscribe(MqttEndpoint endpoint) { + endpoint.unsubscribeHandler((MqttUnsubscribeMessage unsubscribe) -> { + for (String topic : unsubscribe.topics()) { + log.info("Unsubscription for {}", topic); + } + // 回复 UNSUBACK,MQTT 5.0 时可指定 reasonCodes、properties + endpoint.unsubscribeAcknowledge(unsubscribe.messageId()); + }); + } + + /** + * 处理客户端发布的消息 (PUBLISH) + */ + private void handlePublish(MqttEndpoint endpoint) { + // 接收 PUBLISH 消息 + endpoint.publishHandler((MqttPublishMessage message) -> { + String payload = message.payload().toString(Charset.defaultCharset()); + log.info("Received message [{}] on topic [{}] with QoS [{}]", + payload, message.topicName(), message.qosLevel()); + + // 根据不同 QoS,回复客户端 + if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) { + endpoint.publishAcknowledge(message.messageId()); + } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) { + endpoint.publishReceived(message.messageId()); + } + }); + + // 如果 QoS = 2,需要处理 PUBREL + endpoint.publishReleaseHandler(messageId -> { + endpoint.publishComplete(messageId); + }); + } + + /** + * 处理客户端 PINGREQ + */ + private void handlePing(MqttEndpoint endpoint) { + endpoint.pingHandler(v -> { + // 这里仅做日志, PINGRESP 已自动发送 + log.info("Ping received from client [{}]", endpoint.clientIdentifier()); + }); + } + + // ==================== 如果需要服务端向客户端发布消息,可用以下示例 ==================== + + /** + * 服务端主动向已连接的某个 endpoint 发布消息的示例 + * 如果使用 MQTT 5.0,可以传递更多消息属性 + */ + public void publishToClient(MqttEndpoint endpoint, String topic, String content) { + endpoint.publish(topic, + Buffer.buffer(content), + MqttQoS.AT_LEAST_ONCE, // QoS 自行选择 + false, + false); + + // 处理 QoS 1 和 QoS 2 的 ACK + endpoint.publishAcknowledgeHandler(messageId -> { + log.info("Received PUBACK from client [{}] for messageId = {}", endpoint.clientIdentifier(), messageId); + }).publishReceivedHandler(messageId -> { + endpoint.publishRelease(messageId); + }).publishCompletionHandler(messageId -> { + log.info("Received PUBCOMP from client [{}] for messageId = {}", endpoint.clientIdentifier(), messageId); + }); + } + + // ==================== 如果需要多实例部署,用于多核扩展,可参考以下思路 ==================== + // 例如,在宿主应用或插件中循环启动多个 MqttServerExtension 实例,或使用 Vert.x 的 deployVerticle: + // DeploymentOptions options = new DeploymentOptions().setInstances(10); + // vertx.deployVerticle(() -> new MyMqttVerticle(), options); + +}