From 0233b092b81220b38007ff6be592f204fac91b10 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Fri, 24 Nov 2023 00:32:52 +0800 Subject: [PATCH] =?UTF-8?q?websocket=EF=BC=9A=E9=87=8D=E6=96=B0=E5=B0=81?= =?UTF-8?q?=E8=A3=85=20websocket=20=E7=BB=84=E4=BB=B6=EF=BC=8C=E6=94=AF?= =?UTF-8?q?=E6=8C=81=20sender=20=E5=B9=BF=E6=92=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pom.xml | 43 +++++++ .../websocket/config/WebSocketProperties.java | 12 ++ .../YudaoWebSocketAutoConfiguration.java | 116 +++++++++++++++++- .../handler/JsonWebSocketMessageHandler.java | 7 +- .../core/message/JsonWebSocketMessage.java | 6 +- .../AbstractWebSocketMessageSender.java | 104 ++++++++++++++++ .../core/sender/WebSocketMessageSender.java | 52 ++++++++ .../sender/kafka/KafkaWebSocketMessage.java | 35 ++++++ .../kafka/KafkaWebSocketMessageConsumer.java | 28 +++++ .../kafka/KafkaWebSocketMessageSender.java | 67 ++++++++++ .../local/LocalWebSocketMessageSender.java | 20 +++ .../rabbitmq/RabbitMQWebSocketMessage.java | 37 ++++++ .../RabbitMQWebSocketMessageConsumer.java | 39 ++++++ .../RabbitMQWebSocketMessageSender.java | 62 ++++++++++ .../sender/redis/RedisWebSocketMessage.java | 34 +++++ .../redis/RedisWebSocketMessageConsumer.java | 23 ++++ .../redis/RedisWebSocketMessageSender.java | 57 +++++++++ .../rocketmq/RocketMQWebSocketMessage.java | 35 ++++++ .../RocketMQWebSocketMessageConsumer.java | 30 +++++ .../RocketMQWebSocketMessageSender.java | 61 +++++++++ .../session/WebSocketSessionManagerImpl.java | 16 ++- .../core/util/WebSocketFrameworkUtils.java | 14 ++- ...ŠèŠ‹é“ Spring Boot WebSocket 入门》.md | 1 + .../api/websocket/WebSocketSenderApi.java | 54 ++++++++ .../api/websocket/WebSocketSenderApiImpl.java | 34 +++++ .../DemoWebSocketMessageListener.java | 48 ++++++++ .../websocket/message/DemoReceiveMessage.java | 27 ++++ .../websocket/message/DemoSendMessage.java | 24 ++++ .../admin/notice/NoticeController.java | 19 +++ .../src/main/resources/application-local.yaml | 4 +- .../src/main/resources/application.yaml | 10 ++ 31 files changed, 1107 insertions(+), 12 deletions(-) create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/AbstractWebSocketMessageSender.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/WebSocketMessageSender.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessage.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageConsumer.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageSender.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/local/LocalWebSocketMessageSender.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessage.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageConsumer.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageSender.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessage.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageConsumer.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageSender.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessage.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageConsumer.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageSender.java create mode 100644 yudao-framework/yudao-spring-boot-starter-websocket/ã€ŠèŠ‹é“ Spring Boot WebSocket 入门》.md create mode 100644 yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApi.java create mode 100644 yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java create mode 100644 yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/DemoWebSocketMessageListener.java create mode 100644 yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoReceiveMessage.java create mode 100644 yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoSendMessage.java diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/pom.xml b/yudao-framework/yudao-spring-boot-starter-websocket/pom.xml index 3e9cecf3cf..b18ee47836 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/pom.xml +++ b/yudao-framework/yudao-spring-boot-starter-websocket/pom.xml @@ -30,12 +30,55 @@ --> cn.iocoder.boot yudao-spring-boot-starter-security + provided org.springframework.boot spring-boot-starter-websocket + + + + + cn.iocoder.boot + yudao-spring-boot-starter-security + provided + + + + + cn.iocoder.boot + yudao-spring-boot-starter-mq + + + org.springframework.kafka + spring-kafka + true + + + org.springframework.amqp + spring-rabbit + true + + + org.apache.rocketmq + rocketmq-spring-boot-starter + true + + + + + + cn.iocoder.boot + yudao-spring-boot-starter-biz-tenant + provided + \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java index 7c1bd5abed..aa618fb04e 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/WebSocketProperties.java @@ -4,6 +4,9 @@ import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.validation.annotation.Validated; +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; + /** * WebSocket é…置项 * @@ -17,6 +20,15 @@ public class WebSocketProperties { /** * WebSocket 的连接路径 */ + @NotEmpty(message = "WebSocket 的连接路径ä¸èƒ½ä¸ºç©º") private String path = "/ws"; + /** + * 消æ¯å‘é€å™¨çš„类型 + * + * å¯é€‰å€¼ï¼šlocalã€redisã€rocketmqã€kafkaã€rabbitmq + */ + @NotNull(message = "WebSocket 的消æ¯å‘é€è€…ä¸èƒ½ä¸ºç©º") + private String senderType = "local"; + } diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java index e116f3c2c5..80692c2227 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/config/YudaoWebSocketAutoConfiguration.java @@ -1,15 +1,31 @@ package cn.iocoder.yudao.framework.websocket.config; +import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; import cn.iocoder.yudao.framework.websocket.core.handler.JsonWebSocketMessageHandler; import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; import cn.iocoder.yudao.framework.websocket.core.security.LoginUserHandshakeInterceptor; +import cn.iocoder.yudao.framework.websocket.core.sender.kafka.KafkaWebSocketMessageConsumer; +import cn.iocoder.yudao.framework.websocket.core.sender.kafka.KafkaWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.local.LocalWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageConsumer; +import cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq.RabbitMQWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageConsumer; +import cn.iocoder.yudao.framework.websocket.core.sender.redis.RedisWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageConsumer; +import cn.iocoder.yudao.framework.websocket.core.sender.rocketmq.RocketMQWebSocketMessageSender; import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionHandlerDecorator; import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManagerImpl; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.amqp.core.TopicExchange; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; @@ -25,7 +41,6 @@ import java.util.List; @AutoConfiguration @EnableWebSocket // å¼€å¯ websocket @ConditionalOnProperty(prefix = "yudao.websocket", value = "enable", matchIfMissing = true) // å…许使用 yudao.websocket.enable=false ç¦ç”¨ websocket - @EnableConfigurationProperties(WebSocketProperties.class) public class YudaoWebSocketAutoConfiguration { @@ -60,4 +75,103 @@ public class YudaoWebSocketAutoConfiguration { return new WebSocketSessionManagerImpl(); } + // ==================== Sender 相关 ==================== + + @Configuration + @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "local", matchIfMissing = true) + public class LocalWebSocketMessageSenderConfiguration { + + @Bean + public LocalWebSocketMessageSender localWebSocketMessageSender(WebSocketSessionManager sessionManager) { + return new LocalWebSocketMessageSender(sessionManager); + } + + } + + @Configuration + @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "redis", matchIfMissing = true) + public class RedisWebSocketMessageSenderConfiguration { + + @Bean + public RedisWebSocketMessageSender redisWebSocketMessageSender(WebSocketSessionManager sessionManager, + RedisMQTemplate redisMQTemplate) { + return new RedisWebSocketMessageSender(sessionManager, redisMQTemplate); + } + + // TODO 芋艿:需è¦é¢å¤–删除 YudaoRedisMQAutoConfiguration çš„ RedisMessageListenerContainer Bean 上的 @ConditionalOnBean 注解。å¯èƒ½æ˜¯ spring boot çš„ bugï¼ + @Bean + public RedisWebSocketMessageConsumer redisWebSocketMessageConsumer( + RedisWebSocketMessageSender redisWebSocketMessageSender) { + return new RedisWebSocketMessageConsumer(redisWebSocketMessageSender); + } + + } + + @Configuration + @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "rocketmq", matchIfMissing = true) + public class RocketMQWebSocketMessageSenderConfiguration { + + @Bean + public RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender( + WebSocketSessionManager sessionManager, RocketMQTemplate rocketMQTemplate, + @Value("${yudao.websocket.sender-rocketmq.topic}") String topic) { + return new RocketMQWebSocketMessageSender(sessionManager, rocketMQTemplate, topic); + } + + @Bean + public RocketMQWebSocketMessageConsumer rocketMQWebSocketMessageConsumer( + RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender) { + return new RocketMQWebSocketMessageConsumer(rocketMQWebSocketMessageSender); + } + + } + + @Configuration + @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "rabbitmq", matchIfMissing = true) + public class RabbitMQWebSocketMessageSenderConfiguration { + + @Bean + public RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender( + WebSocketSessionManager sessionManager, RabbitTemplate rabbitTemplate, + TopicExchange websocketTopicExchange) { + return new RabbitMQWebSocketMessageSender(sessionManager, rabbitTemplate, websocketTopicExchange); + } + + @Bean + public RabbitMQWebSocketMessageConsumer rabbitMQWebSocketMessageConsumer( + RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender) { + return new RabbitMQWebSocketMessageConsumer(rabbitMQWebSocketMessageSender); + } + + /** + * 创建 Topic Exchange + */ + @Bean + public TopicExchange websocketTopicExchange(@Value("${yudao.websocket.sender-rabbitmq.exchange}") String exchange) { + return new TopicExchange(exchange, + true, // durable: æ˜¯å¦æŒä¹…化 + false); // exclusive: æ˜¯å¦æŽ’å®ƒ + } + + } + + @Configuration + @ConditionalOnProperty(prefix = "yudao.websocket", name = "sender-type", havingValue = "kafka", matchIfMissing = true) + public class KafkaWebSocketMessageSenderConfiguration { + + @Bean + public KafkaWebSocketMessageSender kafkaWebSocketMessageSender( + WebSocketSessionManager sessionManager, KafkaTemplate kafkaTemplate, + @Value("${yudao.websocket.sender-kafka.topic}") String topic) { + return new KafkaWebSocketMessageSender(sessionManager, kafkaTemplate, topic); + } + + @Bean + public KafkaWebSocketMessageConsumer kafkaWebSocketMessageConsumer( + KafkaWebSocketMessageSender kafkaWebSocketMessageSender) { + return new KafkaWebSocketMessageConsumer(kafkaWebSocketMessageSender); + } + + } + } \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java index 06bc6c0fb9..120f529c23 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/handler/JsonWebSocketMessageHandler.java @@ -3,8 +3,10 @@ package cn.iocoder.yudao.framework.websocket.core.handler; import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.TypeUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage; +import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketHandler; @@ -70,8 +72,9 @@ public class JsonWebSocketMessageHandler extends TextWebSocketHandler { } // 2.3 å¤„ç†æ¶ˆæ¯ Type type = TypeUtil.getTypeArgument(messageListener.getClass(), 0); - Object messageObj = JsonUtils.parseObject(jsonMessage.getMessage(), type); - messageListener.onMessage(session, messageObj); + Object messageObj = JsonUtils.parseObject(jsonMessage.getContent(), type); + Long tenantId = WebSocketFrameworkUtils.getTenantId(session); + TenantUtils.execute(tenantId, () -> messageListener.onMessage(session, messageObj)); } catch (Throwable ex) { log.error("[handleTextMessage][session({}) message({}) 处ç†å¼‚常]", session.getId(), message.getPayload()); } diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java index 2257760c9b..0a55cd691b 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/message/JsonWebSocketMessage.java @@ -3,13 +3,15 @@ package cn.iocoder.yudao.framework.websocket.core.message; import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; import lombok.Data; +import java.io.Serializable; + /** * JSON æ ¼å¼çš„ WebSocket 消æ¯å¸§ * * @author èŠ‹é“æºç  */ @Data -public class JsonWebSocketMessage { +public class JsonWebSocketMessage implements Serializable { /** * 消æ¯ç±»åž‹ @@ -22,6 +24,6 @@ public class JsonWebSocketMessage { * * è¦æ±‚ JSON 对象 */ - private String message; + private String content; } diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/AbstractWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/AbstractWebSocketMessageSender.java new file mode 100644 index 0000000000..4e0db44c9d --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/AbstractWebSocketMessageSender.java @@ -0,0 +1,104 @@ +package cn.iocoder.yudao.framework.websocket.core.sender; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.framework.websocket.core.message.JsonWebSocketMessage; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** + * WebSocketMessageSender 实现类 + * + * @author èŠ‹é“æºç  + */ +@Slf4j +@RequiredArgsConstructor +public abstract class AbstractWebSocketMessageSender implements WebSocketMessageSender { + + private final WebSocketSessionManager sessionManager; + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + send(null, userType, userId, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + send(null, userType, null, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + send(sessionId, null, null, messageType, messageContent); + } + + /** + * å‘逿¶ˆæ¯ + * + * @param sessionId Session ç¼–å· + * @param userType 用户类型 + * @param userId ç”¨æˆ·ç¼–å· + * @param messageType 消æ¯ç±»åž‹ + * @param messageContent 消æ¯å†…容 + */ + public void send(String sessionId, Integer userType, Long userId, String messageType, String messageContent) { + // 1. 获得 Session 列表 + List sessions = Collections.emptyList(); + if (StrUtil.isNotEmpty(sessionId)) { + WebSocketSession session = sessionManager.getSession(sessionId); + if (session != null) { + sessions = Collections.singletonList(session); + } + } else if (userType != null && userId != null) { + sessions = (List) sessionManager.getSessionList(userType, userId); + } else if (userType != null) { + sessions = (List) sessionManager.getSessionList(userType); + } + if (CollUtil.isEmpty(sessions)) { + log.info("[send][sessionId({}) userType({}) userId({}) messageType({}) messageContent({}) 未匹é…到会è¯]", + sessionId, userType, userId, messageType, messageContent); + } + // 2. 执行å‘é€ + doSend(sessions, messageType, messageContent); + } + + /** + * å‘逿¶ˆæ¯çš„具体实现 + * + * @param sessions Session 列表 + * @param messageType 消æ¯ç±»åž‹ + * @param messageContent 消æ¯å†…容 + */ + public void doSend(Collection sessions, String messageType, String messageContent) { + JsonWebSocketMessage message = new JsonWebSocketMessage().setType(messageType).setContent(messageContent); + String payload = JsonUtils.toJsonString(message); // 关键,使用 JSON åºåˆ—化 + sessions.forEach(session -> { + // 1. å„ç§æ ¡éªŒï¼Œä¿è¯ Session å¯ä»¥è¢«å‘é€ + if (session == null) { + log.error("[doSend][session 为空, message({})]", message); + return; + } + if (!session.isOpen()) { + log.error("[doSend][session({}) 已关闭, message({})]", session.getId(), message); + return; + } + // 2. 执行å‘é€ + try { + session.sendMessage(new TextMessage(payload)); + log.info("[doSend][session({}) å‘逿¶ˆæ¯æˆåŠŸï¼Œmessage({})]", session.getId(), message); + } catch (IOException ex) { + log.error("[doSend][session({}) å‘逿¶ˆæ¯å¤±è´¥ï¼Œmessage({})]", session.getId(), message, ex); + } + }); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/WebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/WebSocketMessageSender.java new file mode 100644 index 0000000000..9f75ad52d3 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/WebSocketMessageSender.java @@ -0,0 +1,52 @@ +package cn.iocoder.yudao.framework.websocket.core.sender; + +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; + +/** + * WebSocket 消æ¯çš„å‘é€å™¨æŽ¥å£ + * + * @author èŠ‹é“æºç  + */ +public interface WebSocketMessageSender { + + /** + * å‘逿¶ˆæ¯ç»™æŒ‡å®šç”¨æˆ· + * + * @param userType 用户类型 + * @param userId ç”¨æˆ·ç¼–å· + * @param messageType 消æ¯ç±»åž‹ + * @param messageContent 消æ¯å†…容,JSON æ ¼å¼ + */ + void send(Integer userType, Long userId, String messageType, String messageContent); + + /** + * å‘逿¶ˆæ¯ç»™æŒ‡å®šç”¨æˆ·ç±»åž‹ + * + * @param userType 用户类型 + * @param messageType 消æ¯ç±»åž‹ + * @param messageContent 消æ¯å†…容,JSON æ ¼å¼ + */ + void send(Integer userType, String messageType, String messageContent); + + /** + * å‘逿¶ˆæ¯ç»™æŒ‡å®š Session + * + * @param sessionId Session ç¼–å· + * @param messageType 消æ¯ç±»åž‹ + * @param messageContent 消æ¯å†…容,JSON æ ¼å¼ + */ + void send(String sessionId, String messageType, String messageContent); + + default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) { + send(userType, userId, messageType, JsonUtils.toJsonString(messageContent)); + } + + default void sendObject(Integer userType, String messageType, Object messageContent) { + send(userType, messageType, JsonUtils.toJsonString(messageContent)); + } + + default void sendObject(String sessionId, String messageType, Object messageContent) { + send(sessionId, messageType, JsonUtils.toJsonString(messageContent)); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessage.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessage.java new file mode 100644 index 0000000000..5a4cf53119 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessage.java @@ -0,0 +1,35 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.kafka; + +import lombok.Data; + +/** + * Kafka 广播 WebSocket çš„æ¶ˆæ¯ + * + * @author èŠ‹é“æºç  + */ +@Data +public class KafkaWebSocketMessage { + + /** + * Session ç¼–å· + */ + private String sessionId; + /** + * 用户类型 + */ + private Integer userType; + /** + * ç”¨æˆ·ç¼–å· + */ + private Long userId; + + /** + * 消æ¯ç±»åž‹ + */ + private String messageType; + /** + * 消æ¯å†…容 + */ + private String messageContent; + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageConsumer.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageConsumer.java new file mode 100644 index 0000000000..201e65d81f --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageConsumer.java @@ -0,0 +1,28 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.kafka; + +import lombok.RequiredArgsConstructor; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.kafka.annotation.KafkaListener; + +/** + * {@link KafkaWebSocketMessage} 广播消æ¯çš„æ¶ˆè´¹è€…,真正把消æ¯å‘é€å‡ºåŽ» + * + * @author èŠ‹é“æºç  + */ +@RequiredArgsConstructor +public class KafkaWebSocketMessageConsumer { + + private final KafkaWebSocketMessageSender rabbitMQWebSocketMessageSender; + + @RabbitHandler + @KafkaListener( + topics = "${yudao.websocket.sender-kafka.topic}", + // 在 Group 上,使用 UUID 生æˆå…¶åŽç¼€ã€‚这样,å¯åŠ¨çš„ Consumer çš„ Group ä¸åŒï¼Œä»¥è¾¾åˆ°å¹¿æ’­æ¶ˆè´¹çš„目的 + groupId = "${yudao.websocket.sender-kafka.consumer-group}" + "-" + "#{T(java.util.UUID).randomUUID()}") + public void onMessage(KafkaWebSocketMessage message) { + rabbitMQWebSocketMessageSender.send(message.getSessionId(), + message.getUserType(), message.getUserId(), + message.getMessageType(), message.getMessageContent()); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageSender.java new file mode 100644 index 0000000000..47bb598ad5 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/kafka/KafkaWebSocketMessageSender.java @@ -0,0 +1,67 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.kafka; + +import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.concurrent.ExecutionException; + +/** + * 基于 Kafka çš„ {@link WebSocketMessageSender} 实现类 + * + * @author èŠ‹é“æºç  + */ +@Slf4j +public class KafkaWebSocketMessageSender extends AbstractWebSocketMessageSender { + + private final KafkaTemplate kafkaTemplate; + + private final String topic; + + public KafkaWebSocketMessageSender(WebSocketSessionManager sessionManager, + KafkaTemplate kafkaTemplate, + String topic) { + super(sessionManager); + this.kafkaTemplate = kafkaTemplate; + this.topic = topic; + } + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + sendKafkaMessage(null, userId, userType, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + sendKafkaMessage(null, null, userType, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + sendKafkaMessage(sessionId, null, null, messageType, messageContent); + } + + /** + * 通过 Kafka å¹¿æ’­æ¶ˆæ¯ + * + * @param sessionId Session ç¼–å· + * @param userId ç”¨æˆ·ç¼–å· + * @param userType 用户类型 + * @param messageType 消æ¯ç±»åž‹ + * @param messageContent 消æ¯å†…容 + */ + private void sendKafkaMessage(String sessionId, Long userId, Integer userType, + String messageType, String messageContent) { + KafkaWebSocketMessage mqMessage = new KafkaWebSocketMessage() + .setSessionId(sessionId).setUserId(userId).setUserType(userType) + .setMessageType(messageType).setMessageContent(messageContent); + try { + kafkaTemplate.send(topic, mqMessage).get(); + } catch (InterruptedException | ExecutionException e) { + log.error("[sendKafkaMessage][å‘逿¶ˆæ¯({}) 到 Kafka 失败]", mqMessage, e); + } + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/local/LocalWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/local/LocalWebSocketMessageSender.java new file mode 100644 index 0000000000..66640ef344 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/local/LocalWebSocketMessageSender.java @@ -0,0 +1,20 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.local; + +import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; + +/** + * 本地的 {@link WebSocketMessageSender} 实现类 + * + * 注æ„:仅仅适åˆå•机场景ï¼ï¼ï¼ + * + * @author èŠ‹é“æºç  + */ +public class LocalWebSocketMessageSender extends AbstractWebSocketMessageSender { + + public LocalWebSocketMessageSender(WebSocketSessionManager sessionManager) { + super(sessionManager); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessage.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessage.java new file mode 100644 index 0000000000..80a4bc176d --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessage.java @@ -0,0 +1,37 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq; + +import lombok.Data; + +import java.io.Serializable; + +/** + * RabbitMQ 广播 WebSocket çš„æ¶ˆæ¯ + * + * @author èŠ‹é“æºç  + */ +@Data +public class RabbitMQWebSocketMessage implements Serializable { + + /** + * Session ç¼–å· + */ + private String sessionId; + /** + * 用户类型 + */ + private Integer userType; + /** + * ç”¨æˆ·ç¼–å· + */ + private Long userId; + + /** + * 消æ¯ç±»åž‹ + */ + private String messageType; + /** + * 消æ¯å†…容 + */ + private String messageContent; + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageConsumer.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageConsumer.java new file mode 100644 index 0000000000..59e3824217 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageConsumer.java @@ -0,0 +1,39 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq; + +import lombok.RequiredArgsConstructor; +import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.rabbit.annotation.*; + +/** + * {@link RabbitMQWebSocketMessage} 广播消æ¯çš„æ¶ˆè´¹è€…,真正把消æ¯å‘é€å‡ºåŽ» + * + * @author èŠ‹é“æºç  + */ +@RabbitListener( + bindings = @QueueBinding( + value = @Queue( + // 在 Queue çš„å字上,使用 UUID 生æˆå…¶åŽç¼€ã€‚这样,å¯åŠ¨çš„ Consumer çš„ Queue ä¸åŒï¼Œä»¥è¾¾åˆ°å¹¿æ’­æ¶ˆè´¹çš„目的 + name = "${yudao.websocket.sender-rabbitmq.queue}" + "-" + "#{T(java.util.UUID).randomUUID()}", + // Consumer 关闭时,该队列就å¯ä»¥è¢«è‡ªåŠ¨åˆ é™¤äº† + autoDelete = "true" + ), + exchange = @Exchange( + name = "${yudao.websocket.sender-rabbitmq.exchange}", + type = ExchangeTypes.TOPIC, + declare = "false" + ) + ) +) +@RequiredArgsConstructor +public class RabbitMQWebSocketMessageConsumer { + + private final RabbitMQWebSocketMessageSender rabbitMQWebSocketMessageSender; + + @RabbitHandler + public void onMessage(RabbitMQWebSocketMessage message) { + rabbitMQWebSocketMessageSender.send(message.getSessionId(), + message.getUserType(), message.getUserId(), + message.getMessageType(), message.getMessageContent()); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageSender.java new file mode 100644 index 0000000000..065a5d6bfe --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rabbitmq/RabbitMQWebSocketMessageSender.java @@ -0,0 +1,62 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rabbitmq; + +import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.TopicExchange; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +/** + * 基于 RabbitMQ çš„ {@link WebSocketMessageSender} 实现类 + * + * @author èŠ‹é“æºç  + */ +@Slf4j +public class RabbitMQWebSocketMessageSender extends AbstractWebSocketMessageSender { + + private final RabbitTemplate rabbitTemplate; + + private final TopicExchange topicExchange; + + public RabbitMQWebSocketMessageSender(WebSocketSessionManager sessionManager, + RabbitTemplate rabbitTemplate, + TopicExchange topicExchange) { + super(sessionManager); + this.rabbitTemplate = rabbitTemplate; + this.topicExchange = topicExchange; + } + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + sendRabbitMQMessage(null, userId, userType, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + sendRabbitMQMessage(null, null, userType, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + sendRabbitMQMessage(sessionId, null, null, messageType, messageContent); + } + + /** + * 通过 RabbitMQ å¹¿æ’­æ¶ˆæ¯ + * + * @param sessionId Session ç¼–å· + * @param userId ç”¨æˆ·ç¼–å· + * @param userType 用户类型 + * @param messageType 消æ¯ç±»åž‹ + * @param messageContent 消æ¯å†…容 + */ + private void sendRabbitMQMessage(String sessionId, Long userId, Integer userType, + String messageType, String messageContent) { + RabbitMQWebSocketMessage mqMessage = new RabbitMQWebSocketMessage() + .setSessionId(sessionId).setUserId(userId).setUserType(userType) + .setMessageType(messageType).setMessageContent(messageContent); + rabbitTemplate.convertAndSend(topicExchange.getName(), null, mqMessage); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessage.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessage.java new file mode 100644 index 0000000000..fb9ea0ca03 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessage.java @@ -0,0 +1,34 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.redis; + +import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage; +import lombok.Data; + +/** + * Redis 广播 WebSocket çš„æ¶ˆæ¯ + */ +@Data +public class RedisWebSocketMessage extends AbstractRedisChannelMessage { + + /** + * Session ç¼–å· + */ + private String sessionId; + /** + * 用户类型 + */ + private Integer userType; + /** + * ç”¨æˆ·ç¼–å· + */ + private Long userId; + + /** + * 消æ¯ç±»åž‹ + */ + private String messageType; + /** + * 消æ¯å†…容 + */ + private String messageContent; + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageConsumer.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageConsumer.java new file mode 100644 index 0000000000..abce006953 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageConsumer.java @@ -0,0 +1,23 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.redis; + +import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener; +import lombok.RequiredArgsConstructor; + +/** + * {@link RedisWebSocketMessage} 广播消æ¯çš„æ¶ˆè´¹è€…,真正把消æ¯å‘é€å‡ºåŽ» + * + * @author èŠ‹é“æºç  + */ +@RequiredArgsConstructor +public class RedisWebSocketMessageConsumer extends AbstractRedisChannelMessageListener { + + private final RedisWebSocketMessageSender redisWebSocketMessageSender; + + @Override + public void onMessage(RedisWebSocketMessage message) { + redisWebSocketMessageSender.send(message.getSessionId(), + message.getUserType(), message.getUserId(), + message.getMessageType(), message.getMessageContent()); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageSender.java new file mode 100644 index 0000000000..d6004ac6d2 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/redis/RedisWebSocketMessageSender.java @@ -0,0 +1,57 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.redis; + +import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import lombok.extern.slf4j.Slf4j; + +/** + * 基于 Redis çš„ {@link WebSocketMessageSender} 实现类 + * + * @author èŠ‹é“æºç  + */ +@Slf4j +public class RedisWebSocketMessageSender extends AbstractWebSocketMessageSender { + + private final RedisMQTemplate redisMQTemplate; + + public RedisWebSocketMessageSender(WebSocketSessionManager sessionManager, + RedisMQTemplate redisMQTemplate) { + super(sessionManager); + this.redisMQTemplate = redisMQTemplate; + } + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + sendRedisMessage(null, userId, userType, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + sendRedisMessage(null, null, userType, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + sendRedisMessage(sessionId, null, null, messageType, messageContent); + } + + /** + * 通过 Redis å¹¿æ’­æ¶ˆæ¯ + * + * @param sessionId Session ç¼–å· + * @param userId ç”¨æˆ·ç¼–å· + * @param userType 用户类型 + * @param messageType 消æ¯ç±»åž‹ + * @param messageContent 消æ¯å†…容 + */ + private void sendRedisMessage(String sessionId, Long userId, Integer userType, + String messageType, String messageContent) { + RedisWebSocketMessage mqMessage = new RedisWebSocketMessage() + .setSessionId(sessionId).setUserId(userId).setUserType(userType) + .setMessageType(messageType).setMessageContent(messageContent); + redisMQTemplate.send(mqMessage); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessage.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessage.java new file mode 100644 index 0000000000..91570e3e3c --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessage.java @@ -0,0 +1,35 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq; + +import lombok.Data; + +/** + * RocketMQ 广播 WebSocket çš„æ¶ˆæ¯ + * + * @author èŠ‹é“æºç  + */ +@Data +public class RocketMQWebSocketMessage { + + /** + * Session ç¼–å· + */ + private String sessionId; + /** + * 用户类型 + */ + private Integer userType; + /** + * ç”¨æˆ·ç¼–å· + */ + private Long userId; + + /** + * 消æ¯ç±»åž‹ + */ + private String messageType; + /** + * 消æ¯å†…容 + */ + private String messageContent; + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageConsumer.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageConsumer.java new file mode 100644 index 0000000000..ab2e2c4dc5 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageConsumer.java @@ -0,0 +1,30 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq; + +import lombok.RequiredArgsConstructor; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; + +/** + * {@link RocketMQWebSocketMessage} 广播消æ¯çš„æ¶ˆè´¹è€…,真正把消æ¯å‘é€å‡ºåŽ» + * + * @author èŠ‹é“æºç  + */ +@RocketMQMessageListener( // é‡ç‚¹ï¼šæ·»åŠ  @RocketMQMessageListener 注解,声明消费的 topic + topic = "${yudao.websocket.sender-rocketmq.topic}", + consumerGroup = "${yudao.websocket.sender-rocketmq.consumer-group}", + messageModel = MessageModel.BROADCASTING // 设置为广播模å¼ï¼Œä¿è¯æ¯ä¸ªå®žä¾‹éƒ½èƒ½æ”¶åˆ°æ¶ˆæ¯ +) +@RequiredArgsConstructor +public class RocketMQWebSocketMessageConsumer implements RocketMQListener { + + private final RocketMQWebSocketMessageSender rocketMQWebSocketMessageSender; + + @Override + public void onMessage(RocketMQWebSocketMessage message) { + rocketMQWebSocketMessageSender.send(message.getSessionId(), + message.getUserType(), message.getUserId(), + message.getMessageType(), message.getMessageContent()); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageSender.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageSender.java new file mode 100644 index 0000000000..ed059bac43 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/sender/rocketmq/RocketMQWebSocketMessageSender.java @@ -0,0 +1,61 @@ +package cn.iocoder.yudao.framework.websocket.core.sender.rocketmq; + +import cn.iocoder.yudao.framework.websocket.core.sender.AbstractWebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.session.WebSocketSessionManager; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; + +/** + * 基于 RocketMQ çš„ {@link WebSocketMessageSender} 实现类 + * + * @author èŠ‹é“æºç  + */ +@Slf4j +public class RocketMQWebSocketMessageSender extends AbstractWebSocketMessageSender { + + private final RocketMQTemplate rocketMQTemplate; + + private final String topic; + + public RocketMQWebSocketMessageSender(WebSocketSessionManager sessionManager, + RocketMQTemplate rocketMQTemplate, + String topic) { + super(sessionManager); + this.rocketMQTemplate = rocketMQTemplate; + this.topic = topic; + } + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + sendRocketMQMessage(null, userId, userType, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + sendRocketMQMessage(null, null, userType, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + sendRocketMQMessage(sessionId, null, null, messageType, messageContent); + } + + /** + * 通过 RocketMQ å¹¿æ’­æ¶ˆæ¯ + * + * @param sessionId Session ç¼–å· + * @param userId ç”¨æˆ·ç¼–å· + * @param userType 用户类型 + * @param messageType 消æ¯ç±»åž‹ + * @param messageContent 消æ¯å†…容 + */ + private void sendRocketMQMessage(String sessionId, Long userId, Integer userType, + String messageType, String messageContent) { + RocketMQWebSocketMessage mqMessage = new RocketMQWebSocketMessage() + .setSessionId(sessionId).setUserId(userId).setUserType(userType) + .setMessageType(messageType).setMessageContent(messageContent); + rocketMQTemplate.syncSend(topic, mqMessage); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java index 6004cc5da5..aca572f90b 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/session/WebSocketSessionManagerImpl.java @@ -2,10 +2,14 @@ package cn.iocoder.yudao.framework.websocket.core.session; import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.framework.security.core.LoginUser; +import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; import org.springframework.web.socket.WebSocketSession; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -91,10 +95,18 @@ public class WebSocketSessionManagerImpl implements WebSocketSessionManager { return new ArrayList<>(); } LinkedList result = new LinkedList<>(); // é¿å…扩容 + Long contextTenantId = TenantContextHolder.getTenantId(); for (List sessions : userSessionsMap.values()) { - if (CollUtil.isNotEmpty(sessions)) { + if (CollUtil.isEmpty(sessions)) { continue; } + // 特殊:如果租户ä¸åŒ¹é…,则直接排除 + if (contextTenantId != null) { + Long userTenantId = WebSocketFrameworkUtils.getTenantId(sessions.get(0)); + if (!contextTenantId.equals(userTenantId)) { + continue; + } + } result.addAll(sessions); } return result; diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java index a77028df2b..58cdedc297 100644 --- a/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java +++ b/yudao-framework/yudao-spring-boot-starter-websocket/src/main/java/cn/iocoder/yudao/framework/websocket/core/util/WebSocketFrameworkUtils.java @@ -1,7 +1,6 @@ package cn.iocoder.yudao.framework.websocket.core.util; import cn.iocoder.yudao.framework.security.core.LoginUser; -import org.springframework.lang.Nullable; import org.springframework.web.socket.WebSocketSession; import java.util.Map; @@ -39,7 +38,6 @@ public class WebSocketFrameworkUtils { * * @return ç”¨æˆ·ç¼–å· */ - @Nullable public static Long getLoginUserId(WebSocketSession session) { LoginUser loginUser = getLoginUser(session); return loginUser != null ? loginUser.getId() : null; @@ -50,10 +48,20 @@ public class WebSocketFrameworkUtils { * * @return ç”¨æˆ·ç¼–å· */ - @Nullable public static Integer getLoginUserType(WebSocketSession session) { LoginUser loginUser = getLoginUser(session); return loginUser != null ? loginUser.getUserType() : null; } + /** + * 获得当å‰ç”¨æˆ·çš„ç§Ÿæˆ·ç¼–å· + * + * @param session Session + * @return ç§Ÿæˆ·ç¼–å· + */ + public static Long getTenantId(WebSocketSession session) { + LoginUser loginUser = getLoginUser(session); + return loginUser != null ? loginUser.getTenantId() : null; + } + } diff --git a/yudao-framework/yudao-spring-boot-starter-websocket/ã€ŠèŠ‹é“ Spring Boot WebSocket 入门》.md b/yudao-framework/yudao-spring-boot-starter-websocket/ã€ŠèŠ‹é“ Spring Boot WebSocket 入门》.md new file mode 100644 index 0000000000..8df5a7758b --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-websocket/ã€ŠèŠ‹é“ Spring Boot WebSocket 入门》.md @@ -0,0 +1 @@ + diff --git a/yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApi.java b/yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApi.java new file mode 100644 index 0000000000..38582c2f32 --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-api/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApi.java @@ -0,0 +1,54 @@ +package cn.iocoder.yudao.module.infra.api.websocket; + +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; + +/** + * WebSocket å‘é€å™¨çš„ API æŽ¥å£ + * + * 对 WebSocketMessageSender 进行å°è£…,æä¾›ç»™å…¶å®ƒæ¨¡å—使用 + * + * @author èŠ‹é“æºç  + */ +public interface WebSocketSenderApi { + + /** + * å‘逿¶ˆæ¯ç»™æŒ‡å®šç”¨æˆ· + * + * @param userType 用户类型 + * @param userId ç”¨æˆ·ç¼–å· + * @param messageType 消æ¯ç±»åž‹ + * @param messageContent 消æ¯å†…容,JSON æ ¼å¼ + */ + void send(Integer userType, Long userId, String messageType, String messageContent); + + /** + * å‘逿¶ˆæ¯ç»™æŒ‡å®šç”¨æˆ·ç±»åž‹ + * + * @param userType 用户类型 + * @param messageType 消æ¯ç±»åž‹ + * @param messageContent 消æ¯å†…容,JSON æ ¼å¼ + */ + void send(Integer userType, String messageType, String messageContent); + + /** + * å‘逿¶ˆæ¯ç»™æŒ‡å®š Session + * + * @param sessionId Session ç¼–å· + * @param messageType 消æ¯ç±»åž‹ + * @param messageContent 消æ¯å†…容,JSON æ ¼å¼ + */ + void send(String sessionId, String messageType, String messageContent); + + default void sendObject(Integer userType, Long userId, String messageType, Object messageContent) { + send(userType, userId, messageType, JsonUtils.toJsonString(messageContent)); + } + + default void sendObject(Integer userType, String messageType, Object messageContent) { + send(userType, messageType, JsonUtils.toJsonString(messageContent)); + } + + default void sendObject(String sessionId, String messageType, Object messageContent) { + send(sessionId, messageType, JsonUtils.toJsonString(messageContent)); + } + +} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java new file mode 100644 index 0000000000..046cd2fc19 --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/api/websocket/WebSocketSenderApiImpl.java @@ -0,0 +1,34 @@ +package cn.iocoder.yudao.module.infra.api.websocket; + +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * WebSocket å‘é€å™¨çš„ API 实现类 + * + * @author èŠ‹é“æºç  + */ +@Component +public class WebSocketSenderApiImpl implements WebSocketSenderApi { + + @Resource + private WebSocketMessageSender webSocketMessageSender; + + @Override + public void send(Integer userType, Long userId, String messageType, String messageContent) { + webSocketMessageSender.send(userType, userId, messageType, messageContent); + } + + @Override + public void send(Integer userType, String messageType, String messageContent) { + webSocketMessageSender.send(userType, messageType, messageContent); + } + + @Override + public void send(String sessionId, String messageType, String messageContent) { + webSocketMessageSender.send(sessionId, messageType, messageContent); + } + +} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/DemoWebSocketMessageListener.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/DemoWebSocketMessageListener.java new file mode 100644 index 0000000000..9ccf6070ea --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/DemoWebSocketMessageListener.java @@ -0,0 +1,48 @@ +package cn.iocoder.yudao.module.infra.websocket; + +import cn.iocoder.yudao.framework.common.enums.UserTypeEnum; +import cn.iocoder.yudao.framework.websocket.core.listener.WebSocketMessageListener; +import cn.iocoder.yudao.framework.websocket.core.sender.WebSocketMessageSender; +import cn.iocoder.yudao.framework.websocket.core.util.WebSocketFrameworkUtils; +import cn.iocoder.yudao.module.infra.websocket.message.DemoReceiveMessage; +import cn.iocoder.yudao.module.infra.websocket.message.DemoSendMessage; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.WebSocketSession; + +import javax.annotation.Resource; + +/** + * WebSocket 示例:å•呿¶ˆæ¯ + * + * @author èŠ‹é“æºç  + */ +@Component +public class DemoWebSocketMessageListener implements WebSocketMessageListener { + + @Resource + private WebSocketMessageSender webSocketMessageSender; + + @Override + public void onMessage(WebSocketSession session, DemoSendMessage message) { + Long fromUserId = WebSocketFrameworkUtils.getLoginUserId(session); + // 情况一:å•å‘ + if (message.getToUserId() != null) { + DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId) + .setText(message.getText()).setSingle(true); + webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), message.getToUserId(), // 给指定用户 + "demo-message-receive", toMessage); + return; + } + // æƒ…å†µäºŒï¼šç¾¤å‘ + DemoReceiveMessage toMessage = new DemoReceiveMessage().setFromUserId(fromUserId) + .setText(message.getText()).setSingle(false); + webSocketMessageSender.sendObject(UserTypeEnum.ADMIN.getValue(), // 给所有用户 + "demo-message-receive", toMessage); + } + + @Override + public String getType() { + return "demo-message-send"; + } + +} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoReceiveMessage.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoReceiveMessage.java new file mode 100644 index 0000000000..03a246cf9a --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoReceiveMessage.java @@ -0,0 +1,27 @@ +package cn.iocoder.yudao.module.infra.websocket.message; + +import lombok.Data; + +/** + * 示例:server -> client åŒæ­¥æ¶ˆæ¯ + * + * @author èŠ‹é“æºç  + */ +@Data +public class DemoReceiveMessage { + + /** + * æŽ¥æ”¶äººçš„ç¼–å· + */ + private Long fromUserId; + /** + * 内容 + */ + private String text; + + /** + * 是å¦å•èŠ + */ + private Boolean single; + +} diff --git a/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoSendMessage.java b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoSendMessage.java new file mode 100644 index 0000000000..f0c14f5d31 --- /dev/null +++ b/yudao-module-infra/yudao-module-infra-biz/src/main/java/cn/iocoder/yudao/module/infra/websocket/message/DemoSendMessage.java @@ -0,0 +1,24 @@ +package cn.iocoder.yudao.module.infra.websocket.message; + +import lombok.Data; + +/** + * 示例:client -> server å‘逿¶ˆæ¯ + * + * @author èŠ‹é“æºç  + */ +@Data +public class DemoSendMessage { + + /** + * å‘é€ç»™è° + * + * 如果为空,说明å‘é€ç»™æ‰€æœ‰äºº + */ + private Long toUserId; + /** + * 内容 + */ + private String text; + +} diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/notice/NoticeController.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/notice/NoticeController.java index 0e19577858..5a566702f6 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/notice/NoticeController.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/notice/NoticeController.java @@ -1,12 +1,16 @@ package cn.iocoder.yudao.module.system.controller.admin.notice; +import cn.hutool.core.lang.Assert; +import cn.iocoder.yudao.framework.common.enums.UserTypeEnum; import cn.iocoder.yudao.framework.common.pojo.CommonResult; import cn.iocoder.yudao.framework.common.pojo.PageResult; +import cn.iocoder.yudao.module.infra.api.websocket.WebSocketSenderApi; import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeCreateReqVO; import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticePageReqVO; import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeRespVO; import cn.iocoder.yudao.module.system.controller.admin.notice.vo.NoticeUpdateReqVO; import cn.iocoder.yudao.module.system.convert.notice.NoticeConvert; +import cn.iocoder.yudao.module.system.dal.dataobject.notice.NoticeDO; import cn.iocoder.yudao.module.system.service.notice.NoticeService; import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.Parameter; @@ -29,6 +33,9 @@ public class NoticeController { @Resource private NoticeService noticeService; + @Resource + private WebSocketSenderApi webSocketSenderApi; + @PostMapping("/create") @Operation(summary = "创建通知公告") @PreAuthorize("@ss.hasPermission('system:notice:create')") @@ -69,4 +76,16 @@ public class NoticeController { return success(NoticeConvert.INSTANCE.convert(noticeService.getNotice(id))); } + @PostMapping("/push") + @Operation(summary = "推é€é€šçŸ¥å…¬å‘Š", description = "åªå‘é€ç»™ websocket 连接在线的用户") + @Parameter(name = "id", description = "ç¼–å·", required = true, example = "1024") + @PreAuthorize("@ss.hasPermission('system:notice:update')") + public CommonResult push(@RequestParam("id") Long id) { + NoticeDO notice = noticeService.getNotice(id); + Assert.notNull(notice, "公告ä¸èƒ½ä¸ºç©º"); + // 通过 websocket 推é€ç»™åœ¨çº¿çš„用户 + webSocketSenderApi.sendObject(UserTypeEnum.ADMIN.getValue(), "notice-push", notice); + return success(true); + } + } diff --git a/yudao-server/src/main/resources/application-local.yaml b/yudao-server/src/main/resources/application-local.yaml index f15b0cf8dd..6dfe13fffd 100644 --- a/yudao-server/src/main/resources/application-local.yaml +++ b/yudao-server/src/main/resources/application-local.yaml @@ -123,8 +123,8 @@ spring: rabbitmq: host: 127.0.0.1 # RabbitMQ æœåŠ¡çš„åœ°å€ port: 5672 # RabbitMQ æœåŠ¡çš„ç«¯å£ - username: guest # RabbitMQ æœåŠ¡çš„è´¦å· - password: guest # RabbitMQ æœåŠ¡çš„å¯†ç  + username: rabbit # RabbitMQ æœåŠ¡çš„è´¦å· + password: rabbit # RabbitMQ æœåŠ¡çš„å¯†ç  # Kafka é…置项,对应 KafkaProperties é…置类 kafka: bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地å€ï¼Œå¯ä»¥è®¾ç½®å¤šä¸ªï¼Œä»¥é€—å·åˆ†éš” diff --git a/yudao-server/src/main/resources/application.yaml b/yudao-server/src/main/resources/application.yaml index 605eef2738..9544110227 100644 --- a/yudao-server/src/main/resources/application.yaml +++ b/yudao-server/src/main/resources/application.yaml @@ -147,6 +147,16 @@ yudao: websocket: enable: true # websocket的开关 path: /infra/ws # 路径 + sender-type: local # 消æ¯å‘é€çš„类型,å¯é€‰å€¼ä¸º localã€redisã€rocketmqã€kafkaã€rabbitmq + sender-rocketmq: + topic: ${spring.application.name}-websocket # 消æ¯å‘é€çš„ RocketMQ Topic + consumer-group: ${spring.application.name}-websocket-consumer # 消æ¯å‘é€çš„ RocketMQ Consumer Group + sender-rabbitmq: + exchange: ${spring.application.name}-websocket-exchange # 消æ¯å‘é€çš„ RabbitMQ Exchange + queue: ${spring.application.name}-websocket-queue # 消æ¯å‘é€çš„ RabbitMQ Queue + sender-kafka: + topic: ${spring.application.name}-websocket # 消æ¯å‘é€çš„ Kafka Topic + consumer-group: ${spring.application.name}-websocket-consumer # 消æ¯å‘é€çš„ Kafka Consumer Group swagger: title: 芋é“快速开å‘å¹³å° description: æä¾›ç®¡ç†åŽå°ã€ç”¨æˆ· App 的所有功能