Files
ai-agent/workflow/service/flow/flow_execution_service.go

1101 lines
35 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package flow
import (
"ai-agent/workflow/consts/flow"
"ai-agent/workflow/consts/node"
fileDao "ai-agent/workflow/dao/file"
flowDao "ai-agent/workflow/dao/flow"
"ai-agent/workflow/model/dto"
fileDto "ai-agent/workflow/model/dto/file"
flowDto "ai-agent/workflow/model/dto/flow"
"ai-agent/workflow/model/entity"
"context"
"errors"
"fmt"
"os"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
"gitea.com/red-future/common/utils"
"github.com/cloudwego/eino/compose"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
"go.opentelemetry.io/otel/trace"
)
var FlowExecutionService = &flowExecutionService{}
type flowExecutionService struct{}
func (s *flowExecutionService) Get(ctx context.Context, req *flowDto.GetFlowExecutionReq) (res *flowDto.VOFlowExecution, err error) {
r, err := flowDao.FlowExecutionDao.Get(ctx, req)
if err != nil {
return nil, err
}
res = new(flowDto.VOFlowExecution)
res.ImgAddressPrefix, err = utils.GetFileAddressPrefix(ctx)
if err != nil {
return nil, err
}
err = gconv.Struct(r, &res)
return res, err
}
func (s *flowExecutionService) List(ctx context.Context, req *flowDto.ListFlowExecutionReq) (res *flowDto.ListFlowExecutionTreeRes, err error) {
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
req.Creator = user.UserName
list, _, err := flowDao.FlowExecutionDao.List(ctx, req)
if err != nil {
return nil, err
}
// ===================== 核心修复:只统计【有数据】的执行记录,空的直接跳过 =====================
executionNumber := make(map[int64]int) // executionId -> 倒序编号(最新=1
// 第一次遍历:只处理【有输出参数】的记录,统计并分配编号
var validList []*entity.FlowExecution // 只存有效(非空)记录
for _, execution := range list {
if g.IsEmpty(execution.OutputParams) {
continue // 空数据直接过滤,不参与编号、不展示
}
validList = append(validList, execution)
}
// 给有效记录分配【时间倒序编号】(最新=1
totalValid := len(validList)
for idx, execution := range validList {
executionNumber[execution.Id] = totalValid - idx
}
// 2. 分组映射:日期 -> 流程节点
type flowWrap struct {
flowNode flowDto.FlowNode
createdAt *gtime.Time
}
dateMap := make(map[string]*[]flowWrap)
// 遍历【有效数据】构建结构
for _, execution := range validList {
createDate := execution.CreatedAt.Format("Y-m-d")
flowName := execution.FlowName
outputParams := execution.OutputParams
// 编号只算有效数据,不会把空的算进去
num := executionNumber[execution.Id]
displayFlowName := fmt.Sprintf("会话-%d(%s)", num, flowName)
// 3. 解析 outputParams
var tempItems []flowDto.OutputItem
for _, paramMap := range outputParams {
for tsKey, value := range paramMap {
if _, err := strconv.ParseInt(tsKey, 10, 64); err != nil {
continue
}
tempItems = append(tempItems, flowDto.OutputItem{
Timestamp: tsKey,
Content: gconv.String(value),
})
}
}
// ===================== 修复1如果解析后依然为空直接跳过不生成第二层节点 =====================
if len(tempItems) == 0 {
continue
}
// 时间戳正序
sort.Slice(tempItems, func(i, j int) bool {
t1, _ := strconv.ParseInt(tempItems[i].Timestamp, 10, 64)
t2, _ := strconv.ParseInt(tempItems[j].Timestamp, 10, 64)
return t1 < t2
})
// 标号:相同类型递增,不同重置
suffixCount := make(map[string]int)
for idx := range tempItems {
item := &tempItems[idx]
val := item.Content
suffix := "内容"
switch {
case strings.Contains(val, "img") || strings.Contains(val, "png") || strings.Contains(val, "jpg"):
suffix = "图片"
case strings.Contains(val, "html") || strings.Contains(val, "HTML"):
suffix = "HTML"
case strings.Contains(val, "txt") || len(val) > 50:
suffix = "文案"
}
suffixCount[suffix]++
item.Label = fmt.Sprintf("%s_%d", suffix, suffixCount[suffix])
}
// 组装节点
node := flowDto.FlowNode{
FlowName: displayFlowName,
Id: execution.Id,
SessionId: gconv.String(execution.SessionId),
Items: tempItems,
}
if dateMap[createDate] == nil {
dateMap[createDate] = &[]flowWrap{}
}
*dateMap[createDate] = append(*dateMap[createDate], flowWrap{
flowNode: node,
createdAt: execution.CreatedAt,
})
}
// 6. 构建树 + 排序
var tree []flowDto.DateNode
for date, wraps := range dateMap {
// 第二层按创建时间倒序(最新在前)
sort.Slice(*wraps, func(i, j int) bool {
return (*wraps)[i].createdAt.After((*wraps)[j].createdAt)
})
var flowNodes []flowDto.FlowNode
for _, w := range *wraps {
flowNodes = append(flowNodes, w.flowNode)
}
// ===================== 修复2日期下没有流程也过滤掉 =====================
if len(flowNodes) == 0 {
continue
}
tree = append(tree, flowDto.DateNode{
CreateDate: date,
Flows: flowNodes,
})
}
// 第一层日期倒序
sort.Slice(tree, func(i, j int) bool {
return tree[i].CreateDate > tree[j].CreateDate
})
imgPrefix, err := utils.GetFileAddressPrefix(ctx)
return &flowDto.ListFlowExecutionTreeRes{
Tree: tree,
ImgAddressPrefix: imgPrefix,
}, nil
}
// ModelCallback 模型回调接口
func (s *flowExecutionService) ModelCallback(ctx context.Context, req *flowDto.ModelCallbackReq) (err error) {
// 唤醒等待的任务
Notify(req.TaskId, req)
return nil
}
// 全局等待任务回调的工具
var (
asyncMu sync.Mutex
asyncTasks = make(map[string]chan any)
)
// Wait 阻塞等待回调结果
// 调用后会一直卡住,直到 Notify 唤醒 或 超时/取消
func Wait(ctx context.Context, taskId string) (any, error) {
asyncMu.Lock()
ch := make(chan any, 1)
asyncTasks[taskId] = ch
asyncMu.Unlock()
select {
case result := <-ch:
return result, nil
case <-ctx.Done():
asyncMu.Lock()
delete(asyncTasks, taskId)
asyncMu.Unlock()
return nil, ctx.Err()
}
}
// Notify 回调时调用,唤醒等待的任务
func Notify(taskId string, result any) {
asyncMu.Lock()
defer asyncMu.Unlock()
ch, exist := asyncTasks[taskId]
if !exist {
return
}
ch <- result
delete(asyncTasks, taskId)
}
// ===================== 核心改造:替换为 sync.Map 存储取消上下文 =====================
var (
// cancelMap: traceID -> context.CancelFunc
cancelMap sync.Map
)
func (s *flowExecutionService) Cancel(ctx context.Context, req *flowDto.CancelReq) (err error) {
getRes, err := flowDao.FlowExecutionDao.Get(ctx, &flowDto.GetFlowExecutionReq{
SessionId: req.SessionId,
})
if err != nil {
return err
}
if g.IsEmpty(getRes) {
return fmt.Errorf("会话[%s] 不存在", req.SessionId)
}
// 从 sync.Map 获取取消函数
cancelVal, exist := cancelMap.Load(getRes.TraceId)
if !exist {
return fmt.Errorf("traceID[%s] 不存在或已执行完成", getRes.TraceId)
}
// 执行取消
cancel, ok := cancelVal.(context.CancelFunc)
if !ok {
return fmt.Errorf("traceID[%s] 对应的取消函数类型错误", getRes.TraceId)
}
cancel()
// 取消后清理(可选:也可以在流程结束时统一清理)
cancelMap.Delete(getRes.TraceId)
// 同步更新流程执行状态为已取消
_, err = flowDao.FlowExecutionDao.Update(ctx, &flowDto.UpdateFlowExecutionReq{
Id: getRes.Id,
Status: flow.FlowExecutionStatusCancel.Code(),
})
if err != nil {
return fmt.Errorf("更新取消状态失败: %v", err)
}
return nil
}
func (s *flowExecutionService) Execute(ctx context.Context, req *flowDto.ExecuteReq) (res *flowDto.ExecuteRes, err error) {
// ===================== 核心改造1创建可取消的上下文 =====================
execCtx, cancel := context.WithCancel(ctx)
traceId := ""
defer func() {
// 流程结束(成功/失败)时清理 cancelMap
if traceId != "" {
cancelMap.Delete(traceId)
}
cancel()
}()
flowInfo, err := flowDao.FlowExecutionDao.Get(ctx, &flowDto.GetFlowExecutionReq{
SessionId: req.SessionId,
})
if err != nil {
return
}
var executionId int64
var isDialogue bool
if flowInfo == nil {
isDialogue = false
var r = new(flowDto.CreateFlowExecutionReq)
r.FlowUserId = req.FlowId
r.FlowName = req.FlowName
r.TriggerType = flow.FlowExecutionTriggerTypeManual.Code()
r.FlowContent = req.FlowContent
r.NodeInputParams = req.NodeInputParams
r.SessionId = req.SessionId
r.Status = flow.FlowExecutionStatusRunning.Code()
span := trace.SpanFromContext(ctx)
if span != nil && span.SpanContext().HasTraceID() {
r.TraceId = span.SpanContext().TraceID().String()
traceId = r.TraceId
cancelMap.Store(traceId, cancel)
}
executionId, err = flowDao.FlowExecutionDao.Insert(ctx, r)
if err != nil {
return
}
} else {
isDialogue = true
executionId = flowInfo.Id
span := trace.SpanFromContext(ctx)
if span != nil && span.SpanContext().HasTraceID() {
traceId = span.SpanContext().TraceID().String()
cancelMap.Store(traceId, cancel)
}
executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId,
Status: flow.FlowExecutionStatusRunning.Code(),
TraceId: traceId,
}
_, err = flowDao.FlowExecutionDao.Update(ctx, &executionReq)
if err != nil {
return
}
}
if !g.IsEmpty(req.FileUrl) {
createFileTempReq := make([]*fileDto.CreateFileTempReq, 0, len(req.FileUrl))
for _, fileUrl := range req.FileUrl {
var createReq = new(fileDto.CreateFileTempReq)
createReq.BusinessId = req.SessionId
createReq.FileUrl = fileUrl
createFileTempReq = append(createFileTempReq, createReq)
}
_, err = fileDao.FileTempDao.BatchInsert(ctx, createFileTempReq)
if err != nil {
return nil, err
}
}
if isDialogue && !g.IsEmpty(flowInfo) {
// 查询节点中是否包含结果合并节点
var htmlUrl []string
var textNodeId string
var textModelName string
var textModelResponse map[string]any
textResultFrom := make(map[string]any)
var imgNodeId string
var imgModelName string
var imgModelResponse map[string]any
imgResultFrom := make(map[string]any)
for _, item := range flowInfo.NodeInputParams {
if item.NodeCode == node.NodeTypeMerge {
for _, outputParamsItem := range flowInfo.OutputParams {
outputParamsMap := gconv.Map(outputParamsItem)
for _, mapItem := range outputParamsMap {
if strings.HasSuffix(gconv.String(mapItem), ".html") {
htmlUrl = append(htmlUrl, gconv.String(mapItem))
}
}
}
}
if item.NodeCode == node.NodeTypeTextModel {
textNodeId = item.Id
textModelName = item.ModelConfig.ModelName
textModelResponse = item.ModelConfig.ModelResponse
for key, modelFormItem := range item.ModelConfig.ModelForm {
textResultFrom[key] = map[string]any{
"value": modelFormItem,
}
}
}
if item.NodeCode == node.NodeTypeImageModel {
imgNodeId = item.Id
imgModelName = item.ModelConfig.ModelName
imgModelResponse = item.ModelConfig.ModelResponse
for key, modelFormItem := range item.ModelConfig.ModelForm {
imgResultFrom[key] = map[string]any{
"value": modelFormItem,
}
}
}
}
var url string
url, err = utils.GetFileAddressPrefix(ctx)
if err != nil {
return nil, err
}
if strings.HasSuffix(gconv.String(req.ResultUrl), ".md") {
resultUserFrom := make(map[string]any)
resultUserFrom["desc"] = req.Desc
var textNode []node.NodeFormField
textNode, err = TextNode(ctx, textNodeId, req.SessionId, textModelName, req.SkillName, textResultFrom, resultUserFrom, textModelResponse, req.FileUrl)
if err != nil {
return nil, err
}
var textContent []string
var textUrl []string
for _, item := range textNode {
if strings.Contains(item.Field, "text_content") {
textContent = append(textContent, item.Value)
}
if strings.Contains(item.Field, "text_url") {
textUrl = append(textUrl, item.Value)
}
}
}
content := ""
// 第二步 执行目标节点
if content == "text" {
resultUserFrom := make(map[string]any)
resultUserFrom["desc"] = req.Desc
var textNode []node.NodeFormField
textNode, err = TextNode(ctx, textNodeId, req.SessionId, textModelName, req.SkillName, textResultFrom, resultUserFrom, textModelResponse, req.FileUrl)
if err != nil {
return nil, err
}
var htmlTags []string
var textUrl []string
for _, item := range textNode {
if strings.Contains(item.Field, "text_content") {
htmlTags = append(htmlTags, item.Value)
}
if strings.Contains(item.Field, "text_url") {
textUrl = append(textUrl, item.Value)
}
}
var htmlContentUrl []string
if !g.IsEmpty(htmlUrl) {
for i, item := range htmlUrl {
// 获取当前要替换的文本内容
textContent := htmlTags[i]
// 1. 读取 HTML 文件内容
var htmlBytes []byte
htmlBytes, err = os.ReadFile(url + item)
if err != nil {
fmt.Printf("读取文件失败 %s: %v", url+item, err)
continue
}
htmlContent := string(htmlBytes)
// 2. 构建要替换成的新 div 标签
newTextTag := fmt.Sprintf(`<div class="text">%s</div>`, textContent)
re := regexp.MustCompile(`<text>.*?</text>`)
result := re.ReplaceAllString(htmlContent, newTextTag)
fmt.Printf("成功处理文件:%s", result)
// 上传OSS每条独立上传
fileName := fmt.Sprintf("item_%d_%d.html", i, time.Now().UnixMilli())
var ossResult *dto.UploadFileBytesRes
ossResult, err = Upload(ctx, &dto.UploadFileBytesReq{
FileBytes: []byte(result),
FileName: fileName,
})
if err != nil {
return nil, err
}
fmt.Printf("上传OSS成功%s", ossResult.FileURL)
htmlContentUrl = append(htmlContentUrl, ossResult.FileURL)
}
}
var summaryResult []map[string]interface{}
if !g.IsEmpty(textUrl) {
for _, outputParamsItem := range flowInfo.OutputParams {
if !g.IsEmpty(htmlContentUrl) {
if strings.HasSuffix(gconv.String(outputParamsItem), ".html") {
continue
}
}
if strings.HasSuffix(gconv.String(outputParamsItem), ".txt") {
continue
}
timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10)
item := make(map[string]interface{})
item[timeKey] = outputParamsItem
summaryResult = append(summaryResult, item)
}
for _, textItem := range textUrl {
// 生成 毫秒时间戳 作为 KEY
timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10)
item := make(map[string]interface{})
item[timeKey] = textItem
summaryResult = append(summaryResult, item)
}
}
if !g.IsEmpty(htmlContentUrl) {
for _, outputParamsItem := range flowInfo.OutputParams {
if !g.IsEmpty(textUrl) {
if strings.HasSuffix(gconv.String(outputParamsItem), ".txt") {
continue
}
}
if strings.HasSuffix(gconv.String(outputParamsItem), ".html") {
continue
}
timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10)
item := make(map[string]interface{})
item[timeKey] = outputParamsItem
summaryResult = append(summaryResult, item)
}
for _, textItem := range htmlContentUrl {
// 生成 毫秒时间戳 作为 KEY
timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10)
item := make(map[string]interface{})
item[timeKey] = textItem
summaryResult = append(summaryResult, item)
}
}
if !g.IsEmpty(summaryResult) {
executionReq := flowDto.UpdateFlowExecutionReq{
Id: flowInfo.Id,
Status: flow.FlowExecutionStatusSuccess.Code(),
OutputParams: summaryResult,
}
_, err = flowDao.FlowExecutionDao.Update(ctx, &executionReq)
}
} else if content == "img" {
resultUserFrom := make(map[string]any)
resultUserFrom["desc"] = req.Desc
var imgNode []node.NodeFormField
imgNode, err = ImgNode(ctx, imgNodeId, req.SessionId, imgModelName, req.SkillName, imgResultFrom, resultUserFrom, imgModelResponse, req.FileUrl)
if err != nil {
return nil, err
}
var imgCount int
var imgUrl []string
var htmlBuilder strings.Builder
htmlBuilder.WriteString(`<div class="image-group">`)
for _, item := range imgNode {
if strings.Contains(item.Field, "img_url") {
imgCount = imgCount + 1
htmlBuilder.WriteString(fmt.Sprintf(`<img src="%s" alt="图片"/>`, item.Value))
imgUrl = append(imgUrl, item.Value)
}
}
htmlBuilder.WriteString(`</div>`)
var htmlContentUrl []string
if !g.IsEmpty(htmlUrl) && imgCount > 0 {
for i, item := range htmlUrl {
// 1. 读取 HTML 文件内容
var htmlBytes []byte
htmlBytes, err = os.ReadFile(url + item)
if err != nil {
fmt.Printf("读取文件失败 %s: %v", url+item, err)
continue
}
htmlContent := string(htmlBytes)
re := regexp.MustCompile(`<div class="image-group">[\s\S]*?</div>`)
result := re.ReplaceAllString(htmlContent, htmlBuilder.String())
fmt.Printf("成功处理文件:%s", result)
// 上传OSS每条独立上传
fileName := fmt.Sprintf("item_%d_%d.html", i, time.Now().UnixMilli())
var ossResult *dto.UploadFileBytesRes
ossResult, err = Upload(ctx, &dto.UploadFileBytesReq{
FileBytes: []byte(result),
FileName: fileName,
})
if err != nil {
return nil, err
}
fmt.Printf("上传OSS成功%s", ossResult.FileURL)
htmlContentUrl = append(htmlContentUrl, ossResult.FileURL)
}
}
var summaryResult []map[string]interface{}
if !g.IsEmpty(imgCount) {
for _, outputParamsItem := range flowInfo.OutputParams {
if !g.IsEmpty(htmlContentUrl) {
if strings.HasSuffix(gconv.String(outputParamsItem), ".html") {
continue
}
}
if strings.HasSuffix(gconv.String(outputParamsItem), ".png") {
continue
}
timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10)
item := make(map[string]interface{})
item[timeKey] = outputParamsItem
summaryResult = append(summaryResult, item)
}
for _, textItem := range imgUrl {
// 生成 毫秒时间戳 作为 KEY
timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10)
item := make(map[string]interface{})
item[timeKey] = textItem
summaryResult = append(summaryResult, item)
}
}
if !g.IsEmpty(htmlContentUrl) {
for _, outputParamsItem := range flowInfo.OutputParams {
if !g.IsEmpty(imgUrl) {
if strings.HasSuffix(gconv.String(outputParamsItem), ".png") {
continue
}
}
if strings.HasSuffix(gconv.String(outputParamsItem), ".html") {
continue
}
timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10)
item := make(map[string]interface{})
item[timeKey] = outputParamsItem
summaryResult = append(summaryResult, item)
}
for _, textItem := range htmlContentUrl {
// 生成 毫秒时间戳 作为 KEY
timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10)
item := make(map[string]interface{})
item[timeKey] = textItem
summaryResult = append(summaryResult, item)
}
}
if !g.IsEmpty(summaryResult) {
executionReq := flowDto.UpdateFlowExecutionReq{
Id: flowInfo.Id,
Status: flow.FlowExecutionStatusSuccess.Code(),
OutputParams: summaryResult,
}
_, err = flowDao.FlowExecutionDao.Update(ctx, &executionReq)
}
} else if content == "text_img" {
//userFrom := make(map[string]any)
//userFrom["desc"] = req.Desc
//
//var textNode []node.NodeFormField
//textNode, err = TextNode(ctx, textNodeId, req.SessionId, textModelName, req.SkillName, textResultFrom, userFrom, textModelResponse, req.FileUrl)
//if err != nil {
// return nil, err
//}
//
//var htmlTags []string
//var textUrl []string
//for _, item := range textNode {
// if strings.Contains(item.Field, "text_content") {
// htmlTags = append(htmlTags, StripHtmlTags(item.Value, false))
// }
// if strings.Contains(item.Field, "text_url") {
// textUrl = append(textUrl, item.Value)
// }
//}
//
//userFrom["prompt"] = htmlTags
//var imgNode []node.NodeFormField
//imgNode, err = ImgNode(ctx, imgNodeId, req.SessionId, imgModelName, req.SkillName, imgResultFrom, userFrom, imgModelResponse, req.FileUrl)
//if err != nil {
// return nil, err
//}
//var imgCount int
//var imgUrl []string
//var htmlBuilder strings.Builder
//htmlBuilder.WriteString(`<div class="image-group">`)
//for _, item := range imgNode {
// if strings.Contains(item.Field, "img_url") {
// imgCount = imgCount + 1
// htmlBuilder.WriteString(fmt.Sprintf(`<img src="%s" alt="图片"/>`, item.Value))
// imgUrl = append(imgUrl, item.Value)
// }
//}
//htmlBuilder.WriteString(`</div>`)
//
//var htmlContentUrl []string
//if !g.IsEmpty(htmlUrl) && imgCount > 0 {
// for i, item := range htmlUrl {
// // 获取当前要替换的文本内容
// textContent := htmlTags[i]
// // 1. 读取 HTML 文件内容
// var htmlBytes []byte
// htmlBytes, err = os.ReadFile(url + item)
// if err != nil {
// fmt.Printf("读取文件失败 %s: %v", url+item, err)
// continue
// }
// htmlContent := string(htmlBytes)
//
// re := regexp.MustCompile(`<div class="image-group">[\s\S]*?</div>`)
// result := re.ReplaceAllString(htmlContent, htmlBuilder.String())
// // 2. 构建要替换成的新 div 标签
// newTextTag := fmt.Sprintf(`<div class="text">%s</div>`, textContent)
// ret := regexp.MustCompile(`<text>.*?</text>`)
// result = ret.ReplaceAllString(htmlContent, newTextTag)
// fmt.Printf("成功处理文件:%s", result)
//
// // 上传OSS每条独立上传
// fileName := fmt.Sprintf("item_%d_%d.html", i, time.Now().UnixMilli())
// var ossResult *dto.UploadFileBytesRes
// ossResult, err = Upload(ctx, &dto.UploadFileBytesReq{
// FileBytes: []byte(result),
// FileName: fileName,
// })
// if err != nil {
// return nil, err
// }
// fmt.Printf("上传OSS成功%s", ossResult.FileURL)
// htmlContentUrl = append(htmlContentUrl, ossResult.FileURL)
// }
//}
//if !g.IsEmpty(htmlContentUrl) {
// for _, outputParamsItem := range flowInfo.OutputParams {
// if !g.IsEmpty(imgUrl) {
// if strings.HasSuffix(gconv.String(outputParamsItem), ".png") {
// continue
// }
// }
// if strings.HasSuffix(gconv.String(outputParamsItem), ".html") {
// continue
// }
// timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10)
// item := make(map[string]interface{})
// item[timeKey] = outputParamsItem
// summaryResult = append(summaryResult, item)
// }
// for _, textItem := range htmlContentUrl {
// // 生成 毫秒时间戳 作为 KEY
// timeKey := strconv.FormatInt(time.Now().UnixMilli(), 10)
// item := make(map[string]interface{})
// item[timeKey] = textItem
// summaryResult = append(summaryResult, item)
// }
//}
//if !g.IsEmpty(summaryResult) {
// executionReq := flowDto.UpdateFlowExecutionReq{
// Id: flowInfo.Id,
// Status: flow.FlowExecutionStatusSuccess.Code(),
// OutputParams: summaryResult,
// }
// _, err = flowDao.FlowExecutionDao.Update(ctx, &executionReq)
//}
} else {
return nil, fmt.Errorf("意图识别失败")
}
} else {
// =========================================================================
// ✅【第1步】给所有判断节点自动生成意图识别节点
// =========================================================================
judge2IntentNodeMap := make(map[string]string)
finalNodes := make([]entity.FlowNode, 0, len(req.FlowContent.Nodes)*2)
for _, item := range req.FlowContent.Nodes {
finalNodes = append(finalNodes, item)
// 判断节点自动加 intent 节点
if item.NodeCode == node.NodeTypeJudge {
intentNodeID := fmt.Sprintf("intent_%s", item.Id)
intentNode := entity.FlowNode{
Id: intentNodeID,
NodeCode: node.NodeTypeIntent,
Name: fmt.Sprintf("意图识别-%s", item.Name),
InputSource: item.InputSource, // ✅ 正确赋值
FormConfig: item.FormConfig, // ✅ 用户配置
ModelConfig: item.ModelConfig, // ✅ 系统配置
}
finalNodes = append(finalNodes, intentNode)
judge2IntentNodeMap[item.Id] = intentNodeID
}
}
summaryNodeID := "summary_node"
summaryNode := entity.FlowNode{
Id: summaryNodeID,
NodeCode: node.NodeTypeCustomNode, // 复用自定义节点类型,也可新增专属类型
Name: "结果汇总节点",
InputSource: []entity.FlowNodeInputSource{}, // 后续自动聚合所有节点输出
FormConfig: nil,
ModelConfig: node.ModelItem{},
}
finalNodes = append(finalNodes, summaryNode)
// 替换节点列表
req.FlowContent.Nodes = finalNodes
// =========================================================================
// ✅【第2步】构建执行图
// =========================================================================
var runGraph compose.Runnable[any, any]
runGraph, err = BuildGraphFromFlowContent(execCtx, req.FlowContent, judge2IntentNodeMap, summaryNodeID)
if err != nil {
executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId,
Status: flow.FlowExecutionStatusFailed.Code(),
ErrorMessage: err.Error(),
}
_, err1 := flowDao.FlowExecutionDao.Update(ctx, &executionReq)
if err1 != nil {
return
}
return nil, fmt.Errorf("执行工作流失败: %v", err)
}
// =========================================================================
// ✅【第3步】构建 ConfigMap
// =========================================================================
configMap := make(map[string]*entity.FlowNode)
for _, cfg := range req.NodeInputParams {
configMap[cfg.Id] = cfg
}
// 自动给意图节点复制配置
for judgeID, intentID := range judge2IntentNodeMap {
if cfg, ok := configMap[judgeID]; ok {
configMap[intentID] = cfg
}
}
// 初始化汇总节点配置
configMap[summaryNodeID] = &summaryNode
// =========================================================================
// ✅【第4步】构建全局执行入参现在 schemaMap 是有值的!)
// =========================================================================
execInput := &flowDto.FlowExecutionInput{
IsDialogue: isDialogue,
ExecutionId: executionId,
ConfigMap: configMap,
SessionId: req.SessionId,
Desc: req.Desc,
SkillName: req.SkillName,
FileUrl: req.FileUrl,
}
// 执行工作流
_, err = runGraph.Invoke(execCtx, execInput)
if err != nil {
// 检测是否是取消导致的错误
if errors.Is(execCtx.Err(), context.Canceled) {
executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId,
Status: flow.FlowExecutionStatusCancel.Code(),
}
_, _ = flowDao.FlowExecutionDao.Update(ctx, &executionReq)
return nil, fmt.Errorf("工作流已被取消: %v", err)
}
executionReq := flowDto.UpdateFlowExecutionReq{
Id: executionId,
Status: flow.FlowExecutionStatusFailed.Code(),
ErrorMessage: err.Error(),
}
_, err1 := flowDao.FlowExecutionDao.Update(ctx, &executionReq)
if err1 != nil {
return
}
return nil, fmt.Errorf("执行工作流失败: %v", err)
}
}
return
}
// BuildGraphFromFlowContent 根据前端保存的工作流JSON自动构建执行图
func BuildGraphFromFlowContent(ctx context.Context, flowContent *entity.FlowInfo, judge2IntentNodeMap map[string]string, summaryNodeID string) (compose.Runnable[any, any], error) {
graph := compose.NewGraph[any, any]()
nodeMap := make(map[string]entity.FlowNode)
// 注册所有节点
for _, item := range flowContent.Nodes {
nodeMap[item.Id] = item
if item.NodeCode != node.NodeTypeJudge {
registerNodeToGraph(graph, item)
}
}
// 构建边关系
upstreamMap := make(map[string][]string)
edgeMap := make(map[string][]entity.FlowEdge)
for _, edge := range flowContent.Edges {
edgeMap[edge.From] = append(edgeMap[edge.From], edge)
upstreamMap[edge.To] = append(upstreamMap[edge.To], edge.From)
}
// 处理连线 & 分支
for fromNodeID, edges := range edgeMap {
fromNode := nodeMap[fromNodeID]
// --------------------------
// 判断节点 → 分支处理
// --------------------------
if fromNode.NodeCode == node.NodeTypeJudge {
intentNodeID, ok := judge2IntentNodeMap[fromNodeID]
if !ok {
return nil, fmt.Errorf("判断节点[%s]未生成意图节点", fromNodeID)
}
branchMap := make(map[string]bool)
for _, e := range edges {
branchMap[e.To] = true
}
judgeLambda := func(ctx context.Context, input any) (string, error) {
execInput, ok := input.(*flowDto.FlowExecutionInput)
if !ok {
return "", fmt.Errorf("入参类型错误")
}
currentConfig := execInput.ConfigMap[fromNodeID]
if currentConfig == nil {
return "", fmt.Errorf("判断节点%s无配置", fromNodeID)
}
branchIdNameMap := make(map[string]string)
var branchIDs []string
for nodeID := range branchMap {
branchIDs = append(branchIDs, nodeID)
// 从configMap获取分支节点的名称
if branchNodeCfg, ok := execInput.ConfigMap[nodeID]; ok {
branchIdNameMap[nodeID] = branchNodeCfg.Name
} else {
branchIdNameMap[nodeID] = "未命名节点" // 兜底
}
}
// 把分支ID-名称映射塞进 ModelConfig带给意图节点
m := make(map[string]interface{})
m["branch_ids"] = branchIDs
m["branch_id_name_map"] = branchIdNameMap // 传递ID-名称映射
currentConfig.Config = m
// 从意图节点取输出
if intentCfg, ok := execInput.ConfigMap[intentNodeID]; ok {
currentConfig.OutputResult = intentCfg.OutputResult
}
// 关键修改:构造 NodeExecutionInput 传入 JudgeLambda
nodeExecInput := &flowDto.NodeExecutionInput{
Config: currentConfig, // 当前判断节点配置
Global: execInput, // 全局执行入参
}
return JudgeLambda(ctx, nodeExecInput) // 传入 NodeExecutionInput 类型
}
_ = graph.AddBranch(intentNodeID, compose.NewGraphBranch(judgeLambda, branchMap))
continue
}
// --------------------------
// 普通节点连线
// --------------------------
for _, e := range edges {
toNode := nodeMap[e.To]
if toNode.NodeCode == node.NodeTypeJudge {
_ = graph.AddEdge(e.From, fmt.Sprintf("intent_%s", toNode.Id))
continue
}
_ = graph.AddEdge(e.From, e.To)
}
}
// ==================== 第四步:处理开始/结束节点 ====================
if flowContent.StartNodeId != "" {
_ = graph.AddEdge(compose.START, flowContent.StartNodeId)
}
originalEndNodes := findEndNodes(flowContent.StartNodeId, flowContent.Edges)
for _, endID := range originalEndNodes {
_ = graph.AddEdge(endID, summaryNodeID)
}
_ = graph.AddEdge(summaryNodeID, compose.END)
return graph.Compile(ctx, compose.WithGraphName("auto_build_workflow"))
}
// -------------------------- 节点自动注册器(核心分发) --------------------------
func registerNodeToGraph(graph *compose.Graph[any, any], flowNode entity.FlowNode) {
nodeID := flowNode.Id
code := flowNode.NodeCode
// 通用包装:全程入参都是 *FlowExecutionInput
wrapLambda := func(lambda func(ctx context.Context, input any) (any, error)) func(ctx context.Context, input any) (any, error) {
return func(ctx context.Context, input any) (any, error) {
// ✅ 【关键】全程入参类型永远不变
execInput, ok := input.(*flowDto.FlowExecutionInput)
if !ok {
return nil, fmt.Errorf("入参必须是 *FlowExecutionInput, 实际是 %T", input)
}
configMap := execInput.ConfigMap
currentConfig := configMap[nodeID]
if currentConfig == nil {
return nil, fmt.Errorf("节点%s无配置", nodeID)
}
// 获取入参 - 适配切片类型:遍历所有来源节点
var realInput any
if len(flowNode.InputSource) > 0 { // 改为判断切片长度
// 遍历所有指定的来源节点,聚合输出结果
for _, inputSource := range flowNode.InputSource { // 遍历切片
if sourceConfig, ok := configMap[inputSource.NodeId]; ok {
currentConfig.OutputResult = append(currentConfig.OutputResult, sourceConfig.OutputResult...)
}
}
}
// ✅ 封装节点执行入参(配置+表单架构)
realInput = &flowDto.NodeExecutionInput{
Config: currentConfig,
Global: execInput, // ✅ 把【全部节点】的对象直接塞进来
}
// 执行节点
output, err := lambda(ctx, realInput)
if err != nil {
return nil, err
}
// ✅ 自动把当前节点ID 加入已执行列表
execInput.ExecutedNodes = append(execInput.ExecutedNodes, nodeID)
// 输出存入 FlowNodeConfig
if outConfig, ok := output.(*entity.FlowNode); ok {
currentConfig.OutputResult = outConfig.OutputResult
}
// ✅ 关键:返回整个 execInput让下一个节点继续用
return execInput, nil
}
}
if nodeID == "summary_node" {
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(SummaryLambda)))
return
}
switch code {
case "__start__":
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(StartLambda)))
case node.NodeTypeTextModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(TextModelLambda)))
case node.NodeTypeImageModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(ImageModelLambda)))
case node.NodeTypeVideoModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(VideoModelLambda)))
case node.NodeTypeAudioModel:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(AudioModelLambda)))
case node.NodeTypeCustomNode:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(CustomLambda)))
case node.NodeTypeForm:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(FormLambda)))
case node.NodeTypeIntent:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(IntentLambda)))
case node.NodeTypeMerge:
_ = graph.AddLambdaNode(nodeID, compose.InvokableLambda(wrapLambda(MergeLambda)))
}
}
// --------------------------------------------------------------------
// ✅【工具方法】找出所有没有出边的节点 → 作为结束节点连接 END
// --------------------------------------------------------------------
func findEndNodes(startNodeId string, edges []entity.FlowEdge) []string {
// 构建 节点 → 后续节点 的映射
nextMap := make(map[string][]string)
for _, e := range edges {
nextMap[e.From] = append(nextMap[e.From], e.To)
}
endNodeSet := make(map[string]struct{})
// 🚀 只从【开始节点】递归遍历(关键修复)
findLeafNodes(startNodeId, nextMap, endNodeSet)
// 转成数组返回
endNodes := make([]string, 0, len(endNodeSet))
for id := range endNodeSet {
endNodes = append(endNodes, id)
}
return endNodes
}
// --------------------------------------------------------------------
// ✅ 递归:查找以 nodeId 开头的所有叶子节点
// --------------------------------------------------------------------
func findLeafNodes(nodeId string, nextMap map[string][]string, endNodeSet map[string]struct{}) {
nextNodes := nextMap[nodeId]
// 🚩 没有下一个节点 = 真实结束节点
if len(nextNodes) == 0 {
endNodeSet[nodeId] = struct{}{}
return
}
// 递归继续找下一个
for _, nextId := range nextNodes {
findLeafNodes(nextId, nextMap, endNodeSet)
}
}