diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java index 04df143bed..1fc47c0c00 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/DictTypeConstants.java @@ -15,5 +15,7 @@ public class DictTypeConstants { public static final String VALIDATE_TYPE = "iot_validate_type"; public static final String DEVICE_STATE = "iot_device_state"; + public static final String IOT_DATA_BRIDGE_DIRECTION_ENUM = "iot_data_bridge_direction_enum"; + public static final String IOT_DATA_BRIDGE_TYPE_ENUM = "iot_data_bridge_type_enum"; } diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java index 3d12c4b594..7b84c66fcf 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/ErrorCodeConstants.java @@ -70,4 +70,7 @@ public interface ErrorCodeConstants { // ========== MQTT 通信相关 1-050-009-000 ========== ErrorCode MQTT_TOPIC_ILLEGAL = new ErrorCode(1_050_009_000, "topic illegal"); + // ========== IoT 数据桥梁 1-050-010-000 ========== + ErrorCode DATA_BRIDGE_NOT_EXISTS = new ErrorCode(1_050_010_000, "IoT 数据桥梁不存在"); + } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgDirectionEnum.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgeDirectionEnum.java similarity index 78% rename from yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgDirectionEnum.java rename to yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgeDirectionEnum.java index ff4993d0ea..eb4b999163 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgDirectionEnum.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgeDirectionEnum.java @@ -13,14 +13,14 @@ import java.util.Arrays; */ @RequiredArgsConstructor @Getter -public enum IotDataBridgDirectionEnum implements ArrayValuable { +public enum IotDataBridgeDirectionEnum implements ArrayValuable { INPUT(1), // 输入 OUTPUT(2); // 输出 private final Integer type; - public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDataBridgDirectionEnum::getType).toArray(Integer[]::new); + public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDataBridgeDirectionEnum::getType).toArray(Integer[]::new); @Override public Integer[] array() { diff --git a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgeTypeEnum.java similarity index 53% rename from yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java rename to yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgeTypeEnum.java index 295f35cffd..25c7e8c1fe 100644 --- a/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgTypeEnum.java +++ b/yudao-module-iot/yudao-module-iot-api/src/main/java/cn/iocoder/yudao/module/iot/enums/rule/IotDataBridgeTypeEnum.java @@ -13,25 +13,26 @@ import java.util.Arrays; */ @RequiredArgsConstructor @Getter -public enum IotDataBridgTypeEnum implements ArrayValuable { +public enum IotDataBridgeTypeEnum implements ArrayValuable { - HTTP(1), - TCP(2), - WEBSOCKET(3), + HTTP(1, "HTTP"), + TCP(2, "TCP"), + WEBSOCKET(3, "WEBSOCKET"), - MQTT(10), + MQTT(10, "MQTT"), - DATABASE(20), - REDIS_STREAM(21), + DATABASE(20, "DATABASE"), + REDIS_STREAM(21, "REDIS_STREAM"), - ROCKETMQ(30), - RABBITMQ(31), - KAFKA(32) - ; + ROCKETMQ(30, "ROCKETMQ"), + RABBITMQ(31, "RABBITMQ"), + KAFKA(32, "KAFKA"); private final Integer type; - public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDataBridgTypeEnum::getType).toArray(Integer[]::new); + private final String name; + + public static final Integer[] ARRAYS = Arrays.stream(values()).map(IotDataBridgeTypeEnum::getType).toArray(Integer[]::new); @Override public Integer[] array() { diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/IotDataBridgeController.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/IotDataBridgeController.java new file mode 100644 index 0000000000..2d2f8fb75b --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/IotDataBridgeController.java @@ -0,0 +1,93 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule; + +import cn.iocoder.yudao.framework.apilog.core.annotation.ApiAccessLog; +import cn.iocoder.yudao.framework.common.pojo.CommonResult; +import cn.iocoder.yudao.framework.common.pojo.PageParam; +import cn.iocoder.yudao.framework.common.pojo.PageResult; +import cn.iocoder.yudao.framework.common.util.object.BeanUtils; +import cn.iocoder.yudao.framework.excel.core.util.ExcelUtils; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgePageReqVO; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgeRespVO; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgeSaveReqVO; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; +import cn.iocoder.yudao.module.iot.service.rule.IotDataBridgeService; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.annotation.Resource; +import jakarta.servlet.http.HttpServletResponse; +import jakarta.validation.Valid; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +import java.io.IOException; +import java.util.List; + +import static cn.iocoder.yudao.framework.apilog.core.enums.OperateTypeEnum.EXPORT; +import static cn.iocoder.yudao.framework.common.pojo.CommonResult.success; + +@Tag(name = "管理后台 - IoT 数据桥梁") +@RestController +@RequestMapping("/iot/data-bridge") +@Validated +public class IotDataBridgeController { + + @Resource + private IotDataBridgeService dataBridgeService; + + @PostMapping("/create") + @Operation(summary = "创建IoT 数据桥梁") + @PreAuthorize("@ss.hasPermission('iot:data-bridge:create')") + public CommonResult createDataBridge(@Valid @RequestBody IotDataBridgeSaveReqVO createReqVO) { + return success(dataBridgeService.createDataBridge(createReqVO)); + } + + @PutMapping("/update") + @Operation(summary = "更新IoT 数据桥梁") + @PreAuthorize("@ss.hasPermission('iot:data-bridge:update')") + public CommonResult updateDataBridge(@Valid @RequestBody IotDataBridgeSaveReqVO updateReqVO) { + dataBridgeService.updateDataBridge(updateReqVO); + return success(true); + } + + @DeleteMapping("/delete") + @Operation(summary = "删除IoT 数据桥梁") + @Parameter(name = "id", description = "编号", required = true) + @PreAuthorize("@ss.hasPermission('iot:data-bridge:delete')") + public CommonResult deleteDataBridge(@RequestParam("id") Long id) { + dataBridgeService.deleteDataBridge(id); + return success(true); + } + + @GetMapping("/get") + @Operation(summary = "获得IoT 数据桥梁") + @Parameter(name = "id", description = "编号", required = true, example = "1024") + @PreAuthorize("@ss.hasPermission('iot:data-bridge:query')") + public CommonResult getDataBridge(@RequestParam("id") Long id) { + IotDataBridgeDO dataBridge = dataBridgeService.getDataBridge(id); + return success(BeanUtils.toBean(dataBridge, IotDataBridgeRespVO.class)); + } + + @GetMapping("/page") + @Operation(summary = "获得IoT 数据桥梁分页") + @PreAuthorize("@ss.hasPermission('iot:data-bridge:query')") + public CommonResult> getDataBridgePage(@Valid IotDataBridgePageReqVO pageReqVO) { + PageResult pageResult = dataBridgeService.getDataBridgePage(pageReqVO); + return success(BeanUtils.toBean(pageResult, IotDataBridgeRespVO.class)); + } + + @GetMapping("/export-excel") + @Operation(summary = "导出IoT 数据桥梁 Excel") + @PreAuthorize("@ss.hasPermission('iot:data-bridge:export')") + @ApiAccessLog(operateType = EXPORT) + public void exportDataBridgeExcel(@Valid IotDataBridgePageReqVO pageReqVO, + HttpServletResponse response) throws IOException { + pageReqVO.setPageSize(PageParam.PAGE_SIZE_NONE); + List list = dataBridgeService.getDataBridgePage(pageReqVO).getList(); + // 导出 Excel + ExcelUtils.write(response, "IoT 数据桥梁.xls", "数据", IotDataBridgeRespVO.class, + BeanUtils.toBean(list, IotDataBridgeRespVO.class)); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgePageReqVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgePageReqVO.java new file mode 100644 index 0000000000..7da87be07a --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgePageReqVO.java @@ -0,0 +1,39 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge; + +import cn.iocoder.yudao.framework.common.pojo.PageParam; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.springframework.format.annotation.DateTimeFormat; + +import java.time.LocalDateTime; + +import static cn.iocoder.yudao.framework.common.util.date.DateUtils.FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND; + +@Schema(description = "管理后台 - IoT 数据桥梁分页 Request VO") +@Data +@EqualsAndHashCode(callSuper = true) +@ToString(callSuper = true) +public class IotDataBridgePageReqVO extends PageParam { + + @Schema(description = "桥梁名称", example = "赵六") + private String name; + + @Schema(description = "桥梁描述", example = "随便") + private String description; + + @Schema(description = "桥梁状态", example = "2") + private Integer status; + + @Schema(description = "桥梁方向") + private Integer direction; + + @Schema(description = "桥梁类型", example = "1") + private Integer type; + + @Schema(description = "创建时间") + @DateTimeFormat(pattern = FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND) + private LocalDateTime[] createTime; + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeRespVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeRespVO.java new file mode 100644 index 0000000000..3e50dc4d5b --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeRespVO.java @@ -0,0 +1,57 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge; + +import cn.iocoder.yudao.framework.excel.core.annotations.DictFormat; +import cn.iocoder.yudao.framework.excel.core.convert.DictConvert; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeConfig; +import com.alibaba.excel.annotation.ExcelIgnoreUnannotated; +import com.alibaba.excel.annotation.ExcelProperty; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +import java.time.LocalDateTime; + +import static cn.iocoder.yudao.module.iot.enums.DictTypeConstants.IOT_DATA_BRIDGE_DIRECTION_ENUM; +import static cn.iocoder.yudao.module.iot.enums.DictTypeConstants.IOT_DATA_BRIDGE_TYPE_ENUM; +import static cn.iocoder.yudao.module.system.enums.DictTypeConstants.COMMON_STATUS; + +@Schema(description = "管理后台 - IoT 数据桥梁 Response VO") +@Data +@ExcelIgnoreUnannotated +public class IotDataBridgeRespVO { + + @Schema(description = "桥梁编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "18564") + @ExcelProperty("桥梁编号") + private Long id; + + @Schema(description = "桥梁名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "赵六") + @ExcelProperty("桥梁名称") + private String name; + + @Schema(description = "桥梁描述", example = "随便") + @ExcelProperty("桥梁描述") + private String description; + + @Schema(description = "桥梁状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "2") + @ExcelProperty(value = "桥梁状态", converter = DictConvert.class) + @DictFormat(COMMON_STATUS) + private Integer status; + + @Schema(description = "桥梁方向", requiredMode = Schema.RequiredMode.REQUIRED) + @ExcelProperty(value = "桥梁方向", converter = DictConvert.class) + @DictFormat(IOT_DATA_BRIDGE_DIRECTION_ENUM) + private Integer direction; + + @Schema(description = "桥梁类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "1") + @ExcelProperty(value = "桥梁类型", converter = DictConvert.class) + @DictFormat(IOT_DATA_BRIDGE_TYPE_ENUM) + private Integer type; + + @Schema(description = "桥梁配置") + @ExcelProperty("桥梁配置") + private IotDataBridgeConfig config; + + @Schema(description = "创建时间", requiredMode = Schema.RequiredMode.REQUIRED) + @ExcelProperty("创建时间") + private LocalDateTime createTime; + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeSaveReqVO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeSaveReqVO.java new file mode 100644 index 0000000000..96620a1ca4 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/IotDataBridgeSaveReqVO.java @@ -0,0 +1,39 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge; + +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeConfig; +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +@Schema(description = "管理后台 - IoT 数据桥梁新增/修改 Request VO") +@Data +public class IotDataBridgeSaveReqVO { + + @Schema(description = "桥梁编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "18564") + private Long id; + + @Schema(description = "桥梁名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "赵六") + @NotEmpty(message = "桥梁名称不能为空") + private String name; + + @Schema(description = "桥梁描述", example = "随便") + private String description; + + @Schema(description = "桥梁状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "2") + @NotNull(message = "桥梁状态不能为空") + private Integer status; + + @Schema(description = "桥梁方向", requiredMode = Schema.RequiredMode.REQUIRED) + @NotNull(message = "桥梁方向不能为空") + private Integer direction; + + @Schema(description = "桥梁类型", requiredMode = Schema.RequiredMode.REQUIRED, example = "1") + @NotNull(message = "桥梁类型不能为空") + private Integer type; + + @Schema(description = "桥梁配置") + @NotNull(message = "桥梁配置不能为空") + private IotDataBridgeConfig config; + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeConfig.java new file mode 100644 index 0000000000..f37a51edb3 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeConfig.java @@ -0,0 +1,32 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import lombok.Data; + +/** + * 抽象类 IotDataBridgeConfig + * + * 用于表示数据桥梁配置数据的通用类型,根据具体的 "type" 字段动态映射到对应的子类。 + * 提供多态支持,适用于不同类型的数据结构序列化和反序列化场景。 + * + * @author HUIHUI + */ +@Data +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible = true) +@JsonSubTypes({ + @JsonSubTypes.Type(value = IotDataBridgeHttpConfig.class, name = "HTTP"), + @JsonSubTypes.Type(value = IotDataBridgeKafkaMQConfig.class, name = "KAFKA"), + @JsonSubTypes.Type(value = IotDataBridgeMqttConfig.class, name = "MQTT"), + @JsonSubTypes.Type(value = IotDataBridgeRabbitMQConfig.class, name = "RABBITMQ"), + @JsonSubTypes.Type(value = IotDataBridgeRedisStreamMQConfig.class, name = "REDIS_STREAM"), + @JsonSubTypes.Type(value = IotDataBridgeRocketMQConfig.class, name = "ROCKETMQ"), +}) +public abstract class IotDataBridgeConfig { + + /** + * 配置类型 + */ + private String type; + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeHttpConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeHttpConfig.java new file mode 100644 index 0000000000..69cdc71f7f --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeHttpConfig.java @@ -0,0 +1,36 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config; + +import lombok.Data; + +import java.util.Map; + +/** + * HTTP 配置 + * + * @author HUIHUI + */ +@Data +public class IotDataBridgeHttpConfig extends IotDataBridgeConfig { + + /** + * 请求 URL + */ + private String url; + /** + * 请求方法 + */ + private String method; + /** + * 请求头 + */ + private Map headers; + /** + * 请求参数 + */ + private Map query; + /** + * 请求体 + */ + private String body; + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeKafkaMQConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeKafkaMQConfig.java new file mode 100644 index 0000000000..3acd646f33 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeKafkaMQConfig.java @@ -0,0 +1,35 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config; + +import lombok.Data; + +/** + * Kafka 配置 + * + * @author HUIHUI + */ +@Data +public class IotDataBridgeKafkaMQConfig extends IotDataBridgeConfig { + + /** + * Kafka 服务器地址 + */ + private String bootstrapServers; + /** + * 用户名 + */ + private String username; + /** + * 密码 + */ + private String password; + /** + * 是否启用 SSL + */ + private Boolean ssl; + + /** + * 主题 + */ + private String topic; + +} diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeMqttConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeMqttConfig.java new file mode 100644 index 0000000000..0bf7067bc1 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeMqttConfig.java @@ -0,0 +1,34 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config; + +import lombok.Data; + +/** + * MQTT 配置 + * + * @author HUIHUI + */ +@Data +public class IotDataBridgeMqttConfig extends IotDataBridgeConfig { + + /** + * MQTT 服务器地址 + */ + private String url; + /** + * 用户名 + */ + private String username; + /** + * 密码 + */ + private String password; + /** + * 客户端编号 + */ + private String clientId; + /** + * 主题 + */ + private String topic; + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRabbitMQConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRabbitMQConfig.java new file mode 100644 index 0000000000..29bf328979 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRabbitMQConfig.java @@ -0,0 +1,46 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config; + +import lombok.Data; + +/** + * RabbitMQ 配置 + * + * @author HUIHUI + */ +@Data +public class IotDataBridgeRabbitMQConfig extends IotDataBridgeConfig { + + /** + * RabbitMQ 服务器地址 + */ + private String host; + /** + * 端口 + */ + private Integer port; + /** + * 虚拟主机 + */ + private String virtualHost; + /** + * 用户名 + */ + private String username; + /** + * 密码 + */ + private String password; + + /** + * 交换机名称 + */ + private String exchange; + /** + * 路由键 + */ + private String routingKey; + /** + * 队列名称 + */ + private String queue; +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRedisStreamMQConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRedisStreamMQConfig.java new file mode 100644 index 0000000000..db7b2b2bcb --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRedisStreamMQConfig.java @@ -0,0 +1,34 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config; + +import lombok.Data; + +/** + * Redis Stream MQ 配置 + * + * @author HUIHUI + */ +@Data +public class IotDataBridgeRedisStreamMQConfig extends IotDataBridgeConfig { + + /** + * Redis 服务器地址 + */ + private String host; + /** + * 端口 + */ + private Integer port; + /** + * 密码 + */ + private String password; + /** + * 数据库索引 + */ + private Integer database; + + /** + * 主题 + */ + private String topic; +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRocketMQConfig.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRocketMQConfig.java new file mode 100644 index 0000000000..e911461e4d --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/databridge/config/IotDataBridgeRocketMQConfig.java @@ -0,0 +1,39 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config; + +import lombok.Data; + +/** + * RocketMQ 配置 + * + * @author HUIHUI + */ +@Data +public class IotDataBridgeRocketMQConfig extends IotDataBridgeConfig { + + /** + * RocketMQ 名称服务器地址 + */ + private String nameServer; + /** + * 访问密钥 + */ + private String accessKey; + /** + * 秘密钥匙 + */ + private String secretKey; + + /** + * 生产者组 + */ + private String group; + /** + * 主题 + */ + private String topic; + /** + * 标签 + */ + private String tags; + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/package-info.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/package-info.java new file mode 100644 index 0000000000..a977d86e93 --- /dev/null +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/controller/admin/rule/vo/package-info.java @@ -0,0 +1 @@ +package cn.iocoder.yudao.module.iot.controller.admin.rule.vo; \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java index bb0623a1ff..05493b916f 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/dataobject/rule/IotDataBridgeDO.java @@ -1,22 +1,22 @@ package cn.iocoder.yudao.module.iot.dal.dataobject.rule; import cn.iocoder.yudao.framework.mybatis.core.dataobject.BaseDO; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeConfig; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeDirectionEnum; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum; import com.baomidou.mybatisplus.annotation.KeySequence; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler; -import com.fasterxml.jackson.annotation.JsonTypeInfo; import lombok.*; -import java.util.Map; - /** * IoT 数据桥梁 DO * * @author 芋道源码 */ -@TableName("iot_data_bridge") +@TableName(value = "iot_data_bridge", autoResultMap = true) @KeySequence("iot_data_bridge_seq") // 用于 Oracle、PostgreSQL、Kingbase、DB2、H2 数据库的主键自增。如果是 MySQL 等数据库,可不写。 @Data @EqualsAndHashCode(callSuper = true) @@ -48,14 +48,14 @@ public class IotDataBridgeDO extends BaseDO { /** * 桥梁方向 * - * 枚举 {@link cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgDirectionEnum} + * 枚举 {@link IotDataBridgeDirectionEnum} */ private Integer direction; /** * 桥梁类型 * - * 枚举 {@link cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum} + * 枚举 {@link IotDataBridgeTypeEnum} */ private Integer type; @@ -63,211 +63,6 @@ public class IotDataBridgeDO extends BaseDO { * 桥梁配置 */ @TableField(typeHandler = JacksonTypeHandler.class) - private Config config; - - /** - * 文件客户端的配置 - * 不同实现的客户端,需要不同的配置,通过子类来定义 - * - * @author 芋道源码 - */ - @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS) - // @JsonTypeInfo 注解的作用,Jackson 多态 - // 1. 序列化到时数据库时,增加 @class 属性。 - // 2. 反序列化到内存对象时,通过 @class 属性,可以创建出正确的类型 - public interface Config { - } - - /** - * HTTP 配置 - */ - @Data - public static class HttpConfig implements Config { - - /** - * 请求 URL - */ - private String url; - /** - * 请求方法 - */ - private String method; - /** - * 请求头 - */ - private Map headers; - /** - * 请求参数 - */ - private Map query; - /** - * 请求体 - */ - private String body; - - } - - /** - * MQTT 配置 - */ - @Data - public static class MqttConfig implements Config { - - /** - * MQTT 服务器地址 - */ - private String url; - /** - * 用户名 - */ - private String username; - /** - * 密码 - */ - private String password; - /** - * 客户端编号 - */ - private String clientId; - /** - * 主题 - */ - private String topic; - - } - - /** - * RocketMQ 配置 - */ - @Data - public static class RocketMQConfig implements Config { - - /** - * RocketMQ 名称服务器地址 - */ - private String nameServer; - /** - * 访问密钥 - */ - private String accessKey; - /** - * 秘密钥匙 - */ - private String secretKey; - - /** - * 生产者组 - */ - private String group; - /** - * 主题 - */ - private String topic; - /** - * 标签 - */ - private String tags; - - } - - /** - * Kafka 配置 - */ - @Data - public static class KafkaMQConfig implements Config { - - /** - * Kafka 服务器地址 - */ - private String bootstrapServers; - /** - * 用户名 - */ - private String username; - /** - * 密码 - */ - private String password; - /** - * 是否启用 SSL - */ - private Boolean ssl; - - /** - * 主题 - */ - private String topic; - - } - - /** - * RabbitMQ 配置 - */ - @Data - public static class RabbitMQConfig implements Config { - - /** - * RabbitMQ 服务器地址 - */ - private String host; - /** - * 端口 - */ - private Integer port; - /** - * 虚拟主机 - */ - private String virtualHost; - /** - * 用户名 - */ - private String username; - /** - * 密码 - */ - private String password; - - /** - * 交换机名称 - */ - private String exchange; - /** - * 路由键 - */ - private String routingKey; - /** - * 队列名称 - */ - private String queue; - } - - /** - * Redis Stream MQ 配置 - */ - @Data - public static class RedisStreamMQConfig implements Config { - - /** - * Redis 服务器地址 - */ - private String host; - /** - * 端口 - */ - private Integer port; - /** - * 密码 - */ - private String password; - /** - * 数据库索引 - */ - private Integer database; - - /** - * 主题 - */ - private String topic; - } + private IotDataBridgeConfig config; } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataBridgeMapper.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataBridgeMapper.java index 076b341f00..1609aec34a 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataBridgeMapper.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/dal/mysql/rule/IotDataBridgeMapper.java @@ -1,9 +1,29 @@ package cn.iocoder.yudao.module.iot.dal.mysql.rule; +import cn.iocoder.yudao.framework.common.pojo.PageResult; import cn.iocoder.yudao.framework.mybatis.core.mapper.BaseMapperX; +import cn.iocoder.yudao.framework.mybatis.core.query.LambdaQueryWrapperX; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgePageReqVO; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; import org.apache.ibatis.annotations.Mapper; +/** + * IoT 数据桥梁 Mapper + * + * @author HUIHUI + */ @Mapper public interface IotDataBridgeMapper extends BaseMapperX { -} + + default PageResult selectPage(IotDataBridgePageReqVO reqVO) { + return selectPage(reqVO, new LambdaQueryWrapperX() + .likeIfPresent(IotDataBridgeDO::getName, reqVO.getName()) + .likeIfPresent(IotDataBridgeDO::getDescription, reqVO.getDescription()) + .eqIfPresent(IotDataBridgeDO::getStatus, reqVO.getStatus()) + .eqIfPresent(IotDataBridgeDO::getDirection, reqVO.getDirection()) + .eqIfPresent(IotDataBridgeDO::getType, reqVO.getType()) + .betweenIfPresent(IotDataBridgeDO::getCreateTime, reqVO.getCreateTime()) + .orderByDesc(IotDataBridgeDO::getId)); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotDataBridgeService.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotDataBridgeService.java index cbf0b36c23..f720a1d15d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotDataBridgeService.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotDataBridgeService.java @@ -1,20 +1,54 @@ package cn.iocoder.yudao.module.iot.service.rule; +import cn.iocoder.yudao.framework.common.pojo.PageResult; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgePageReqVO; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgeSaveReqVO; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; +import jakarta.validation.Valid; /** - * IoT 数据桥梁的 Service 接口 + * IoT 数据桥梁 Service 接口 * - * @author 芋道源码 + * @author HUIHUI */ public interface IotDataBridgeService { /** - * 获得指定数据桥梁 + * 创建IoT 数据桥梁 * - * @param id 数据桥梁编号 - * @return 数据桥梁 + * @param createReqVO 创建信息 + * @return 编号 */ - IotDataBridgeDO getIotDataBridge(Long id); + Long createDataBridge(@Valid IotDataBridgeSaveReqVO createReqVO); -} + /** + * 更新IoT 数据桥梁 + * + * @param updateReqVO 更新信息 + */ + void updateDataBridge(@Valid IotDataBridgeSaveReqVO updateReqVO); + + /** + * 删除IoT 数据桥梁 + * + * @param id 编号 + */ + void deleteDataBridge(Long id); + + /** + * 获得IoT 数据桥梁 + * + * @param id 编号 + * @return IoT 数据桥梁 + */ + IotDataBridgeDO getDataBridge(Long id); + + /** + * 获得IoT 数据桥梁分页 + * + * @param pageReqVO 分页查询 + * @return IoT 数据桥梁分页 + */ + PageResult getDataBridgePage(IotDataBridgePageReqVO pageReqVO); + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotDataBridgeServiceImpl.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotDataBridgeServiceImpl.java index 7814fd9bbc..9e439fc996 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotDataBridgeServiceImpl.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/IotDataBridgeServiceImpl.java @@ -1,45 +1,70 @@ package cn.iocoder.yudao.module.iot.service.rule; -import cn.hutool.core.map.MapUtil; -import cn.iocoder.yudao.framework.common.util.json.JsonUtils; +import cn.iocoder.yudao.framework.common.pojo.PageResult; +import cn.iocoder.yudao.framework.common.util.object.BeanUtils; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgePageReqVO; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.IotDataBridgeSaveReqVO; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; import cn.iocoder.yudao.module.iot.dal.mysql.rule.IotDataBridgeMapper; -import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; -import java.util.Objects; +import static cn.iocoder.yudao.framework.common.exception.util.ServiceExceptionUtil.exception; +import static cn.iocoder.yudao.module.iot.enums.ErrorCodeConstants.DATA_BRIDGE_NOT_EXISTS; /** - * IoT 数据桥梁的 Service 实现类 + * IoT 数据桥梁 Service 实现类 * - * @author 芋道源码 + * @author HUIHUI */ @Service @Validated -@Slf4j public class IotDataBridgeServiceImpl implements IotDataBridgeService { @Resource private IotDataBridgeMapper dataBridgeMapper; - // TODO @芋艿:临时测试 @Override - public IotDataBridgeDO getIotDataBridge(Long id) { - if (Objects.equals(id, 1L)) { - IotDataBridgeDO.HttpConfig config = new IotDataBridgeDO.HttpConfig() - .setUrl("http://127.0.0.1:48080/test") -// .setMethod("POST") - .setMethod("GET") - .setQuery(MapUtil.of("aaa", "bbb")) - .setHeaders(MapUtil.of("ccc", "ddd")) - .setBody(JsonUtils.toJsonString(MapUtil.of("eee", "fff"))); - return IotDataBridgeDO.builder().id(1L).name("芋道").description("芋道源码").status(0).direction(1) - .type(IotDataBridgTypeEnum.HTTP.getType()).config(config).build(); + public Long createDataBridge(IotDataBridgeSaveReqVO createReqVO) { + // 插入 + IotDataBridgeDO dataBridge = BeanUtils.toBean(createReqVO, IotDataBridgeDO.class); + dataBridgeMapper.insert(dataBridge); + // 返回 + return dataBridge.getId(); + } + + @Override + public void updateDataBridge(IotDataBridgeSaveReqVO updateReqVO) { + // 校验存在 + validateDataBridgeExists(updateReqVO.getId()); + // 更新 + IotDataBridgeDO updateObj = BeanUtils.toBean(updateReqVO, IotDataBridgeDO.class); + dataBridgeMapper.updateById(updateObj); + } + + @Override + public void deleteDataBridge(Long id) { + // 校验存在 + validateDataBridgeExists(id); + // 删除 + dataBridgeMapper.deleteById(id); + } + + private void validateDataBridgeExists(Long id) { + if (dataBridgeMapper.selectById(id) == null) { + throw exception(DATA_BRIDGE_NOT_EXISTS); } + } + + @Override + public IotDataBridgeDO getDataBridge(Long id) { return dataBridgeMapper.selectById(id); } -} + @Override + public PageResult getDataBridgePage(IotDataBridgePageReqVO pageReqVO) { + return dataBridgeMapper.selectPage(pageReqVO); + } + +} \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java index 4cf1f8f285..a673b538ef 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneAction.java @@ -22,7 +22,7 @@ public interface IotRuleSceneAction { * 2. 非空的情况:设备触发 * @param config 配置 */ - void execute(@Nullable IotDeviceMessage message, IotRuleSceneDO.ActionConfig config); + void execute(@Nullable IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) throws Exception; /** * 获得类型 diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java index 6733331cb4..b38e181f93 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/IotRuleSceneDataBridgeAction.java @@ -26,17 +26,17 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { @Resource private IotDataBridgeService dataBridgeService; @Resource - private List dataBridgeExecutes; + private List> dataBridgeExecutes; @Override - public void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) { + public void execute(IotDeviceMessage message, IotRuleSceneDO.ActionConfig config) throws Exception { // 1.1 如果消息为空,直接返回 if (message == null) { return; } // 1.2 获得数据桥梁 Assert.notNull(config.getDataBridgeId(), "数据桥梁编号不能为空"); - IotDataBridgeDO dataBridge = dataBridgeService.getIotDataBridge(config.getDataBridgeId()); + IotDataBridgeDO dataBridge = dataBridgeService.getDataBridge(config.getDataBridgeId()); if (dataBridge == null || dataBridge.getConfig() == null) { log.error("[execute][message({}) config({}) 对应的数据桥梁不存在]", message, config); return; @@ -47,7 +47,9 @@ public class IotRuleSceneDataBridgeAction implements IotRuleSceneAction { } // 2. 执行数据桥接操作 - dataBridgeExecutes.forEach(execute -> execute.execute(message, dataBridge)); + for (IotDataBridgeExecute execute : dataBridgeExecutes) { + execute.execute(message, dataBridge); + } } @Override diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java index ebd0f87764..d26c2dd436 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/AbstractCacheableDataBridgeExecute.java @@ -1,8 +1,11 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; +import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; +import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; import lombok.extern.slf4j.Slf4j; import java.time.Duration; @@ -15,22 +18,36 @@ import java.time.Duration; /** * 带缓存功能的数据桥梁执行器抽象类 * + * 该类提供了一个通用的缓存机制,用于管理各类数据桥接的生产者(Producer)实例。 + * + * 主要特点: + * - 基于Guava Cache实现高效的生产者实例缓存管理 + * - 自动处理生产者的生命周期(创建、获取、关闭) + * - 支持30分钟未访问自动过期清理机制 + * - 异常处理与日志记录,便于问题排查 + * + * 子类需要实现: + * - initProducer(Config) - 初始化特定类型的生产者实例 + * - closeProducer(Producer) - 关闭生产者实例并释放资源 + * + * @param 配置信息类型,用于初始化生产者 + * @param 生产者类型,负责将数据发送到目标系统 * @author HUIHUI */ @Slf4j -public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute { +public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridgeExecute { - // TODO @huihui:AbstractCacheableDataBridgeExecute 这样,下面的 Object, Object 就有了类型;另外 IotDataBridgeDO.Config 可以替代一个 Object 哇, /** * Producer 缓存 */ - private final LoadingCache PRODUCER_CACHE = CacheBuilder.newBuilder() - .expireAfterAccess(Duration.ofMinutes(30)) - .removalListener(notification -> { - Object producer = notification.getValue(); + private final LoadingCache PRODUCER_CACHE = CacheBuilder.newBuilder() + .expireAfterAccess(Duration.ofMinutes(30)) // 30 分钟未访问就提前过期 + .removalListener((RemovalListener) notification -> { + Producer producer = notification.getValue(); if (producer == null) { return; } + try { closeProducer(producer); log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已关闭]", notification.getKey()); @@ -38,15 +55,18 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 关闭失败]", notification.getKey(), e); } }) - .build(new CacheLoader() { - + .build(new CacheLoader() { @Override - public Object load(Object config) throws Exception { - Object producer = initProducer(config); - log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config); - return producer; + public Producer load(Config config) throws Exception { + try { + Producer producer = initProducer(config); + log.info("[PRODUCER_CACHE][配置({}) 对应的 producer 已创建并启动]", config); + return producer; + } catch (Exception e) { + log.error("[PRODUCER_CACHE][配置({}) 对应的 producer 创建启动失败]", config, e); + throw e; // 抛出异常,触发缓存加载失败机制 + } } - }); /** @@ -55,7 +75,7 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg * @param config 配置信息 * @return 生产者对象 */ - protected Object getProducer(Object config) throws Exception { + protected Producer getProducer(Config config) throws Exception { return PRODUCER_CACHE.get(config); } @@ -66,13 +86,29 @@ public abstract class AbstractCacheableDataBridgeExecute implements IotDataBridg * @return 生产者对象 * @throws Exception 如果初始化失败 */ - protected abstract Object initProducer(Object config) throws Exception; + protected abstract Producer initProducer(Config config) throws Exception; /** * 关闭生产者 * * @param producer 生产者对象 */ - protected abstract void closeProducer(Object producer); + protected abstract void closeProducer(Producer producer) throws Exception; + + @Override + @SuppressWarnings({"unchecked"}) + public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { + // 1.1 校验数据桥梁类型 + if (!getType().equals(dataBridge.getType())) { + return; + } + + // 1.2 执行对应的数据桥梁发送消息 + try { + execute0(message, (Config) dataBridge.getConfig()); + } catch (Exception e) { + log.error("[execute][桥梁配置 config({}) 对应的 message({}) 发送异常]", dataBridge.getConfig(), message, e); + } + } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java index a10f75103c..ce3d0f1938 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotDataBridgeExecute.java @@ -9,9 +9,14 @@ import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; * * @author HUIHUI */ -public interface IotDataBridgeExecute { +public interface IotDataBridgeExecute { - // TODO @huihui:要不搞个 getType?然后 execute0 由子类实现。这样,子类的 executeRedisStream ,其实就是 execute0 了。 + /** + * 获取数据桥梁类型 + * + * @return 数据桥梁类型 + */ + Integer getType(); /** * 执行数据桥梁操作 @@ -19,6 +24,23 @@ public interface IotDataBridgeExecute { * @param message 设备消息 * @param dataBridge 数据桥梁 */ - void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge); + @SuppressWarnings({"unchecked"}) + default void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) throws Exception { + // 1.1 校验数据桥梁类型 + if (!getType().equals(dataBridge.getType())) { + return; + } + + // 1.2 执行对应的数据桥梁发送消息 + execute0(message, (Config) dataBridge.getConfig()); + } + + /** + * 【真正】执行数据桥梁操作 + * + * @param message 设备消息 + * @param config 桥梁配置 + */ + void execute0(IotDeviceMessage message, Config config) throws Exception; } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java index 27b8bc6bba..22b72e055e 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotHttpDataBridgeExecute.java @@ -3,8 +3,8 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.framework.common.util.http.HttpUtils; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; -import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeHttpConfig; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; @@ -25,23 +25,19 @@ import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_ */ @Component @Slf4j -public class IotHttpDataBridgeExecute implements IotDataBridgeExecute { +public class IotHttpDataBridgeExecute implements IotDataBridgeExecute { @Resource private RestTemplate restTemplate; @Override - public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1.1 校验数据桥梁的类型 == HTTP - if (!IotDataBridgTypeEnum.HTTP.getType().equals(dataBridge.getType())) { - return; - } - // 1.2 执行 HTTP 请求 - executeHttp(message, (IotDataBridgeDO.HttpConfig) dataBridge.getConfig()); + public Integer getType() { + return IotDataBridgeTypeEnum.HTTP.getType(); } + @Override @SuppressWarnings({"unchecked", "deprecation"}) - private void executeHttp(IotDeviceMessage message, IotDataBridgeDO.HttpConfig config) { + public void execute0(IotDeviceMessage message, IotDataBridgeHttpConfig config) { String url = null; HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase()); HttpEntity requestEntity = null; diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java index e95d4ced95..3b7f99bf42 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotKafkaMQDataBridgeExecute.java @@ -1,11 +1,13 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeKafkaMQConfig; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; -import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @@ -15,65 +17,53 @@ import java.time.LocalDateTime; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * Kafka 的 {@link IotDataBridgeExecute} 实现类 * * @author HUIHUI */ +@ConditionalOnClass(name = "org.springframework.kafka.core.KafkaTemplate") @Component @Slf4j -public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { +public class IotKafkaMQDataBridgeExecute extends + AbstractCacheableDataBridgeExecute> { - private static final Duration SEND_TIMEOUT = Duration.ofMillis(10); + private static final Duration SEND_TIMEOUT = Duration.ofMillis(10000); // 10 秒超时时间 @Override - public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1. 校验数据桥梁的类型 == KAFKA - if (!IotDataBridgTypeEnum.KAFKA.getType().equals(dataBridge.getType())) { - return; - } - // 2. 执行 Kafka 发送消息 - executeKafka(message, (IotDataBridgeDO.KafkaMQConfig) dataBridge.getConfig()); - } - - @SuppressWarnings("unchecked") - private void executeKafka(IotDeviceMessage message, IotDataBridgeDO.KafkaMQConfig config) { - try { - // 1. 获取或创建 KafkaTemplate - KafkaTemplate kafkaTemplate = (KafkaTemplate) getProducer(config); - - // 2. 发送消息并等待结果 - kafkaTemplate.send(config.getTopic(), message.toString()) - .get(SEND_TIMEOUT.getSeconds(), TimeUnit.SECONDS); // 添加超时等待 - log.info("[executeKafka][message({}) 发送成功]", message); - } catch (TimeoutException e) { - log.error("[executeKafka][message({}) config({}) 发送超时]", message, config, e); - } catch (Exception e) { - log.error("[executeKafka][message({}) config({}) 发送异常]", message, config, e); - } + public Integer getType() { + return IotDataBridgeTypeEnum.KAFKA.getType(); } @Override - protected Object initProducer(Object config) { - IotDataBridgeDO.KafkaMQConfig kafkaConfig = (IotDataBridgeDO.KafkaMQConfig) config; - + public void execute0(IotDeviceMessage message, IotDataBridgeKafkaMQConfig config) throws Exception { + // 1. 获取或创建 KafkaTemplate + KafkaTemplate kafkaTemplate = getProducer(config); + + // 2. 发送消息并等待结果 + kafkaTemplate.send(config.getTopic(), message.toString()) + .get(SEND_TIMEOUT.getSeconds(), TimeUnit.SECONDS); // 添加超时等待 + log.info("[execute0][message({}) 发送成功]", message); + } + + @Override + protected KafkaTemplate initProducer(IotDataBridgeKafkaMQConfig config) { // 1.1 构建生产者配置 Map props = new HashMap<>(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers()); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 1.2 如果配置了认证信息 - if (kafkaConfig.getUsername() != null && kafkaConfig.getPassword() != null) { + if (config.getUsername() != null && config.getPassword() != null) { props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" - + kafkaConfig.getUsername() + "\" password=\"" + kafkaConfig.getPassword() + "\";"); + + config.getUsername() + "\" password=\"" + config.getPassword() + "\";"); } // 1.3 如果启用 SSL - if (Boolean.TRUE.equals(kafkaConfig.getSsl())) { + if (Boolean.TRUE.equals(config.getSsl())) { props.put("security.protocol", "SSL"); } @@ -83,10 +73,8 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec } @Override - protected void closeProducer(Object producer) { - if (producer instanceof KafkaTemplate) { - ((KafkaTemplate) producer).destroy(); - } + protected void closeProducer(KafkaTemplate producer) { + producer.destroy(); } // TODO @芋艿:测试代码,后续清理 @@ -95,7 +83,7 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec IotKafkaMQDataBridgeExecute action = new IotKafkaMQDataBridgeExecute(); // 2. 创建共享的配置 - IotDataBridgeDO.KafkaMQConfig config = new IotDataBridgeDO.KafkaMQConfig(); + IotDataBridgeKafkaMQConfig config = new IotDataBridgeKafkaMQConfig(); config.setBootstrapServers("127.0.0.1:9092"); config.setTopic("test-topic"); config.setSsl(false); @@ -117,10 +105,10 @@ public class IotKafkaMQDataBridgeExecute extends AbstractCacheableDataBridgeExec // 4. 执行两次测试,验证缓存 log.info("[main][第一次执行,应该会创建新的 producer]"); - action.executeKafka(message, config); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); log.info("[main][第二次执行,应该会复用缓存的 producer]"); - action.executeKafka(message, config); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); } } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java index 5611873155..54485c091d 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRabbitMQDataBridgeExecute.java @@ -1,12 +1,14 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRabbitMQConfig; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; -import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; @@ -17,51 +19,44 @@ import java.time.LocalDateTime; * * @author HUIHUI */ +@ConditionalOnClass(name = "com.rabbitmq.client.Channel") @Component @Slf4j -public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { +public class IotRabbitMQDataBridgeExecute extends + AbstractCacheableDataBridgeExecute { + @Override - public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1.1 校验数据桥梁的类型 == RABBITMQ - if (!IotDataBridgTypeEnum.RABBITMQ.getType().equals(dataBridge.getType())) { - return; - } - // 1.2 执行 RabbitMQ 发送消息 - executeRabbitMQ(message, (IotDataBridgeDO.RabbitMQConfig) dataBridge.getConfig()); + public Integer getType() { + return IotDataBridgeTypeEnum.RABBITMQ.getType(); } - private void executeRabbitMQ(IotDeviceMessage message, IotDataBridgeDO.RabbitMQConfig config) { - try { - // 1. 获取或创建 Channel - Channel channel = (Channel) getProducer(config); + @Override + public void execute0(IotDeviceMessage message, IotDataBridgeRabbitMQConfig config) throws Exception { + // 1. 获取或创建 Channel + Channel channel = getProducer(config); - // 2.1 声明交换机、队列和绑定关系 - channel.exchangeDeclare(config.getExchange(), "direct", true); - channel.queueDeclare(config.getQueue(), true, false, false, null); - channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey()); + // 2.1 声明交换机、队列和绑定关系 + channel.exchangeDeclare(config.getExchange(), "direct", true); + channel.queueDeclare(config.getQueue(), true, false, false, null); + channel.queueBind(config.getQueue(), config.getExchange(), config.getRoutingKey()); - // 2.2 发送消息 - channel.basicPublish(config.getExchange(), config.getRoutingKey(), null, - message.toString().getBytes(StandardCharsets.UTF_8)); - log.info("[executeRabbitMQ][message({}) config({}) 发送成功]", message, config); - } catch (Exception e) { - log.error("[executeRabbitMQ][message({}) config({}) 发送异常]", message, config, e); - } + // 2.2 发送消息 + channel.basicPublish(config.getExchange(), config.getRoutingKey(), null, + message.toString().getBytes(StandardCharsets.UTF_8)); + log.info("[executeRabbitMQ][message({}) config({}) 发送成功]", message, config); } @Override @SuppressWarnings("resource") - protected Object initProducer(Object config) throws Exception { - IotDataBridgeDO.RabbitMQConfig rabbitConfig = (IotDataBridgeDO.RabbitMQConfig) config; - + protected Channel initProducer(IotDataBridgeRabbitMQConfig config) throws Exception { // 1. 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); - factory.setHost(rabbitConfig.getHost()); - factory.setPort(rabbitConfig.getPort()); - factory.setVirtualHost(rabbitConfig.getVirtualHost()); - factory.setUsername(rabbitConfig.getUsername()); - factory.setPassword(rabbitConfig.getPassword()); + factory.setHost(config.getHost()); + factory.setPort(config.getPort()); + factory.setVirtualHost(config.getVirtualHost()); + factory.setUsername(config.getUsername()); + factory.setPassword(config.getPassword()); // 2. 创建连接 Connection connection = factory.newConnection(); @@ -71,20 +66,13 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe } @Override - protected void closeProducer(Object producer) { - if (producer instanceof Channel) { - try { - Channel channel = (Channel) producer; - if (channel.isOpen()) { - channel.close(); - } - Connection connection = channel.getConnection(); - if (connection.isOpen()) { - connection.close(); - } - } catch (Exception e) { - log.error("[closeProducer][关闭 RabbitMQ 连接异常]", e); - } + protected void closeProducer(Channel channel) throws Exception { + if (channel.isOpen()) { + channel.close(); + } + Connection connection = channel.getConnection(); + if (connection.isOpen()) { + connection.close(); } } @@ -94,7 +82,7 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe IotRabbitMQDataBridgeExecute action = new IotRabbitMQDataBridgeExecute(); // 2. 创建共享的配置 - IotDataBridgeDO.RabbitMQConfig config = new IotDataBridgeDO.RabbitMQConfig(); + IotDataBridgeRabbitMQConfig config = new IotDataBridgeRabbitMQConfig(); config.setHost("localhost"); config.setPort(5672); config.setVirtualHost("/"); @@ -118,10 +106,12 @@ public class IotRabbitMQDataBridgeExecute extends AbstractCacheableDataBridgeExe .build(); // 4. 执行两次测试,验证缓存 - log.info("[main][第一次执行,应该会创建新的 channel]"); - action.executeRabbitMQ(message, config); + // 4. 执行两次测试,验证缓存 + log.info("[main][第一次执行,应该会创建新的 producer]"); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); - log.info("[main][第二次执行,应该会复用缓存的 channel]"); - action.executeRabbitMQ(message, config); + log.info("[main][第二次执行,应该会复用缓存的 producer]"); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); } + } diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java index 552fc3425b..5616c7e648 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRedisStreamMQDataBridgeExecute.java @@ -2,8 +2,9 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; import cn.hutool.core.util.ReflectUtil; import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRedisStreamMQConfig; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; -import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; @@ -29,47 +30,36 @@ import java.time.LocalDateTime; */ @Component @Slf4j -public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { +public class IotRedisStreamMQDataBridgeExecute extends + AbstractCacheableDataBridgeExecute> { @Override - public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1.1 校验数据桥梁类型 - if (!IotDataBridgTypeEnum.REDIS_STREAM.getType().equals(dataBridge.getType())) { - return; - } - // 1.2 执行消息发送 - executeRedisStream(message, (IotDataBridgeDO.RedisStreamMQConfig) dataBridge.getConfig()); - } - - @SuppressWarnings("unchecked") - // TODO @huihui:try catch 交给父类来做,子类不处理异常 - private void executeRedisStream(IotDeviceMessage message, IotDataBridgeDO.RedisStreamMQConfig config) { - try { - // 1. 获取 RedisTemplate - RedisTemplate redisTemplate = (RedisTemplate) getProducer(config); - - // 2. 创建并发送 Stream 记录 - ObjectRecord record = StreamRecords.newRecord() - .ofObject(message).withStreamKey(config.getTopic()); - String recordId = String.valueOf(redisTemplate.opsForStream().add(record)); - log.info("[executeRedisStream][消息发送成功] messageId: {}, config: {}", recordId, config); - } catch (Exception e) { - log.error("[executeRedisStream][消息发送失败] message: {}, config: {}", message, config, e); - } + public Integer getType() { + return IotDataBridgeTypeEnum.REDIS_STREAM.getType(); } @Override - protected Object initProducer(Object config) { - IotDataBridgeDO.RedisStreamMQConfig redisConfig = (IotDataBridgeDO.RedisStreamMQConfig) config; + public void execute0(IotDeviceMessage message, IotDataBridgeRedisStreamMQConfig config) throws Exception { + // 1. 获取 RedisTemplate + RedisTemplate redisTemplate = getProducer(config); + // 2. 创建并发送 Stream 记录 + ObjectRecord record = StreamRecords.newRecord() + .ofObject(message).withStreamKey(config.getTopic()); + String recordId = String.valueOf(redisTemplate.opsForStream().add(record)); + log.info("[executeRedisStream][消息发送成功] messageId: {}, config: {}", recordId, config); + } + + @Override + protected RedisTemplate initProducer(IotDataBridgeRedisStreamMQConfig config) { // 1.1 创建 Redisson 配置 Config redissonConfig = new Config(); SingleServerConfig serverConfig = redissonConfig.useSingleServer() - .setAddress("redis://" + redisConfig.getHost() + ":" + redisConfig.getPort()) - .setDatabase(redisConfig.getDatabase()); + .setAddress("redis://" + config.getHost() + ":" + config.getPort()) + .setDatabase(config.getDatabase()); // 1.2 设置密码(如果有) - if (StrUtil.isNotBlank(redisConfig.getPassword())) { - serverConfig.setPassword(redisConfig.getPassword()); + if (StrUtil.isNotBlank(config.getPassword())) { + serverConfig.setPassword(config.getPassword()); } // TODO @huihui:看看能不能简化一些。按道理说,不用这么多的哈。 @@ -90,17 +80,10 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid } @Override - protected void closeProducer(Object producer) { - // TODO @huihui:try catch 交给父类来做。子类不处理异常 - if (producer instanceof RedisTemplate) { - RedisConnectionFactory factory = ((RedisTemplate) producer).getConnectionFactory(); - try { - if (factory != null) { - ((RedissonConnectionFactory) factory).destroy(); - } - } catch (Exception e) { - log.error("[closeProducer][关闭 redisson 连接异常]", e); - } + protected void closeProducer(RedisTemplate producer) throws Exception { + RedisConnectionFactory factory = producer.getConnectionFactory(); + if (factory != null) { + ((RedissonConnectionFactory) factory).destroy(); } } @@ -119,7 +102,7 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid IotRedisStreamMQDataBridgeExecute action = new IotRedisStreamMQDataBridgeExecute(); // 2. 创建共享的配置 - IotDataBridgeDO.RedisStreamMQConfig config = new IotDataBridgeDO.RedisStreamMQConfig(); + IotDataBridgeRedisStreamMQConfig config = new IotDataBridgeRedisStreamMQConfig(); config.setHost("127.0.0.1"); config.setPort(6379); config.setDatabase(0); @@ -140,11 +123,11 @@ public class IotRedisStreamMQDataBridgeExecute extends AbstractCacheableDataBrid .build(); // 4. 执行两次测试,验证缓存 - log.info("[main][第一次执行,应该会创建新的 RedisTemplate]"); - action.executeRedisStream(message, config); + log.info("[main][第一次执行,应该会创建新的 producer]"); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); - log.info("[main][第二次执行,应该会复用缓存的 RedisTemplate]"); - action.executeRedisStream(message, config); + log.info("[main][第二次执行,应该会复用缓存的 producer]"); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); } } \ No newline at end of file diff --git a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java index 3b53252539..541bd181e0 100644 --- a/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java +++ b/yudao-module-iot/yudao-module-iot-biz/src/main/java/cn/iocoder/yudao/module/iot/service/rule/action/databridge/IotRocketMQDataBridgeExecute.java @@ -1,7 +1,8 @@ package cn.iocoder.yudao.module.iot.service.rule.action.databridge; +import cn.iocoder.yudao.module.iot.controller.admin.rule.vo.databridge.config.IotDataBridgeRocketMQConfig; import cn.iocoder.yudao.module.iot.dal.dataobject.rule.IotDataBridgeDO; -import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgTypeEnum; +import cn.iocoder.yudao.module.iot.enums.rule.IotDataBridgeTypeEnum; import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -9,6 +10,7 @@ import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.stereotype.Component; import java.time.LocalDateTime; @@ -18,58 +20,49 @@ import java.time.LocalDateTime; * * @author HUIHUI */ +@ConditionalOnClass(name = "org.apache.rocketmq.client.producer.DefaultMQProducer") @Component @Slf4j -public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExecute { +public class IotRocketMQDataBridgeExecute extends + AbstractCacheableDataBridgeExecute { @Override - public void execute(IotDeviceMessage message, IotDataBridgeDO dataBridge) { - // 1.1 校验数据桥梁的类型 == ROCKETMQ - if (!IotDataBridgTypeEnum.ROCKETMQ.getType().equals(dataBridge.getType())) { - return; - } - // 1.2 执行 RocketMQ 发送消息 - executeRocketMQ(message, (IotDataBridgeDO.RocketMQConfig) dataBridge.getConfig()); + public Integer getType() { + return IotDataBridgeTypeEnum.ROCKETMQ.getType(); } - private void executeRocketMQ(IotDeviceMessage message, IotDataBridgeDO.RocketMQConfig config) { - try { - // 1. 获取或创建 Producer - DefaultMQProducer producer = (DefaultMQProducer) getProducer(config); + @Override + public void execute0(IotDeviceMessage message, IotDataBridgeRocketMQConfig config) throws Exception { + // 1. 获取或创建 Producer + DefaultMQProducer producer = getProducer(config); - // 2.1 创建消息对象,指定Topic、Tag和消息体 - Message msg = new Message( - config.getTopic(), - config.getTags(), - message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET) - ); - // 2.2 发送同步消息并处理结果 - SendResult sendResult = producer.send(msg); - // 2.3 处理发送结果 - if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { - log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult); - } else { - log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult); - } - } catch (Exception e) { - log.error("[executeRocketMQ][message({}) config({}) 发送异常]", message, config, e); + // 2.1 创建消息对象,指定Topic、Tag和消息体 + Message msg = new Message( + config.getTopic(), + config.getTags(), + message.toString().getBytes(RemotingHelper.DEFAULT_CHARSET) + ); + // 2.2 发送同步消息并处理结果 + SendResult sendResult = producer.send(msg); + // 2.3 处理发送结果 + if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) { + log.info("[executeRocketMQ][message({}) config({}) 发送成功,结果({})]", message, config, sendResult); + } else { + log.error("[executeRocketMQ][message({}) config({}) 发送失败,结果({})]", message, config, sendResult); } } @Override - protected Object initProducer(Object config) throws Exception { - IotDataBridgeDO.RocketMQConfig rocketMQConfig = (IotDataBridgeDO.RocketMQConfig) config; - DefaultMQProducer producer = new DefaultMQProducer(rocketMQConfig.getGroup()); - producer.setNamesrvAddr(rocketMQConfig.getNameServer()); + protected DefaultMQProducer initProducer(IotDataBridgeRocketMQConfig config) throws Exception { + DefaultMQProducer producer = new DefaultMQProducer(config.getGroup()); + producer.setNamesrvAddr(config.getNameServer()); producer.start(); return producer; } @Override - protected void closeProducer(Object producer) { - if (producer instanceof DefaultMQProducer) { - ((DefaultMQProducer) producer).shutdown(); - } + protected void closeProducer(DefaultMQProducer producer) { + producer.shutdown(); } // TODO @芋艿:测试代码,后续清理 @@ -78,7 +71,7 @@ public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExe IotRocketMQDataBridgeExecute action = new IotRocketMQDataBridgeExecute(); // 2. 创建共享的配置 - IotDataBridgeDO.RocketMQConfig config = new IotDataBridgeDO.RocketMQConfig(); + IotDataBridgeRocketMQConfig config = new IotDataBridgeRocketMQConfig(); config.setNameServer("127.0.0.1:9876"); config.setGroup("test-group"); config.setTopic("test-topic"); @@ -99,10 +92,10 @@ public class IotRocketMQDataBridgeExecute extends AbstractCacheableDataBridgeExe // 4. 执行两次测试,验证缓存 log.info("[main][第一次执行,应该会创建新的 producer]"); - action.executeRocketMQ(message, config); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); log.info("[main][第二次执行,应该会复用缓存的 producer]"); - action.executeRocketMQ(message, config); + action.execute(message, new IotDataBridgeDO().setType(action.getType()).setConfig(config)); } } diff --git a/yudao-server/pom.xml b/yudao-server/pom.xml index 17403fef85..2c7708a517 100644 --- a/yudao-server/pom.xml +++ b/yudao-server/pom.xml @@ -114,6 +114,19 @@ yudao-module-iot-biz ${revision} + + + org.apache.rocketmq + rocketmq-spring-boot-starter + + + org.springframework.kafka + spring-kafka + + + org.springframework.boot + spring-boot-starter-amqp +