Files
media/service/video/concat_service.go

682 lines
20 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package video
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
dao "media/dao/video"
dto "media/model/dto/video"
entity "media/model/entity/video"
"gitea.com/red-future/common/beans"
commonHttp "gitea.com/red-future/common/http"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/util/guid"
)
type concatService struct{}
// Concat 视频拼接服务单例
var Concat = new(concatService)
// ConcatReq 视频拼接请求
type ConcatReq struct {
VideoPaths []string // 视频文件路径列表(按此顺序拼接)
OutputPath string // 输出视频文件路径,空则自动生成
Method string // 拼接方式: auto/fast/reencode默认 auto
Upload bool // 是否上传到MinIO
}
// ConcatRes 视频拼接响应
type ConcatRes struct {
OutputPath string `json:"outputPath"` // 输出文件路径
FileSize int64 `json:"fileSize"` // 文件大小(bytes)
Duration float64 `json:"duration"` // 拼接后总时长(秒)
DurationStr string `json:"durationStr"` // 可读时长
MethodUsed string `json:"methodUsed"` // 实际使用的拼接方式
InputFiles int `json:"inputFiles"` // 输入文件数
FileURL string `json:"fileURL"` // MinIO访问地址上传后返回
}
// Concat 拼接多个视频为一个
func (s *concatService) Concat(ctx context.Context, req *ConcatReq) (res *ConcatRes, err error) {
g.Log().Infof(ctx, "[Concat] 服务层收到请求: videoPaths=%v, method=%s, upload=%v",
req.VideoPaths, req.Method, req.Upload)
if len(req.VideoPaths) < 2 {
return nil, fmt.Errorf("至少需要2个视频才能拼接")
}
// 校验所有视频文件存在
for i, p := range req.VideoPaths {
if _, err := os.Stat(p); os.IsNotExist(err) {
return nil, fmt.Errorf("第%d个视频文件不存在: %s", i+1, p)
}
}
ffmpegPath, err := s.getFFmpegPath()
if err != nil {
return nil, err
}
// 生成输出路径
outputPath := req.OutputPath
if outputPath == "" {
outputDir := filepath.Dir(req.VideoPaths[0])
// 用第一个输入文件名 + 拼接数 + 时间戳,溯源更清晰
baseName := filepath.Base(req.VideoPaths[0])
ext := filepath.Ext(baseName)
stem := strings.TrimSuffix(baseName, ext)
stemRunes := []rune(stem)
if len(stemRunes) > 20 {
stemRunes = stemRunes[:20]
}
outputPath = filepath.Join(outputDir,
fmt.Sprintf("concat_%s_x%d_%s%s", string(stemRunes), len(req.VideoPaths), time.Now().Format("150405"), ext))
}
method := req.Method
if method == "" {
method = "auto"
}
var methodUsed string
switch method {
case "fast":
// 无损拼接(要求同编码参数,速度快但可能黑屏)
err = s.concatByDemuxer(ctx, ffmpegPath, req.VideoPaths, outputPath)
methodUsed = "concat demuxer (无损)"
default:
// 重编码拼接(自动归一化分辨率/音频,兼容所有视频)
err = s.concatByFilter(ctx, ffmpegPath, req.VideoPaths, outputPath)
methodUsed = "concat filter (重编码)"
}
if err != nil {
return nil, fmt.Errorf("视频拼接失败: %v", err)
}
// 获取输出文件信息
stat, statErr := os.Stat(outputPath)
if statErr != nil {
return nil, fmt.Errorf("输出文件异常: %v", statErr)
}
// 获取时长
duration, _ := s.getVideoDuration(ctx, ffmpegPath, outputPath)
res = &ConcatRes{
OutputPath: outputPath,
FileSize: stat.Size(),
Duration: duration,
DurationStr: formatDuration(duration),
MethodUsed: methodUsed,
InputFiles: len(req.VideoPaths),
}
// 如果需要上传到 MinIO用独立 context避免 HTTP 断开后 ctx 被取消)
if req.Upload {
uploadCtx := context.Background()
uploadRes, uploadErr := s.UploadToMinIO(uploadCtx, outputPath)
if uploadErr != nil {
return nil, fmt.Errorf("上传到MinIO失败: %v", uploadErr)
}
res.FileURL = uploadRes.FileURL
}
return
}
// concatByDemuxer 使用 concat demuxer 无损拼接(要求同编码参数)
func (s *concatService) concatByDemuxer(ctx context.Context, ffmpegPath string, inputs []string, output string) error {
// 创建文件列表
fileListPath := filepath.Join(filepath.Dir(output), "concat_list.txt")
var lines []string
for _, p := range inputs {
lines = append(lines, fmt.Sprintf("file '%s'", p))
}
if err := os.WriteFile(fileListPath, []byte(strings.Join(lines, "\n")+"\n"), 0644); err != nil {
return fmt.Errorf("创建文件列表失败: %v", err)
}
defer os.Remove(fileListPath)
args := []string{
"-f", "concat",
"-safe", "0",
"-i", fileListPath,
"-c", "copy", // 直接复制流,不重编码
"-y",
output,
}
// 使用独立 context避免 HTTP 请求超时导致 ffmpeg 被 SIGKILL
bgCtx := context.Background()
cmd := exec.CommandContext(bgCtx, ffmpegPath, args...)
outputBytes, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("ffmpeg demuxer 失败: %v\n%s", err, string(outputBytes))
}
return nil
}
// concatByFilter 使用 concat filter 重编码拼接(自动归一化分辨率/音频参数)
func (s *concatService) concatByFilter(ctx context.Context, ffmpegPath string, inputs []string, output string) error {
n := len(inputs)
// 1. 获取所有视频的分辨率,确定统一输出尺寸
maxW, maxH := 0, 0
var inputMeta []struct{ w, h int }
for _, p := range inputs {
w, h, _ := s.getVideoResolution(ctx, ffmpegPath, p)
inputMeta = append(inputMeta, struct{ w, h int }{w, h})
if w > maxW {
maxW = w
}
if h > maxH {
maxH = h
}
}
// 保底
if maxW == 0 {
maxW = 1920
}
if maxH == 0 {
maxH = 1080
}
// 2. 构建输入参数
var inputArgs []string
for _, p := range inputs {
inputArgs = append(inputArgs, "-i", p)
}
// 3. 构建 filter_complex每个视频 scale+pad 到统一尺寸,然后 concat
var filterParts []string
for i := 0; i < n; i++ {
filterParts = append(filterParts, fmt.Sprintf(
"[%d:v]scale=%d:%d:force_original_aspect_ratio=decrease,pad=%d:%d:(ow-iw)/2:(oh-ih)/2,setsar=1,fps=30[v%d]",
i, maxW, maxH, maxW, maxH, i,
))
filterParts = append(filterParts, fmt.Sprintf(
"[%d:a]aresample=44100[a%d]",
i, i,
))
}
// 收集归一化后的流
var concatInputs []string
for i := 0; i < n; i++ {
concatInputs = append(concatInputs, fmt.Sprintf("[v%d][a%d]", i, i))
}
filterStr := fmt.Sprintf("%s;%sconcat=n=%d:v=1:a=1[outv][outa]",
strings.Join(filterParts, ";"),
strings.Join(concatInputs, ""), n)
outputDir := filepath.Dir(output)
args := append(inputArgs,
"-filter_complex", filterStr,
"-map", "[outv]",
"-map", "[outa]",
"-preset", "fast",
"-crf", "23",
"-y",
output,
)
// 调试:记录完整命令
g.Log().Debugf(ctx, "concat filter 命令: %s %v", ffmpegPath, args)
// 保存 filter graph 用于调试
filterFile := filepath.Join(outputDir, "concat_filter.txt")
os.WriteFile(filterFile, []byte(filterStr), 0644)
defer os.Remove(filterFile)
// 使用独立 context避免 HTTP 请求超时导致 ffmpeg 被 SIGKILL
bgCtx := context.Background()
cmd := exec.CommandContext(bgCtx, ffmpegPath, args...)
outputBytes, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("ffmpeg filter 失败: %v\n日志:\n%s", err, string(outputBytes))
}
return nil
}
// getVideoResolution 获取视频分辨率
func (s *concatService) getVideoResolution(ctx context.Context, ffmpegPath, videoPath string) (width, height int, err error) {
ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe")
if _, err := os.Stat(ffprobePath); os.IsNotExist(err) {
ffprobePath = "ffprobe"
}
cmd := exec.CommandContext(ctx, ffprobePath,
"-v", "error",
"-select_streams", "v:0",
"-show_entries", "stream=width,height",
"-of", "csv=p=0",
videoPath,
)
output, err := cmd.Output()
if err != nil {
return 0, 0, err
}
fmt.Sscanf(strings.TrimSpace(string(output)), "%d,%d", &width, &height)
return
}
// getVideoDuration 获取视频时长
func (s *concatService) getVideoDuration(ctx context.Context, ffmpegPath, videoPath string) (float64, error) {
ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe")
if _, err := os.Stat(ffprobePath); os.IsNotExist(err) {
ffprobePath = "ffprobe"
}
cmd := exec.CommandContext(ctx, ffprobePath,
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
videoPath,
)
output, err := cmd.Output()
if err != nil {
return 0, err
}
var duration float64
fmt.Sscanf(strings.TrimSpace(string(output)), "%f", &duration)
return duration, nil
}
func (s *concatService) getFFmpegPath() (string, error) {
ffmpegPath := g.Cfg().MustGet(context.Background(), "ffmpeg.path", "").String()
if ffmpegPath != "" {
if _, err := os.Stat(ffmpegPath); err == nil {
return ffmpegPath, nil
}
}
path, err := exec.LookPath("ffmpeg")
if err != nil {
return "", fmt.Errorf("未找到 ffmpeg")
}
return path, nil
}
func formatDuration(seconds float64) string {
h := int(seconds) / 3600
m := (int(seconds) % 3600) / 60
s := int(seconds) % 60
return fmt.Sprintf("%02d:%02d:%02d", h, m, s)
}
// uploadFileRes 上传文件响应
type uploadFileRes struct {
FileURL string `json:"fileURL" dc:"上传地址"`
FileSize int `json:"fileSize" dc:"文件大小"`
FileName string `json:"fileName" dc:"文件名称"`
FileFormat string `json:"fileFormat" dc:"文件格式"`
FileAddressPrefix string `json:"fileAddressPrefix"`
}
// UploadToMinIO 通过 OSS 微服务的 uploadFile 接口上传到 MinIOmultipart/form-data
func (s *concatService) UploadToMinIO(ctx context.Context, localFilePath string) (*uploadFileRes, error) {
// 构建 multipart/form-data 表单
var buf bytes.Buffer
mw := multipart.NewWriter(&buf)
file, err := os.Open(localFilePath)
if err != nil {
return nil, fmt.Errorf("打开文件失败: %v", err)
}
defer file.Close()
fw, err := mw.CreateFormFile("file", filepath.Base(localFilePath))
if err != nil {
return nil, fmt.Errorf("创建表单文件字段失败: %v", err)
}
if _, err = io.Copy(fw, file); err != nil {
return nil, fmt.Errorf("写入文件内容失败: %v", err)
}
mw.Close()
// 使用 commonHttp 的客户端(含 Consul 服务发现),大文件上传设置长超时
client := commonHttp.Httpclient.Clone()
// 必须单独设置 Transport.ResponseHeaderTimeoutSetTimeout 只设 Client.Timeout
newTransport := http.DefaultTransport.(*http.Transport).Clone()
newTransport.ResponseHeaderTimeout = 5 * time.Minute
client.Transport = newTransport
client.SetTimeout(10 * time.Minute)
// 透传认证 headers
hasAuthHeader := false
if r := g.RequestFromCtx(ctx); r != nil {
for k, v := range r.Header {
client.SetHeader(k, v[0])
if strings.EqualFold(k, "Authorization") || strings.EqualFold(k, "X-User-Info") {
hasAuthHeader = true
}
}
}
// 原始请求无认证信息时,注入默认用户上下文
if !hasAuthHeader {
userJSON, _ := json.Marshal(beans.User{UserName: "admin", TenantId: 1})
client.SetHeader("X-User-Info", string(userJSON))
}
// 设置 multipart Content-Type含 boundary
contentType := mw.FormDataContentType()
client.SetHeader("Content-Type", contentType)
g.Log().Debugf(ctx, "[UploadToMinIO] 请求URL: oss/file/uploadFile, 文件: %s, Body大小: %d bytes",
localFilePath, buf.Len())
// 发送 multipart 请求(原始字节流)
response, err := client.Post(ctx, "oss/file/uploadFile", buf.Bytes())
if err != nil {
glog.Error(ctx, err)
return nil, fmt.Errorf("调用OSS上传服务失败: %v", err)
}
defer response.Close()
body := response.ReadAll()
// 调试:打印原始响应
g.Log().Debugf(ctx, "[UploadToMinIO] OSS原始响应: %s", string(body))
// 解析标准 GoFrame 响应格式 {code, message, data}
var apiResp struct {
Code int `json:"code"`
Message string `json:"message"`
Data *uploadFileRes `json:"data"`
}
if err = json.Unmarshal(body, &apiResp); err != nil {
return nil, fmt.Errorf("响应解析失败: %v", err)
}
if apiResp.Code != 200 && apiResp.Code != 0 {
return nil, fmt.Errorf("OSS上传失败: %s", apiResp.Message)
}
g.Log().Infof(ctx, "[UploadToMinIO] 上传成功 fileURL=%s size=%d", apiResp.Data.FileURL, apiResp.Data.FileSize)
return apiResp.Data, nil
}
// CleanupConcat 清理输入视频文件
func CleanupConcat(paths []string) {
for _, p := range paths {
os.Remove(p)
}
}
// ---------- 异步拼接任务管理 ----------
// CreateAsyncTask 创建异步拼接任务URL模式返回 taskId后台处理
func (s *concatService) CreateAsyncTask(ctx context.Context, videoURLs []string, method string, upload bool, callbackURL string) (string, error) {
if len(videoURLs) < 2 {
return "", fmt.Errorf("至少需要2个视频才能拼接")
}
taskID := "concat_" + guid.S()
task := &entity.ConcatTask{
TaskID: taskID,
Status: "pending",
MethodUsed: method,
}
if _, err := dao.ConcatTask.Insert(ctx, task); err != nil {
return "", fmt.Errorf("创建任务失败: %v", err)
}
g.Log().Infof(ctx, "[异步拼接] 创建任务 %s, 视频数=%d, 回调=%s", taskID, len(videoURLs), callbackURL)
// 异步处理:先下载再拼接
go s.processAsyncTask(taskID, videoURLs, method, upload, callbackURL)
return taskID, nil
}
// CreateAsyncTaskWithFiles 创建异步拼接任务(文件上传模式),直接处理本地文件
func (s *concatService) CreateAsyncTaskWithFiles(ctx context.Context, filePaths []string, method string, upload bool, callbackURL string) (string, error) {
if len(filePaths) < 2 {
return "", fmt.Errorf("至少需要2个视频才能拼接")
}
taskID := "concat_" + guid.S()
task := &entity.ConcatTask{
TaskID: taskID,
Status: "pending",
MethodUsed: method,
}
if _, err := dao.ConcatTask.Insert(ctx, task); err != nil {
return "", fmt.Errorf("创建任务失败: %v", err)
}
g.Log().Infof(ctx, "[异步拼接-文件] 创建任务 %s, 文件数=%d, 回调=%s", taskID, len(filePaths), callbackURL)
// 异步处理:已有本地文件,直接拼接
go s.processAsyncTaskWithFiles(taskID, filePaths, method, upload, callbackURL)
return taskID, nil
}
// GetTaskResult 查询异步任务结果
func (s *concatService) GetTaskResult(ctx context.Context, taskID string) (*dto.GetConcatTaskRes, error) {
task, err := dao.ConcatTask.GetByTaskID(ctx, taskID)
if err != nil {
return nil, fmt.Errorf("查询任务失败: %v", err)
}
if task == nil {
return nil, fmt.Errorf("任务不存在: %s", taskID)
}
return dao.EntityToTaskRes(task), nil
}
// processAsyncTaskWithFiles 后台处理异步拼接任务(文件上传模式,文件已在本地)
func (s *concatService) processAsyncTaskWithFiles(taskID string, filePaths []string, method string, upload bool, callbackURL string) {
bgCtx := context.Background()
bgCtx = context.WithValue(bgCtx, "user", &beans.User{UserName: "admin", TenantId: 1})
dao.ConcatTask.UpdateRunning(bgCtx, taskID)
defer func() {
if r := recover(); r != nil {
errMsg := fmt.Sprintf("异步拼接异常: %v", r)
g.Log().Errorf(bgCtx, "[异步拼接 %s] %s", taskID, errMsg)
dao.ConcatTask.UpdateError(bgCtx, taskID, errMsg)
s.concatCallback(bgCtx, taskID, callbackURL)
}
}()
concatErr := s.executeConcat(bgCtx, taskID, filePaths, method, upload)
if concatErr != nil {
dao.ConcatTask.UpdateError(bgCtx, taskID, concatErr.Error())
s.concatCallback(bgCtx, taskID, callbackURL)
return
}
g.Log().Infof(bgCtx, "[异步拼接 %s] 完成", taskID)
if callbackURL != "" {
s.concatCallback(bgCtx, taskID, callbackURL)
}
}
// processAsyncTask 后台处理异步拼接任务URL模式需要先下载
func (s *concatService) processAsyncTask(taskID string, videoURLs []string, method string, upload bool, callbackURL string) {
bgCtx := context.Background()
bgCtx = context.WithValue(bgCtx, "user", &beans.User{UserName: "admin", TenantId: 1})
dao.ConcatTask.UpdateRunning(bgCtx, taskID)
defer func() {
if r := recover(); r != nil {
errMsg := fmt.Sprintf("异步拼接异常: %v", r)
g.Log().Errorf(bgCtx, "[异步拼接 %s] %s", taskID, errMsg)
dao.ConcatTask.UpdateError(bgCtx, taskID, errMsg)
s.concatCallback(bgCtx, taskID, callbackURL)
}
}()
// 下载视频
var savePaths []string
tempDir := g.Cfg().MustGet(bgCtx, "ffmpeg.temp_dir", "resource/temp").String()
os.MkdirAll(tempDir, 0755)
for _, videoURL := range videoURLs {
savePath, dlErr := downloadFile(bgCtx, videoURL, tempDir)
if dlErr != nil {
g.Log().Warningf(bgCtx, "[异步拼接 %s] 下载失败 %s: %v", taskID, videoURL, dlErr)
continue
}
savePaths = append(savePaths, savePath)
}
if len(savePaths) < 2 {
errMsg := fmt.Sprintf("成功下载的视频不足2个(共%d)", len(videoURLs))
dao.ConcatTask.UpdateError(bgCtx, taskID, errMsg)
CleanupConcat(savePaths)
s.concatCallback(bgCtx, taskID, callbackURL)
return
}
// 执行拼接
concatErr := s.executeConcat(bgCtx, taskID, savePaths, method, upload)
CleanupConcat(savePaths)
if concatErr != nil {
dao.ConcatTask.UpdateError(bgCtx, taskID, concatErr.Error())
s.concatCallback(bgCtx, taskID, callbackURL)
return
}
g.Log().Infof(bgCtx, "[异步拼接 %s] 完成", taskID)
if callbackURL != "" {
s.concatCallback(bgCtx, taskID, callbackURL)
}
}
// executeConcat 执行拼接并更新任务状态,返回输出路径
func (s *concatService) executeConcat(ctx context.Context, taskID string, filePaths []string, method string, upload bool) error {
tempDir := filepath.Dir(filePaths[0])
outputPath := filepath.Join(tempDir,
fmt.Sprintf("concat_%s_x%d_%s.mp4", taskID, len(filePaths), time.Now().Format("150405")))
res, concatErr := s.Concat(ctx, &ConcatReq{
VideoPaths: filePaths,
OutputPath: outputPath,
Method: method,
Upload: upload,
})
if concatErr != nil {
os.Remove(outputPath)
return concatErr
}
// 更新数据库为成功
fileName := filepath.Base(outputPath)
fileFormat := ""
if idx := strings.LastIndex(fileName, "."); idx > 0 {
fileFormat = fileName[idx+1:]
}
dao.ConcatTask.UpdateSuccess(ctx, taskID,
res.FileURL, res.FileSize, fileName, fileFormat,
"", res.MethodUsed, res.DurationStr)
os.Remove(outputPath)
return nil
}
// concatCallback 回调通知(从数据库读取任务结果发送)
func (s *concatService) concatCallback(ctx context.Context, taskID, callbackURL string) {
if callbackURL == "" {
return
}
task, err := dao.ConcatTask.GetByTaskID(ctx, taskID)
if err != nil || task == nil {
g.Log().Errorf(ctx, "[异步拼接回调 %s] 查询任务失败: %v", taskID, err)
return
}
payload := map[string]interface{}{
"taskId": task.TaskID,
"status": task.Status,
}
if task.Status == "success" {
payload["fileURL"] = task.FileURL
payload["fileSize"] = task.FileSize
}
if task.Status == "failed" {
payload["errorMessage"] = task.ErrorMessage
}
body, _ := json.Marshal(payload)
g.Log().Infof(ctx, "[异步拼接回调 %s] 状态=%s, 目标=%s", taskID, task.Status, callbackURL)
req, _ := http.NewRequest("POST", callbackURL, bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
userJSON, _ := json.Marshal(beans.User{UserName: "admin", TenantId: 1})
req.Header.Set("X-User-Info", string(userJSON))
client := &http.Client{Timeout: 30 * time.Second}
resp, reqErr := client.Do(req)
if reqErr != nil {
g.Log().Errorf(ctx, "[异步拼接回调 %s] 请求失败: %v", taskID, reqErr)
return
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
g.Log().Infof(ctx, "[异步拼接回调 %s] 响应 status=%d, body=%s", taskID, resp.StatusCode, string(respBody))
}
// downloadFile 下载文件到临时目录
func downloadFile(ctx context.Context, rawURL, tempDir string) (string, error) {
parsedURL, err := url.Parse(rawURL)
if err != nil {
return "", err
}
segments := strings.Split(parsedURL.Path, "/")
fileName := segments[len(segments)-1]
if fileName == "" {
fileName = fmt.Sprintf("video_%d.mp4", time.Now().UnixMilli())
}
savePath := filepath.Join(tempDir, fmt.Sprintf("%d_%s", time.Now().UnixMilli(), fileName))
client := &http.Client{Timeout: 10 * time.Minute}
resp, err := client.Get(rawURL)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("HTTP %d", resp.StatusCode)
}
out, err := os.Create(savePath)
if err != nil {
return "", err
}
defer out.Close()
_, err = io.Copy(out, resp.Body)
if err != nil {
os.Remove(savePath)
return "", err
}
return savePath, nil
}