diff --git a/controller/audio/audio_extract_controller.go b/controller/audio/audio_extract_controller.go index 0a3aaf3..00fb865 100644 --- a/controller/audio/audio_extract_controller.go +++ b/controller/audio/audio_extract_controller.go @@ -8,6 +8,7 @@ import ( service "media/service/asr" "gitea.com/red-future/common/beans" + "gitea.com/red-future/common/utils" "github.com/gogf/gf/v2/frame/g" ) @@ -56,10 +57,23 @@ func (c *audio) ListTasks(ctx context.Context, req *dto.ListTaskReq) (res *dto.L return service.AudioTask.ListTasks(ctx, req) } -// withUser 为 context 注入默认用户(无认证基础设施时使用) +// withUser 优先从请求头/X-User-Info/Token 提取用户信息,没有则用默认 admin func withUser(ctx context.Context) context.Context { - if ctx.Value("user") == nil { - ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1}) + if ctx.Value("user") != nil { + return ctx } + + user, err := utils.GetUserInfo(ctx) + if err == nil && user != nil && user.TenantId > 0 { + g.Log().Infof(ctx, "[用户信息] 从请求头解析到用户: userName=%s, tenantId=%d", user.UserName, user.TenantId) + ctx = context.WithValue(ctx, "user", user) + return ctx + } + + if err != nil { + g.Log().Debugf(ctx, "[用户信息] 解析失败(%v), 使用默认admin/tenant=1", err) + } + + ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1}) return ctx } diff --git a/controller/video/concat_controller.go b/controller/video/concat_controller.go index fc239c1..f58e336 100644 --- a/controller/video/concat_controller.go +++ b/controller/video/concat_controller.go @@ -16,6 +16,7 @@ import ( service "media/service/video" "gitea.com/red-future/common/beans" + "gitea.com/red-future/common/utils" "github.com/gogf/gf/v2/frame/g" ) @@ -126,11 +127,24 @@ func (c *video) GetConcatTask(ctx context.Context, req *dto.GetConcatTaskReq) (r return service.Concat.GetTaskResult(ctx, req.TaskID) } -// withUser 为 context 注入默认用户(无认证基础设施时使用) +// withUser 优先从请求头/X-User-Info/Token 提取用户信息,没有则用默认 admin func withUser(ctx context.Context) context.Context { - if ctx.Value("user") == nil { - ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1}) + if ctx.Value("user") != nil { + return ctx } + + user, err := utils.GetUserInfo(ctx) + if err == nil && user != nil && user.TenantId > 0 { + g.Log().Infof(ctx, "[用户信息] 从请求头解析到用户: userName=%s, tenantId=%d", user.UserName, user.TenantId) + ctx = context.WithValue(ctx, "user", user) + return ctx + } + + if err != nil { + g.Log().Debugf(ctx, "[用户信息] 解析失败(%v), 使用默认admin/tenant=1", err) + } + + ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1}) return ctx } diff --git a/dao/video/concat_task_dao.go b/dao/video/concat_task_dao.go index cccd4e3..9bde22f 100644 --- a/dao/video/concat_task_dao.go +++ b/dao/video/concat_task_dao.go @@ -18,10 +18,11 @@ type concatTaskDao struct{} const concatTaskTable = "concat_task" -// Insert 创建任务 +// Insert 创建任务(排除 id 字段,让数据库自增) func (d *concatTaskDao) Insert(ctx context.Context, data *entity.ConcatTask) (id int64, err error) { r, err := gfdb.DB(ctx).Model(ctx, concatTaskTable). Data(data). + FieldsEx(entity.ConcatTaskCols.Id). Insert() if err != nil { return 0, err diff --git a/service/asr/task_service.go b/service/asr/task_service.go index 17ae693..d64a1c1 100644 --- a/service/asr/task_service.go +++ b/service/asr/task_service.go @@ -19,6 +19,7 @@ import ( serviceScene "media/service/scene" "gitea.com/red-future/common/beans" + "gitea.com/red-future/common/utils" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/gconv" "github.com/gogf/gf/v2/util/guid" @@ -79,16 +80,19 @@ func (s *audioTaskService) Create(ctx context.Context, params *CreateTaskParams) g.Log().Infof(ctx, "[创建任务 %s] 文件数=%d, 模型=%s, 语言=%s, 回调=%s", taskID, len(params.InputData), params.Model, params.Language, params.CallbackURL) + // 提取调用方用户信息,传给 goroutine + user := getUserFromCtx(ctx) + // 异步处理 - go s.processTask(taskID, params.InputData, params.Model, params.Language, params.Threshold, params.CallbackURL) + go s.processTask(user, taskID, params.InputData, params.Model, params.Language, params.Threshold, params.CallbackURL) return &dto.CreateTaskRes{TaskID: taskID}, nil } // processTask 异步处理所有URL,每个文件生成一条明细 -func (s *audioTaskService) processTask(taskID string, urls []string, model, language string, threshold float64, callbackURL string) { +func (s *audioTaskService) processTask(user *beans.User, taskID string, urls []string, model, language string, threshold float64, callbackURL string) { ctx := context.Background() - ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1}) + ctx = context.WithValue(ctx, "user", user) defer func() { if r := recover(); r != nil { @@ -231,7 +235,8 @@ func (s *audioTaskService) callback(ctx context.Context, taskID, status, errMsg, g.Log().Debugf(ctx, "[回调 %s] 回调载荷长度=%d字节, 明细条数=%d", taskID, len(body), len(detailItems)) // 透传调用方的用户信息,供回调方 GetUserInfo 从 X-User-Info 头获取 - userJSON, _ := json.Marshal(beans.User{UserName: "admin", TenantId: 1}) + cbUser := getUserFromCtx(ctx) + userJSON, _ := json.Marshal(cbUser) g.Log().Infof(ctx, "[回调 %s] curl -X POST '%s' -H 'Content-Type: application/json' -H 'X-User-Info: %s' -d '%s'", taskID, callbackURL, string(userJSON), strings.ReplaceAll(string(body), "'", "'\\''")) @@ -316,6 +321,21 @@ func (s *audioTaskService) processSingleVideo(ctx context.Context, taskID, saveP } } +// getUserFromCtx 从 context 提取用户信息,没有则返回默认 admin +func getUserFromCtx(ctx context.Context) *beans.User { + if u := ctx.Value("user"); u != nil { + if user, ok := u.(*beans.User); ok { + return user + } + } + // 尝试用 common 库解析 + user, err := utils.GetUserInfo(ctx) + if err == nil && user != nil { + return user + } + return &beans.User{UserName: "admin", TenantId: 1} +} + // saveDetail 保存单文件明细到 transcribe_task_detail func (s *audioTaskService) saveDetail(ctx context.Context, taskID string, fileIndex int, fileName, text, scenes string, audioSize int64, audioDuration, model, language, errMsg string) { detail := &entity.TranscribeTaskDetail{ diff --git a/service/video/concat_service.go b/service/video/concat_service.go index 8d555ea..04b551f 100644 --- a/service/video/concat_service.go +++ b/service/video/concat_service.go @@ -204,22 +204,40 @@ func (s *concatService) concatByFilter(ctx context.Context, ffmpegPath string, i inputArgs = append(inputArgs, "-i", p) } - // 3. 构建 filter_complex:每个视频 scale+pad 到统一尺寸,然后 concat + // 3. 检测每个视频是否有音频轨道及时长 + hasAudio := make([]bool, n) + videoDuration := make([]float64, n) + for i, p := range inputs { + hasAudio[i] = s.hasVideoAudio(ctx, ffmpegPath, p) + videoDuration[i], _ = s.getVideoDuration(ctx, ffmpegPath, p) + } + + // 4. 构建 filter_complex:每个视频 scale+pad 到统一尺寸,然后 concat var filterParts []string + var concatInputs []string for i := 0; i < n; i++ { filterParts = append(filterParts, fmt.Sprintf( "[%d:v]scale=%d:%d:force_original_aspect_ratio=decrease,pad=%d:%d:(ow-iw)/2:(oh-ih)/2,setsar=1,fps=30[v%d]", i, maxW, maxH, maxW, maxH, i, )) - filterParts = append(filterParts, fmt.Sprintf( - "[%d:a]aresample=44100[a%d]", - i, i, - )) - } - // 收集归一化后的流 - var concatInputs []string - for i := 0; i < n; i++ { - concatInputs = append(concatInputs, fmt.Sprintf("[v%d][a%d]", i, i)) + if hasAudio[i] { + filterParts = append(filterParts, fmt.Sprintf( + "[%d:a]aresample=44100[a%d]", + i, i, + )) + concatInputs = append(concatInputs, fmt.Sprintf("[v%d][a%d]", i, i)) + } else { + // 无音频轨道,生成匹配视频时长的静音音频 + dur := videoDuration[i] + if dur <= 0 { + dur = 30 // 保底30秒 + } + filterParts = append(filterParts, fmt.Sprintf( + "aevalsrc=0:n=2:s=44100:d=%.2f[a%d]", + dur, i, + )) + concatInputs = append(concatInputs, fmt.Sprintf("[v%d][a%d]", i, i)) + } } filterStr := fmt.Sprintf("%s;%sconcat=n=%d:v=1:a=1[outv][outa]", strings.Join(filterParts, ";"), @@ -230,8 +248,10 @@ func (s *concatService) concatByFilter(ctx context.Context, ffmpegPath string, i "-filter_complex", filterStr, "-map", "[outv]", "-map", "[outa]", - "-preset", "fast", - "-crf", "23", + "-c:v", "h264_videotoolbox", + "-b:v", "5M", + "-allow_sw", "true", + "-c:a", "aac", "-y", output, ) @@ -300,6 +320,28 @@ func (s *concatService) getVideoDuration(ctx context.Context, ffmpegPath, videoP return duration, nil } +// hasVideoAudio 检测视频文件是否有音频轨道 +func (s *concatService) hasVideoAudio(ctx context.Context, ffmpegPath, videoPath string) bool { + ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe") + if _, err := os.Stat(ffprobePath); os.IsNotExist(err) { + ffprobePath = "ffprobe" + } + + cmd := exec.CommandContext(ctx, ffprobePath, + "-v", "error", + "-select_streams", "a:0", + "-show_entries", "stream=codec_type", + "-of", "default=noprint_wrappers=1:nokey=1", + videoPath, + ) + output, err := cmd.Output() + if err != nil || len(strings.TrimSpace(string(output))) == 0 { + return false + } + // 检测视频时长,如果为0则用 aevalsrc 生成静音 + return true +} + func (s *concatService) getFFmpegPath() (string, error) { ffmpegPath := g.Cfg().MustGet(context.Background(), "ffmpeg.path", "").String() if ffmpegPath != "" { @@ -359,7 +401,7 @@ func (s *concatService) UploadToMinIO(ctx context.Context, localFilePath string) client.Transport = newTransport client.SetTimeout(10 * time.Minute) - // 透传认证 headers + // 透传认证 headers(优先从 HTTP 请求头取) hasAuthHeader := false if r := g.RequestFromCtx(ctx); r != nil { for k, v := range r.Header { @@ -369,9 +411,10 @@ func (s *concatService) UploadToMinIO(ctx context.Context, localFilePath string) } } } - // 原始请求无认证信息时,注入默认用户上下文 + // 无 HTTP 请求时(异步 goroutine),从 context 的用户信息构造 header if !hasAuthHeader { - userJSON, _ := json.Marshal(beans.User{UserName: "admin", TenantId: 1}) + uploadUser := getUserFromCtx(ctx) + userJSON, _ := json.Marshal(uploadUser) client.SetHeader("X-User-Info", string(userJSON)) } @@ -438,10 +481,13 @@ func (s *concatService) CreateAsyncTask(ctx context.Context, videoURLs []string, return "", fmt.Errorf("创建任务失败: %v", err) } + // 提取调用方用户信息,传给 goroutine + user := getUserFromCtx(ctx) + g.Log().Infof(ctx, "[异步拼接] 创建任务 %s, 视频数=%d, 回调=%s", taskID, len(videoURLs), callbackURL) // 异步处理:先下载再拼接 - go s.processAsyncTask(taskID, videoURLs, method, upload, callbackURL) + go s.processAsyncTask(user, taskID, videoURLs, method, upload, callbackURL) return taskID, nil } @@ -462,14 +508,27 @@ func (s *concatService) CreateAsyncTaskWithFiles(ctx context.Context, filePaths return "", fmt.Errorf("创建任务失败: %v", err) } + // 提取调用方用户信息,传给 goroutine + user := getUserFromCtx(ctx) + g.Log().Infof(ctx, "[异步拼接-文件] 创建任务 %s, 文件数=%d, 回调=%s", taskID, len(filePaths), callbackURL) // 异步处理:已有本地文件,直接拼接 - go s.processAsyncTaskWithFiles(taskID, filePaths, method, upload, callbackURL) + go s.processAsyncTaskWithFiles(user, taskID, filePaths, method, upload, callbackURL) return taskID, nil } +// getUserFromCtx 从 context 中提取用户信息,如果没有则返回默认 admin +func getUserFromCtx(ctx context.Context) *beans.User { + if u := ctx.Value("user"); u != nil { + if user, ok := u.(*beans.User); ok { + return user + } + } + return &beans.User{UserName: "admin", TenantId: 1} +} + // GetTaskResult 查询异步任务结果 func (s *concatService) GetTaskResult(ctx context.Context, taskID string) (*dto.GetConcatTaskRes, error) { task, err := dao.ConcatTask.GetByTaskID(ctx, taskID) @@ -484,9 +543,9 @@ func (s *concatService) GetTaskResult(ctx context.Context, taskID string) (*dto. } // processAsyncTaskWithFiles 后台处理异步拼接任务(文件上传模式,文件已在本地) -func (s *concatService) processAsyncTaskWithFiles(taskID string, filePaths []string, method string, upload bool, callbackURL string) { +func (s *concatService) processAsyncTaskWithFiles(user *beans.User, taskID string, filePaths []string, method string, upload bool, callbackURL string) { bgCtx := context.Background() - bgCtx = context.WithValue(bgCtx, "user", &beans.User{UserName: "admin", TenantId: 1}) + bgCtx = context.WithValue(bgCtx, "user", user) dao.ConcatTask.UpdateRunning(bgCtx, taskID) @@ -514,9 +573,9 @@ func (s *concatService) processAsyncTaskWithFiles(taskID string, filePaths []str } // processAsyncTask 后台处理异步拼接任务(URL模式,需要先下载) -func (s *concatService) processAsyncTask(taskID string, videoURLs []string, method string, upload bool, callbackURL string) { +func (s *concatService) processAsyncTask(user *beans.User, taskID string, videoURLs []string, method string, upload bool, callbackURL string) { bgCtx := context.Background() - bgCtx = context.WithValue(bgCtx, "user", &beans.User{UserName: "admin", TenantId: 1}) + bgCtx = context.WithValue(bgCtx, "user", user) dao.ConcatTask.UpdateRunning(bgCtx, taskID) @@ -628,10 +687,12 @@ func (s *concatService) concatCallback(ctx context.Context, taskID, callbackURL req, _ := http.NewRequest("POST", callbackURL, bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") - userJSON, _ := json.Marshal(beans.User{UserName: "admin", TenantId: 1}) + // 透传调用方用户信息 + cbUser := getUserFromCtx(ctx) + userJSON, _ := json.Marshal(cbUser) req.Header.Set("X-User-Info", string(userJSON)) - client := &http.Client{Timeout: 30 * time.Second} + client := &http.Client{Timeout: 2 * time.Minute} resp, reqErr := client.Do(req) if reqErr != nil { g.Log().Errorf(ctx, "[异步拼接回调 %s] 请求失败: %v", taskID, reqErr)