refactor: 重构工作流执行图构建与节点上下文处理
This commit is contained in:
@@ -363,49 +363,12 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
|
||||
}
|
||||
return nil, errors.New("文件格式不支持")
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// ✅【第1步】给所有判断节点自动生成意图识别节点
|
||||
// =========================================================================
|
||||
judge2IntentNodeMap := make(map[string]string)
|
||||
finalNodes := make([]entity.FlowNode, 0, len(req.FlowContent.Nodes)*2)
|
||||
for _, item := range req.FlowContent.Nodes {
|
||||
finalNodes = append(finalNodes, item)
|
||||
// 判断节点自动加 intent 节点
|
||||
if item.NodeCode == node.NodeTypeJudge {
|
||||
intentNodeID := fmt.Sprintf("intent_%s", item.Id)
|
||||
intentNode := entity.FlowNode{
|
||||
Id: intentNodeID,
|
||||
NodeCode: node.NodeTypeIntent,
|
||||
Name: fmt.Sprintf("意图识别-%s", item.Name),
|
||||
InputSource: item.InputSource, // ✅ 正确赋值
|
||||
FormConfig: item.FormConfig, // ✅ 用户配置
|
||||
ModelConfig: item.ModelConfig, // ✅ 系统配置
|
||||
}
|
||||
finalNodes = append(finalNodes, intentNode)
|
||||
judge2IntentNodeMap[item.Id] = intentNodeID
|
||||
}
|
||||
}
|
||||
|
||||
summaryNodeID := "summary_node"
|
||||
summaryNode := entity.FlowNode{
|
||||
Id: summaryNodeID,
|
||||
NodeCode: node.NodeTypeCustomNode, // 复用自定义节点类型,也可新增专属类型
|
||||
Name: "结果汇总节点",
|
||||
InputSource: []entity.FlowNodeInputSource{}, // 后续自动聚合所有节点输出
|
||||
FormConfig: nil,
|
||||
ModelConfig: node.ModelItem{},
|
||||
}
|
||||
finalNodes = append(finalNodes, summaryNode)
|
||||
|
||||
// 替换节点列表
|
||||
req.FlowContent.Nodes = finalNodes
|
||||
|
||||
// =========================================================================
|
||||
// ✅【第2步】构建执行图
|
||||
// =========================================================================
|
||||
var nodeList []entity.FlowNode
|
||||
var runGraph compose.Runnable[any, any]
|
||||
runGraph, err = BuildGraphFromFlowContent(execCtx, req.FlowContent, judge2IntentNodeMap, summaryNodeID)
|
||||
nodeList, runGraph, err = BuildGraphFromFlowContent(execCtx, req.FlowContent)
|
||||
if err != nil {
|
||||
executionReq := flowDto.UpdateFlowExecutionReq{
|
||||
Id: executionId,
|
||||
@@ -418,7 +381,6 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
|
||||
}
|
||||
return nil, fmt.Errorf("执行工作流失败: %v", err)
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// ✅【第3步】构建 ConfigMap
|
||||
// =========================================================================
|
||||
@@ -426,15 +388,9 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
|
||||
for _, cfg := range req.NodeInputParams {
|
||||
configMap[cfg.Id] = cfg
|
||||
}
|
||||
// 自动给意图节点复制配置
|
||||
for judgeID, intentID := range judge2IntentNodeMap {
|
||||
if cfg, ok := configMap[judgeID]; ok {
|
||||
configMap[intentID] = cfg
|
||||
}
|
||||
for _, i := range nodeList {
|
||||
configMap[i.Id] = &i
|
||||
}
|
||||
// 初始化汇总节点配置
|
||||
configMap[summaryNodeID] = &summaryNode
|
||||
|
||||
// =========================================================================
|
||||
// ✅【第4步】构建全局执行入参(现在 schemaMap 是有值的!)
|
||||
// =========================================================================
|
||||
@@ -475,7 +431,7 @@ func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.Execute
|
||||
}
|
||||
|
||||
// BuildGraphFromFlowContent 根据前端保存的工作流JSON,自动构建执行图
|
||||
func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo, judge2IntentNodeMap map[string]string, summaryNodeID string) (compose.Runnable[any, any], error) {
|
||||
func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo) ([]entity.FlowNode, compose.Runnable[any, any], error) {
|
||||
// 注册自定义合并函数:处理 *flowDto.FlowExecutionInput 类型合并
|
||||
// 由于 ConfigMap 是 map 引用类型,所有并行分支修改已经写入共享内存
|
||||
// 直接返回第一个实例即可,所有修改都已经可见
|
||||
@@ -488,9 +444,26 @@ func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo
|
||||
})
|
||||
|
||||
graph := compose.NewGraph[any, any]()
|
||||
nodeMap := make(map[string]entity.FlowNode)
|
||||
|
||||
var nodeList []entity.FlowNode
|
||||
nodeId := uuid.NewString()
|
||||
originalEndNodes := findEndNodes(flowContent.StartNodeId, flowContent.Edges)
|
||||
for i := range originalEndNodes {
|
||||
sprintf := fmt.Sprintf("%v_%d", nodeId, i)
|
||||
summaryNode := entity.FlowNode{
|
||||
Id: sprintf,
|
||||
NodeCode: node.NodeTypeSystemSum,
|
||||
Name: node.NodeNameSystemSum,
|
||||
InputSource: []entity.FlowNodeInputSource{}, // 后续自动聚合所有节点输出
|
||||
FormConfig: nil,
|
||||
ModelConfig: node.ModelItem{},
|
||||
}
|
||||
nodeList = append(nodeList, summaryNode)
|
||||
flowContent.Nodes = append(flowContent.Nodes, summaryNode)
|
||||
}
|
||||
|
||||
// 注册所有节点
|
||||
nodeMap := make(map[string]entity.FlowNode)
|
||||
for _, item := range flowContent.Nodes {
|
||||
nodeMap[item.Id] = item
|
||||
if item.NodeCode != node.NodeTypeJudge {
|
||||
@@ -498,6 +471,16 @@ func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo
|
||||
}
|
||||
}
|
||||
|
||||
// 注册所有边
|
||||
if flowContent.StartNodeId != "" {
|
||||
_ = graph.AddEdge(compose.START, flowContent.StartNodeId)
|
||||
}
|
||||
for i, endID := range originalEndNodes {
|
||||
sprintf := fmt.Sprintf("%v_%d", nodeId, i)
|
||||
_ = graph.AddEdge(endID, sprintf)
|
||||
_ = graph.AddEdge(sprintf, compose.END)
|
||||
}
|
||||
|
||||
// 构建边关系
|
||||
upstreamMap := make(map[string][]string)
|
||||
edgeMap := make(map[string][]entity.FlowEdge)
|
||||
@@ -510,15 +493,8 @@ func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo
|
||||
for fromNodeID, edges := range edgeMap {
|
||||
fromNode := nodeMap[fromNodeID]
|
||||
|
||||
// --------------------------
|
||||
// 判断节点 → 分支处理
|
||||
// --------------------------
|
||||
if fromNode.NodeCode == node.NodeTypeJudge {
|
||||
intentNodeID, ok := judge2IntentNodeMap[fromNodeID]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("判断节点[%s]未生成意图节点", fromNodeID)
|
||||
}
|
||||
|
||||
branchMap := make(map[string]bool)
|
||||
for _, e := range edges {
|
||||
branchMap[e.To] = true
|
||||
@@ -553,11 +529,6 @@ func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo
|
||||
m["branch_id_name_map"] = branchIdNameMap // 传递ID-名称映射
|
||||
currentConfig.Config = m
|
||||
|
||||
// 从意图节点取输出
|
||||
if intentCfg, ok := execInput.ConfigMap[intentNodeID]; ok {
|
||||
currentConfig.OutputResult = intentCfg.OutputResult
|
||||
}
|
||||
|
||||
// 关键修改:构造 NodeExecutionInput 传入 JudgeLambda
|
||||
nodeExecInput := &flowDto.NodeExecutionInput{
|
||||
Config: currentConfig, // 当前判断节点配置
|
||||
@@ -566,34 +537,21 @@ func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo
|
||||
return JudgeLambda(ctx, nodeExecInput) // 传入 NodeExecutionInput 类型
|
||||
}
|
||||
|
||||
_ = graph.AddBranch(intentNodeID, compose.NewGraphBranch(judgeLambda, branchMap))
|
||||
_ = graph.AddBranch(upstreamMap[fromNodeID][0], compose.NewGraphBranch(judgeLambda, branchMap))
|
||||
continue
|
||||
}
|
||||
|
||||
// --------------------------
|
||||
// 普通节点连线
|
||||
// --------------------------
|
||||
for _, e := range edges {
|
||||
toNode := nodeMap[e.To]
|
||||
if toNode.NodeCode == node.NodeTypeJudge {
|
||||
_ = graph.AddEdge(e.From, fmt.Sprintf("intent_%s", toNode.Id))
|
||||
continue
|
||||
}
|
||||
_ = graph.AddEdge(e.From, e.To)
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== 第四步:处理开始/结束节点 ====================
|
||||
if flowContent.StartNodeId != "" {
|
||||
_ = graph.AddEdge(compose.START, flowContent.StartNodeId)
|
||||
}
|
||||
originalEndNodes := findEndNodes(flowContent.StartNodeId, flowContent.Edges)
|
||||
for _, endID := range originalEndNodes {
|
||||
_ = graph.AddEdge(endID, summaryNodeID)
|
||||
}
|
||||
_ = graph.AddEdge(summaryNodeID, compose.END)
|
||||
|
||||
return graph.Compile(ctx, compose.WithGraphName("auto_build_workflow"), compose.WithNodeTriggerMode(compose.AllPredecessor))
|
||||
compile, err := graph.Compile(ctx, compose.WithGraphName("auto_build_workflow"))
|
||||
return nodeList, compile, err
|
||||
}
|
||||
|
||||
// -------------------------- 节点自动注册器(核心分发) --------------------------
|
||||
@@ -703,13 +661,11 @@ func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNod
|
||||
return execInput, nil
|
||||
}
|
||||
}
|
||||
if nodeID == "summary_node" {
|
||||
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(SummaryLambda)))
|
||||
return
|
||||
}
|
||||
switch code {
|
||||
case "__start__":
|
||||
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(StartLambda)))
|
||||
case node.NodeTypeSystemSum:
|
||||
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(SummaryLambda)))
|
||||
case node.NodeTypeTextModel:
|
||||
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(TextModelLambda)))
|
||||
case node.NodeTypeImageModel:
|
||||
@@ -722,12 +678,6 @@ func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNod
|
||||
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(BatchModelLambda)))
|
||||
case node.NodeTypeDataConversionModel:
|
||||
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(DataConversionLambda)))
|
||||
//case node.NodeTypeSenseOptimizeModel:
|
||||
// _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(SenseOptimizeModelLambda)))
|
||||
//case node.NodeTypeStoryOptimizeModel:
|
||||
// _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(StoryOptimizeModelLambda)))
|
||||
//case node.NodeTypeScriptOptimizeModel:
|
||||
// _ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(ScriptOptimizeModelLambda)))
|
||||
case node.NodeTypeCustomNode:
|
||||
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(CustomLambda)))
|
||||
case node.NodeTypeForm:
|
||||
@@ -737,49 +687,42 @@ func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNod
|
||||
case node.NodeTypeMerge:
|
||||
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(MergeLambda)))
|
||||
case node.NodeTypeDataMerge:
|
||||
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(DataMergeLambda)))
|
||||
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(DataMergeLambda)), compose.WithGraphCompileOptions(compose.WithNodeTriggerMode(compose.AllPredecessor)))
|
||||
case node.NodeTypeHttp:
|
||||
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(HttpLambda)))
|
||||
}
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
// ✅【工具方法】找出所有没有出边的节点 → 作为结束节点连接 END
|
||||
// --------------------------------------------------------------------
|
||||
func findEndNodes(startNodeId string, edges []entity.FlowEdge) []string {
|
||||
// 构建 节点 → 后续节点 的映射
|
||||
nextMap := make(map[string][]string)
|
||||
for _, e := range edges {
|
||||
nextMap[e.From] = append(nextMap[e.From], e.To)
|
||||
}
|
||||
|
||||
endNodeSet := make(map[string]struct{})
|
||||
visited := make(map[string]struct{})
|
||||
queue := []string{startNodeId}
|
||||
|
||||
// 🚀 只从【开始节点】递归遍历(关键修复)
|
||||
findLeafNodes(startNodeId, nextMap, endNodeSet)
|
||||
for len(queue) > 0 {
|
||||
node := queue[0]
|
||||
queue = queue[1:]
|
||||
|
||||
// 转成数组返回
|
||||
endNodes := make([]string, 0, len(endNodeSet))
|
||||
for id := range endNodeSet {
|
||||
endNodes = append(endNodes, id)
|
||||
}
|
||||
return endNodes
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
// ✅ 递归:查找以 nodeId 开头的所有叶子节点
|
||||
// --------------------------------------------------------------------
|
||||
func findLeafNodes(nodeId string, nextMap map[string][]string, endNodeSet map[string]struct{}) {
|
||||
nextNodes := nextMap[nodeId]
|
||||
|
||||
// 🚩 没有下一个节点 = 真实结束节点
|
||||
if len(nextNodes) == 0 {
|
||||
endNodeSet[nodeId] = struct{}{}
|
||||
return
|
||||
}
|
||||
|
||||
// 递归继续找下一个
|
||||
for _, nextId := range nextNodes {
|
||||
findLeafNodes(nextId, nextMap, endNodeSet)
|
||||
if _, exist := visited[node]; exist {
|
||||
continue
|
||||
}
|
||||
visited[node] = struct{}{}
|
||||
|
||||
nextList := nextMap[node]
|
||||
if len(nextList) == 0 {
|
||||
endNodeSet[node] = struct{}{}
|
||||
continue
|
||||
}
|
||||
queue = append(queue, nextList...)
|
||||
}
|
||||
|
||||
res := make([]string, 0, len(endNodeSet))
|
||||
for k := range endNodeSet {
|
||||
res = append(res, k)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user