From 9a40fd7e1e873fb27514a0b89edfabdcf718e77f Mon Sep 17 00:00:00 2001 From: lmk <1095689763@qq.com> Date: Thu, 21 May 2026 20:56:30 +0800 Subject: [PATCH] =?UTF-8?q?=E7=94=9F=E6=88=90=E8=A7=86=E9=A2=91=E5=B9=B6?= =?UTF-8?q?=E4=B8=94=E4=B8=8A=E4=BC=A0=E5=88=B0minio?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- model/dto/audio/task_dto.go | 12 +++------ service/asr/task_service.go | 26 ++++++++++--------- service/video/concat_service.go | 44 ++++++++++++++++++++++++++------- 3 files changed, 52 insertions(+), 30 deletions(-) diff --git a/model/dto/audio/task_dto.go b/model/dto/audio/task_dto.go index d88d2e5..f0c27d5 100644 --- a/model/dto/audio/task_dto.go +++ b/model/dto/audio/task_dto.go @@ -93,16 +93,10 @@ type ListTaskRes struct { // ---------- 回调通知结构 ---------- -// CallbackPayload 回调通知内容 +// CallbackPayload 回调通知内容(与 GetTaskRes 出参一致) type CallbackPayload struct { - TaskID string `json:"taskId" dc:"任务ID"` - Status string `json:"status" dc:"任务状态"` - TotalFiles int `json:"totalFiles" dc:"文件总数"` - SuccessFiles int `json:"successFiles" dc:"成功文件数"` - FailFiles int `json:"failFiles" dc:"失败文件数"` - Result string `json:"result,omitempty" dc:"完整的处理结果JSON"` - ErrorMessage string `json:"errorMessage,omitempty" dc:"错误信息"` - DetailList []TranscribeTaskDetailItem `json:"detailList" dc:"明细列表"` + TaskInfo TranscribeTaskItem `json:"taskInfo" dc:"任务信息"` + DetailList []TranscribeTaskDetailItem `json:"detailList" dc:"明细列表(每视频一条)"` } // ---------- 任务处理结果结构(用于result JSONB) ---------- diff --git a/service/asr/task_service.go b/service/asr/task_service.go index b379768..17ae693 100644 --- a/service/asr/task_service.go +++ b/service/asr/task_service.go @@ -196,7 +196,7 @@ func (s *audioTaskService) processTask(taskID string, urls []string, model, lang g.Log().Infof(ctx, "[任务 %s] 全部处理流程结束", taskID) } -// callback 向回调地址 POST 任务结果 +// callback 向回调地址 POST 任务结果(与查询接口 GetTaskRes 出参一致) func (s *audioTaskService) callback(ctx context.Context, taskID, status, errMsg, callbackURL string) { if callbackURL == "" { return @@ -214,27 +214,29 @@ func (s *audioTaskService) callback(ctx context.Context, taskID, status, errMsg, detailItems = append(detailItems, dao.DetailEntityToItem(&detailList[i])) } + // 构建与查询接口一致的 taskInfo + taskInfo := dao.EntityToItem(task) + + // 与查询接口一致:从 result 中补全 scenes 等字段 + detailItems = enrichDetailsFromResult(task.Result, detailItems) + payload := dto.CallbackPayload{ - TaskID: taskID, - Status: status, - TotalFiles: task.TotalFiles, - SuccessFiles: task.SuccessFiles, - FailFiles: task.FailFiles, - ErrorMessage: errMsg, - Result: task.Result, - DetailList: detailItems, + TaskInfo: taskInfo, + DetailList: detailItems, } body, _ := json.Marshal(payload) g.Log().Infof(ctx, "[回调 %s] 触发回调, 状态=%s, 成功=%d 失败=%d, 错误=%s, 目标=%s", - taskID, status, payload.SuccessFiles, payload.FailFiles, errMsg, callbackURL) + taskID, taskInfo.Status, taskInfo.SuccessFiles, taskInfo.FailFiles, errMsg, callbackURL) g.Log().Debugf(ctx, "[回调 %s] 回调载荷长度=%d字节, 明细条数=%d", taskID, len(body), len(detailItems)) + // 透传调用方的用户信息,供回调方 GetUserInfo 从 X-User-Info 头获取 + userJSON, _ := json.Marshal(beans.User{UserName: "admin", TenantId: 1}) + g.Log().Infof(ctx, "[回调 %s] curl -X POST '%s' -H 'Content-Type: application/json' -H 'X-User-Info: %s' -d '%s'", + taskID, callbackURL, string(userJSON), strings.ReplaceAll(string(body), "'", "'\\''")) req, _ := http.NewRequest("POST", callbackURL, bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") - // 透传调用方的用户信息,供回调方 GetUserInfo 从 X-User-Info 头获取 - userJSON, _ := json.Marshal(beans.User{UserName: "admin", TenantId: 1}) req.Header.Set("X-User-Info", string(userJSON)) resp, reqErr := http.DefaultClient.Do(req) diff --git a/service/video/concat_service.go b/service/video/concat_service.go index e8aaa33..11622c8 100644 --- a/service/video/concat_service.go +++ b/service/video/concat_service.go @@ -7,11 +7,14 @@ import ( "fmt" "io" "mime/multipart" + "net/http" "os" "os/exec" "path/filepath" "strings" + "time" + "gitea.com/red-future/common/beans" commonHttp "gitea.com/red-future/common/http" "github.com/gogf/gf/v2/frame/g" @@ -64,7 +67,16 @@ func (s *concatService) Concat(ctx context.Context, req *ConcatReq) (res *Concat outputPath := req.OutputPath if outputPath == "" { outputDir := filepath.Dir(req.VideoPaths[0]) - outputPath = filepath.Join(outputDir, "concat_output.mp4") + // 用第一个输入文件名 + 拼接数 + 时间戳,溯源更清晰 + baseName := filepath.Base(req.VideoPaths[0]) + ext := filepath.Ext(baseName) + stem := strings.TrimSuffix(baseName, ext) + stemRunes := []rune(stem) + if len(stemRunes) > 20 { + stemRunes = stemRunes[:20] + } + outputPath = filepath.Join(outputDir, + fmt.Sprintf("concat_%s_x%d_%s%s", string(stemRunes), len(req.VideoPaths), time.Now().Format("150405"), ext)) } method := req.Method @@ -304,9 +316,9 @@ type uploadFileRes struct { FileAddressPrefix string `json:"fileAddressPrefix"` } -// UploadToMinIO 通过 OSS 微服务的 multipart 文件上传接口上传到 MinIO +// UploadToMinIO 通过 OSS 微服务的 uploadFile 接口上传到 MinIO(multipart/form-data) func (s *concatService) UploadToMinIO(ctx context.Context, localFilePath string) (*uploadFileRes, error) { - // 手动构建 multipart/form-data 表单 + // 构建 multipart/form-data 表单 var buf bytes.Buffer mw := multipart.NewWriter(&buf) @@ -325,25 +337,39 @@ func (s *concatService) UploadToMinIO(ctx context.Context, localFilePath string) } mw.Close() + // 使用 commonHttp 的客户端(含 Consul 服务发现),大文件上传设置长超时 client := commonHttp.Httpclient.Clone() + // 必须单独设置 Transport.ResponseHeaderTimeout,SetTimeout 只设 Client.Timeout + newTransport := http.DefaultTransport.(*http.Transport).Clone() + newTransport.ResponseHeaderTimeout = 5 * time.Minute + client.Transport = newTransport + client.SetTimeout(10 * time.Minute) // 透传认证 headers + hasAuthHeader := false if r := g.RequestFromCtx(ctx); r != nil { for k, v := range r.Header { client.SetHeader(k, v[0]) + if strings.EqualFold(k, "Authorization") || strings.EqualFold(k, "X-User-Info") { + hasAuthHeader = true + } } } + // 原始请求无认证信息时,注入默认用户上下文 + if !hasAuthHeader { + userJSON, _ := json.Marshal(beans.User{UserName: "admin", TenantId: 1}) + client.SetHeader("X-User-Info", string(userJSON)) + } + // 设置 multipart Content-Type(含 boundary) contentType := mw.FormDataContentType() - g.Log().Debugf(ctx, "[UploadToMinIO] Content-Type: %s", contentType) client.SetHeader("Content-Type", contentType) - // 打印请求信息 - postBytes := buf.Bytes() - g.Log().Debugf(ctx, "[UploadToMinIO] 请求URL: oss/file/uploadFile, 文件: %s, Body大小: %d bytes, Boundary: %s", - localFilePath, len(postBytes), mw.Boundary()) + g.Log().Debugf(ctx, "[UploadToMinIO] 请求URL: oss/file/uploadFile, 文件: %s, Body大小: %d bytes", + localFilePath, buf.Len()) - response, err := client.Post(ctx, "oss/file/uploadFile", postBytes) + // 发送 multipart 请求(原始字节流) + response, err := client.Post(ctx, "oss/file/uploadFile", buf.Bytes()) if err != nil { glog.Error(ctx, err) return nil, fmt.Errorf("调用OSS上传服务失败: %v", err)