package video import ( "bytes" "context" "encoding/json" "fmt" "io" "mime/multipart" "net/http" "os" "path/filepath" "strings" "time" analysisDao "media/dao/video" dto "media/model/dto/video" entity "media/model/entity/video" "gitea.redpowerfuture.com/red-future/common/beans" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/guid" ) type analysisService struct{} // Analysis 视频分析服务单例 var Analysis = new(analysisService) // ---------- 异步任务管理 ---------- // CreateAsyncTask 创建异步分析任务,返回 taskId,后台串行处理 func (s *analysisService) CreateAsyncTask(ctx context.Context, videoURLs []string, callbackURL string) (string, error) { if len(videoURLs) == 0 { return "", fmt.Errorf("视频URL列表不能为空") } taskID := "anl_" + guid.S() task := &entity.AnalysisTask{ TaskID: taskID, CallbackURL: callbackURL, Status: "pending", Total: len(videoURLs), } if _, err := analysisDao.AnalysisTask.Insert(ctx, task); err != nil { return "", fmt.Errorf("创建任务失败: %v", err) } // 批量创建明细 if err := analysisDao.AnalysisTaskDetail.BatchInsert(ctx, taskID, videoURLs); err != nil { return "", fmt.Errorf("创建任务明细失败: %v", err) } // 提取调用方用户信息,传给 goroutine user := getUserFromCtx(ctx) g.Log().Infof(ctx, "[视频分析] 创建任务 %s, 视频数=%d, 回调=%s", taskID, len(videoURLs), callbackURL) // 异步处理 go s.processAsyncTask(user, taskID, videoURLs, callbackURL) return taskID, nil } // GetTaskResult 查询异步任务结果 func (s *analysisService) GetTaskResult(ctx context.Context, taskID string) (*dto.GetAnalysisTaskRes, error) { task, err := analysisDao.AnalysisTask.GetByTaskID(ctx, taskID) if err != nil { return nil, fmt.Errorf("查询任务失败: %v", err) } if task == nil { return nil, fmt.Errorf("任务不存在: %s", taskID) } details, err := analysisDao.AnalysisTaskDetail.GetByTaskID(ctx, taskID) if err != nil { return nil, fmt.Errorf("查询任务明细失败: %v", err) } return analysisDao.AnalysisEntityToTaskRes(task, details), nil } // processAsyncTask 后台串行处理异步分析任务 func (s *analysisService) processAsyncTask(user *beans.User, taskID string, videoURLs []string, callbackURL string) { bgCtx := context.Background() bgCtx = context.WithValue(bgCtx, "user", user) analysisDao.AnalysisTask.UpdateProcessing(bgCtx, taskID) defer func() { if r := recover(); r != nil { errMsg := fmt.Sprintf("视频分析异常: %v", r) g.Log().Errorf(bgCtx, "[视频分析 %s] %s", taskID, errMsg) analysisDao.AnalysisTask.UpdateError(bgCtx, taskID, errMsg) s.analysisCallback(bgCtx, taskID, callbackURL) } }() successCount := 0 failedCount := 0 // 逐个串行处理视频 for _, videoURL := range videoURLs { g.Log().Infof(bgCtx, "[视频分析 %s] 开始处理视频: %s", taskID, videoURL) // 1. 下载视频到永久存储目录 savePath, dlErr := s.downloadVideo(bgCtx, taskID, videoURL) if dlErr != nil { errMsg := fmt.Sprintf("下载视频失败 %s: %v", videoURL, dlErr) g.Log().Errorf(bgCtx, "[视频分析 %s] %s", taskID, errMsg) analysisDao.AnalysisTaskDetail.UpdateError(bgCtx, taskID, videoURL, errMsg) failedCount++ analysisDao.AnalysisTask.UpdateProgress(bgCtx, taskID, successCount, failedCount) continue } g.Log().Infof(bgCtx, "[视频分析 %s] 视频下载完成: %s -> %s", taskID, videoURL, savePath) // 2. 调用第三方 caption 接口(一次一个视频,form-data 上传) captionResult, cpErr := s.callCaptionAPI(bgCtx, savePath) if cpErr != nil { errMsg := fmt.Sprintf("调用Caption接口失败 %s: %v", videoURL, cpErr) g.Log().Errorf(bgCtx, "[视频分析 %s] %s", taskID, errMsg) analysisDao.AnalysisTaskDetail.UpdateError(bgCtx, taskID, videoURL, errMsg) failedCount++ analysisDao.AnalysisTask.UpdateProgress(bgCtx, taskID, successCount, failedCount) continue } g.Log().Infof(bgCtx, "[视频分析 %s] Caption接口调用成功: %s", taskID, videoURL) // 3. 保存结果到数据库(视频不删除,永久保留) captionJSON, _ := json.Marshal(captionResult) analysisDao.AnalysisTaskDetail.UpdateSuccess(bgCtx, taskID, videoURL, savePath, string(captionJSON)) successCount++ analysisDao.AnalysisTask.UpdateProgress(bgCtx, taskID, successCount, failedCount) g.Log().Infof(bgCtx, "[视频分析 %s] 视频处理完成: %s (成功=%d, 失败=%d)", taskID, videoURL, successCount, failedCount) } // 更新任务最终状态 if failedCount == 0 { analysisDao.AnalysisTask.UpdateSuccess(bgCtx, taskID, successCount, failedCount) } else if successCount == 0 { analysisDao.AnalysisTask.UpdateError(bgCtx, taskID, fmt.Sprintf("所有视频处理失败(共%d个)", len(videoURLs))) } else { analysisDao.AnalysisTask.UpdateSuccess(bgCtx, taskID, successCount, failedCount) } g.Log().Infof(bgCtx, "[视频分析 %s] 任务完成, 总视频=%d, 成功=%d, 失败=%d", taskID, len(videoURLs), successCount, failedCount) // 回调通知 if callbackURL != "" { s.analysisCallback(bgCtx, taskID, callbackURL) } } // downloadVideo 下载视频到永久存储目录 func (s *analysisService) downloadVideo(ctx context.Context, taskID string, videoURL string) (string, error) { // 从配置获取视频永久存储目录 videoDir := g.Cfg().MustGet(ctx, "analysis.video_dir", "resource/videos").String() if !filepath.IsAbs(videoDir) { absDir, _ := filepath.Abs(videoDir) videoDir = absDir } // 按 taskId 子目录组织 taskDir := filepath.Join(videoDir, taskID) os.MkdirAll(taskDir, 0755) // 从URL提取文件名 segments := strings.Split(videoURL, "/") fileName := segments[len(segments)-1] if fileName == "" { fileName = fmt.Sprintf("video_%d.mp4", time.Now().UnixMilli()) } savePath := filepath.Join(taskDir, fileName) client := &http.Client{Timeout: 10 * time.Minute} resp, err := client.Get(videoURL) if err != nil { return "", err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return "", fmt.Errorf("HTTP %d", resp.StatusCode) } out, err := os.Create(savePath) if err != nil { return "", err } defer out.Close() _, err = io.Copy(out, resp.Body) if err != nil { os.Remove(savePath) return "", err } return savePath, nil } // callCaptionAPI 调用第三方 caption 接口(form-data 上传单个视频) // 根据配置 analysis.mock_caption 决定使用 mock 数据还是真实调用 func (s *analysisService) callCaptionAPI(ctx context.Context, videoPath string) (map[string]interface{}, error) { if g.Cfg().MustGet(ctx, "analysis.mock_caption", true).Bool() { return s.mockCallCaptionAPI(ctx, videoPath) } return s.realCallCaptionAPI(ctx, videoPath) } // mockCallCaptionAPI 返回 mock 的 caption 数据 func (s *analysisService) mockCallCaptionAPI(ctx context.Context, videoPath string) (map[string]interface{}, error) { g.Log().Infof(ctx, "[呼叫Caption-Mock] 使用mock数据, 文件=%s", videoPath) return map[string]interface{}{ "caption": "Scene: 视频展示了一个产品演示和讲解过程。Events: 一个人走进画面开始介绍产品, 展示了产品的各个功能模块, 最后总结产品优势。", "scene": "这是一个产品演示和讲解的场景。视频中有人在画面中介绍一款产品,展示了产品的各个功能模块和使用方式,包括产品的外观、核心功能、操作界面等。视频整体节奏适中,配合专业的产品讲解。", "events": []map[string]interface{}{ { "start": 0.0, "end": 5.0, "description": "视频开场,人物走进画面,开始打招呼并介绍本次演示的主题", }, { "start": 5.5, "end": 15.0, "description": "展示产品外观包装,详细说明产品设计理念和特点", }, { "start": 15.5, "end": 30.0, "description": "开机演示,展示产品主界面和核心功能入口", }, { "start": 30.5, "end": 50.0, "description": "详细演示产品主要功能模块的操作流程和使用方法", }, }, }, nil } // realCallCaptionAPI 真实调用第三方 caption 接口(form-data 上传单个视频) func (s *analysisService) realCallCaptionAPI(ctx context.Context, videoPath string) (map[string]interface{}, error) { // 获取 caption 服务地址 captionURL := g.Cfg().MustGet(ctx, "analysis.caption_url", "http://192.168.3.49:8900/caption").String() // 构建 multipart/form-data 表单 var buf bytes.Buffer mw := multipart.NewWriter(&buf) // 添加视频文件字段 file, err := os.Open(videoPath) if err != nil { return nil, fmt.Errorf("打开视频文件失败: %v", err) } defer file.Close() fw, err := mw.CreateFormFile("video", filepath.Base(videoPath)) if err != nil { return nil, fmt.Errorf("创建表单文件字段失败: %v", err) } if _, err = io.Copy(fw, file); err != nil { return nil, fmt.Errorf("写入文件内容失败: %v", err) } // 添加 max_new_tokens 参数 if err = mw.WriteField("max_new_tokens", "2048"); err != nil { return nil, fmt.Errorf("写入表单字段失败: %v", err) } mw.Close() // 发送请求(长超时,caption 接口耗时较长) client := &http.Client{Timeout: 30 * time.Minute} req, err := http.NewRequest("POST", captionURL, &buf) if err != nil { return nil, fmt.Errorf("创建请求失败: %v", err) } req.Header.Set("Content-Type", mw.FormDataContentType()) g.Log().Debugf(ctx, "[呼叫Caption] 请求URL=%s, 文件=%s", captionURL, videoPath) resp, err := client.Do(req) if err != nil { return nil, fmt.Errorf("请求Caption接口失败: %v", err) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("读取Caption响应失败: %v", err) } if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("Caption接口返回非200: %d, body=%s", resp.StatusCode, string(body)) } // 解析响应 JSON var result map[string]interface{} if err = json.Unmarshal(body, &result); err != nil { return nil, fmt.Errorf("解析Caption响应JSON失败: %v, body=%s", err, string(body)) } g.Log().Debugf(ctx, "[呼叫Caption] 响应成功, 文件=%s, 结果长度=%d", videoPath, len(body)) return result, nil } // analysisCallback 回调通知 func (s *analysisService) analysisCallback(ctx context.Context, taskID, callbackURL string) { if callbackURL == "" { return } task, err := analysisDao.AnalysisTask.GetByTaskID(ctx, taskID) if err != nil || task == nil { g.Log().Errorf(ctx, "[视频分析回调 %s] 查询任务失败: %v", taskID, err) return } details, err := analysisDao.AnalysisTaskDetail.GetByTaskID(ctx, taskID) if err != nil { g.Log().Errorf(ctx, "[视频分析回调 %s] 查询明细失败: %v", taskID, err) return } // 构造回调结果 var results []map[string]interface{} for _, d := range details { item := map[string]interface{}{ "video_url": d.VideoURL, "video_save_path": d.VideoSavePath, } if d.Status == "success" && d.CaptionResult != "" { var captionResult interface{} json.Unmarshal([]byte(d.CaptionResult), &captionResult) item["caption_result"] = captionResult } if d.Status == "failed" { item["caption_result"] = nil item["fail_reason"] = d.FailReason } results = append(results, item) } payload := map[string]interface{}{ "task_id": task.TaskID, "status": task.Status, "total": task.Total, "success_count": task.SuccessCount, "failed_count": task.FailedCount, "results": results, } body, _ := json.Marshal(payload) g.Log().Infof(ctx, "[视频分析回调 %s] 状态=%s, 目标=%s", taskID, task.Status, callbackURL) cbReq, _ := http.NewRequest("POST", callbackURL, bytes.NewReader(body)) cbReq.Header.Set("Content-Type", "application/json") // 透传调用方用户信息 cbUser := getUserFromCtx(ctx) userJSON, _ := json.Marshal(cbUser) cbReq.Header.Set("X-User-Info", string(userJSON)) // 打印 curl 命令方便调试 escapedBody := strings.ReplaceAll(string(body), "'", "'\\''") g.Log().Infof(ctx, "[视频分析回调 %s] curl 调试命令:\ncurl -X POST '%s' \\\n -H 'Content-Type: application/json' \\\n -H 'X-User-Info: %s' \\\n -d '%s'", taskID, callbackURL, string(userJSON), escapedBody) client := &http.Client{Timeout: 2 * time.Minute} resp, reqErr := client.Do(cbReq) if reqErr != nil { g.Log().Errorf(ctx, "[视频分析回调 %s] 请求失败: %v", taskID, reqErr) return } defer resp.Body.Close() respBody, _ := io.ReadAll(resp.Body) g.Log().Infof(ctx, "[视频分析回调 %s] 响应 status=%d, body=%s", taskID, resp.StatusCode, string(respBody)) }