package flow import ( "ai-agent/workflow/consts/flow" "ai-agent/workflow/consts/node" fileDao "ai-agent/workflow/dao/file" flowDao "ai-agent/workflow/dao/flow" fileDto "ai-agent/workflow/model/dto/file" flowDto "ai-agent/workflow/model/dto/flow" "ai-agent/workflow/model/entity" "context" "errors" "fmt" "sort" "strconv" "strings" "sync" "gitea.redpowerfuture.com/red-future/common/utils" "github.com/cloudwego/eino/compose" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" "go.opentelemetry.io/otel/trace" ) var FlowExecutionService = &flowExecutionService{} type flowExecutionService struct{} func (s *flowExecutionService) Get(ctx context.Context, req *flowDto.GetFlowExecutionReq) (res *flowDto.VOFlowExecution, err error) { r, err := flowDao.FlowExecutionDao.Get(ctx, req) if err != nil { return nil, err } res = new(flowDto.VOFlowExecution) res.ImgAddressPrefix, err = utils.GetFileAddressPrefix(ctx) if err != nil { return nil, err } err = gconv.Struct(r, &res) return res, err } func (s *flowExecutionService) List(ctx context.Context, req *flowDto.ListFlowExecutionReq) (res *flowDto.ListFlowExecutionTreeRes, err error) { user, err := utils.GetUserInfo(ctx) if err != nil { return } req.Creator = user.UserName list, _, err := flowDao.FlowExecutionDao.List(ctx, req) if err != nil { return nil, err } // ===================== 核心修复:只统计【有数据】的执行记录,空的直接跳过 ===================== executionNumber := make(map[int64]int) // executionId -> 倒序编号(最新=1) // 第一次遍历:只处理【有输出参数】的记录,统计并分配编号 var validList []*entity.FlowExecution // 只存有效(非空)记录 for _, execution := range list { if g.IsEmpty(execution.OutputParams) { continue // 空数据直接过滤,不参与编号、不展示 } validList = append(validList, execution) } // 给有效记录分配【时间倒序编号】(最新=1) totalValid := len(validList) for idx, execution := range validList { executionNumber[execution.Id] = totalValid - idx } // 2. 分组映射:日期 -> 流程节点 type flowWrap struct { flowNode flowDto.FlowNode createdAt *gtime.Time } dateMap := make(map[string]*[]flowWrap) // 遍历【有效数据】构建结构 for _, execution := range validList { createDate := execution.CreatedAt.Format("Y-m-d") flowName := execution.FlowName outputParams := execution.OutputParams // 编号只算有效数据,不会把空的算进去 num := executionNumber[execution.Id] displayFlowName := fmt.Sprintf("会话-%d(%s)", num, flowName) // 3. 解析 outputParams var tempItems []flowDto.OutputItem for _, paramMap := range outputParams { for tsKey, value := range paramMap { if _, err := strconv.ParseInt(tsKey, 10, 64); err != nil { continue } tempItems = append(tempItems, flowDto.OutputItem{ Timestamp: tsKey, Content: gconv.String(value), }) } } // ===================== 修复1:如果解析后依然为空,直接跳过,不生成第二层节点 ===================== if len(tempItems) == 0 { continue } // 时间戳正序 sort.Slice(tempItems, func(i, j int) bool { t1, _ := strconv.ParseInt(tempItems[i].Timestamp, 10, 64) t2, _ := strconv.ParseInt(tempItems[j].Timestamp, 10, 64) return t1 < t2 }) // 标号:相同类型递增,不同重置 suffixCount := make(map[string]int) for idx := range tempItems { item := &tempItems[idx] val := item.Content suffix := "内容" switch { case strings.Contains(val, "img") || strings.Contains(val, "png") || strings.Contains(val, "jpg"): suffix = "图片" case strings.Contains(val, "html") || strings.Contains(val, "HTML"): suffix = "HTML" case strings.Contains(val, "inc") || len(val) > 50: suffix = "文案" } suffixCount[suffix]++ item.Label = fmt.Sprintf("%s_%d", suffix, suffixCount[suffix]) } // 组装节点 node := flowDto.FlowNode{ FlowName: displayFlowName, Id: execution.Id, SessionId: gconv.String(execution.SessionId), Items: tempItems, } if dateMap[createDate] == nil { dateMap[createDate] = &[]flowWrap{} } *dateMap[createDate] = append(*dateMap[createDate], flowWrap{ flowNode: node, createdAt: execution.CreatedAt, }) } // 6. 构建树 + 排序 var tree []flowDto.DateNode for date, wraps := range dateMap { // 第二层按创建时间倒序(最新在前) sort.Slice(*wraps, func(i, j int) bool { return (*wraps)[i].createdAt.After((*wraps)[j].createdAt) }) var flowNodes []flowDto.FlowNode for _, w := range *wraps { flowNodes = append(flowNodes, w.flowNode) } // ===================== 修复2:日期下没有流程,也过滤掉 ===================== if len(flowNodes) == 0 { continue } tree = append(tree, flowDto.DateNode{ CreateDate: date, Flows: flowNodes, }) } // 第一层日期倒序 sort.Slice(tree, func(i, j int) bool { return tree[i].CreateDate > tree[j].CreateDate }) imgPrefix, err := utils.GetFileAddressPrefix(ctx) return &flowDto.ListFlowExecutionTreeRes{ Tree: tree, ImgAddressPrefix: imgPrefix, }, nil } // ModelCallback 模型回调接口 func (s *flowExecutionService) ModelCallback(ctx context.Context, req *flowDto.ModelCallbackReq) (err error) { // 唤醒等待的任务 Notify(req.TaskId, req) return nil } // 全局等待任务回调的工具 var ( asyncMu sync.Mutex asyncTasks = make(map[string]chan any) ) // Wait 阻塞等待回调结果 // 调用后会一直卡住,直到 Notify 唤醒 或 超时/取消 func Wait(ctx context.Context, taskId string) (any, error) { asyncMu.Lock() ch := make(chan any, 1) asyncTasks[taskId] = ch asyncMu.Unlock() select { case result := <-ch: return result, nil case <-ctx.Done(): asyncMu.Lock() delete(asyncTasks, taskId) asyncMu.Unlock() return nil, ctx.Err() } } // Notify 回调时调用,唤醒等待的任务 func Notify(taskId string, result any) { asyncMu.Lock() defer asyncMu.Unlock() ch, exist := asyncTasks[taskId] if !exist { return } ch <- result delete(asyncTasks, taskId) } // ===================== 核心改造:替换为 sync.Map 存储取消上下文 ===================== var ( // cancelMap: traceID -> context.CancelFunc cancelMap sync.Map ) func (s *flowExecutionService) Cancel(ctx context.Context, req *flowDto.CancelReq) (err error) { getRes, err := flowDao.FlowExecutionDao.Get(ctx, &flowDto.GetFlowExecutionReq{ SessionId: req.SessionId, }) if err != nil { return err } if g.IsEmpty(getRes) { return fmt.Errorf("会话[%s] 不存在", req.SessionId) } // 从 sync.Map 获取取消函数 cancelVal, exist := cancelMap.Load(getRes.TraceId) if !exist { return fmt.Errorf("traceID[%s] 不存在或已执行完成", getRes.TraceId) } // 执行取消 cancel, ok := cancelVal.(context.CancelFunc) if !ok { return fmt.Errorf("traceID[%s] 对应的取消函数类型错误", getRes.TraceId) } cancel() // 取消后清理(可选:也可以在流程结束时统一清理) cancelMap.Delete(getRes.TraceId) // 同步更新流程执行状态为已取消 _, err = flowDao.FlowExecutionDao.Update(ctx, &flowDto.UpdateFlowExecutionReq{ Id: getRes.Id, Status: flow.FlowExecutionStatusCancel.Code(), }) if err != nil { return fmt.Errorf("更新取消状态失败: %v", err) } return nil } func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.ExecuteReq) (res *flowDto.ExecuteRes, err error) { // ===================== 核心改造1:创建可取消的上下文 ===================== execCtx, cancel := context.WithCancel(ctx) traceId := "" defer func() { // 流程结束(成功/失败)时清理 cancelMap if traceId != "" { cancelMap.Delete(traceId) } cancel() }() flowInfo, err := flowDao.FlowExecutionDao.Get(ctx, &flowDto.GetFlowExecutionReq{ SessionId: req.SessionId, }) if err != nil { return } var executionId int64 var isDialogue bool if flowInfo == nil { isDialogue = false var r = new(flowDto.CreateFlowExecutionReq) r.FlowUserId = req.FlowId r.FlowName = req.FlowName r.TriggerType = flow.FlowExecutionTriggerTypeManual.Code() r.FlowContent = req.FlowContent r.NodeInputParams = req.NodeInputParams r.SessionId = req.SessionId r.Status = flow.FlowExecutionStatusRunning.Code() span := trace.SpanFromContext(ctx) if span != nil && span.SpanContext().HasTraceID() { r.TraceId = span.SpanContext().TraceID().String() traceId = r.TraceId cancelMap.Store(traceId, cancel) } executionId, err = flowDao.FlowExecutionDao.Insert(ctx, r) if err != nil { return } } else { isDialogue = true executionId = flowInfo.Id span := trace.SpanFromContext(ctx) if span != nil && span.SpanContext().HasTraceID() { traceId = span.SpanContext().TraceID().String() cancelMap.Store(traceId, cancel) } executionReq := flowDto.UpdateFlowExecutionReq{ Id: executionId, Status: flow.FlowExecutionStatusRunning.Code(), TraceId: traceId, } _, err = flowDao.FlowExecutionDao.Update(ctx, &executionReq) if err != nil { return } } if !g.IsEmpty(req.FileUrl) { createFileTempReq := make([]*fileDto.CreateFileTempReq, 0, len(req.FileUrl)) for _, fileUrl := range req.FileUrl { var createReq = new(fileDto.CreateFileTempReq) createReq.BusinessId = req.SessionId createReq.FileUrl = fileUrl createFileTempReq = append(createFileTempReq, createReq) } _, err = fileDao.FileTempDao.BatchInsert(ctx, createFileTempReq) if err != nil { return nil, err } } if isDialogue && !g.IsEmpty(flowInfo) && !g.IsEmpty(req.ResultUrl) { if strings.HasSuffix(gconv.String(req.ResultUrl), ".inc") { err = TextModelSingleLambda(ctx, req, flowInfo) return } else if strings.HasSuffix(gconv.String(req.ResultUrl), ".png") { err = ImgModelSingleLambda(ctx, req, flowInfo) return } else if strings.HasSuffix(gconv.String(req.ResultUrl), ".html") { err = TextImgModelSingleLambda(ctx, req, flowInfo) return } return nil, errors.New("文件格式不支持") } // ========================================================================= // ✅【第1步】给所有判断节点自动生成意图识别节点 // ========================================================================= judge2IntentNodeMap := make(map[string]string) finalNodes := make([]entity.FlowNode, 0, len(req.FlowContent.Nodes)*2) for _, item := range req.FlowContent.Nodes { finalNodes = append(finalNodes, item) // 判断节点自动加 intent 节点 if item.NodeCode == node.NodeTypeJudge { intentNodeID := fmt.Sprintf("intent_%s", item.Id) intentNode := entity.FlowNode{ Id: intentNodeID, NodeCode: node.NodeTypeIntent, Name: fmt.Sprintf("意图识别-%s", item.Name), InputSource: item.InputSource, // ✅ 正确赋值 FormConfig: item.FormConfig, // ✅ 用户配置 ModelConfig: item.ModelConfig, // ✅ 系统配置 } finalNodes = append(finalNodes, intentNode) judge2IntentNodeMap[item.Id] = intentNodeID } } summaryNodeID := "summary_node" summaryNode := entity.FlowNode{ Id: summaryNodeID, NodeCode: node.NodeTypeCustomNode, // 复用自定义节点类型,也可新增专属类型 Name: "结果汇总节点", InputSource: []entity.FlowNodeInputSource{}, // 后续自动聚合所有节点输出 FormConfig: nil, ModelConfig: node.ModelItem{}, } finalNodes = append(finalNodes, summaryNode) // 替换节点列表 req.FlowContent.Nodes = finalNodes // ========================================================================= // ✅【第2步】构建执行图 // ========================================================================= var runGraph compose.Runnable[any, any] runGraph, err = BuildGraphFromFlowContent(execCtx, req.FlowContent, judge2IntentNodeMap, summaryNodeID) if err != nil { executionReq := flowDto.UpdateFlowExecutionReq{ Id: executionId, Status: flow.FlowExecutionStatusFailed.Code(), ErrorMessage: err.Error(), } _, err1 := flowDao.FlowExecutionDao.Update(ctx, &executionReq) if err1 != nil { return } return nil, fmt.Errorf("执行工作流失败: %v", err) } // ========================================================================= // ✅【第3步】构建 ConfigMap // ========================================================================= configMap := make(map[string]*entity.FlowNode) for _, cfg := range req.NodeInputParams { configMap[cfg.Id] = cfg } // 自动给意图节点复制配置 for judgeID, intentID := range judge2IntentNodeMap { if cfg, ok := configMap[judgeID]; ok { configMap[intentID] = cfg } } // 初始化汇总节点配置 configMap[summaryNodeID] = &summaryNode // ========================================================================= // ✅【第4步】构建全局执行入参(现在 schemaMap 是有值的!) // ========================================================================= execInput := &flowDto.FlowExecutionInput{ IsDialogue: isDialogue, ExecutionId: executionId, ConfigMap: configMap, SessionId: req.SessionId, Desc: req.Desc, SkillName: req.SkillName, FileUrl: req.FileUrl, } // 执行工作流 _, err = runGraph.Invoke(execCtx, execInput) if err != nil { // 检测是否是取消导致的错误 if errors.Is(execCtx.Err(), context.Canceled) { executionReq := flowDto.UpdateFlowExecutionReq{ Id: executionId, Status: flow.FlowExecutionStatusCancel.Code(), } _, _ = flowDao.FlowExecutionDao.Update(ctx, &executionReq) return nil, fmt.Errorf("工作流已被取消: %v", err) } executionReq := flowDto.UpdateFlowExecutionReq{ Id: executionId, Status: flow.FlowExecutionStatusFailed.Code(), ErrorMessage: err.Error(), } _, err1 := flowDao.FlowExecutionDao.Update(ctx, &executionReq) if err1 != nil { return } return nil, fmt.Errorf("执行工作流失败: %v", err) } return } // BuildGraphFromFlowContent 根据前端保存的工作流JSON,自动构建执行图 func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo, judge2IntentNodeMap map[string]string, summaryNodeID string) (compose.Runnable[any, any], error) { graph := compose.NewGraph[any, any]() nodeMap := make(map[string]entity.FlowNode) // 注册所有节点 for _, item := range flowContent.Nodes { nodeMap[item.Id] = item if item.NodeCode != node.NodeTypeJudge { registerNodeToGraph(graph, item) } } // 构建边关系 upstreamMap := make(map[string][]string) edgeMap := make(map[string][]entity.FlowEdge) for _, edge := range flowContent.Edges { edgeMap[edge.From] = append(edgeMap[edge.From], edge) upstreamMap[edge.To] = append(upstreamMap[edge.To], edge.From) } // 处理连线 & 分支 for fromNodeID, edges := range edgeMap { fromNode := nodeMap[fromNodeID] // -------------------------- // 判断节点 → 分支处理 // -------------------------- if fromNode.NodeCode == node.NodeTypeJudge { intentNodeID, ok := judge2IntentNodeMap[fromNodeID] if !ok { return nil, fmt.Errorf("判断节点[%s]未生成意图节点", fromNodeID) } branchMap := make(map[string]bool) for _, e := range edges { branchMap[e.To] = true } judgeLambda := func(ctx context.Context, input any) (string, error) { execInput, ok := input.(*flowDto.FlowExecutionInput) if !ok { return "", fmt.Errorf("入参类型错误") } currentConfig := execInput.ConfigMap[fromNodeID] if currentConfig == nil { return "", fmt.Errorf("判断节点%s无配置", fromNodeID) } branchIdNameMap := make(map[string]string) var branchIDs []string for nodeID := range branchMap { branchIDs = append(branchIDs, nodeID) // 从configMap获取分支节点的名称 if branchNodeCfg, ok := execInput.ConfigMap[nodeID]; ok { branchIdNameMap[nodeID] = branchNodeCfg.Name } else { branchIdNameMap[nodeID] = "未命名节点" // 兜底 } } // 把分支ID-名称映射塞进 ModelConfig,带给意图节点 m := make(map[string]interface{}) m["branch_ids"] = branchIDs m["branch_id_name_map"] = branchIdNameMap // 传递ID-名称映射 currentConfig.Config = m // 从意图节点取输出 if intentCfg, ok := execInput.ConfigMap[intentNodeID]; ok { currentConfig.OutputResult = intentCfg.OutputResult } // 关键修改:构造 NodeExecutionInput 传入 JudgeLambda nodeExecInput := &flowDto.NodeExecutionInput{ Config: currentConfig, // 当前判断节点配置 Global: execInput, // 全局执行入参 } return JudgeLambda(ctx, nodeExecInput) // 传入 NodeExecutionInput 类型 } _ = graph.AddBranch(intentNodeID, compose.NewGraphBranch(judgeLambda, branchMap)) continue } // -------------------------- // 普通节点连线 // -------------------------- for _, e := range edges { toNode := nodeMap[e.To] if toNode.NodeCode == node.NodeTypeJudge { _ = graph.AddEdge(e.From, fmt.Sprintf("intent_%s", toNode.Id)) continue } _ = graph.AddEdge(e.From, e.To) } } // ==================== 第四步:处理开始/结束节点 ==================== if flowContent.StartNodeId != "" { _ = graph.AddEdge(compose.START, flowContent.StartNodeId) } originalEndNodes := findEndNodes(flowContent.StartNodeId, flowContent.Edges) for _, endID := range originalEndNodes { _ = graph.AddEdge(endID, summaryNodeID) } _ = graph.AddEdge(summaryNodeID, compose.END) return graph.Compile(ctx, compose.WithGraphName("auto_build_workflow")) } // -------------------------- 节点自动注册器(核心分发) -------------------------- func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNode) { nodeID := flowNode.Id code := flowNode.NodeCode // 通用包装:全程入参都是 *FlowExecutionInput wrapLambda := func(lambda func(ctx context.Context, input any) (any, error)) func(ctx context.Context, input any) (any, error) { return func(ctx context.Context, input any) (any, error) { // ✅ 【关键】全程入参类型永远不变 execInput, ok := input.(*flowDto.FlowExecutionInput) if !ok { return nil, fmt.Errorf("入参必须是 *FlowExecutionInput, 实际是 %T", input) } configMap := execInput.ConfigMap currentConfig := configMap[nodeID] if currentConfig == nil { return nil, fmt.Errorf("节点%s无配置", nodeID) } // 获取入参 - 适配切片类型:遍历所有来源节点 var realInput any if len(flowNode.InputSource) > 0 { // 改为判断切片长度 // 遍历所有指定的来源节点,聚合输出结果 for _, inputSource := range flowNode.InputSource { // 遍历切片 if sourceConfig, ok := configMap[inputSource.NodeId]; ok { currentConfig.OutputResult = append(currentConfig.OutputResult, sourceConfig.OutputResult...) } } } // ✅ 封装节点执行入参(配置+表单架构) realInput = &flowDto.NodeExecutionInput{ Config: currentConfig, Global: execInput, // ✅ 把【全部节点】的对象直接塞进来 } // 执行节点 output, err := lambda(ctx, realInput) if err != nil { return nil, err } // ✅ 自动把当前节点ID 加入已执行列表 execInput.ExecutedNodes = append(execInput.ExecutedNodes, nodeID) // 输出存入 FlowNodeConfig if outConfig, ok := output.(*entity.FlowNode); ok { currentConfig.OutputResult = outConfig.OutputResult } // ✅ 关键:返回整个 execInput,让下一个节点继续用! return execInput, nil } } if nodeID == "summary_node" { _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(SummaryLambda))) return } switch code { case "__start__": _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(StartLambda))) case node.NodeTypeTextModel: _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(TextModelLambda))) case node.NodeTypeImageModel: _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(ImageModelLambda))) case node.NodeTypeVideoModel: _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(VideoModelLambda))) case node.NodeTypeAudioModel: _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(AudioModelLambda))) case node.NodeTypeCustomNode: _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(CustomLambda))) case node.NodeTypeForm: _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(FormLambda))) case node.NodeTypeIntent: _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(IntentLambda))) case node.NodeTypeMerge: _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(MergeLambda))) } } // -------------------------------------------------------------------- // ✅【工具方法】找出所有没有出边的节点 → 作为结束节点连接 END // -------------------------------------------------------------------- func findEndNodes(startNodeId string, edges []entity.FlowEdge) []string { // 构建 节点 → 后续节点 的映射 nextMap := make(map[string][]string) for _, e := range edges { nextMap[e.From] = append(nextMap[e.From], e.To) } endNodeSet := make(map[string]struct{}) // 🚀 只从【开始节点】递归遍历(关键修复) findLeafNodes(startNodeId, nextMap, endNodeSet) // 转成数组返回 endNodes := make([]string, 0, len(endNodeSet)) for id := range endNodeSet { endNodes = append(endNodes, id) } return endNodes } // -------------------------------------------------------------------- // ✅ 递归:查找以 nodeId 开头的所有叶子节点 // -------------------------------------------------------------------- func findLeafNodes(nodeId string, nextMap map[string][]string, endNodeSet map[string]struct{}) { nextNodes := nextMap[nodeId] // 🚩 没有下一个节点 = 真实结束节点 if len(nextNodes) == 0 { endNodeSet[nodeId] = struct{}{} return } // 递归继续找下一个 for _, nextId := range nextNodes { findLeafNodes(nextId, nextMap, endNodeSet) } }