diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeAbstractConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeAbstractConfig.java
index 06219f7236..550550d195 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeAbstractConfig.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeAbstractConfig.java
@@ -15,12 +15,12 @@ import lombok.Data;
@Data
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true)
@JsonSubTypes({
- @JsonSubTypes.Type(value = IotDataBridgeHttpConfig.class, name = "HTTP"),
- @JsonSubTypes.Type(value = IotDataBridgeKafkaMQConfig.class, name = "KAFKA"),
- @JsonSubTypes.Type(value = IotDataBridgeMqttConfig.class, name = "MQTT"),
- @JsonSubTypes.Type(value = IotDataBridgeRabbitMQConfig.class, name = "RABBITMQ"),
- @JsonSubTypes.Type(value = IotDataBridgeRedisStreamMQConfig.class, name = "REDIS_STREAM"),
- @JsonSubTypes.Type(value = IotDataBridgeRocketMQConfig.class, name = "ROCKETMQ"),
+ @JsonSubTypes.Type(value = IotDataBridgeHttpConfig.class, name = "1"),
+ @JsonSubTypes.Type(value = IotDataBridgeMqttConfig.class, name = "10"),
+ @JsonSubTypes.Type(value = IotDataBridgeRedisStreamMQConfig.class, name = "21"),
+ @JsonSubTypes.Type(value = IotDataBridgeRocketMQConfig.class, name = "30"),
+ @JsonSubTypes.Type(value = IotDataBridgeRabbitMQConfig.class, name = "31"),
+ @JsonSubTypes.Type(value = IotDataBridgeKafkaMQConfig.class, name = "32"),
})
public abstract class IotDataBridgeAbstractConfig {
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java
index 1e8d939ec2..ce3d0f1938 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java
@@ -1,6 +1,5 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
-import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeAbstractConfig;
import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java
index 3b7f99bf42..5674c7d609 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java
@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeKafkaMQConfig;
-import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import lombok.extern.slf4j.Slf4j;
@@ -13,7 +12,6 @@ import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;
-import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -77,38 +75,4 @@ public class IotKafkaMQDataBridgeExecute extends
producer.destroy();
}
- // TODO @芋艿:测试代码,后续清理
- public static void main(String[] args) {
- // 1. 创建一个共享的实例
- IotKafkaMQDataBridgeExecute action = new IotKafkaMQDataBridgeExecute();
-
- // 2. 创建共享的配置
- IotDataBridgeKafkaMQConfig config = new IotDataBridgeKafkaMQConfig();
- config.setBootstrapServers("127.0.0.1:9092");
- config.setTopic("test-topic");
- config.setSsl(false);
- config.setUsername(null);
- config.setPassword(null);
-
- // 3. 创建共享的消息
- IotDeviceMessage message = IotDeviceMessage.builder()
- .requestId("TEST-001")
- .productKey("testProduct")
- .deviceName("testDevice")
- .deviceKey("testDeviceKey")
- .type("property")
- .identifier("temperature")
- .data("{\"value\": 60}")
- .reportTime(LocalDateTime.now())
- .tenantId(1L)
- .build();
-
- // 4. 执行两次测试,验证缓存
- log.info("[main][第一次执行,应该会创建新的 producer]");
- action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
-
- log.info("[main][第二次执行,应该会复用缓存的 producer]");
- action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
- }
-
}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java
index 54485c091d..efe08b1fcb 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java
@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRabbitMQConfig;
-import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import com.rabbitmq.client.Channel;
@@ -12,7 +11,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
-import java.time.LocalDateTime;
/**
* RabbitMQ 的 {@link IotDataBridgeExecute} 实现类
@@ -76,42 +74,4 @@ public class IotRabbitMQDataBridgeExecute extends
}
}
- // TODO @芋艿:测试代码,后续清理
- public static void main(String[] args) {
- // 1. 创建一个共享的实例
- IotRabbitMQDataBridgeExecute action = new IotRabbitMQDataBridgeExecute();
-
- // 2. 创建共享的配置
- IotDataBridgeRabbitMQConfig config = new IotDataBridgeRabbitMQConfig();
- config.setHost("localhost");
- config.setPort(5672);
- config.setVirtualHost("/");
- config.setUsername("admin");
- config.setPassword("123456");
- config.setExchange("test-exchange");
- config.setRoutingKey("test-key");
- config.setQueue("test-queue");
-
- // 3. 创建共享的消息
- IotDeviceMessage message = IotDeviceMessage.builder()
- .requestId("TEST-001")
- .productKey("testProduct")
- .deviceName("testDevice")
- .deviceKey("testDeviceKey")
- .type("property")
- .identifier("temperature")
- .data("{\"value\": 60}")
- .reportTime(LocalDateTime.now())
- .tenantId(1L)
- .build();
-
- // 4. 执行两次测试,验证缓存
- // 4. 执行两次测试,验证缓存
- log.info("[main][第一次执行,应该会创建新的 producer]");
- action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
-
- log.info("[main][第二次执行,应该会复用缓存的 producer]");
- action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
- }
-
}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java
index 5616c7e648..2aac76619a 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java
@@ -3,7 +3,6 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRedisStreamMQConfig;
-import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -21,8 +20,6 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
-import java.time.LocalDateTime;
-
/**
* Redis Stream MQ 的 {@link IotDataBridgeExecute} 实现类
*
@@ -96,38 +93,4 @@ public class IotRedisStreamMQDataBridgeExecute extends
return json;
}
- // TODO @芋艿:测试代码,后续清理
- public static void main(String[] args) {
- // 1. 创建一个共享的实例
- IotRedisStreamMQDataBridgeExecute action = new IotRedisStreamMQDataBridgeExecute();
-
- // 2. 创建共享的配置
- IotDataBridgeRedisStreamMQConfig config = new IotDataBridgeRedisStreamMQConfig();
- config.setHost("127.0.0.1");
- config.setPort(6379);
- config.setDatabase(0);
- config.setPassword("123456");
- config.setTopic("test-stream");
-
- // 3. 创建共享的消息
- IotDeviceMessage message = IotDeviceMessage.builder()
- .requestId("TEST-001")
- .productKey("testProduct")
- .deviceName("testDevice")
- .deviceKey("testDeviceKey")
- .type("property")
- .identifier("temperature")
- .data("{\"value\": 60}")
- .reportTime(LocalDateTime.now())
- .tenantId(1L)
- .build();
-
- // 4. 执行两次测试,验证缓存
- log.info("[main][第一次执行,应该会创建新的 producer]");
- action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
-
- log.info("[main][第二次执行,应该会复用缓存的 producer]");
- action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
- }
-
-}
\ No newline at end of file
+}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java
index c3e729dda3..d3ac77227a 100644
--- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java
+++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java
@@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRocketMQConfig;
-import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import lombok.extern.slf4j.Slf4j;
@@ -13,8 +12,6 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.stereotype.Component;
-import java.time.LocalDateTime;
-
/**
* RocketMQ 的 {@link IotDataBridgeExecute} 实现类
*
@@ -65,38 +62,4 @@ public class IotRocketMQDataBridgeExecute extends
producer.shutdown();
}
- // TODO @芋艿:测试代码,后续清理
- // TODO @puhui999:搞到测试类里。
- public static void main(String[] args) {
- // 1. 创建一个共享的实例
- IotRocketMQDataBridgeExecute action = new IotRocketMQDataBridgeExecute();
-
- // 2. 创建共享的配置
- IotDataBridgeRocketMQConfig config = new IotDataBridgeRocketMQConfig();
- config.setNameServer("127.0.0.1:9876");
- config.setGroup("test-group");
- config.setTopic("test-topic");
- config.setTags("test-tag");
-
- // 3. 创建共享的消息
- IotDeviceMessage message = IotDeviceMessage.builder()
- .requestId("TEST-001")
- .productKey("testProduct")
- .deviceName("testDevice")
- .deviceKey("testDeviceKey")
- .type("property")
- .identifier("temperature")
- .data("{\"value\": 60}")
- .reportTime(LocalDateTime.now())
- .tenantId(1L)
- .build();
-
- // 4. 执行两次测试,验证缓存
- log.info("[main][第一次执行,应该会创建新的 producer]");
- action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
-
- log.info("[main][第二次执行,应该会复用缓存的 producer]");
- action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
- }
-
}
diff --git a/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecuteTest.java b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecuteTest.java
new file mode 100644
index 0000000000..03ea33d682
--- /dev/null
+++ b/yudao-module-iot/yudao-module-iot-biz/src/test/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecuteTest.java
@@ -0,0 +1,159 @@
+package cn.iocoder.yudao.module.iot.service.rule.action.databridge;
+
+import cn.iocoder.yudao.framework.test.core.ut.BaseMockitoUnitTest;
+import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.*;
+import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO;
+import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
+
+import java.time.LocalDateTime;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+
+/**
+ * {@link IotDataBridgeExecute} 实现类的测试
+ *
+ * @author HUIHUI
+ */
+@Disabled // 默认禁用,需要手动启用测试
+@Slf4j
+public class IotDataBridgeExecuteTest extends BaseMockitoUnitTest {
+
+ private IotDeviceMessage message;
+
+ @Mock
+ private RestTemplate restTemplate;
+
+ @InjectMocks
+ private IotHttpDataBridgeExecute httpDataBridgeExecute;
+
+ @BeforeEach
+ public void setUp() {
+ // 创建共享的测试消息
+ message = IotDeviceMessage.builder()
+ .requestId("TEST-001")
+ .productKey("testProduct")
+ .deviceName("testDevice")
+ .deviceKey("testDeviceKey")
+ .type("property")
+ .identifier("temperature")
+ .data("{\"value\": 60}")
+ .reportTime(LocalDateTime.now())
+ .tenantId(1L)
+ .build();
+
+ // 配置 RestTemplate mock 返回成功响应
+ Mockito.when(restTemplate.exchange(anyString(), any(HttpMethod.class), any(), any(Class.class)))
+ .thenReturn(new ResponseEntity<>("Success", HttpStatus.OK));
+ }
+
+ @Test
+ public void testKafkaMQDataBridge() {
+ // 1. 创建执行器实例
+ IotKafkaMQDataBridgeExecute action = new IotKafkaMQDataBridgeExecute();
+
+ // 2. 创建配置
+ IotDataBridgeKafkaMQConfig config = new IotDataBridgeKafkaMQConfig();
+ config.setBootstrapServers("127.0.0.1:9092");
+ config.setTopic("test-topic");
+ config.setSsl(false);
+ config.setUsername(null);
+ config.setPassword(null);
+
+ // 3. 执行两次测试,验证缓存
+ log.info("[testKafkaMQDataBridge][第一次执行,应该会创建新的 producer]");
+ action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
+
+ log.info("[testKafkaMQDataBridge][第二次执行,应该会复用缓存的 producer]");
+ action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
+ }
+
+ @Test
+ public void testRabbitMQDataBridge() {
+ // 1. 创建执行器实例
+ IotRabbitMQDataBridgeExecute action = new IotRabbitMQDataBridgeExecute();
+
+ // 2. 创建配置
+ IotDataBridgeRabbitMQConfig config = new IotDataBridgeRabbitMQConfig();
+ config.setHost("localhost");
+ config.setPort(5672);
+ config.setVirtualHost("/");
+ config.setUsername("admin");
+ config.setPassword("123456");
+ config.setExchange("test-exchange");
+ config.setRoutingKey("test-key");
+ config.setQueue("test-queue");
+
+ // 3. 执行两次测试,验证缓存
+ log.info("[testRabbitMQDataBridge][第一次执行,应该会创建新的 producer]");
+ action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
+
+ log.info("[testRabbitMQDataBridge][第二次执行,应该会复用缓存的 producer]");
+ action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
+ }
+
+ @Test
+ public void testRedisStreamMQDataBridge() {
+ // 1. 创建执行器实例
+ IotRedisStreamMQDataBridgeExecute action = new IotRedisStreamMQDataBridgeExecute();
+
+ // 2. 创建配置
+ IotDataBridgeRedisStreamMQConfig config = new IotDataBridgeRedisStreamMQConfig();
+ config.setHost("127.0.0.1");
+ config.setPort(6379);
+ config.setDatabase(0);
+ config.setPassword("123456");
+ config.setTopic("test-stream");
+
+ // 3. 执行两次测试,验证缓存
+ log.info("[testRedisStreamMQDataBridge][第一次执行,应该会创建新的 producer]");
+ action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
+
+ log.info("[testRedisStreamMQDataBridge][第二次执行,应该会复用缓存的 producer]");
+ action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
+ }
+
+ @Test
+ public void testRocketMQDataBridge() {
+ // 1. 创建执行器实例
+ IotRocketMQDataBridgeExecute action = new IotRocketMQDataBridgeExecute();
+
+ // 2. 创建配置
+ IotDataBridgeRocketMQConfig config = new IotDataBridgeRocketMQConfig();
+ config.setNameServer("127.0.0.1:9876");
+ config.setGroup("test-group");
+ config.setTopic("test-topic");
+ config.setTags("test-tag");
+
+ // 3. 执行两次测试,验证缓存
+ log.info("[testRocketMQDataBridge][第一次执行,应该会创建新的 producer]");
+ action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
+
+ log.info("[testRocketMQDataBridge][第二次执行,应该会复用缓存的 producer]");
+ action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config));
+ }
+
+ @Test
+ public void testHttpDataBridge() throws Exception {
+ // 创建配置
+ IotDataBridgeHttpConfig config = new IotDataBridgeHttpConfig();
+ config.setUrl("https://doc.iocoder.cn/");
+ config.setMethod(HttpMethod.GET.name());
+
+ // 执行测试
+ log.info("[testHttpDataBridge][执行HTTP数据桥接测试]");
+ httpDataBridgeExecute.execute(message, new IotDataBridgeDO().setType(httpDataBridgeExecute.getType()).setConfig(config));
+ }
+
+}
diff --git a/yudao-server/pom.xml b/yudao-server/pom.xml
index f408ca3e5d..0251e7b649 100644
--- a/yudao-server/pom.xml
+++ b/yudao-server/pom.xml
@@ -115,19 +115,18 @@
${revision}
-
-
- org.apache.rocketmq
- rocketmq-spring-boot-starter
-
-
- org.springframework.kafka
- spring-kafka
-
-
- org.springframework.boot
- spring-boot-starter-amqp
-
+
+
+
+
+
+
+
+
+
+
+
+