【代码评审】BPM:触发器 HTTP 异步

This commit is contained in:
YunaiV 2025-02-25 07:56:05 +08:00
parent fda6aff8af
commit d0fbb7677c
15 changed files with 44 additions and 31 deletions

View File

@ -1,7 +1,6 @@
package cn.iocoder.yudao.module.bpm.api.task;
import cn.iocoder.yudao.module.bpm.api.task.dto.BpmProcessInstanceCreateReqDTO;
import jakarta.validation.Valid;
/**
@ -20,12 +19,13 @@ public interface BpmProcessInstanceApi {
*/
String createProcessInstance(Long userId, @Valid BpmProcessInstanceCreateReqDTO reqDTO);
// TODO @jason新增 BpmProcessTaskApi 接口这个要不改成 triggerTask保持通用性 flowable 保持一致
/**
* 异步 HTTP 请求触发器回调, 为了唤醒流程继续执行
*
* @param processInstanceId 流程实例编号
* @param callbackId 回调编号, 对应 ReceiveTask Id
* @param callbackId 回调编号, 对应 ReceiveTask Id TODO @jason改成 taskDefineKey
*/
void asyncHttpTriggerCallback(String processInstanceId, String callbackId);
}

View File

@ -19,7 +19,7 @@ public enum BpmTriggerTypeEnum implements ArrayValuable<Integer> {
HTTP_REQUEST(1, "发起 HTTP 请求"),
FORM_UPDATE(2, "更新流程表单数据"),
FORM_DELETE(3, "删除流程表单数据"),
ASYNC_HTTP_REQUEST(4, "发起异步 HTTP 请求");
ASYNC_HTTP_REQUEST(4, "发起异步 HTTP 请求"); // TODO @jasonHTTP_REQUEST_ASYNC
/**
* 触发器执行动作类型

View File

@ -34,4 +34,5 @@ public class BpmProcessInstanceApiImpl implements BpmProcessInstanceApi {
public void asyncHttpTriggerCallback(String processInstanceId, String callbackId) {
bpmTaskService.triggerReceiveTask(processInstanceId, callbackId);
}
}

View File

@ -117,7 +117,8 @@ public class BpmSimpleModelNodeVO {
@Schema(description = "路由分支组", example = "[]")
private List<RouterSetting> routerGroups;
@Schema(description = "路由分支默认分支 ID", example = "Flow_xxx", hidden = true) // 由后端生成所以 hidden = true
@Schema(description = "路由分支默认分支 ID", example = "Flow_xxx", hidden = true) // 由后端生成不从前端传递所以 hidden = true
@JsonIgnore
private String routerDefaultFlowId; // 仅用于路由分支节点 BpmSimpleModelNodeType.ROUTER_BRANCH_NODE
/**
@ -125,12 +126,9 @@ public class BpmSimpleModelNodeVO {
*/
private TriggerSetting triggerSetting;
/**
* 附加节点 Id, 该节点不从前端传入 由程序生成. 由于当个节点无法完成功能 需要附加节点来完成
*/
@Schema(description = "附加节点 Id", example = "UserTask_xxx", hidden = true) // 由后端生成不从前端传递所以 hidden = true
@JsonIgnore
private String attachNodeId; // 目前用于异步触发器节点需要 UserTask ReceiveTask附加节点) 来完成
private String attachNodeId; // 目前用于触发器节点异步需要 UserTask ReceiveTask附加节点) 来完成
/**
* 子流程设置
@ -382,7 +380,6 @@ public class BpmSimpleModelNodeVO {
@Valid
private List<HttpRequestParam> body;
// TODO @json可能未来看情况搞个 HttpResponseParam得看看有没别的业务需要抽象统一
/**
* 请求返回处理设置用于修改流程表单值
* <p>
@ -392,10 +389,13 @@ public class BpmSimpleModelNodeVO {
@Schema(description = "请求返回处理设置", example = "[]")
private List<KeyValue<String, String>> response;
// TODO @jason改成 callbackTaskDefineKey
/**
* 异步 Http 请求需要指定回调 Id用于回调执行. 由后端指定
* 异步 Http 请求需要指定回调 ID用于回调执行
*/
@Schema(description = "回调 ID", example = "xxx", hidden = true)
private String callbackId;
}
@Schema(description = "流程表单触发器设置", example = "{}")

View File

@ -180,6 +180,7 @@ public class BpmProcessInstanceController {
return success(processInstanceService.getProcessInstanceBpmnModelView(id));
}
// TODO @jason要不去掉这个接口单体通过 asyncHttpTriggerCallback
@PostMapping("/http-trigger/callback")
@Operation(summary = "异步 HTTP 请求触发器回调")
@PermitAll // 允许外部调用不需要登录 TODO @芋艿 需要加一下验证签名吗

View File

@ -4,6 +4,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotEmpty;
import lombok.Data;
// TODO @jason要不去掉这个接口单体通过 asyncHttpTriggerCallback
@Schema(description = "管理后台 - Bpm 异步 Http 触发器请求回调 Request VO")
@Data
public class BpmHttpTriggerCallbackReqVO {
@ -15,4 +16,5 @@ public class BpmHttpTriggerCallbackReqVO {
@Schema(description = "回调编号", requiredMode = Schema.RequiredMode.REQUIRED, example = "dca1cdcc-b8fe-11ef-99b5-01ff4722db8b")
@NotEmpty(message = "回调编号不能为空")
private String callbackId;
}

View File

@ -112,4 +112,5 @@ public class BpmTaskEventListener extends AbstractFlowableEngineEventListener {
taskService.triggerReceiveTask(event.getProcessInstanceId(), taskKey);
}
}
}

View File

@ -171,11 +171,12 @@ public class SimpleModelUtils {
// 情况二没有子节点则直接跟 targetNodeId 建立连线例如说结束节点条件分支分支节点的孩子节点或聚合节点的最后一个节点
String finalTargetNodeId = isChildNodeValid ? childNode.getId() : targetNodeId;
// 如果没有附加节点则直接建立连线
if (StrUtil.isEmpty(node.getAttachNodeId())) {
SequenceFlow sequenceFlow = buildBpmnSequenceFlow(node.getId(), finalTargetNodeId);
process.addFlowElement(sequenceFlow);
} else {
// 如果有附加节点. 需要先建立和附加节点的连线再建立附加节点和目标节点的连线
// 如果有附加节点需要先建立和附加节点的连线再建立附加节点和目标节点的连线例如说触发器节点异步
List<SequenceFlow> sequenceFlows = buildAttachNodeSequenceFlow(node.getId(), node.getAttachNodeId(), finalTargetNodeId);
sequenceFlows.forEach(process::addFlowElement);
}
@ -189,9 +190,9 @@ public class SimpleModelUtils {
/**
* 构建有附加节点的连线
*
* @param nodeId 当前节点 Id
* @param attachNodeId 附属节点 Id
* @param targetNodeId 目标节点 Id
* @param nodeId 当前节点 ID
* @param attachNodeId 附属节点 ID
* @param targetNodeId 目标节点 ID
*/
private static List<SequenceFlow> buildAttachNodeSequenceFlow(String nodeId, String attachNodeId, String targetNodeId) {
SequenceFlow sequenceFlow = buildBpmnSequenceFlow(nodeId, attachNodeId, null, null, null);
@ -748,8 +749,10 @@ public class SimpleModelUtils {
public static class TriggerNodeConvert implements NodeConvert {
// TODO @芋艿异步在看看
@Override
public List<? extends FlowElement> convertList(BpmSimpleModelNodeVO node) {
Assert.notNull(node.getTriggerSetting(), "触发器节点设置不能为空");
List<FlowElement> flowElements = new ArrayList<>(2);
// 触发器使用 ServiceTask 来实现
ServiceTask serviceTask = new ServiceTask();
@ -757,23 +760,23 @@ public class SimpleModelUtils {
serviceTask.setName(node.getName());
serviceTask.setImplementationType(ImplementationType.IMPLEMENTATION_TYPE_DELEGATEEXPRESSION);
serviceTask.setImplementation("${" + BpmTriggerTaskDelegate.BEAN_NAME + "}");
Assert.notNull(node.getTriggerSetting(), "触发器节点设置不能为空");
addExtensionElement(serviceTask, TRIGGER_TYPE, node.getTriggerSetting().getType());
flowElements.add(serviceTask);
// 异步 HTTP 请求需要附加一个 ReceiveTask发起请求后等待回调执行
// TODO @jason这里能挪到最后处理么这样代码的整体性更好
if (ASYNC_HTTP_REQUEST.getType().equals(node.getTriggerSetting().getType())) {
Assert.notNull(node.getTriggerSetting().getHttpRequestSetting(), "触发器 HTTP 请求设置不能为空");
String attachNodeId = "Activity_" + IdUtil.fastUUID();
ReceiveTask receiveTask = new ReceiveTask();
receiveTask.setId(attachNodeId);
receiveTask.setName("异步 HTTP 请求");
node.setAttachNodeId(attachNodeId);
// 设置 receiveId
Assert.notNull(node.getTriggerSetting().getHttpRequestSetting(), "触发器 http 请求设置不能为空");
node.getTriggerSetting().getHttpRequestSetting().setCallbackId(attachNodeId);
node.getTriggerSetting().getHttpRequestSetting().setCallbackId(attachNodeId); // 设置 receiveId
flowElements.add(receiveTask);
}
// TODO @jason是不是挪到 flowElements.add(serviceTask); 之前哈因为它在处理 serviceTask
if (node.getTriggerSetting().getHttpRequestSetting() != null) {
addExtensionElementJson(serviceTask, TRIGGER_PARAM, node.getTriggerSetting().getHttpRequestSetting());
}

View File

@ -276,8 +276,9 @@ public interface BpmTaskService {
*/
void processTaskTimeout(String processInstanceId, String taskDefineKey, Integer handlerType);
// TODO @jason改成 triggerTask然后触发 ReceiveTask让流程继续执行改成一些调用场景
/**
* 触发 ReceiveTask, 让流程继续执行
* 触发 ReceiveTask让流程继续执行
*
* @param processInstanceId 流程示例编号
* @param taskDefineKey 任务 Key

View File

@ -1329,7 +1329,7 @@ public class BpmTaskServiceImpl implements BpmTaskService {
.activityId(taskDefineKey)
.singleResult();
if (execution == null) {
log.error("[processDelayTimerTimeout][processInstanceId({}) activityId({}) 没有找到执行活动]",
log.error("[triggerReceiveTask][processInstanceId({}) activityId({}) 没有找到执行活动]",
processInstanceId, taskDefineKey);
return;
}

View File

@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.bpm.service.task.trigger;
package cn.iocoder.yudao.module.bpm.service.task.trigger.form;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
@ -7,6 +7,7 @@ import cn.iocoder.yudao.module.bpm.enums.definition.BpmTriggerTypeEnum;
import cn.iocoder.yudao.module.bpm.framework.flowable.core.util.BpmnModelUtils;
import cn.iocoder.yudao.module.bpm.framework.flowable.core.util.SimpleModelUtils;
import cn.iocoder.yudao.module.bpm.service.task.BpmProcessInstanceService;
import cn.iocoder.yudao.module.bpm.service.task.trigger.BpmTrigger;
import com.fasterxml.jackson.core.type.TypeReference;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;

View File

@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.bpm.service.task.trigger;
package cn.iocoder.yudao.module.bpm.service.task.trigger.form;
import cn.hutool.core.collection.CollUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
@ -7,6 +7,7 @@ import cn.iocoder.yudao.module.bpm.enums.definition.BpmTriggerTypeEnum;
import cn.iocoder.yudao.module.bpm.framework.flowable.core.util.BpmnModelUtils;
import cn.iocoder.yudao.module.bpm.framework.flowable.core.util.SimpleModelUtils;
import cn.iocoder.yudao.module.bpm.service.task.BpmProcessInstanceService;
import cn.iocoder.yudao.module.bpm.service.task.trigger.BpmTrigger;
import com.fasterxml.jackson.core.type.TypeReference;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;

View File

@ -1,7 +1,8 @@
package cn.iocoder.yudao.module.bpm.service.task.trigger;
package cn.iocoder.yudao.module.bpm.service.task.trigger.http;
import cn.iocoder.yudao.module.bpm.controller.admin.definition.vo.model.simple.BpmSimpleModelNodeVO;
import cn.iocoder.yudao.module.bpm.framework.flowable.core.util.SimpleModelUtils;
import cn.iocoder.yudao.module.bpm.service.task.trigger.BpmTrigger;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.runtime.ProcessInstance;
@ -39,8 +40,7 @@ public abstract class BpmAbstractHttpRequestTrigger implements BpmTrigger {
HttpEntity<MultiValueMap<String, String>> requestEntity = new HttpEntity<>(body, headers);
ResponseEntity<String> responseEntity;
try {
responseEntity = restTemplate.exchange(url, HttpMethod.POST,
requestEntity, String.class);
responseEntity = restTemplate.exchange(url, HttpMethod.POST, requestEntity, String.class);
log.info("[sendHttpRequest][HTTP 触发器,请求头:{},请求体:{},响应结果:{}]", headers, body, responseEntity);
} catch (RestClientException e) {
log.error("[sendHttpRequest][HTTP 触发器,请求头:{},请求体:{},请求出错:{}]", headers, body, e.getMessage());
@ -66,4 +66,5 @@ public abstract class BpmAbstractHttpRequestTrigger implements BpmTrigger {
body.add("processInstanceId", processInstance.getId());
return body;
}
}

View File

@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.bpm.service.task.trigger;
package cn.iocoder.yudao.module.bpm.service.task.trigger.http;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
import cn.iocoder.yudao.module.bpm.controller.admin.definition.vo.model.simple.BpmSimpleModelNodeVO;
@ -42,7 +42,8 @@ public class BpmAsyncHttpRequestTrigger extends BpmAbstractHttpRequestTrigger {
MultiValueMap<String, String> headers = buildHttpHeaders(processInstance, setting.getHeader());
// 2.2 设置请求体
MultiValueMap<String, String> body = buildHttpBody(processInstance, setting.getBody());
body.add("callbackId", setting.getCallbackId()); // 异步请求 callbackId 需要传给被调用方. 用于回调执行
// TODO @芋艿异步在看看
body.add("callbackId", setting.getCallbackId()); // 异步请求 callbackId 需要传给被调用方用于回调执行
// 3. 发起请求
sendHttpRequest(setting.getUrl(), headers, body);

View File

@ -1,4 +1,4 @@
package cn.iocoder.yudao.module.bpm.service.task.trigger;
package cn.iocoder.yudao.module.bpm.service.task.trigger.http;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;