diff --git a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml index 15bbaa42de..c8972f16b2 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml +++ b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml @@ -36,6 +36,7 @@ org.apache.rocketmq rocketmq-spring-boot-starter + true diff --git a/yudao-module-iot/yudao-module-iot-biz/pom.xml b/yudao-module-iot/yudao-module-iot-biz/pom.xml index 2a9669c0ee..ea1dde86fe 100644 --- a/yudao-module-iot/yudao-module-iot-biz/pom.xml +++ b/yudao-module-iot/yudao-module-iot-biz/pom.xml @@ -75,10 +75,11 @@ - + - cn.iocoder.boot - yudao-spring-boot-starter-mq + org.apache.rocketmq + rocketmq-spring-boot-starter + true diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java index 7668c05b94..f87cd60011 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java @@ -1,33 +1,18 @@ package cn.iocoder.yudao.module.iot.service.rule.action; -import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Assert; import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum; -import cn.iocoder.yudao.framework.common.util.http.HttpUtils; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotRuleSceneDO; -import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import cn.iocoder.yudao.module.iot.service.rule.IotDataBridgeService; +import cn.iocoder.yudao.module.iot.service.rule.execute.IotDataBridgeExecute; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.springframework.http.*; import org.springframework.stereotype.Component; -import org.springframework.web.client.RestTemplate; -import org.springframework.web.util.UriComponentsBuilder; -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.Map; - -import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; +import java.util.List; /** * IoT 数据桥梁的 {@link IotRuleSceneAction} 实现类 @@ -41,11 +26,10 @@ import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_ @Slf4j public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { - @Resource - private RestTemplate restTemplate; - @Resource private IotDataBridgeService dataBridgeService; + @Resource + private List dataBridgeExecutes; @Override public void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) { @@ -65,26 +49,8 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { return; } - // 2.1 执行 HTTP 请求 - if (IotDataBridgTypeEnum.HTTP.getType().equals(dataBridge.getType())) { - executeHttp(message, (IotDataBridgeDO.HttpConfig) dataBridge.getConfig()); - return; - } - // 2.2 执行 RocketMQ 发送消息 - if (IotDataBridgTypeEnum.ROCKETMQ.getType().equals(dataBridge.getType())) { - executeRocketMQ(message, (IotDataBridgeDO.RocketMQConfig) dataBridge.getConfig()); - return; - } - - // TODO @芋艿:因为下面的,都是有状态的,所以通过 guava 缓存连接,然后通过 RemovalNotification 实现关闭。例如说,一次新建有效期是 10 分钟; - // TODO @芋艿:mq-redis - // TODO @芋艿:mq-数据库 - // TODO @芋艿:kafka - // TODO @芋艿:rocketmq - // TODO @芋艿:rabbitmq - // TODO @芋艿:mqtt - // TODO @芋艿:tcp - // TODO @芋艿:websocket + // 2.1 执行数据桥接操作 + dataBridgeExecutes.forEach(execute -> execute.execute(message, dataBridge)); } @Override @@ -92,115 +58,4 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { return IotRuleSceneActionTypeEnum.DATA_BRIDGE; } - @SuppressWarnings({"unchecked", "deprecation"}) - private void executeHttp(IotDeviceMessage message, IotDataBridgeDO.HttpConfig config) { - String url = null; - HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase()); - HttpEntity requestEntity = null; - ResponseEntity responseEntity = null; - try { - // 1.1 构建 Header - HttpHeaders headers = new HttpHeaders(); - if (CollUtil.isNotEmpty(config.getHeaders())) { - config.getHeaders().putAll(config.getHeaders()); - } - headers.add(HEADER_TENANT_ID, message.getTenantId().toString()); - // 1.2 构建 URL - UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromUriString(config.getUrl()); - if (CollUtil.isNotEmpty(config.getQuery())) { - config.getQuery().forEach(uriBuilder::queryParam); - } - // 1.3 构建请求体 - if (method == HttpMethod.GET) { - uriBuilder.queryParam("message", HttpUtils.encodeUtf8(JsonUtils.toJsonString(message))); - url = uriBuilder.build().toUriString(); - requestEntity = new HttpEntity<>(headers); - } else { - url = uriBuilder.build().toUriString(); - Map requestBody = JsonUtils.parseObject(config.getBody(), Map.class); - if (requestBody == null) { - requestBody = new HashMap<>(); - } - requestBody.put("message", message); - headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE); - requestEntity = new HttpEntity<>(JsonUtils.toJsonString(requestBody), headers); - } - - // 2.1 发送请求 - responseEntity = restTemplate.exchange(url, method, requestEntity, String.class); - // 2.2 记录日志 - if (responseEntity.getStatusCode().is2xxSuccessful()) { - log.info("[executeHttp][message({}) config({}) url({}) method({}) requestEntity({}) 请求成功({})]", - message, config, url, method, requestEntity, responseEntity); - } else { - log.error("[executeHttp][message({}) config({}) url({}) method({}) requestEntity({}) 请求失败({})]", - message, config, url, method, requestEntity, responseEntity); - } - } catch (Exception e) { - log.error("[executeHttp][message({}) config({}) url({}) method({}) requestEntity({}) 请求异常({})]", - message, config, url, method, requestEntity, responseEntity, e); - } - } - - private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { - // 1.1 创建生产者实例,指定生产者组名 - DefaultMQProducer producer = new DefaultMQProducer(config.getGroup()); - // TODO @puhui999:可以考虑,基于 guava 做 cache,使用 config 作为 key,然后假个 listener 超时,销毁 producer - try { - // 1.2 设置 NameServer 地址 - producer.setNamesrvAddr(config.getNameServer()); - // 1.3 启动生产者 - producer.start(); - - // 2.1 创建消息对象,指定Topic、Tag和消息体 - Message msg = new Message( - config.getTopic(), - config.getTags(), - message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET) - ); - // 2.2 发送同步消息并处理结果 - SendResult sendResult = producer.send(msg); - // 2.3 处理发送结果 - if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { - log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult); - } else { - log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult); - } - } catch (Exception e) { - log.error("[executeRocketMQ][message({}) config({}) 发送异常]", message, config, e); - } finally { - // 3. 关闭生产者 - producer.shutdown(); - } - } - - // TODO @芋艿:测试代码,后续清理 - public static void main(String[] args) { - // 1. 创建 IotRuleSceneDataBridgeAction 实例 - IotRuleSceneDataBridgeAction action = new IotRuleSceneDataBridgeAction(); - - // 2. 创建测试消息 - IotDeviceMessage message = IotDeviceMessage.builder() - .requestId("TEST-001") - .productKey("testProduct") - .deviceName("testDevice") - .deviceKey("testDeviceKey") - .type("property") - .identifier("temperature") - .data("{\"value\": 60}") - .reportTime(LocalDateTime.now()) - .tenantId(1L) - .build(); - - // 3. 创建 RocketMQ 配置 - IotDataBridgeDO.RocketMQConfig config = new IotDataBridgeDO.RocketMQConfig(); - config.setNameServer("127.0.0.1:9876"); - config.setGroup("test-group"); - config.setTopic("test-topic"); - config.setTags("test-tag"); - - // 4. 执行测试 - action.executeRocketMQ(message, config); - } - } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotDataBridgeExecute.java new file mode 100644 index 0000000000..8979d21150 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotDataBridgeExecute.java @@ -0,0 +1,32 @@ +package cn.iocoder.yudao.module.iot.service.rule.execute; + +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; +import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; + + +/** + * IoT 数据桥梁的执行器 execute 接口 + * + * @author HUIHUI + */ +public interface IotDataBridgeExecute { + + /** + * 执行数据桥接操作 + * + * @param message 设备消息 + * @param dataBridge 数据桥梁 + */ + void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge); + + // TODO @芋艿:因为下面的,都是有状态的,所以通过 guava 缓存连接,然后通过 RemovalNotification 实现关闭。例如说,一次新建有效期是 10 分钟; + // TODO @芋艿:mq-redis + // TODO @芋艿:mq-数据库 + // TODO @芋艿:kafka + // TODO @芋艿:rocketmq + // TODO @芋艿:rabbitmq + // TODO @芋艿:mqtt + // TODO @芋艿:tcp + // TODO @芋艿:websocket + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotHttpDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotHttpDataBridgeExecute.java new file mode 100644 index 0000000000..861fadd6c0 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotHttpDataBridgeExecute.java @@ -0,0 +1,93 @@ +package cn.iocoder.yudao.module.iot.service.rule.execute; + +import cn.hutool.core.collection.CollUtil; +import cn.iocoder.yudao.framework.common.util.http.HttpUtils; +import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; +import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.http.*; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; +import org.springframework.web.util.UriComponentsBuilder; + +import java.util.HashMap; +import java.util.Map; + +import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; + +/** + * Http 的 {@link IotDataBridgeExecute} 实现类 + * + * @author HUIHUI + */ +@Component +@Slf4j +public class IotHttpDataBridgeExecute implements IotDataBridgeExecute { + + @Resource + private RestTemplate restTemplate; + + @Override + public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { + // 1.1 校验数据桥接的类型 == HTTP + if (!IotDataBridgTypeEnum.HTTP.getType().equals(dataBridge.getType())) { + return; + } + // 1.2 执行 HTTP 请求 + executeHttp(message, (IotDataBridgeDO.HttpConfig) dataBridge.getConfig()); + } + + @SuppressWarnings({"unchecked", "deprecation"}) + private void executeHttp(IotDeviceMessage message, IotDataBridgeDO.HttpConfig config) { + String url = null; + HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase()); + HttpEntity requestEntity = null; + ResponseEntity responseEntity = null; + try { + // 1.1 构建 Header + HttpHeaders headers = new HttpHeaders(); + if (CollUtil.isNotEmpty(config.getHeaders())) { + config.getHeaders().putAll(config.getHeaders()); + } + headers.add(HEADER_TENANT_ID, message.getTenantId().toString()); + // 1.2 构建 URL + UriComponentsBuilder uriBuilder = UriComponentsBuilder.fromUriString(config.getUrl()); + if (CollUtil.isNotEmpty(config.getQuery())) { + config.getQuery().forEach(uriBuilder::queryParam); + } + // 1.3 构建请求体 + if (method == HttpMethod.GET) { + uriBuilder.queryParam("message", HttpUtils.encodeUtf8(JsonUtils.toJsonString(message))); + url = uriBuilder.build().toUriString(); + requestEntity = new HttpEntity<>(headers); + } else { + url = uriBuilder.build().toUriString(); + Map requestBody = JsonUtils.parseObject(config.getBody(), Map.class); + if (requestBody == null) { + requestBody = new HashMap<>(); + } + requestBody.put("message", message); + headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE); + requestEntity = new HttpEntity<>(JsonUtils.toJsonString(requestBody), headers); + } + + // 2.1 发送请求 + responseEntity = restTemplate.exchange(url, method, requestEntity, String.class); + // 2.2 记录日志 + if (responseEntity.getStatusCode().is2xxSuccessful()) { + log.info("[executeHttp][message({}) config({}) url({}) method({}) requestEntity({}) 请求成功({})]", + message, config, url, method, requestEntity, responseEntity); + } else { + log.error("[executeHttp][message({}) config({}) url({}) method({}) requestEntity({}) 请求失败({})]", + message, config, url, method, requestEntity, responseEntity); + } + } catch (Exception e) { + log.error("[executeHttp][message({}) config({}) url({}) method({}) requestEntity({}) 请求异常({})]", + message, config, url, method, requestEntity, responseEntity, e); + } + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotRocketMQDataBridgeExecute.java new file mode 100644 index 0000000000..3e02e6960a --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/execute/IotRocketMQDataBridgeExecute.java @@ -0,0 +1,96 @@ +package cn.iocoder.yudao.module.iot.service.rule.execute; + +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; +import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +/** + * RocketMQ 的 {@link IotDataBridgeExecute} 实现类 + * + * @author HUIHUI + */ +@Component +@Slf4j +public class IotRocketMQDataBridgeExecute implements IotDataBridgeExecute { + + @Override + public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { + // 1.1 校验数据桥接的类型 == ROCKETMQ + if (!IotDataBridgTypeEnum.ROCKETMQ.getType().equals(dataBridge.getType())) { + return; + } + // 1.2 执行 RocketMQ 发送消息 + executeRocketMQ(message, (IotDataBridgeDO.RocketMQConfig) dataBridge.getConfig()); + } + + private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { + // 1.1 创建生产者实例,指定生产者组名 + DefaultMQProducer producer = new DefaultMQProducer(config.getGroup()); + // TODO @puhui999:可以考虑,基于 guava 做 cache,使用 config 作为 key,然后假个 listener 超时,销毁 producer + try { + // 1.2 设置 NameServer 地址 + producer.setNamesrvAddr(config.getNameServer()); + // 1.3 启动生产者 + producer.start(); + + // 2.1 创建消息对象,指定Topic、Tag和消息体 + Message msg = new Message( + config.getTopic(), + config.getTags(), + message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET) + ); + // 2.2 发送同步消息并处理结果 + SendResult sendResult = producer.send(msg); + // 2.3 处理发送结果 + if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { + log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult); + } else { + log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult); + } + } catch (Exception e) { + log.error("[executeRocketMQ][message({}) config({}) 发送异常]", message, config, e); + } finally { + // 3. 关闭生产者 + producer.shutdown(); + } + } + + // TODO @芋艿:测试代码,后续清理 + public static void main(String[] args) { + // 1. 创建 IotRocketMQDataBridgeExecute 实例 + IotRocketMQDataBridgeExecute action = new IotRocketMQDataBridgeExecute(); + + // 2. 创建测试消息 + IotDeviceMessage message = IotDeviceMessage.builder() + .requestId("TEST-001") + .productKey("testProduct") + .deviceName("testDevice") + .deviceKey("testDeviceKey") + .type("property") + .identifier("temperature") + .data("{\"value\": 60}") + .reportTime(LocalDateTime.now()) + .tenantId(1L) + .build(); + + // 3. 创建 RocketMQ 配置 + IotDataBridgeDO.RocketMQConfig config = new IotDataBridgeDO.RocketMQConfig(); + config.setNameServer("127.0.0.1:9876"); + config.setGroup("test-group"); + config.setTopic("test-topic"); + config.setTags("test-tag"); + + // 4. 执行测试 + action.executeRocketMQ(message, config); + } + +}