feat: 添加工作流取消与临时文件管理功能

- 新增临时文件(FileTemp)的实体、DAO和DTO,支持文件临时存储与批量操作
- 实现工作流执行取消功能,使用sync.Map管理context.CancelFunc,支持按会话取消运行中的流程
- 将流程执行状态"暂停"变更为"取消",并处理取消导致的错误
- 引入IsDialogue标识区分对话模式,调整判断/文案/图片节点的表单数据组装逻辑
- 重构ComposeMessagesReq,使用BuildType替代IsBuild和ModelTypeId
- 优化HTML内容提取逻辑,修复文案纯文本与图片URL的标签过滤及标签命名
- 在结果汇总节点中使用事务更新执行状态并批量保存输出文件记录
This commit is contained in:
2026-05-15 09:37:23 +08:00
parent 2c3cbab11d
commit b1ee117f6c
18 changed files with 730 additions and 336 deletions

View File

@@ -3,10 +3,13 @@ package flow
import (
"ai-agent/workflow/consts/flow"
"ai-agent/workflow/consts/node"
fileDao "ai-agent/workflow/dao/file"
flowDao "ai-agent/workflow/dao/flow"
fileDto "ai-agent/workflow/model/dto/file"
flowDto "ai-agent/workflow/model/dto/flow"
"ai-agent/workflow/model/entity"
"context"
"errors"
"fmt"
"sort"
"strconv"
@@ -31,6 +34,10 @@ func (s *flowExecutionService) Get(ctx context.Context, req *flowDto.GetFlowExec
return nil, err
}
res = new(flowDto.VOFlowExecution)
res.ImgAddressPrefix, err = utils.GetFileAddressPrefix(ctx)
if err != nil {
return nil, err
}
err = gconv.Struct(r, &res)
return res, err
}
@@ -227,15 +234,61 @@ func Notify(taskId string, result any) {
delete(asyncTasks, taskId)
}
//func (s *flowExecutionService) Cancel(ctx context.Context, req *flowDto.CancelReq) (err error) {
// res, err := flowDao.FlowExecutionDao.Get(ctx, &flowDto.GetFlowExecutionReq{
// Id: req.FlowId,
// })
// res.TraceId
// return
//}
// ===================== 核心改造:替换为 sync.Map 存储取消上下文 =====================
var (
// cancelMap: traceID -> context.CancelFunc
cancelMap sync.Map
)
func (s *flowExecutionService) Cancel(ctx context.Context, req *flowDto.CancelReq) (err error) {
getRes, err := flowDao.FlowExecutionDao.Get(ctx, &flowDto.GetFlowExecutionReq{
SessionId: req.SessionId,
})
if err != nil {
return err
}
if g.IsEmpty(getRes) {
return fmt.Errorf("会话[%s] 不存在", req.SessionId)
}
// 从 sync.Map 获取取消函数
cancelVal, exist := cancelMap.Load(getRes.TraceId)
if !exist {
return fmt.Errorf("traceID[%s] 不存在或已执行完成", getRes.TraceId)
}
// 执行取消
cancel, ok := cancelVal.(context.CancelFunc)
if !ok {
return fmt.Errorf("traceID[%s] 对应的取消函数类型错误", getRes.TraceId)
}
cancel()
// 取消后清理(可选:也可以在流程结束时统一清理)
cancelMap.Delete(getRes.TraceId)
// 同步更新流程执行状态为已取消
_, err = flowDao.FlowExecutionDao.Update(ctx, &flowDto.UpdateFlowExecutionReq{
Id: getRes.Id,
Status: flow.FlowExecutionStatusCancel.Code(),
})
if err != nil {
return fmt.Errorf("更新取消状态失败: %v", err)
}
return nil
}
func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.ExecuteReq) (res *flowDto.ExecuteRes, err error) {
// ===================== 核心改造1创建可取消的上下文 =====================
execCtx, cancel := context.WithCancel(ctx)
traceId := ""
defer func() {
// 流程结束(成功/失败)时清理 cancelMap
if traceId != "" {
cancelMap.Delete(traceId)
}
cancel()
}()
flowInfo, err := flowDao.FlowExecutionDao.Get(ctx, &flowDto.GetFlowExecutionReq{
SessionId: req.SessionId,
@@ -244,7 +297,9 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
return
}
var executionId int64
var isDialogue bool
if flowInfo == nil {
isDialogue = false
var r = new(flowDto.CreateFlowExecutionReq)
r.FlowUserId = req.FlowId
r.FlowName = req.FlowName
@@ -256,24 +311,55 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
span := trace.SpanFromContext(ctx)
if span != nil && span.SpanContext().HasTraceID() {
r.TraceId = span.SpanContext().TraceID().String()
traceId = r.TraceId
cancelMap.Store(traceId, cancel)
}
executionId, err = flowDao.FlowExecutionDao.Insert(ctx, r)
if err != nil {
return
}
} else {
isDialogue = true
executionId = flowInfo.Id
span := trace.SpanFromContext(ctx)
if span != nil && span.SpanContext().HasTraceID() {
traceId = span.SpanContext().TraceID().String()
cancelMap.Store(traceId, cancel)
}
executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId,
Status: flow.FlowExecutionStatusRunning.Code(),
TraceId: traceId,
}
_, err = flowDao.FlowExecutionDao.Update(ctx, &executionReq)
if err != nil {
return
}
}
_, err = flowDao.FlowUserDao.Update(ctx, &flowDto.UpdateFlowUserReq{
Id: req.FlowId,
FlowContent: req.FlowContent,
NodeInputParams: req.NodeInputParams,
})
if err != nil {
return nil, err
if !g.IsEmpty(req.FileUrl) {
createFileTempReq := make([]*fileDto.CreateFileTempReq, 0, len(req.FileUrl))
for _, fileUrl := range req.FileUrl {
var createReq = new(fileDto.CreateFileTempReq)
createReq.BusinessId = req.SessionId
createReq.FileUrl = fileUrl
createFileTempReq = append(createFileTempReq, createReq)
}
_, err = fileDao.FileTempDao.BatchInsert(ctx, createFileTempReq)
if err != nil {
return nil, err
}
}
//_, err = flowDao.FlowUserDao.Update(ctx, &flowDto.UpdateFlowUserReq{
// Id: req.FlowId,
// FlowContent: req.FlowContent,
// NodeInputParams: req.NodeInputParams,
//})
//if err != nil {
// return nil, err
//}
//nodeInsert := make([]*nodeDto.CreateNodeExecutionReq, 0, len(flowInfo.NodeInputParams))
//for _, flowNode := range flowInfo.NodeInputParams {
// nodeInsert = append(nodeInsert, &nodeDto.CreateNodeExecutionReq{
@@ -329,18 +415,18 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
// =========================================================================
// ✅【第2步】构建执行图
// =========================================================================
runGraph, err := BuildGraphFromFlowContent(ctx, req.FlowContent, judge2IntentNodeMap, summaryNodeID)
runGraph, err := BuildGraphFromFlowContent(execCtx, req.FlowContent, judge2IntentNodeMap, summaryNodeID)
if err != nil {
executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId,
Id: executionId,
Status: flow.FlowExecutionStatusFailed.Code(),
ErrorMessage: err.Error(),
}
executionReq.Status = flow.FlowExecutionStatusFailed.Code()
executionReq.ErrorMessage = err.Error()
_, err1 := flowDao.FlowExecutionDao.Update(ctx, &executionReq)
if err1 != nil {
return
}
return nil, err
return nil, fmt.Errorf("执行工作流失败: %v", err)
}
// =========================================================================
@@ -363,6 +449,7 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
// ✅【第4步】构建全局执行入参现在 schemaMap 是有值的!)
// =========================================================================
execInput := &flowDto.FlowExecutionInput{
IsDialogue: isDialogue,
ExecutionId: executionId,
ConfigMap: configMap,
SessionId: req.SessionId,
@@ -371,18 +458,27 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
FileUrl: req.FileUrl,
}
// 执行工作流
_, err = runGraph.Invoke(ctx, execInput)
_, err = runGraph.Invoke(execCtx, execInput)
if err != nil {
executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId,
// 检测是否是取消导致的错误
if errors.Is(execCtx.Err(), context.Canceled) {
executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId,
Status: flow.FlowExecutionStatusCancel.Code(),
}
_, _ = flowDao.FlowExecutionDao.Update(ctx, &executionReq)
return nil, fmt.Errorf("工作流已被取消: %v", err)
}
executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId,
Status: flow.FlowExecutionStatusFailed.Code(),
ErrorMessage: err.Error(),
}
executionReq.Status = flow.FlowExecutionStatusFailed.Code()
executionReq.ErrorMessage = err.Error()
_, err1 := flowDao.FlowExecutionDao.Update(ctx, &executionReq)
if err1 != nil {
return
}
return nil, err
return nil, fmt.Errorf("执行工作流失败: %v", err)
}
return