383 lines
12 KiB
Go
383 lines
12 KiB
Go
package video
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"io"
|
||
"mime/multipart"
|
||
"net/http"
|
||
"os"
|
||
"path/filepath"
|
||
"strings"
|
||
"time"
|
||
|
||
analysisDao "media/dao/video"
|
||
dto "media/model/dto/video"
|
||
entity "media/model/entity/video"
|
||
|
||
"gitea.com/red-future/common/beans"
|
||
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/gogf/gf/v2/util/guid"
|
||
)
|
||
|
||
type analysisService struct{}
|
||
|
||
// Analysis 视频分析服务单例
|
||
var Analysis = new(analysisService)
|
||
|
||
// ---------- 异步任务管理 ----------
|
||
|
||
// CreateAsyncTask 创建异步分析任务,返回 taskId,后台串行处理
|
||
func (s *analysisService) CreateAsyncTask(ctx context.Context, videoURLs []string, callbackURL string) (string, error) {
|
||
if len(videoURLs) == 0 {
|
||
return "", fmt.Errorf("视频URL列表不能为空")
|
||
}
|
||
|
||
taskID := "anl_" + guid.S()
|
||
task := &entity.AnalysisTask{
|
||
TaskID: taskID,
|
||
CallbackURL: callbackURL,
|
||
Status: "pending",
|
||
Total: len(videoURLs),
|
||
}
|
||
if _, err := analysisDao.AnalysisTask.Insert(ctx, task); err != nil {
|
||
return "", fmt.Errorf("创建任务失败: %v", err)
|
||
}
|
||
|
||
// 批量创建明细
|
||
if err := analysisDao.AnalysisTaskDetail.BatchInsert(ctx, taskID, videoURLs); err != nil {
|
||
return "", fmt.Errorf("创建任务明细失败: %v", err)
|
||
}
|
||
|
||
// 提取调用方用户信息,传给 goroutine
|
||
user := getUserFromCtx(ctx)
|
||
|
||
g.Log().Infof(ctx, "[视频分析] 创建任务 %s, 视频数=%d, 回调=%s", taskID, len(videoURLs), callbackURL)
|
||
|
||
// 异步处理
|
||
go s.processAsyncTask(user, taskID, videoURLs, callbackURL)
|
||
|
||
return taskID, nil
|
||
}
|
||
|
||
// GetTaskResult 查询异步任务结果
|
||
func (s *analysisService) GetTaskResult(ctx context.Context, taskID string) (*dto.GetAnalysisTaskRes, error) {
|
||
task, err := analysisDao.AnalysisTask.GetByTaskID(ctx, taskID)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("查询任务失败: %v", err)
|
||
}
|
||
if task == nil {
|
||
return nil, fmt.Errorf("任务不存在: %s", taskID)
|
||
}
|
||
|
||
details, err := analysisDao.AnalysisTaskDetail.GetByTaskID(ctx, taskID)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("查询任务明细失败: %v", err)
|
||
}
|
||
|
||
return analysisDao.AnalysisEntityToTaskRes(task, details), nil
|
||
}
|
||
|
||
// processAsyncTask 后台串行处理异步分析任务
|
||
func (s *analysisService) processAsyncTask(user *beans.User, taskID string, videoURLs []string, callbackURL string) {
|
||
bgCtx := context.Background()
|
||
bgCtx = context.WithValue(bgCtx, "user", user)
|
||
|
||
analysisDao.AnalysisTask.UpdateProcessing(bgCtx, taskID)
|
||
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
errMsg := fmt.Sprintf("视频分析异常: %v", r)
|
||
g.Log().Errorf(bgCtx, "[视频分析 %s] %s", taskID, errMsg)
|
||
analysisDao.AnalysisTask.UpdateError(bgCtx, taskID, errMsg)
|
||
s.analysisCallback(bgCtx, taskID, callbackURL)
|
||
}
|
||
}()
|
||
|
||
successCount := 0
|
||
failedCount := 0
|
||
|
||
// 逐个串行处理视频
|
||
for _, videoURL := range videoURLs {
|
||
g.Log().Infof(bgCtx, "[视频分析 %s] 开始处理视频: %s", taskID, videoURL)
|
||
|
||
// 1. 下载视频到永久存储目录
|
||
savePath, dlErr := s.downloadVideo(bgCtx, taskID, videoURL)
|
||
if dlErr != nil {
|
||
errMsg := fmt.Sprintf("下载视频失败 %s: %v", videoURL, dlErr)
|
||
g.Log().Errorf(bgCtx, "[视频分析 %s] %s", taskID, errMsg)
|
||
analysisDao.AnalysisTaskDetail.UpdateError(bgCtx, taskID, videoURL, errMsg)
|
||
failedCount++
|
||
analysisDao.AnalysisTask.UpdateProgress(bgCtx, taskID, successCount, failedCount)
|
||
continue
|
||
}
|
||
|
||
g.Log().Infof(bgCtx, "[视频分析 %s] 视频下载完成: %s -> %s", taskID, videoURL, savePath)
|
||
|
||
// 2. 调用第三方 caption 接口(一次一个视频,form-data 上传)
|
||
captionResult, cpErr := s.callCaptionAPI(bgCtx, savePath)
|
||
if cpErr != nil {
|
||
errMsg := fmt.Sprintf("调用Caption接口失败 %s: %v", videoURL, cpErr)
|
||
g.Log().Errorf(bgCtx, "[视频分析 %s] %s", taskID, errMsg)
|
||
analysisDao.AnalysisTaskDetail.UpdateError(bgCtx, taskID, videoURL, errMsg)
|
||
failedCount++
|
||
analysisDao.AnalysisTask.UpdateProgress(bgCtx, taskID, successCount, failedCount)
|
||
continue
|
||
}
|
||
|
||
g.Log().Infof(bgCtx, "[视频分析 %s] Caption接口调用成功: %s", taskID, videoURL)
|
||
|
||
// 3. 保存结果到数据库(视频不删除,永久保留)
|
||
captionJSON, _ := json.Marshal(captionResult)
|
||
analysisDao.AnalysisTaskDetail.UpdateSuccess(bgCtx, taskID, videoURL, savePath, string(captionJSON))
|
||
successCount++
|
||
analysisDao.AnalysisTask.UpdateProgress(bgCtx, taskID, successCount, failedCount)
|
||
|
||
g.Log().Infof(bgCtx, "[视频分析 %s] 视频处理完成: %s (成功=%d, 失败=%d)", taskID, videoURL, successCount, failedCount)
|
||
}
|
||
|
||
// 更新任务最终状态
|
||
if failedCount == 0 {
|
||
analysisDao.AnalysisTask.UpdateSuccess(bgCtx, taskID, successCount, failedCount)
|
||
} else if successCount == 0 {
|
||
analysisDao.AnalysisTask.UpdateError(bgCtx, taskID, fmt.Sprintf("所有视频处理失败(共%d个)", len(videoURLs)))
|
||
} else {
|
||
analysisDao.AnalysisTask.UpdateSuccess(bgCtx, taskID, successCount, failedCount)
|
||
}
|
||
|
||
g.Log().Infof(bgCtx, "[视频分析 %s] 任务完成, 总视频=%d, 成功=%d, 失败=%d", taskID, len(videoURLs), successCount, failedCount)
|
||
|
||
// 回调通知
|
||
if callbackURL != "" {
|
||
s.analysisCallback(bgCtx, taskID, callbackURL)
|
||
}
|
||
}
|
||
|
||
// downloadVideo 下载视频到永久存储目录
|
||
func (s *analysisService) downloadVideo(ctx context.Context, taskID string, videoURL string) (string, error) {
|
||
// 从配置获取视频永久存储目录
|
||
videoDir := g.Cfg().MustGet(ctx, "analysis.video_dir", "resource/videos").String()
|
||
if !filepath.IsAbs(videoDir) {
|
||
absDir, _ := filepath.Abs(videoDir)
|
||
videoDir = absDir
|
||
}
|
||
// 按 taskId 子目录组织
|
||
taskDir := filepath.Join(videoDir, taskID)
|
||
os.MkdirAll(taskDir, 0755)
|
||
|
||
// 从URL提取文件名
|
||
segments := strings.Split(videoURL, "/")
|
||
fileName := segments[len(segments)-1]
|
||
if fileName == "" {
|
||
fileName = fmt.Sprintf("video_%d.mp4", time.Now().UnixMilli())
|
||
}
|
||
savePath := filepath.Join(taskDir, fileName)
|
||
|
||
client := &http.Client{Timeout: 10 * time.Minute}
|
||
resp, err := client.Get(videoURL)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != http.StatusOK {
|
||
return "", fmt.Errorf("HTTP %d", resp.StatusCode)
|
||
}
|
||
|
||
out, err := os.Create(savePath)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
defer out.Close()
|
||
|
||
_, err = io.Copy(out, resp.Body)
|
||
if err != nil {
|
||
os.Remove(savePath)
|
||
return "", err
|
||
}
|
||
|
||
return savePath, nil
|
||
}
|
||
|
||
// callCaptionAPI 调用第三方 caption 接口(form-data 上传单个视频)
|
||
// 根据配置 analysis.mock_caption 决定使用 mock 数据还是真实调用
|
||
func (s *analysisService) callCaptionAPI(ctx context.Context, videoPath string) (map[string]interface{}, error) {
|
||
if g.Cfg().MustGet(ctx, "analysis.mock_caption", true).Bool() {
|
||
return s.mockCallCaptionAPI(ctx, videoPath)
|
||
}
|
||
return s.realCallCaptionAPI(ctx, videoPath)
|
||
}
|
||
|
||
// mockCallCaptionAPI 返回 mock 的 caption 数据
|
||
func (s *analysisService) mockCallCaptionAPI(ctx context.Context, videoPath string) (map[string]interface{}, error) {
|
||
g.Log().Infof(ctx, "[呼叫Caption-Mock] 使用mock数据, 文件=%s", videoPath)
|
||
|
||
return map[string]interface{}{
|
||
"caption": "Scene: 视频展示了一个产品演示和讲解过程。Events: 一个人走进画面开始介绍产品, 展示了产品的各个功能模块, 最后总结产品优势。",
|
||
"scene": "这是一个产品演示和讲解的场景。视频中有人在画面中介绍一款产品,展示了产品的各个功能模块和使用方式,包括产品的外观、核心功能、操作界面等。视频整体节奏适中,配合专业的产品讲解。",
|
||
"events": []map[string]interface{}{
|
||
{
|
||
"start": 0.0,
|
||
"end": 5.0,
|
||
"description": "视频开场,人物走进画面,开始打招呼并介绍本次演示的主题",
|
||
},
|
||
{
|
||
"start": 5.5,
|
||
"end": 15.0,
|
||
"description": "展示产品外观包装,详细说明产品设计理念和特点",
|
||
},
|
||
{
|
||
"start": 15.5,
|
||
"end": 30.0,
|
||
"description": "开机演示,展示产品主界面和核心功能入口",
|
||
},
|
||
{
|
||
"start": 30.5,
|
||
"end": 50.0,
|
||
"description": "详细演示产品主要功能模块的操作流程和使用方法",
|
||
},
|
||
},
|
||
}, nil
|
||
}
|
||
|
||
// realCallCaptionAPI 真实调用第三方 caption 接口(form-data 上传单个视频)
|
||
func (s *analysisService) realCallCaptionAPI(ctx context.Context, videoPath string) (map[string]interface{}, error) {
|
||
// 获取 caption 服务地址
|
||
captionURL := g.Cfg().MustGet(ctx, "analysis.caption_url", "http://192.168.3.49:8900/caption").String()
|
||
|
||
// 构建 multipart/form-data 表单
|
||
var buf bytes.Buffer
|
||
mw := multipart.NewWriter(&buf)
|
||
|
||
// 添加视频文件字段
|
||
file, err := os.Open(videoPath)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("打开视频文件失败: %v", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
fw, err := mw.CreateFormFile("video", filepath.Base(videoPath))
|
||
if err != nil {
|
||
return nil, fmt.Errorf("创建表单文件字段失败: %v", err)
|
||
}
|
||
if _, err = io.Copy(fw, file); err != nil {
|
||
return nil, fmt.Errorf("写入文件内容失败: %v", err)
|
||
}
|
||
|
||
// 添加 max_new_tokens 参数
|
||
if err = mw.WriteField("max_new_tokens", "2048"); err != nil {
|
||
return nil, fmt.Errorf("写入表单字段失败: %v", err)
|
||
}
|
||
mw.Close()
|
||
|
||
// 发送请求(长超时,caption 接口耗时较长)
|
||
client := &http.Client{Timeout: 30 * time.Minute}
|
||
req, err := http.NewRequest("POST", captionURL, &buf)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("创建请求失败: %v", err)
|
||
}
|
||
req.Header.Set("Content-Type", mw.FormDataContentType())
|
||
|
||
g.Log().Debugf(ctx, "[呼叫Caption] 请求URL=%s, 文件=%s", captionURL, videoPath)
|
||
|
||
resp, err := client.Do(req)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("请求Caption接口失败: %v", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
body, err := io.ReadAll(resp.Body)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("读取Caption响应失败: %v", err)
|
||
}
|
||
|
||
if resp.StatusCode != http.StatusOK {
|
||
return nil, fmt.Errorf("Caption接口返回非200: %d, body=%s", resp.StatusCode, string(body))
|
||
}
|
||
|
||
// 解析响应 JSON
|
||
var result map[string]interface{}
|
||
if err = json.Unmarshal(body, &result); err != nil {
|
||
return nil, fmt.Errorf("解析Caption响应JSON失败: %v, body=%s", err, string(body))
|
||
}
|
||
|
||
g.Log().Debugf(ctx, "[呼叫Caption] 响应成功, 文件=%s, 结果长度=%d", videoPath, len(body))
|
||
|
||
return result, nil
|
||
}
|
||
|
||
// analysisCallback 回调通知
|
||
func (s *analysisService) analysisCallback(ctx context.Context, taskID, callbackURL string) {
|
||
if callbackURL == "" {
|
||
return
|
||
}
|
||
|
||
task, err := analysisDao.AnalysisTask.GetByTaskID(ctx, taskID)
|
||
if err != nil || task == nil {
|
||
g.Log().Errorf(ctx, "[视频分析回调 %s] 查询任务失败: %v", taskID, err)
|
||
return
|
||
}
|
||
|
||
details, err := analysisDao.AnalysisTaskDetail.GetByTaskID(ctx, taskID)
|
||
if err != nil {
|
||
g.Log().Errorf(ctx, "[视频分析回调 %s] 查询明细失败: %v", taskID, err)
|
||
return
|
||
}
|
||
|
||
// 构造回调结果
|
||
var results []map[string]interface{}
|
||
for _, d := range details {
|
||
item := map[string]interface{}{
|
||
"video_url": d.VideoURL,
|
||
"video_save_path": d.VideoSavePath,
|
||
}
|
||
if d.Status == "success" && d.CaptionResult != "" {
|
||
var captionResult interface{}
|
||
json.Unmarshal([]byte(d.CaptionResult), &captionResult)
|
||
item["caption_result"] = captionResult
|
||
}
|
||
if d.Status == "failed" {
|
||
item["caption_result"] = nil
|
||
item["fail_reason"] = d.FailReason
|
||
}
|
||
results = append(results, item)
|
||
}
|
||
|
||
payload := map[string]interface{}{
|
||
"task_id": task.TaskID,
|
||
"status": task.Status,
|
||
"total": task.Total,
|
||
"success_count": task.SuccessCount,
|
||
"failed_count": task.FailedCount,
|
||
"results": results,
|
||
}
|
||
|
||
body, _ := json.Marshal(payload)
|
||
g.Log().Infof(ctx, "[视频分析回调 %s] 状态=%s, 目标=%s", taskID, task.Status, callbackURL)
|
||
|
||
cbReq, _ := http.NewRequest("POST", callbackURL, bytes.NewReader(body))
|
||
cbReq.Header.Set("Content-Type", "application/json")
|
||
// 透传调用方用户信息
|
||
cbUser := getUserFromCtx(ctx)
|
||
userJSON, _ := json.Marshal(cbUser)
|
||
cbReq.Header.Set("X-User-Info", string(userJSON))
|
||
|
||
// 打印 curl 命令方便调试
|
||
escapedBody := strings.ReplaceAll(string(body), "'", "'\\''")
|
||
g.Log().Infof(ctx, "[视频分析回调 %s] curl 调试命令:\ncurl -X POST '%s' \\\n -H 'Content-Type: application/json' \\\n -H 'X-User-Info: %s' \\\n -d '%s'",
|
||
taskID, callbackURL, string(userJSON), escapedBody)
|
||
|
||
client := &http.Client{Timeout: 2 * time.Minute}
|
||
resp, reqErr := client.Do(cbReq)
|
||
if reqErr != nil {
|
||
g.Log().Errorf(ctx, "[视频分析回调 %s] 请求失败: %v", taskID, reqErr)
|
||
return
|
||
}
|
||
defer resp.Body.Close()
|
||
respBody, _ := io.ReadAll(resp.Body)
|
||
g.Log().Infof(ctx, "[视频分析回调 %s] 响应 status=%d, body=%s", taskID, resp.StatusCode, string(respBody))
|
||
}
|