diff --git a/yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/util/json/JsonUtils.java b/yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/util/json/JsonUtils.java index 8bb8765917..70b747bf9b 100644 --- a/yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/util/json/JsonUtils.java +++ b/yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/util/json/JsonUtils.java @@ -99,6 +99,18 @@ public class JsonUtils { } } + public static T parseObject(byte[] text, Type type) { + if (ArrayUtil.isEmpty(text)) { + return null; + } + try { + return objectMapper.readValue(text, objectMapper.getTypeFactory().constructType(type)); + } catch (IOException e) { + log.error("json parse err,json:{}", text, e); + throw new RuntimeException(e); + } + } + /** * 将字符串解析成指定类型的对象 * 使用 {@link #parseObject(String, Class)} 时,在@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS) 的场景下, diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusAutoConfiguration.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusAutoConfiguration.java index 2bd9d82d5f..aa216b55ad 100644 --- a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusAutoConfiguration.java +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/config/IotMessageBusAutoConfiguration.java @@ -2,8 +2,12 @@ package cn.iocoder.yudao.module.iot.messagebus.config; import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.messagebus.core.local.LocalIotMessageBus; +import cn.iocoder.yudao.module.iot.messagebus.core.rocketmq.RocketMQIotMessageBus; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; +import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ApplicationContext; @@ -34,4 +38,19 @@ public class IotMessageBusAutoConfiguration { } + // ==================== RocketMQ 实现 ==================== + + @Configuration + @ConditionalOnProperty(prefix = "yudao.iot.message-bus", name = "type", havingValue = "rocketmq") + @ConditionalOnClass(RocketMQTemplate.class) + public static class RocketMQIotMessageBusConfiguration { + + @Bean + public IotMessageBus rocketMQIotMessageBus(RocketMQProperties rocketMQProperties, RocketMQTemplate rocketMQTemplate) { + log.info("[rocketMQIotMessageBus][创建 RocketMQ IoT 消息总线]"); + return new RocketMQIotMessageBus(rocketMQProperties, rocketMQTemplate); + } + + } + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/AbstractIotMessageBus.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/AbstractIotMessageBus.java deleted file mode 100644 index bc4cd91840..0000000000 --- a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/AbstractIotMessageBus.java +++ /dev/null @@ -1,69 +0,0 @@ -package cn.iocoder.yudao.module.iot.messagebus.core; - -import cn.hutool.core.collection.CollUtil; -import lombok.extern.slf4j.Slf4j; - -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * IoT 消息总线抽象基类 - * - * 提供通用的订阅者管理功能 - * - * @author 芋道源码 - */ -@Slf4j -public abstract class AbstractIotMessageBus implements IotMessageBus { - - /** - * 订阅者映射表 - * Key: topic - */ - private final Map>> subscribers = new ConcurrentHashMap<>(); - - @Override - public void register(String topic, IotMessageBusSubscriber subscriber) { - // 执行注册 - doRegister(topic, subscriber); - - // 添加订阅者映射 - List> topicSubscribers = subscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>()); - topicSubscribers.add(subscriber); - topicSubscribers.sort(Comparator.comparingInt(IotMessageBusSubscriber::order)); - log.info("[register][topic({}) 注册订阅者({})成功]", topic, subscriber.getClass().getName()); - } - - /** - * 注册订阅者 - * - * @param topic 主题 - * @param subscriber 订阅者 - */ - protected abstract void doRegister(String topic, IotMessageBusSubscriber subscriber); - - /** - * 通知订阅者 - * - * @param message 消息 - */ - @SuppressWarnings({"rawtypes", "unchecked"}) - protected void notifySubscribers(String topic, Object message) { - List> topicSubscribers = subscribers.get(topic); - if (CollUtil.isEmpty(topicSubscribers)) { - return; - } - for (IotMessageBusSubscriber subscriber : topicSubscribers) { - try { - subscriber.onMessage(topic, message); - } catch (Exception e) { - log.error("[notifySubscribers][topic({}) message({}) 通知订阅者({})失败]", - topic, e, subscriber.getClass().getName()); - } - } - } - -} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBus.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBus.java index de8bc067a7..931e963989 100644 --- a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBus.java +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBus.java @@ -20,9 +20,8 @@ public interface IotMessageBus { /** * 注册消息订阅者 * - * @param topic 主题 * @param subscriber 订阅者 */ - void register(String topic, IotMessageBusSubscriber subscriber); + void register(IotMessageBusSubscriber subscriber); } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBusSubscriber.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBusSubscriber.java index aec1b3b3e2..a8bdff9fa3 100644 --- a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBusSubscriber.java +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/IotMessageBusSubscriber.java @@ -10,18 +10,20 @@ package cn.iocoder.yudao.module.iot.messagebus.core; public interface IotMessageBusSubscriber { /** - * 处理接收到的消息 - * - * @param topic 主题 - * @param message 消息内容 + * @return 主题 */ - void onMessage(String topic, T message); + String getTopic(); /** - * 获取订阅者的顺序 - * - * @return 顺序值 + * @return 分组 */ - int order(); + String getGroup(); + + /** + * 处理接收到的消息 + * + * @param message 消息内容 + */ + void onMessage(T message); } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBus.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBus.java index eb09ac7de6..5a27a676b2 100644 --- a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBus.java +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBus.java @@ -1,6 +1,6 @@ package cn.iocoder.yudao.module.iot.messagebus.core.local; -import cn.iocoder.yudao.module.iot.messagebus.core.AbstractIotMessageBus; +import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus; import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBusSubscriber; import lombok.RequiredArgsConstructor; @@ -8,6 +8,13 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; import org.springframework.context.event.EventListener; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + /** * 本地的 {@link IotMessageBus} 实现类 * @@ -17,23 +24,46 @@ import org.springframework.context.event.EventListener; */ @RequiredArgsConstructor @Slf4j -public class LocalIotMessageBus extends AbstractIotMessageBus { +public class LocalIotMessageBus implements IotMessageBus { private final ApplicationContext applicationContext; + /** + * 订阅者映射表 + * Key: topic + */ + private final Map>> subscribers = new HashMap<>(); + @Override public void post(String topic, Object message) { applicationContext.publishEvent(new LocalIotMessage(topic, message)); } @Override - protected void doRegister(String topic, IotMessageBusSubscriber subscriber) { - // 无需实现,交给 Spring @EventListener 监听 + public void register(IotMessageBusSubscriber subscriber) { + String topic = subscriber.getTopic(); + List> topicSubscribers = subscribers.computeIfAbsent(topic, k -> new ArrayList<>()); + topicSubscribers.add(subscriber); + log.info("[register][topic({}/{}) 注册消费者({})成功]", + topic, subscriber.getGroup(), subscriber.getClass().getName()); } @EventListener + @SuppressWarnings({"unchecked", "rawtypes"}) public void onMessage(LocalIotMessage message) { - notifySubscribers(message.getTopic(), message.getMessage()); + String topic = message.getTopic(); + List> topicSubscribers = subscribers.get(topic); + if (CollUtil.isEmpty(topicSubscribers)) { + return; + } + for (IotMessageBusSubscriber subscriber : topicSubscribers) { + try { + subscriber.onMessage(message.getMessage()); + } catch (Exception ex) { + log.error("[onMessage][topic({}/{}) message({}) 消费者({}) 处理异常]", + subscriber.getTopic(), subscriber.getGroup(), message.getMessage(), subscriber.getClass().getName(), ex); + } + } } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/rocketmq/RocketMQIotMessageBus.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/rocketmq/RocketMQIotMessageBus.java new file mode 100644 index 0000000000..346d66efbf --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/main/java/cn/iocoder/yudao/module/iot/messagebus/core/rocketmq/RocketMQIotMessageBus.java @@ -0,0 +1,98 @@ +package cn.iocoder.yudao.module.iot.messagebus.core.rocketmq; + +import cn.hutool.core.util.TypeUtil; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBusSubscriber; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties; +import org.apache.rocketmq.spring.core.RocketMQTemplate; + +import jakarta.annotation.PreDestroy; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; + +/** + * 基于 RocketMQ 的 {@link IotMessageBus} 实现类 + * + * @author 芋道源码 + */ +@RequiredArgsConstructor +@Slf4j +public class RocketMQIotMessageBus implements IotMessageBus { + + private final RocketMQProperties rocketMQProperties; + + private final RocketMQTemplate rocketMQTemplate; + + /** + * 主题对应的消费者映射 + */ + private final List topicConsumers = new ArrayList<>(); + + @Override + public void post(String topic, Object message) { + SendResult result = rocketMQTemplate.syncSend(topic, JsonUtils.toJsonString(message)); + log.info("[post][topic({}) 发送消息({}) result({})]", topic, message, result); + } + + @Override + @SneakyThrows + public void register(IotMessageBusSubscriber subscriber) { + Type type = TypeUtil.getTypeArgument(subscriber.getClass(), 0); + if (type == null) { + throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName())); + } + + // 1.1 创建 DefaultMQPushConsumer + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); + consumer.setNamesrvAddr(rocketMQProperties.getNameServer()); + consumer.setConsumerGroup(subscriber.getGroup()); + // 1.2 订阅主题 + consumer.subscribe(subscriber.getTopic(), "*"); + // 1.3 设置消息监听器 + consumer.setMessageListener((MessageListenerConcurrently) (messages, context) -> { + for (MessageExt messageExt : messages) { + try { + byte[] body = messageExt.getBody(); + subscriber.onMessage(JsonUtils.parseObject(body, type)); + } catch (Exception ex) { + log.error("[onMessage][topic({}/{}) message({}) 消费者({}) 处理异常]", + subscriber.getTopic(), subscriber.getGroup(), messageExt, subscriber.getClass().getName(), ex); + return ConsumeConcurrentlyStatus.RECONSUME_LATER; + } + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + }); + // 1.4 启动消费者 + consumer.start(); + + // 2. 保存消费者引用 + topicConsumers.add(consumer); + } + + /** + * 销毁时关闭所有消费者 + */ + @PreDestroy + public void destroy() { + for (DefaultMQPushConsumer consumer : topicConsumers) { + try { + consumer.shutdown(); + log.info("[destroy][关闭 group({}) 的消费者成功]", consumer.getConsumerGroup()); + } catch (Exception e) { + log.error("[destroy]关闭 group({}) 的消费者异常]", consumer.getConsumerGroup(), e); + } + } + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/TestMessage.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/TestMessage.java new file mode 100644 index 0000000000..df12d601a5 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/TestMessage.java @@ -0,0 +1,12 @@ +package cn.iocoder.yudao.module.iot.messagebus.core; + +import lombok.Data; + +@Data +public class TestMessage { + + private String nickname; + + private Integer age; + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBusIntegrationTest.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBusIntegrationTest.java index cf6a1cd142..c44997f6c7 100644 --- a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBusIntegrationTest.java +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/local/LocalIotMessageBusIntegrationTest.java @@ -6,7 +6,6 @@ import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBusSubscriber; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.context.annotation.Import; import org.springframework.test.context.TestPropertySource; @@ -51,17 +50,21 @@ public class LocalIotMessageBusIntegrationTest { IotMessageBusSubscriber subscriber1 = new IotMessageBusSubscriber<>() { @Override - public void onMessage(String topic, String message) { - log.info("[订阅者1] 收到消息 - Topic: {}, Message: {}", topic, message); - subscriber1Count.incrementAndGet(); - assertEquals("test-topic", topic); - assertEquals(testMessage, message); - latch.countDown(); + public String getTopic() { + return topic; } @Override - public int order() { - return 1; + public String getGroup() { + return "group1"; + } + + @Override + public void onMessage(String message) { + log.info("[订阅者1] 收到消息 - Topic: {}, Message: {}", getTopic(), message); + subscriber1Count.incrementAndGet(); + assertEquals(testMessage, message); + latch.countDown(); } }; @@ -69,35 +72,39 @@ public class LocalIotMessageBusIntegrationTest { IotMessageBusSubscriber subscriber2 = new IotMessageBusSubscriber<>() { @Override - public void onMessage(String topic, String message) { - log.info("[订阅者2] 收到消息 - Topic: {}, Message: {}", topic, message); + public String getTopic() { + return topic; + } + + @Override + public String getGroup() { + return "group2"; + } + + @Override + public void onMessage(String message) { + log.info("[订阅者2] 收到消息 - Topic: {}, Message: {}", getTopic(), message); subscriber2Count.incrementAndGet(); - assertEquals("test-topic", topic); assertEquals(testMessage, message); latch.countDown(); } - @Override - public int order() { - return 0; - } - }; // 注册订阅者 - messageBus.register(topic, subscriber1); - messageBus.register(topic, subscriber2); + messageBus.register(subscriber1); + messageBus.register(subscriber2); // 发送消息 log.info("[测试] 发送消息 - Topic: {}, Message: {}", topic, testMessage); messageBus.post(topic, testMessage); - // 等待消息处理完成(最多等待5秒) - boolean completed = latch.await(5, TimeUnit.SECONDS); + // 等待消息处理完成(最多等待 10 秒) + boolean completed = latch.await(10, TimeUnit.SECONDS); // 验证结果 assertTrue(completed, "消息处理超时"); - assertEquals(1, subscriber1Count.get(), "订阅者1应该收到1条消息"); - assertEquals(1, subscriber2Count.get(), "订阅者2应该收到1条消息"); - log.info("[测试] 测试完成 - 订阅者1收到{}条消息,订阅者2收到{}条消息", subscriber1Count.get(), subscriber2Count.get()); + assertEquals(1, subscriber1Count.get(), "订阅者 1 应该收到 1 条消息"); + assertEquals(1, subscriber2Count.get(), "订阅者 2 应该收到 1 条消息"); + log.info("[测试] 测试完成 - 订阅者 1 收到{}条消息,订阅者 2 收到{}条消息", subscriber1Count.get(), subscriber2Count.get()); } /** @@ -116,16 +123,20 @@ public class LocalIotMessageBusIntegrationTest { IotMessageBusSubscriber statusSubscriber = new IotMessageBusSubscriber<>() { @Override - public void onMessage(String topic, String message) { - log.info("[状态订阅者] 收到消息 - Topic: {}, Message: {}", topic, message); - assertEquals(topic1, topic); - assertEquals(message1, message); - latch.countDown(); + public String getTopic() { + return topic1; } @Override - public int order() { - return 0; + public String getGroup() { + return "status-group"; + } + + @Override + public void onMessage(String message) { + log.info("[状态订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message); + assertEquals(message1, message); + latch.countDown(); } }; @@ -133,28 +144,32 @@ public class LocalIotMessageBusIntegrationTest { IotMessageBusSubscriber dataSubscriber = new IotMessageBusSubscriber<>() { @Override - public void onMessage(String topic, String message) { - log.info("[数据订阅者] 收到消息 - Topic: {}, Message: {}", topic, message); - assertEquals(topic2, topic); + public String getTopic() { + return topic2; + } + + @Override + public String getGroup() { + return "data-group"; + } + + @Override + public void onMessage(String message) { + log.info("[数据订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message); assertEquals(message2, message); latch.countDown(); } - @Override - public int order() { - return 1; - } - }; // 注册订阅者到不同主题 - messageBus.register(topic1, statusSubscriber); - messageBus.register(topic2, dataSubscriber); + messageBus.register(statusSubscriber); + messageBus.register(dataSubscriber); // 发送消息到不同主题 messageBus.post(topic1, message1); messageBus.post(topic2, message2); // 等待消息处理完成 - boolean completed = latch.await(5, TimeUnit.SECONDS); + boolean completed = latch.await(10, TimeUnit.SECONDS); assertTrue(completed, "消息处理超时"); log.info("[测试] 多主题测试完成"); } diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/rocketmq/RocketMQIotMessageBusTest.java b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/rocketmq/RocketMQIotMessageBusTest.java new file mode 100644 index 0000000000..bd8ab074d4 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/java/cn/iocoder/yudao/module/iot/messagebus/core/rocketmq/RocketMQIotMessageBusTest.java @@ -0,0 +1,271 @@ +package cn.iocoder.yudao.module.iot.messagebus.core.rocketmq; + +import cn.hutool.core.util.IdUtil; +import cn.iocoder.yudao.module.iot.messagebus.config.IotMessageBusAutoConfiguration; +import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBus; +import cn.iocoder.yudao.module.iot.messagebus.core.IotMessageBusSubscriber; +import cn.iocoder.yudao.module.iot.messagebus.core.TestMessage; +import jakarta.annotation.Resource; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.annotation.Import; +import org.springframework.test.context.TestPropertySource; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * {@link RocketMQIotMessageBus} 集成测试 + * + * @author 芋道源码 + */ +@SpringBootTest(classes = RocketMQIotMessageBusTest.class) +@Import({RocketMQAutoConfiguration.class, IotMessageBusAutoConfiguration.class}) +@TestPropertySource(properties = { + "yudao.iot.message-bus.type=rocketmq", + "rocketmq.name-server=127.0.0.1:9876", + "rocketmq.producer.group=test-rocketmq-group", + "rocketmq.producer.send-message-timeout=10000" +}) +@Slf4j +public class RocketMQIotMessageBusTest { + + @Resource + private IotMessageBus messageBus; + + /** + * 1 topic 1 subscriber(string) + */ + @Test + public void testSendMessageWithOneSubscriber() throws InterruptedException { + // 准备 + String topic = "test-topic-" + IdUtil.simpleUUID(); +// String topic = "test-topic-pojo"; + String testMessage = "Hello IoT Message Bus!"; + // 用于等待消息处理完成 + CountDownLatch latch = new CountDownLatch(1); + // 用于记录接收到的消息 + AtomicInteger subscriberCount = new AtomicInteger(0); + AtomicReference subscriberMessageRef = new AtomicReference<>(); + + // 发送消息(需要提前发,保证 RocketMQ 路由的创建) + log.info("[测试] 发送消息 - Topic: {}, Message: {}", topic, testMessage); + messageBus.post(topic, testMessage); + + // 创建订阅者 + IotMessageBusSubscriber subscriber1 = new IotMessageBusSubscriber<>() { + + @Override + public String getTopic() { + return topic; + } + + @Override + public String getGroup() { + return "test-topic-" + IdUtil.simpleUUID() + "-consumer"; +// return "test-topic-consumer-01"; + } + + @Override + public void onMessage(String message) { + log.info("[订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message); + subscriberCount.incrementAndGet(); + subscriberMessageRef.set(message); + assertEquals(testMessage, message); + latch.countDown(); + } + + }; + // 注册订阅者 + messageBus.register(subscriber1); + + // 等待消息处理完成(最多等待 5 秒) + boolean completed = latch.await(10, TimeUnit.SECONDS); + + // 验证结果 + assertTrue(completed, "消息处理超时"); + assertEquals(1, subscriberCount.get(), "订阅者应该收到 1 条消息"); + log.info("[测试] 测试完成 - 订阅者收到{}条消息", subscriberCount.get()); + assertEquals(testMessage, subscriberMessageRef.get(), "接收到的消息内容不匹配"); + } + + /** + * 1 topic 2 subscriber(pojo) + */ + @Test + public void testSendMessageWithTwoSubscribers() throws InterruptedException { + // 准备 + String topic = "test-topic-" + IdUtil.simpleUUID(); +// String topic = "test-topic-pojo"; + TestMessage testMessage = new TestMessage().setNickname("yunai").setAge(18); + // 用于等待消息处理完成 + CountDownLatch latch = new CountDownLatch(2); + // 用于记录接收到的消息 + AtomicInteger subscriber1Count = new AtomicInteger(0); + AtomicReference subscriber1MessageRef = new AtomicReference<>(); + AtomicInteger subscriber2Count = new AtomicInteger(0); + AtomicReference subscriber2MessageRef = new AtomicReference<>(); + + // 发送消息(需要提前发,保证 RocketMQ 路由的创建) + log.info("[测试] 发送消息 - Topic: {}, Message: {}", topic, testMessage); + messageBus.post(topic, testMessage); + + // 创建第一个订阅者 + IotMessageBusSubscriber subscriber1 = new IotMessageBusSubscriber<>() { + + @Override + public String getTopic() { + return topic; + } + + @Override + public String getGroup() { + return "test-topic-" + IdUtil.simpleUUID() + "-consumer"; +// return "test-topic-consumer-01"; + } + + @Override + public void onMessage(TestMessage message) { + log.info("[订阅者1] 收到消息 - Topic: {}, Message: {}", getTopic(), message); + subscriber1Count.incrementAndGet(); + subscriber1MessageRef.set(message); + assertEquals(testMessage, message); + latch.countDown(); + } + + }; + // 创建第二个订阅者 + IotMessageBusSubscriber subscriber2 = new IotMessageBusSubscriber<>() { + + @Override + public String getTopic() { + return topic; + } + + @Override + public String getGroup() { + return "test-topic-" + IdUtil.simpleUUID() + "-consumer"; +// return "test-topic-consumer-02"; + } + + @Override + public void onMessage(TestMessage message) { + log.info("[订阅者2] 收到消息 - Topic: {}, Message: {}", getTopic(), message); + subscriber2Count.incrementAndGet(); + subscriber2MessageRef.set(message); + assertEquals(testMessage, message); + latch.countDown(); + } + + }; + // 注册订阅者 + messageBus.register(subscriber1); + messageBus.register(subscriber2); + + // 等待消息处理完成(最多等待 5 秒) + boolean completed = latch.await(10, TimeUnit.SECONDS); + + // 验证结果 + assertTrue(completed, "消息处理超时"); + assertEquals(1, subscriber1Count.get(), "订阅者 1 应该收到 1 条消息"); + assertEquals(1, subscriber2Count.get(), "订阅者 2 应该收到 1 条消息"); + log.info("[测试] 测试完成 - 订阅者 1 收到{}条消息,订阅者2收到{}条消息", subscriber1Count.get(), subscriber2Count.get()); + assertEquals(testMessage, subscriber1MessageRef.get(), "接收到的消息内容不匹配"); + assertEquals(testMessage, subscriber2MessageRef.get(), "接收到的消息内容不匹配"); + } + + /** + * 2 topic 2 subscriber + */ + @Test + public void testMultipleTopics() throws InterruptedException { + // 准备 + String topic1 = "device-status-" + IdUtil.simpleUUID(); + String topic2 = "device-data-" + IdUtil.simpleUUID(); + String message1 = "设备在线"; + String message2 = "温度:25°C"; + CountDownLatch latch = new CountDownLatch(2); + AtomicInteger subscriber1Count = new AtomicInteger(0); + AtomicReference subscriber1MessageRef = new AtomicReference<>(); + AtomicInteger subscriber2Count = new AtomicInteger(0); + AtomicReference subscriber2MessageRef = new AtomicReference<>(); + + + // 发送消息到不同主题(需要提前发,保证 RocketMQ 路由的创建) + log.info("[测试] 发送消息 - Topic1: {}, Message1: {}", topic1, message1); + messageBus.post(topic1, message1); + log.info("[测试] 发送消息 - Topic2: {}, Message2: {}", topic2, message2); + messageBus.post(topic2, message2); + + // 创建订阅者 1 - 只订阅设备状态 + IotMessageBusSubscriber statusSubscriber = new IotMessageBusSubscriber<>() { + + @Override + public String getTopic() { + return topic1; + } + + @Override + public String getGroup() { + return "status-group-" + IdUtil.simpleUUID(); + } + + @Override + public void onMessage(String message) { + log.info("[状态订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message); + subscriber1Count.incrementAndGet(); + subscriber1MessageRef.set(message); + assertEquals(message1, message); + latch.countDown(); + } + + }; + // 创建订阅者 2 - 只订阅设备数据 + IotMessageBusSubscriber dataSubscriber = new IotMessageBusSubscriber<>() { + + @Override + public String getTopic() { + return topic2; + } + + @Override + public String getGroup() { + return "data-group-" + IdUtil.simpleUUID(); + } + + @Override + public void onMessage(String message) { + log.info("[数据订阅者] 收到消息 - Topic: {}, Message: {}", getTopic(), message); + subscriber2Count.incrementAndGet(); + subscriber2MessageRef.set(message); + assertEquals(message2, message); + latch.countDown(); + } + + }; + // 注册订阅者到不同主题 + messageBus.register(statusSubscriber); + messageBus.register(dataSubscriber); + + // 等待消息处理完成 + boolean completed = latch.await(10, TimeUnit.SECONDS); + + // 验证结果 + assertTrue(completed, "消息处理超时"); + assertEquals(1, subscriber1Count.get(), "状态订阅者应该收到 1 条消息"); + assertEquals(message1, subscriber1MessageRef.get(), "状态订阅者接收到的消息内容不匹配"); + assertEquals(1, subscriber2Count.get(), "数据订阅者应该收到 1 条消息"); + assertEquals(message2, subscriber2MessageRef.get(), "数据订阅者接收到的消息内容不匹配"); + log.info("[测试] 多主题测试完成 - 状态订阅者收到{}条消息,数据订阅者收到{}条消息", subscriber1Count.get(), subscriber2Count.get()); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/resources/application-test.yml b/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/resources/application-test.yml deleted file mode 100644 index 59571cd4dd..0000000000 --- a/yudao-module-iot/yudao-module-iot-core/yudao-module-iot-message-bus/src/test/resources/application-test.yml +++ /dev/null @@ -1,4 +0,0 @@ -yudao: - iot: - message-bus: - type: local \ No newline at end of file