From 890d304340f1d20c52cf5288d47956eaed5814ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E6=B5=A9=E6=B5=A9?= <1036606149@qq.com> Date: Wed, 15 Jan 2025 22:37:07 +0800 Subject: [PATCH] =?UTF-8?q?=E3=80=90=E5=8A=9F=E8=83=BD=E5=AE=8C=E5=96=84?= =?UTF-8?q?=E3=80=91IoT:=20=E6=9B=B4=E6=96=B0=20Vert.x=20=E7=89=88?= =?UTF-8?q?=E6=9C=AC=E8=87=B3=204.5.1=EF=BC=8C=E6=96=B0=E5=A2=9E=20EMQX=20?= =?UTF-8?q?=E6=8F=92=E4=BB=B6=E5=8F=8A=E5=85=B6=E7=9B=B8=E5=85=B3=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=EF=BC=8C=E9=87=8D=E6=9E=84=20MQTT=20=E6=8F=92?= =?UTF-8?q?=E4=BB=B6=E4=BB=A5=E6=94=AF=E6=8C=81=20Vert.x=20MQTT=20?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=99=A8=EF=BC=8C=E4=BC=98=E5=8C=96=E6=8F=92?= =?UTF-8?q?=E4=BB=B6=E5=90=AF=E5=8A=A8=E5=92=8C=E5=81=9C=E6=AD=A2=E9=80=BB?= =?UTF-8?q?=E8=BE=91=EF=BC=8C=E6=9B=B4=E6=96=B0=E6=8F=92=E4=BB=B6=E6=8F=8F?= =?UTF-8?q?=E8=BF=B0=E4=BF=A1=E6=81=AF=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yudao-dependencies/pom.xml | 6 + .../plugin.properties | 6 + .../yudao-module-iot-emqx-plugin/pom.xml | 164 +++++++++++++ .../src/main/assembly/assembly.xml | 31 +++ .../yudao/module/iot/plugin/EmqxPlugin.java | 45 ++++ .../yudao-module-iot-http-plugin/pom.xml | 13 +- .../plugin.properties | 7 +- .../yudao-module-iot-mqtt-plugin/pom.xml | 7 +- .../yudao/module/iot/plugin/MqttPlugin.java | 39 ++- .../iot/plugin/MqttServerExtension.java | 231 ++++++++++++++++++ 10 files changed, 508 insertions(+), 41 deletions(-) create mode 100644 yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/plugin.properties create mode 100644 yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/pom.xml create mode 100644 yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/assembly/assembly.xml create mode 100644 yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/EmqxPlugin.java create mode 100644 yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttServerExtension.java diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index fb3cf8562d..0a9d0bf454 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -626,6 +626,12 @@ vertx-web ${vertx.version} + + + io.vertx + vertx-mqtt + ${vertx.version} + diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/plugin.properties b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/plugin.properties new file mode 100644 index 0000000000..a23bafcf79 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/plugin.properties @@ -0,0 +1,6 @@ +plugin.id=emqx-plugin +plugin.class=cn.iocoder.yudao.module.iot.plugin.EmqxPlugin +plugin.version=0.0.1 +plugin.provider=ahh +plugin.dependencies= +plugin.description=emqx-plugin-0.0.1 diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/pom.xml b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/pom.xml new file mode 100644 index 0000000000..43d67f5207 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/pom.xml @@ -0,0 +1,164 @@ + + + + yudao-module-iot-plugin + cn.iocoder.boot + ${revision} + + 4.0.0 + jar + + yudao-module-iot-emqx-plugin + + ${project.artifactId} + + 物联网 插件模块 - emqx 插件 + + + + + emqx-plugin + cn.iocoder.yudao.module.iot.plugin.EmqxPlugin + 0.0.1 + ahh + emqx-plugin-0.0.1 + + + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + 1.6 + + + unzip jar file + package + + + + + + + run + + + + + + + maven-assembly-plugin + 2.3 + + + + src/main/assembly/assembly.xml + + + false + + + + make-assembly + package + + attached + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + + ${plugin.id} + ${plugin.class} + ${plugin.version} + ${plugin.provider} + ${plugin.description} + ${plugin.dependencies} + + + + + + + maven-deploy-plugin + + true + + + + + + + + + org.springframework.boot + spring-boot-starter-web + + + + org.pf4j + pf4j-spring + provided + + + + cn.iocoder.boot + yudao-module-iot-api + ${revision} + + + org.projectlombok + lombok + ${lombok.version} + provided + + + + io.vertx + vertx-core + + + + io.vertx + vertx-web + + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + + + \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/assembly/assembly.xml b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/assembly/assembly.xml new file mode 100644 index 0000000000..daec9e4315 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/assembly/assembly.xml @@ -0,0 +1,31 @@ + + plugin + + zip + + false + + + false + runtime + lib + + *:jar:* + + + + + + + target/plugin-classes + classes + + + diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/EmqxPlugin.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/EmqxPlugin.java new file mode 100644 index 0000000000..e64695b06d --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-emqx-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/EmqxPlugin.java @@ -0,0 +1,45 @@ +package cn.iocoder.yudao.module.iot.plugin; + +import cn.iocoder.yudao.module.iot.api.ServiceRegistry; +import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi; +import lombok.extern.slf4j.Slf4j; +import org.pf4j.Plugin; +import org.pf4j.PluginWrapper; + +import javax.annotation.Resource; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +public class EmqxPlugin extends Plugin { + + private ExecutorService executorService; + @Resource + private DeviceDataApi deviceDataApi; + + public EmqxPlugin(PluginWrapper wrapper) { + super(wrapper); + this.executorService = Executors.newSingleThreadExecutor(); + } + + @Override + public void start() { + log.info("EmqxPlugin.start()"); + + if (executorService.isShutdown() || executorService.isTerminated()) { + executorService = Executors.newSingleThreadExecutor(); + } + + deviceDataApi = ServiceRegistry.getService(DeviceDataApi.class); + if (deviceDataApi == null) { + log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!"); + return; + } + + } + + @Override + public void stop() { + log.info("EmqxPlugin.stop()"); + } +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/pom.xml b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/pom.xml index 29c0200f1c..22cb439681 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/pom.xml +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-http-plugin/pom.xml @@ -147,20 +147,11 @@ ${lombok.version} provided - - - io.vertx - vertx-core - - + io.vertx vertx-web - - - - org.eclipse.paho - org.eclipse.paho.client.mqttv3 + 4.5.11 \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/plugin.properties b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/plugin.properties index 31050c5bac..939e0f6929 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/plugin.properties +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/plugin.properties @@ -1,6 +1,7 @@ plugin.id=mqtt-plugin +plugin.description=Vert.x MQTT plugin plugin.class=cn.iocoder.yudao.module.iot.plugin.MqttPlugin -plugin.version=0.0.1 +plugin.version=1.0.0 +plugin.requires= plugin.provider=ahh -plugin.dependencies= -plugin.description=mqtt-plugin-0.0.1 +plugin.license=Apache-2.0 diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/pom.xml b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/pom.xml index 9607e0f93c..462fbd0901 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/pom.xml +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/pom.xml @@ -145,10 +145,11 @@ ${lombok.version} provided - + - org.eclipse.paho - org.eclipse.paho.client.mqttv3 + io.vertx + vertx-mqtt + 4.5.11 \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java index b3749e4025..54ff31f36b 100644 --- a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttPlugin.java @@ -1,45 +1,36 @@ package cn.iocoder.yudao.module.iot.plugin; -import cn.iocoder.yudao.module.iot.api.ServiceRegistry; -import cn.iocoder.yudao.module.iot.api.device.DeviceDataApi; import lombok.extern.slf4j.Slf4j; import org.pf4j.Plugin; import org.pf4j.PluginWrapper; -import javax.annotation.Resource; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - @Slf4j public class MqttPlugin extends Plugin { - private ExecutorService executorService; - @Resource - private DeviceDataApi deviceDataApi; + private MqttServerExtension mqttServerExtension; public MqttPlugin(PluginWrapper wrapper) { super(wrapper); - this.executorService = Executors.newSingleThreadExecutor(); } @Override public void start() { - log.info("MqttPlugin.start()"); - - if (executorService.isShutdown() || executorService.isTerminated()) { - executorService = Executors.newSingleThreadExecutor(); - } - - deviceDataApi = ServiceRegistry.getService(DeviceDataApi.class); - if (deviceDataApi == null) { - log.error("未能从 ServiceRegistry 获取 DeviceDataApi 实例,请确保主程序已正确注册!"); - return; - } - + log.info("MQTT Plugin started."); + mqttServerExtension = new MqttServerExtension(); + mqttServerExtension.startMqttServer(); } @Override public void stop() { - log.info("MqttPlugin.stop()"); + log.info("MQTT Plugin stopped."); + if (mqttServerExtension != null) { + mqttServerExtension.stopMqttServer().onComplete(ar -> { + if (ar.succeeded()) { + log.info("Stopped MQTT Server successfully"); + } else { + log.error("Failed to stop MQTT Server: {}", ar.cause().getMessage()); + } + }); + } } -} \ No newline at end of file +} diff --git a/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttServerExtension.java b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttServerExtension.java new file mode 100644 index 0000000000..868d238ee9 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-plugin/yudao-module-iot-mqtt-plugin/src/main/java/cn/iocoder/yudao/module/iot/plugin/MqttServerExtension.java @@ -0,0 +1,231 @@ +package cn.iocoder.yudao.module.iot.plugin; + +import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.mqtt.MqttEndpoint; +import io.vertx.mqtt.MqttServer; +import io.vertx.mqtt.MqttServerOptions; +import io.vertx.mqtt.MqttTopicSubscription; +import io.vertx.mqtt.messages.MqttDisconnectMessage; +import io.vertx.mqtt.messages.MqttPublishMessage; +import io.vertx.mqtt.messages.MqttSubscribeMessage; +import io.vertx.mqtt.messages.MqttUnsubscribeMessage; +import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode; +import lombok.extern.slf4j.Slf4j; +import org.pf4j.Extension; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +/** + * 根据官方示例,整合常见 MQTT 功能到 PF4J 的 Extension 类中 + */ +@Slf4j +@Extension +public class MqttServerExtension { + + private Vertx vertx; + private MqttServer mqttServer; + + /** + * 启动 MQTT 服务端 + * 可根据需要决定是否启用 SSL/TLS、WebSocket、多实例部署等 + */ + public void startMqttServer() { + // 初始化 Vert.x + vertx = Vertx.vertx(); + + // ========== 如果需要 SSL/TLS,请参考下面注释,启用注释并替换端口、证书路径等 ========== + // MqttServerOptions options = new MqttServerOptions() + // .setPort(8883) + // .setKeyCertOptions(new PemKeyCertOptions() + // .setKeyPath("./src/test/resources/tls/server-key.pem") + // .setCertPath("./src/test/resources/tls/server-cert.pem")) + // .setSsl(true); + + // ========== 如果需要 WebSocket,请设置 setUseWebSocket(true) ========== + // options.setUseWebSocket(true); + + // ========== 默认不启用 SSL 的示例 ========== + MqttServerOptions options = new MqttServerOptions() + .setPort(1883) + .setHost("0.0.0.0") + .setUseWebSocket(false); // 如果需要 WebSocket,请改为 true + + mqttServer = MqttServer.create(vertx, options); + + // 指定 endpointHandler,处理客户端连接等 + mqttServer.endpointHandler(endpoint -> { + handleClientConnect(endpoint); + handleDisconnect(endpoint); + handleSubscribe(endpoint); + handleUnsubscribe(endpoint); + handlePublish(endpoint); + handlePing(endpoint); + }); + + // 启动监听 + mqttServer.listen(ar -> { + if (ar.succeeded()) { + log.info("MQTT server is listening on port {}", mqttServer.actualPort()); + } else { + log.error("Error on starting the server", ar.cause()); + } + }); + } + + /** + * 优雅关闭 MQTT 服务端 + */ + public Future stopMqttServer() { + if (mqttServer != null) { + return mqttServer.close().onComplete(ar -> { + if (ar.succeeded()) { + log.info("MQTT server closed."); + if (vertx != null) { + vertx.close(); + log.info("Vert.x instance closed."); + } + } else { + log.error("Failed to close MQTT server: {}", ar.cause().getMessage()); + } + }); + } + return Future.succeededFuture(); + } + + // ==================== 以下为官方示例中常见事件的处理封装 ==================== + + /** + * 处理客户端连接 (CONNECT) + */ + private void handleClientConnect(MqttEndpoint endpoint) { + // 打印 CONNECT 的主要信息 + log.info("MQTT client [{}] request to connect, clean session = {}", + endpoint.clientIdentifier(), endpoint.isCleanSession()); + + if (endpoint.auth() != null) { + log.info("[username = {}, password = {}]", endpoint.auth().getUsername(), endpoint.auth().getPassword()); + } + log.info("[properties = {}]", endpoint.connectProperties()); + + if (endpoint.will() != null) { + log.info("[will topic = {}, msg = {}, QoS = {}, isRetain = {}]", + endpoint.will().getWillTopic(), + new String(endpoint.will().getWillMessageBytes()), + endpoint.will().getWillQos(), + endpoint.will().isWillRetain()); + } + + log.info("[keep alive timeout = {}]", endpoint.keepAliveTimeSeconds()); + + // 接受远程客户端的连接 + endpoint.accept(false); + } + + /** + * 处理客户端主动断开 (DISCONNECT) + */ + private void handleDisconnect(MqttEndpoint endpoint) { + endpoint.disconnectMessageHandler((MqttDisconnectMessage disconnectMessage) -> { + log.info("Received disconnect from client [{}], reason code = {}", + endpoint.clientIdentifier(), disconnectMessage.code()); + }); + } + + /** + * 处理客户端订阅 (SUBSCRIBE) + */ + private void handleSubscribe(MqttEndpoint endpoint) { + endpoint.subscribeHandler((MqttSubscribeMessage subscribe) -> { + List reasonCodes = new ArrayList<>(); + for (MqttTopicSubscription s : subscribe.topicSubscriptions()) { + log.info("Subscription for {} with QoS {}", s.topicName(), s.qualityOfService()); + // 将客户端请求的 QoS 转换为返回给客户端的 reason code(可能是错误码或实际 granted QoS) + reasonCodes.add(MqttSubAckReasonCode.qosGranted(s.qualityOfService())); + } + // 回复 SUBACK,MQTT 5.0 时可指定 reasonCodes、properties + endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES); + }); + } + + /** + * 处理客户端取消订阅 (UNSUBSCRIBE) + */ + private void handleUnsubscribe(MqttEndpoint endpoint) { + endpoint.unsubscribeHandler((MqttUnsubscribeMessage unsubscribe) -> { + for (String topic : unsubscribe.topics()) { + log.info("Unsubscription for {}", topic); + } + // 回复 UNSUBACK,MQTT 5.0 时可指定 reasonCodes、properties + endpoint.unsubscribeAcknowledge(unsubscribe.messageId()); + }); + } + + /** + * 处理客户端发布的消息 (PUBLISH) + */ + private void handlePublish(MqttEndpoint endpoint) { + // 接收 PUBLISH 消息 + endpoint.publishHandler((MqttPublishMessage message) -> { + String payload = message.payload().toString(Charset.defaultCharset()); + log.info("Received message [{}] on topic [{}] with QoS [{}]", + payload, message.topicName(), message.qosLevel()); + + // 根据不同 QoS,回复客户端 + if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) { + endpoint.publishAcknowledge(message.messageId()); + } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) { + endpoint.publishReceived(message.messageId()); + } + }); + + // 如果 QoS = 2,需要处理 PUBREL + endpoint.publishReleaseHandler(messageId -> { + endpoint.publishComplete(messageId); + }); + } + + /** + * 处理客户端 PINGREQ + */ + private void handlePing(MqttEndpoint endpoint) { + endpoint.pingHandler(v -> { + // 这里仅做日志, PINGRESP 已自动发送 + log.info("Ping received from client [{}]", endpoint.clientIdentifier()); + }); + } + + // ==================== 如果需要服务端向客户端发布消息,可用以下示例 ==================== + + /** + * 服务端主动向已连接的某个 endpoint 发布消息的示例 + * 如果使用 MQTT 5.0,可以传递更多消息属性 + */ + public void publishToClient(MqttEndpoint endpoint, String topic, String content) { + endpoint.publish(topic, + Buffer.buffer(content), + MqttQoS.AT_LEAST_ONCE, // QoS 自行选择 + false, + false); + + // 处理 QoS 1 和 QoS 2 的 ACK + endpoint.publishAcknowledgeHandler(messageId -> { + log.info("Received PUBACK from client [{}] for messageId = {}", endpoint.clientIdentifier(), messageId); + }).publishReceivedHandler(messageId -> { + endpoint.publishRelease(messageId); + }).publishCompletionHandler(messageId -> { + log.info("Received PUBCOMP from client [{}] for messageId = {}", endpoint.clientIdentifier(), messageId); + }); + } + + // ==================== 如果需要多实例部署,用于多核扩展,可参考以下思路 ==================== + // 例如,在宿主应用或插件中循环启动多个 MqttServerExtension 实例,或使用 Vert.x 的 deployVerticle: + // DeploymentOptions options = new DeploymentOptions().setInstances(10); + // vertx.deployVerticle(() -> new MyMqttVerticle(), options); + +}