Files
media/service/video/analysis_service.go
2026-06-10 16:10:10 +08:00

383 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package video
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"path/filepath"
"strings"
"time"
analysisDao "media/dao/video"
dto "media/model/dto/video"
entity "media/model/entity/video"
"gitea.redpowerfuture.com/red-future/common/beans"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/guid"
)
type analysisService struct{}
// Analysis 视频分析服务单例
var Analysis = new(analysisService)
// ---------- 异步任务管理 ----------
// CreateAsyncTask 创建异步分析任务,返回 taskId后台串行处理
func (s *analysisService) CreateAsyncTask(ctx context.Context, videoURLs []string, callbackURL string) (string, error) {
if len(videoURLs) == 0 {
return "", fmt.Errorf("视频URL列表不能为空")
}
taskID := "anl_" + guid.S()
task := &entity.AnalysisTask{
TaskID: taskID,
CallbackURL: callbackURL,
Status: "pending",
Total: len(videoURLs),
}
if _, err := analysisDao.AnalysisTask.Insert(ctx, task); err != nil {
return "", fmt.Errorf("创建任务失败: %v", err)
}
// 批量创建明细
if err := analysisDao.AnalysisTaskDetail.BatchInsert(ctx, taskID, videoURLs); err != nil {
return "", fmt.Errorf("创建任务明细失败: %v", err)
}
// 提取调用方用户信息,传给 goroutine
user := getUserFromCtx(ctx)
g.Log().Infof(ctx, "[视频分析] 创建任务 %s, 视频数=%d, 回调=%s", taskID, len(videoURLs), callbackURL)
// 异步处理
go s.processAsyncTask(user, taskID, videoURLs, callbackURL)
return taskID, nil
}
// GetTaskResult 查询异步任务结果
func (s *analysisService) GetTaskResult(ctx context.Context, taskID string) (*dto.GetAnalysisTaskRes, error) {
task, err := analysisDao.AnalysisTask.GetByTaskID(ctx, taskID)
if err != nil {
return nil, fmt.Errorf("查询任务失败: %v", err)
}
if task == nil {
return nil, fmt.Errorf("任务不存在: %s", taskID)
}
details, err := analysisDao.AnalysisTaskDetail.GetByTaskID(ctx, taskID)
if err != nil {
return nil, fmt.Errorf("查询任务明细失败: %v", err)
}
return analysisDao.AnalysisEntityToTaskRes(task, details), nil
}
// processAsyncTask 后台串行处理异步分析任务
func (s *analysisService) processAsyncTask(user *beans.User, taskID string, videoURLs []string, callbackURL string) {
bgCtx := context.Background()
bgCtx = context.WithValue(bgCtx, "user", user)
analysisDao.AnalysisTask.UpdateProcessing(bgCtx, taskID)
defer func() {
if r := recover(); r != nil {
errMsg := fmt.Sprintf("视频分析异常: %v", r)
g.Log().Errorf(bgCtx, "[视频分析 %s] %s", taskID, errMsg)
analysisDao.AnalysisTask.UpdateError(bgCtx, taskID, errMsg)
s.analysisCallback(bgCtx, taskID, callbackURL)
}
}()
successCount := 0
failedCount := 0
// 逐个串行处理视频
for _, videoURL := range videoURLs {
g.Log().Infof(bgCtx, "[视频分析 %s] 开始处理视频: %s", taskID, videoURL)
// 1. 下载视频到永久存储目录
savePath, dlErr := s.downloadVideo(bgCtx, taskID, videoURL)
if dlErr != nil {
errMsg := fmt.Sprintf("下载视频失败 %s: %v", videoURL, dlErr)
g.Log().Errorf(bgCtx, "[视频分析 %s] %s", taskID, errMsg)
analysisDao.AnalysisTaskDetail.UpdateError(bgCtx, taskID, videoURL, errMsg)
failedCount++
analysisDao.AnalysisTask.UpdateProgress(bgCtx, taskID, successCount, failedCount)
continue
}
g.Log().Infof(bgCtx, "[视频分析 %s] 视频下载完成: %s -> %s", taskID, videoURL, savePath)
// 2. 调用第三方 caption 接口一次一个视频form-data 上传)
captionResult, cpErr := s.callCaptionAPI(bgCtx, savePath)
if cpErr != nil {
errMsg := fmt.Sprintf("调用Caption接口失败 %s: %v", videoURL, cpErr)
g.Log().Errorf(bgCtx, "[视频分析 %s] %s", taskID, errMsg)
analysisDao.AnalysisTaskDetail.UpdateError(bgCtx, taskID, videoURL, errMsg)
failedCount++
analysisDao.AnalysisTask.UpdateProgress(bgCtx, taskID, successCount, failedCount)
continue
}
g.Log().Infof(bgCtx, "[视频分析 %s] Caption接口调用成功: %s", taskID, videoURL)
// 3. 保存结果到数据库(视频不删除,永久保留)
captionJSON, _ := json.Marshal(captionResult)
analysisDao.AnalysisTaskDetail.UpdateSuccess(bgCtx, taskID, videoURL, savePath, string(captionJSON))
successCount++
analysisDao.AnalysisTask.UpdateProgress(bgCtx, taskID, successCount, failedCount)
g.Log().Infof(bgCtx, "[视频分析 %s] 视频处理完成: %s (成功=%d, 失败=%d)", taskID, videoURL, successCount, failedCount)
}
// 更新任务最终状态
if failedCount == 0 {
analysisDao.AnalysisTask.UpdateSuccess(bgCtx, taskID, successCount, failedCount)
} else if successCount == 0 {
analysisDao.AnalysisTask.UpdateError(bgCtx, taskID, fmt.Sprintf("所有视频处理失败(共%d个)", len(videoURLs)))
} else {
analysisDao.AnalysisTask.UpdateSuccess(bgCtx, taskID, successCount, failedCount)
}
g.Log().Infof(bgCtx, "[视频分析 %s] 任务完成, 总视频=%d, 成功=%d, 失败=%d", taskID, len(videoURLs), successCount, failedCount)
// 回调通知
if callbackURL != "" {
s.analysisCallback(bgCtx, taskID, callbackURL)
}
}
// downloadVideo 下载视频到永久存储目录
func (s *analysisService) downloadVideo(ctx context.Context, taskID string, videoURL string) (string, error) {
// 从配置获取视频永久存储目录
videoDir := g.Cfg().MustGet(ctx, "analysis.video_dir", "resource/videos").String()
if !filepath.IsAbs(videoDir) {
absDir, _ := filepath.Abs(videoDir)
videoDir = absDir
}
// 按 taskId 子目录组织
taskDir := filepath.Join(videoDir, taskID)
os.MkdirAll(taskDir, 0755)
// 从URL提取文件名
segments := strings.Split(videoURL, "/")
fileName := segments[len(segments)-1]
if fileName == "" {
fileName = fmt.Sprintf("video_%d.mp4", time.Now().UnixMilli())
}
savePath := filepath.Join(taskDir, fileName)
client := &http.Client{Timeout: 10 * time.Minute}
resp, err := client.Get(videoURL)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("HTTP %d", resp.StatusCode)
}
out, err := os.Create(savePath)
if err != nil {
return "", err
}
defer out.Close()
_, err = io.Copy(out, resp.Body)
if err != nil {
os.Remove(savePath)
return "", err
}
return savePath, nil
}
// callCaptionAPI 调用第三方 caption 接口form-data 上传单个视频)
// 根据配置 analysis.mock_caption 决定使用 mock 数据还是真实调用
func (s *analysisService) callCaptionAPI(ctx context.Context, videoPath string) (map[string]interface{}, error) {
if g.Cfg().MustGet(ctx, "analysis.mock_caption", true).Bool() {
return s.mockCallCaptionAPI(ctx, videoPath)
}
return s.realCallCaptionAPI(ctx, videoPath)
}
// mockCallCaptionAPI 返回 mock 的 caption 数据
func (s *analysisService) mockCallCaptionAPI(ctx context.Context, videoPath string) (map[string]interface{}, error) {
g.Log().Infof(ctx, "[呼叫Caption-Mock] 使用mock数据, 文件=%s", videoPath)
return map[string]interface{}{
"caption": "Scene: 视频展示了一个产品演示和讲解过程。Events: 一个人走进画面开始介绍产品, 展示了产品的各个功能模块, 最后总结产品优势。",
"scene": "这是一个产品演示和讲解的场景。视频中有人在画面中介绍一款产品,展示了产品的各个功能模块和使用方式,包括产品的外观、核心功能、操作界面等。视频整体节奏适中,配合专业的产品讲解。",
"events": []map[string]interface{}{
{
"start": 0.0,
"end": 5.0,
"description": "视频开场,人物走进画面,开始打招呼并介绍本次演示的主题",
},
{
"start": 5.5,
"end": 15.0,
"description": "展示产品外观包装,详细说明产品设计理念和特点",
},
{
"start": 15.5,
"end": 30.0,
"description": "开机演示,展示产品主界面和核心功能入口",
},
{
"start": 30.5,
"end": 50.0,
"description": "详细演示产品主要功能模块的操作流程和使用方法",
},
},
}, nil
}
// realCallCaptionAPI 真实调用第三方 caption 接口form-data 上传单个视频)
func (s *analysisService) realCallCaptionAPI(ctx context.Context, videoPath string) (map[string]interface{}, error) {
// 获取 caption 服务地址
captionURL := g.Cfg().MustGet(ctx, "analysis.caption_url", "http://192.168.3.49:8900/caption").String()
// 构建 multipart/form-data 表单
var buf bytes.Buffer
mw := multipart.NewWriter(&buf)
// 添加视频文件字段
file, err := os.Open(videoPath)
if err != nil {
return nil, fmt.Errorf("打开视频文件失败: %v", err)
}
defer file.Close()
fw, err := mw.CreateFormFile("video", filepath.Base(videoPath))
if err != nil {
return nil, fmt.Errorf("创建表单文件字段失败: %v", err)
}
if _, err = io.Copy(fw, file); err != nil {
return nil, fmt.Errorf("写入文件内容失败: %v", err)
}
// 添加 max_new_tokens 参数
if err = mw.WriteField("max_new_tokens", "2048"); err != nil {
return nil, fmt.Errorf("写入表单字段失败: %v", err)
}
mw.Close()
// 发送请求长超时caption 接口耗时较长)
client := &http.Client{Timeout: 30 * time.Minute}
req, err := http.NewRequest("POST", captionURL, &buf)
if err != nil {
return nil, fmt.Errorf("创建请求失败: %v", err)
}
req.Header.Set("Content-Type", mw.FormDataContentType())
g.Log().Debugf(ctx, "[呼叫Caption] 请求URL=%s, 文件=%s", captionURL, videoPath)
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("请求Caption接口失败: %v", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("读取Caption响应失败: %v", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Caption接口返回非200: %d, body=%s", resp.StatusCode, string(body))
}
// 解析响应 JSON
var result map[string]interface{}
if err = json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("解析Caption响应JSON失败: %v, body=%s", err, string(body))
}
g.Log().Debugf(ctx, "[呼叫Caption] 响应成功, 文件=%s, 结果长度=%d", videoPath, len(body))
return result, nil
}
// analysisCallback 回调通知
func (s *analysisService) analysisCallback(ctx context.Context, taskID, callbackURL string) {
if callbackURL == "" {
return
}
task, err := analysisDao.AnalysisTask.GetByTaskID(ctx, taskID)
if err != nil || task == nil {
g.Log().Errorf(ctx, "[视频分析回调 %s] 查询任务失败: %v", taskID, err)
return
}
details, err := analysisDao.AnalysisTaskDetail.GetByTaskID(ctx, taskID)
if err != nil {
g.Log().Errorf(ctx, "[视频分析回调 %s] 查询明细失败: %v", taskID, err)
return
}
// 构造回调结果
var results []map[string]interface{}
for _, d := range details {
item := map[string]interface{}{
"video_url": d.VideoURL,
"video_save_path": d.VideoSavePath,
}
if d.Status == "success" && d.CaptionResult != "" {
var captionResult interface{}
json.Unmarshal([]byte(d.CaptionResult), &captionResult)
item["caption_result"] = captionResult
}
if d.Status == "failed" {
item["caption_result"] = nil
item["fail_reason"] = d.FailReason
}
results = append(results, item)
}
payload := map[string]interface{}{
"task_id": task.TaskID,
"status": task.Status,
"total": task.Total,
"success_count": task.SuccessCount,
"failed_count": task.FailedCount,
"results": results,
}
body, _ := json.Marshal(payload)
g.Log().Infof(ctx, "[视频分析回调 %s] 状态=%s, 目标=%s", taskID, task.Status, callbackURL)
cbReq, _ := http.NewRequest("POST", callbackURL, bytes.NewReader(body))
cbReq.Header.Set("Content-Type", "application/json")
// 透传调用方用户信息
cbUser := getUserFromCtx(ctx)
userJSON, _ := json.Marshal(cbUser)
cbReq.Header.Set("X-User-Info", string(userJSON))
// 打印 curl 命令方便调试
escapedBody := strings.ReplaceAll(string(body), "'", "'\\''")
g.Log().Infof(ctx, "[视频分析回调 %s] curl 调试命令:\ncurl -X POST '%s' \\\n -H 'Content-Type: application/json' \\\n -H 'X-User-Info: %s' \\\n -d '%s'",
taskID, callbackURL, string(userJSON), escapedBody)
client := &http.Client{Timeout: 2 * time.Minute}
resp, reqErr := client.Do(cbReq)
if reqErr != nil {
g.Log().Errorf(ctx, "[视频分析回调 %s] 请求失败: %v", taskID, reqErr)
return
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
g.Log().Infof(ctx, "[视频分析回调 %s] 响应 status=%d, body=%s", taskID, resp.StatusCode, string(respBody))
}