【功能新增】IoT:增加 IotRuleSceneJob 执行定时任务

This commit is contained in:
YunaiV 2025-02-04 12:39:56 +08:00
parent 8d0caaa16c
commit f6f162ad2f
5 changed files with 213 additions and 44 deletions

View File

@ -0,0 +1,27 @@
package cn.iocoder.yudao.module.iot.controller.admin.rule;
import cn.iocoder.yudao.module.iot.service.rule.IotRuleSceneService;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import jakarta.annotation.security.PermitAll;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Tag(name = "管理后台 - IoT 规则场景")
@RestController
@RequestMapping("/iot/rule-scene")
@Validated
public class IotRuleSceneController {
@Resource
private IotRuleSceneService ruleSceneService;
@GetMapping("/test")
@PermitAll
public void test() {
ruleSceneService.test();
}
}

View File

@ -80,18 +80,18 @@ public class IotSchedulerManager {
* 添加或更新 Job Quartz
*
* @param jobClass 任务处理器的类
* @param jobHandlerName 任务处理器的
* @param jobName 任务
* @param cronExpression CRON 表达式
* @param jobDataMap 任务数据
* @throws SchedulerException 添加异常
*/
public void addOrUpdateJob(Class <? extends Job> jobClass, String jobHandlerName,
public void addOrUpdateJob(Class <? extends Job> jobClass, String jobName,
String cronExpression, Map<String, Object> jobDataMap)
throws SchedulerException {
if (scheduler.checkExists(new JobKey(jobHandlerName))) {
this.updateJob(jobHandlerName, cronExpression);
if (scheduler.checkExists(new JobKey(jobName))) {
this.updateJob(jobName, cronExpression);
} else {
this.addJob(jobClass, jobHandlerName, cronExpression, jobDataMap);
this.addJob(jobClass, jobName, cronExpression, jobDataMap);
}
}
@ -99,20 +99,20 @@ public class IotSchedulerManager {
* 添加 Job Quartz
*
* @param jobClass 任务处理器的类
* @param jobHandlerName 任务处理器的
* @param jobName 任务
* @param cronExpression CRON 表达式
* @param jobDataMap 任务数据
* @throws SchedulerException 添加异常
*/
public void addJob(Class <? extends Job> jobClass, String jobHandlerName,
public void addJob(Class <? extends Job> jobClass, String jobName,
String cronExpression, Map<String, Object> jobDataMap)
throws SchedulerException {
// 创建 JobDetail 对象
JobDetail jobDetail = JobBuilder.newJob(jobClass)
.usingJobData(new JobDataMap(jobDataMap))
.withIdentity(jobHandlerName).build();
.withIdentity(jobName).build();
// 创建 Trigger 对象
Trigger trigger = this.buildTrigger(jobHandlerName, cronExpression);
Trigger trigger = this.buildTrigger(jobName, cronExpression);
// 新增 Job 调度
scheduler.scheduleJob(jobDetail, trigger);
}
@ -120,70 +120,70 @@ public class IotSchedulerManager {
/**
* 更新 Job Quartz
*
* @param jobHandlerName 任务处理器的
* @param jobName 任务
* @param cronExpression CRON 表达式
* @throws SchedulerException 更新异常
*/
public void updateJob(String jobHandlerName, String cronExpression)
public void updateJob(String jobName, String cronExpression)
throws SchedulerException {
// 创建新 Trigger 对象
Trigger newTrigger = this.buildTrigger(jobHandlerName, cronExpression);
Trigger newTrigger = this.buildTrigger(jobName, cronExpression);
// 修改调度
scheduler.rescheduleJob(new TriggerKey(jobHandlerName), newTrigger);
scheduler.rescheduleJob(new TriggerKey(jobName), newTrigger);
}
/**
* 删除 Quartz 中的 Job
*
* @param jobHandlerName 任务处理器的
* @param jobName 任务
* @throws SchedulerException 删除异常
*/
public void deleteJob(String jobHandlerName) throws SchedulerException {
public void deleteJob(String jobName) throws SchedulerException {
// 暂停 Trigger 对象
scheduler.pauseTrigger(new TriggerKey(jobHandlerName));
scheduler.pauseTrigger(new TriggerKey(jobName));
// 取消并删除 Job 调度
scheduler.unscheduleJob(new TriggerKey(jobHandlerName));
scheduler.deleteJob(new JobKey(jobHandlerName));
scheduler.unscheduleJob(new TriggerKey(jobName));
scheduler.deleteJob(new JobKey(jobName));
}
/**
* 暂停 Quartz 中的 Job
*
* @param jobHandlerName 任务处理器的
* @param jobName 任务
* @throws SchedulerException 暂停异常
*/
public void pauseJob(String jobHandlerName) throws SchedulerException {
scheduler.pauseJob(new JobKey(jobHandlerName));
public void pauseJob(String jobName) throws SchedulerException {
scheduler.pauseJob(new JobKey(jobName));
}
/**
* 启动 Quartz 中的 Job
*
* @param jobHandlerName 任务处理器的
* @param jobName 任务
* @throws SchedulerException 启动异常
*/
public void resumeJob(String jobHandlerName) throws SchedulerException {
scheduler.resumeJob(new JobKey(jobHandlerName));
scheduler.resumeTrigger(new TriggerKey(jobHandlerName));
public void resumeJob(String jobName) throws SchedulerException {
scheduler.resumeJob(new JobKey(jobName));
scheduler.resumeTrigger(new TriggerKey(jobName));
}
/**
* 立即触发一次 Quartz 中的 Job
*
* @param jobHandlerName 任务处理器的
* @param jobName 任务
* @throws SchedulerException 触发异常
*/
public void triggerJob(String jobHandlerName)
public void triggerJob(String jobName)
throws SchedulerException {
// 触发任务
JobDataMap data = new JobDataMap();
data.put(JobDataKeyEnum.JOB_HANDLER_NAME.name(), jobHandlerName);
scheduler.triggerJob(new JobKey(jobHandlerName), data);
data.put(JobDataKeyEnum.JOB_HANDLER_NAME.name(), jobName);
scheduler.triggerJob(new JobKey(jobName), data);
}
private Trigger buildTrigger(String jobHandlerName, String cronExpression) {
private Trigger buildTrigger(String jobName, String cronExpression) {
return TriggerBuilder.newTrigger()
.withIdentity(jobHandlerName)
.withIdentity(jobName)
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression))
.build();
}

View File

@ -0,0 +1,57 @@
package cn.iocoder.yudao.module.iot.job.rule;
import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.service.rule.IotRuleSceneService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.util.Map;
/**
* IoT 规则场景 Job用于执行 {@link IotRuleSceneTriggerTypeEnum#TIMER} 类型的规则场景
*
* @author 芋道源码
*/
@Slf4j
public class IotRuleSceneJob extends QuartzJobBean {
/**
* JobData Key - 规则场景编号
*/
public static final String JOB_DATA_KEY_RULE_SCENE_ID = "ruleSceneId";
@Resource
private IotRuleSceneService ruleSceneService;
@Override
protected void executeInternal(JobExecutionContext context) {
// 获得规则场景编号
Long ruleSceneId = context.getMergedJobDataMap().getLong(JOB_DATA_KEY_RULE_SCENE_ID);
// 执行规则场景
ruleSceneService.executeRuleSceneByTimer(ruleSceneId);
}
/**
* 创建 JobData Map
*
* @param ruleSceneId 规则场景编号
* @return JobData Map
*/
public static Map<String, Object> buildJobDataMap(Long ruleSceneId) {
return Map.of(JOB_DATA_KEY_RULE_SCENE_ID, ruleSceneId);
}
/**
* 创建 Job 名字
*
* @param ruleSceneId 规则场景编号
* @return Job 名字
*/
public static String buildJobName(Long ruleSceneId) {
return String.format("%s_%d", IotRuleSceneJob.class.getSimpleName(), ruleSceneId);
}
}

View File

@ -29,6 +29,16 @@ public interface IotRuleSceneService {
*/
void executeRuleSceneByDevice(IotDeviceMessage message);
// TODO @芋艿基于 timer 场景执行规则场景
/**
* 基于 {@link IotRuleSceneTriggerTypeEnum#TIMER} 场景执行规则场景
*
* @param id 场景编号
*/
void executeRuleSceneByTimer(Long id);
/**
* TODO 芋艿测试方法需要删除
*/
void test();
}

View File

@ -7,6 +7,7 @@ import cn.hutool.core.text.CharPool;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.common.enums.CommonStatusEnum;
import cn.iocoder.yudao.framework.common.util.number.NumberUtils;
import cn.iocoder.yudao.framework.common.util.object.ObjectUtils;
import cn.iocoder.yudao.framework.common.util.spring.SpringExpressionUtils;
@ -19,9 +20,12 @@ import cn.iocoder.yudao.module.iot.enums.device.IotDeviceMessageTypeEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneActionTypeEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneTriggerConditionParameterOperatorEnum;
import cn.iocoder.yudao.module.iot.enums.rule.IotRuleSceneTriggerTypeEnum;
import cn.iocoder.yudao.module.iot.framework.job.core.IotSchedulerManager;
import cn.iocoder.yudao.module.iot.job.rule.IotRuleSceneJob;
import cn.iocoder.yudao.module.iot.mq.message.IotDeviceMessage;
import cn.iocoder.yudao.module.iot.service.rule.action.IotRuleSceneAction;
import jakarta.annotation.Resource;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobKey;
import org.quartz.Scheduler;
@ -54,6 +58,9 @@ public class IotRuleSceneServiceImpl implements IotRuleSceneService {
@Resource
private List<IotRuleSceneAction> ruleSceneActions;
@Resource(name = "iotSchedulerManager")
private IotSchedulerManager schedulerManager;
// TODO 芋艿缓存待实现
@Override
@TenantIgnore // 忽略租户隔离因为 IotRuleSceneMessageHandler 调用时一般未传递租户所以需要忽略
@ -186,7 +193,7 @@ public class IotRuleSceneServiceImpl implements IotRuleSceneService {
public void executeRuleSceneByDevice(IotDeviceMessage message) {
TenantUtils.execute(message.getTenantId(), () -> {
// 1. 获得设备匹配的规则场景
List<IotRuleSceneDO> ruleScenes = getMatchedRuleSceneList(message);
List<IotRuleSceneDO> ruleScenes = getMatchedRuleSceneListByMessage(message);
if (CollUtil.isEmpty(ruleScenes)) {
return;
}
@ -196,13 +203,60 @@ public class IotRuleSceneServiceImpl implements IotRuleSceneService {
});
}
@Override
public void executeRuleSceneByTimer(Long id) {
// 1.1 获得规则场景
// IotRuleSceneDO scene = TenantUtils.executeIgnore(() -> ruleSceneMapper.selectById(id));
// TODO @芋艿这里临时测试后续删除
IotRuleSceneDO scene = new IotRuleSceneDO().setStatus(CommonStatusEnum.ENABLE.getStatus());
if (true) {
scene.setTenantId(1L);
IotRuleSceneDO.TriggerConfig triggerConfig = new IotRuleSceneDO.TriggerConfig();
triggerConfig.setType(IotRuleSceneTriggerTypeEnum.TIMER.getType());
scene.setTriggers(ListUtil.toList(triggerConfig));
// 动作
IotRuleSceneDO.ActionConfig action01 = new IotRuleSceneDO.ActionConfig();
action01.setType(IotRuleSceneActionTypeEnum.DEVICE_CONTROL.getType());
IotRuleSceneDO.ActionDeviceControl actionDeviceControl01 = new IotRuleSceneDO.ActionDeviceControl();
actionDeviceControl01.setProductKey("4aymZgOTOOCrDKRT");
actionDeviceControl01.setDeviceNames(ListUtil.of("small"));
actionDeviceControl01.setType(IotDeviceMessageTypeEnum.PROPERTY.getType());
actionDeviceControl01.setIdentifier(IotDeviceMessageIdentifierEnum.PROPERTY_SET.getIdentifier());
actionDeviceControl01.setData(MapUtil.<String, Object>builder()
.put("power", 1)
.put("color", "red")
.build());
action01.setDeviceControl(actionDeviceControl01);
scene.setActions(ListUtil.toList(action01));
}
if (scene == null) {
log.error("[executeRuleSceneByTimer][规则场景({}) 不存在]", id);
return;
}
if (CommonStatusEnum.isDisable(scene.getStatus())) {
log.info("[executeRuleSceneByTimer][规则场景({}) 已被禁用]", id);
return;
}
// 1.2 判断是否有定时触发器避免脏数据
IotRuleSceneDO.TriggerConfig config = CollUtil.findOne(scene.getTriggers(),
trigger -> ObjUtil.equals(trigger.getType(), IotRuleSceneTriggerTypeEnum.TIMER.getType()));
if (config == null) {
log.error("[executeRuleSceneByTimer][规则场景({}) 不存在定时触发器]", scene);
return;
}
// 2. 执行规则场景
TenantUtils.execute(scene.getTenantId(),
() -> executeRuleSceneAction(null, ListUtil.toList(scene)));
}
/**
* 获得匹配的规则场景列表
* 基于消息获得匹配的规则场景列表
*
* @param message 设备消息
* @return 规则场景列表
*/
private List<IotRuleSceneDO> getMatchedRuleSceneList(IotDeviceMessage message) {
private List<IotRuleSceneDO> getMatchedRuleSceneListByMessage(IotDeviceMessage message) {
// 1. 匹配设备
// TODO @芋艿可能需要 getSelf(); 缓存
List<IotRuleSceneDO> ruleScenes = getRuleSceneListByProductKeyAndDeviceNameFromCache(
@ -335,16 +389,37 @@ public class IotRuleSceneServiceImpl implements IotRuleSceneService {
});
}
// TODO @芋艿测试思路代码记得删除
// 1. Job IotRuleSceneJob
// 2. 参数id
// 3. jobHandlerNameIotRuleSceneJob + id
@Override
@SneakyThrows
public void test() {
// TODO @芋艿测试思路代码记得删除
// 1. Job IotRuleSceneJob DONE
// 2. 参数id DONE
// 3. jobHandlerNameIotRuleSceneJob + id DONE
// 新增addJob
// 修改不存在 addJob存在 updateJob
// + 禁用1存在停止2不存在不处理TODO 测试直接暂停是否可以结论可以
// + 开启1存在更新2不存在新增
// + 禁用开启1存在删除TODO 测试直接删除结论可以
// 新增addJob
// 修改不存在 addJob存在 updateJob
// + 禁用1存在停止2不存在不处理TODO 测试直接暂停是否可以结论可以pauseJob
// + 开启1存在更新2不存在新增结论使用 save(addOrUpdateJob)
// + 禁用开启1存在删除TODO 测试直接删除结论可以deleteJob
//
if (true) {
Long id = 1L;
Map<String, Object> jobDataMap = IotRuleSceneJob.buildJobDataMap(id);
schedulerManager.addOrUpdateJob(IotRuleSceneJob.class,
IotRuleSceneJob.buildJobName(id),
"0/10 * * * * ?",
jobDataMap);
}
if (false) {
Long id = 1L;
schedulerManager.pauseJob(IotRuleSceneJob.buildJobName(id));
}
if (true) {
}
}
public static void main2(String[] args) throws SchedulerException {
// System.out.println(QuartzJobBean.class);