From 036b5cec370fa6ecedbe515120b8d8ab2e876953 Mon Sep 17 00:00:00 2001 From: lmk <1095689763@qq.com> Date: Fri, 22 May 2026 13:18:55 +0800 Subject: [PATCH] =?UTF-8?q?=E7=94=9F=E6=88=90=E8=A7=86=E9=A2=91=E5=B9=B6?= =?UTF-8?q?=E4=B8=94=E4=B8=8A=E4=BC=A0=E5=88=B0minio-=E5=BC=82=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- controller/video/concat_controller.go | 55 +++++ dao/video/concat_task_dao.go | 112 ++++++++++ model/dto/video/video_dto.go | 47 +++++ model/entity/video/concat_task.go | 48 +++++ service/video/concat_service.go | 282 +++++++++++++++++++++++++- sql/concat_task.sql | 39 ++++ 6 files changed, 579 insertions(+), 4 deletions(-) create mode 100644 dao/video/concat_task_dao.go create mode 100644 model/entity/video/concat_task.go create mode 100644 sql/concat_task.sql diff --git a/controller/video/concat_controller.go b/controller/video/concat_controller.go index b62590f..fc239c1 100644 --- a/controller/video/concat_controller.go +++ b/controller/video/concat_controller.go @@ -26,6 +26,9 @@ var Concat = new(video) // Concat 视频拼接(URL模式) POST /video/concat func (c *video) Concat(ctx context.Context, req *dto.ConcatReq) (res *dto.ConcatRes, err error) { ctx = withUser(ctx) + g.Log().Infof(ctx, "[视频拼接] 收到请求 入参: method=%s, upload=%v, video_urls=%v", + req.Method, req.Upload, req.VideoURLs) + if req.Method == "" { req.Method = "auto" } @@ -44,12 +47,33 @@ func (c *video) Concat(ctx context.Context, req *dto.ConcatReq) (res *dto.Concat if err != nil { return nil, err } + + defer os.Remove(svcRes.OutputPath) return toDTORes(svcRes), nil } +// ConcatAsync 视频拼接-异步(URL模式) POST /video/concat/async +func (c *video) ConcatAsync(ctx context.Context, req *dto.ConcatAsyncReq) (res *dto.CreateConcatTaskRes, err error) { + ctx = withUser(ctx) + g.Log().Infof(ctx, "[视频拼接-异步] 收到请求 入参: method=%s, upload=%v, callback=%s, video_urls=%v", + req.Method, req.Upload, req.CallbackURL, req.VideoURLs) + + if req.Method == "" { + req.Method = "auto" + } + + taskID, taskErr := service.Concat.CreateAsyncTask(ctx, req.VideoURLs, req.Method, req.Upload, req.CallbackURL) + if taskErr != nil { + return nil, taskErr + } + return &dto.CreateConcatTaskRes{TaskID: taskID}, nil +} + // ConcatUpload 视频拼接(文件上传模式) POST /video/concat/upload func (c *video) ConcatUpload(ctx context.Context, req *dto.ConcatUploadReq) (res *dto.ConcatRes, err error) { ctx = withUser(ctx) + g.Log().Infof(ctx, "[视频拼接-上传] 收到请求 入参: method=%s, upload=%v", req.Method, req.Upload) + savePaths, err := common.SaveUploadedFilesFromCtx(ctx) if err != nil || len(savePaths) < 2 { return nil, fmt.Errorf("至少需要2个视频,当前%d个", len(savePaths)) @@ -68,9 +92,40 @@ func (c *video) ConcatUpload(ctx context.Context, req *dto.ConcatUploadReq) (res if err != nil { return nil, err } + + defer os.Remove(svcRes.OutputPath) return toDTORes(svcRes), nil } +// ConcatUploadAsync 视频拼接-异步(文件上传模式) POST /video/concat/upload/async +func (c *video) ConcatUploadAsync(ctx context.Context, req *dto.ConcatUploadAsyncReq) (res *dto.CreateConcatTaskRes, err error) { + ctx = withUser(ctx) + g.Log().Infof(ctx, "[视频拼接-上传-异步] 收到请求 入参: method=%s, upload=%v, callback=%s", + req.Method, req.Upload, req.CallbackURL) + + savePaths, err := common.SaveUploadedFilesFromCtx(ctx) + if err != nil || len(savePaths) < 2 { + return nil, fmt.Errorf("至少需要2个视频,当前%d个", len(savePaths)) + } + defer service.CleanupConcat(savePaths) + + if req.Method == "" { + req.Method = "auto" + } + + taskID, taskErr := service.Concat.CreateAsyncTaskWithFiles(ctx, savePaths, req.Method, req.Upload, req.CallbackURL) + if taskErr != nil { + return nil, taskErr + } + return &dto.CreateConcatTaskRes{TaskID: taskID}, nil +} + +// GetConcatTask 查询异步拼接任务结果 GET /video/concat/task/{taskId} +func (c *video) GetConcatTask(ctx context.Context, req *dto.GetConcatTaskReq) (res *dto.GetConcatTaskRes, err error) { + ctx = withUser(ctx) + return service.Concat.GetTaskResult(ctx, req.TaskID) +} + // withUser 为 context 注入默认用户(无认证基础设施时使用) func withUser(ctx context.Context) context.Context { if ctx.Value("user") == nil { diff --git a/dao/video/concat_task_dao.go b/dao/video/concat_task_dao.go new file mode 100644 index 0000000..cccd4e3 --- /dev/null +++ b/dao/video/concat_task_dao.go @@ -0,0 +1,112 @@ +package video + +import ( + "context" + "time" + + dto "media/model/dto/video" + entity "media/model/entity/video" + + "gitea.com/red-future/common/db/gfdb" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/gconv" +) + +var ConcatTask = new(concatTaskDao) + +type concatTaskDao struct{} + +const concatTaskTable = "concat_task" + +// Insert 创建任务 +func (d *concatTaskDao) Insert(ctx context.Context, data *entity.ConcatTask) (id int64, err error) { + r, err := gfdb.DB(ctx).Model(ctx, concatTaskTable). + Data(data). + Insert() + if err != nil { + return 0, err + } + return r.LastInsertId() +} + +// GetByTaskID 根据taskId查询任务 +func (d *concatTaskDao) GetByTaskID(ctx context.Context, taskID string) (res *entity.ConcatTask, err error) { + r, err := gfdb.DB(ctx).Model(ctx, concatTaskTable). + Where(entity.ConcatTaskCols.TaskID, taskID). + One() + if err != nil { + return nil, err + } + if r == nil { + return nil, nil + } + err = r.Struct(&res) + return +} + +// UpdateRunning 更新为运行中 +func (d *concatTaskDao) UpdateRunning(ctx context.Context, taskID string) error { + _, err := gfdb.DB(ctx).Model(ctx, concatTaskTable). + Data(g.Map{ + entity.ConcatTaskCols.Status: "running", + }). + Where(entity.ConcatTaskCols.TaskID, taskID). + Update() + return err +} + +// UpdateSuccess 更新为成功 +func (d *concatTaskDao) UpdateSuccess(ctx context.Context, taskID string, fileURL string, fileSize int64, fileName, fileFormat, fileAddrPrefix, methodUsed, durationStr string) error { + _, err := gfdb.DB(ctx).Model(ctx, concatTaskTable). + Data(g.Map{ + entity.ConcatTaskCols.Status: "success", + entity.ConcatTaskCols.FileURL: fileURL, + entity.ConcatTaskCols.FileSize: fileSize, + entity.ConcatTaskCols.FileName: fileName, + entity.ConcatTaskCols.FileFormat: fileFormat, + entity.ConcatTaskCols.FileAddressPrefix: fileAddrPrefix, + entity.ConcatTaskCols.MethodUsed: methodUsed, + entity.ConcatTaskCols.DurationStr: durationStr, + entity.ConcatTaskCols.ErrorMessage: "", + }). + Where(entity.ConcatTaskCols.TaskID, taskID). + Update() + return err +} + +// UpdateError 更新为失败 +func (d *concatTaskDao) UpdateError(ctx context.Context, taskID string, errMsg string) error { + _, err := gfdb.DB(ctx).Model(ctx, concatTaskTable). + Data(g.Map{ + entity.ConcatTaskCols.Status: "failed", + entity.ConcatTaskCols.ErrorMessage: errMsg, + }). + Where(entity.ConcatTaskCols.TaskID, taskID). + Update() + return err +} + +// EntityToTaskRes 实体转DTO +func EntityToTaskRes(e *entity.ConcatTask) *dto.GetConcatTaskRes { + res := &dto.GetConcatTaskRes{ + TaskID: e.TaskID, + Status: e.Status, + CreatedAt: gconv.Int64(e.CreatedAt.Timestamp()), + } + if e.CreatedAt == nil { + res.CreatedAt = time.Now().UnixMilli() + } + if e.Status == "success" { + res.FileURL = e.FileURL + res.FileSize = e.FileSize + res.FileName = e.FileName + res.FileFormat = e.FileFormat + res.FileAddressPrefix = e.FileAddressPrefix + res.MethodUsed = e.MethodUsed + res.DurationStr = e.DurationStr + } + if e.Status == "failed" { + res.ErrorMessage = e.ErrorMessage + } + return res +} diff --git a/model/dto/video/video_dto.go b/model/dto/video/video_dto.go index 66cfbdd..06238db 100644 --- a/model/dto/video/video_dto.go +++ b/model/dto/video/video_dto.go @@ -10,6 +10,15 @@ type ConcatReq struct { Upload bool `json:"upload" dc:"是否上传到MinIO" d:"false"` } +// ConcatAsyncReq 视频拼接-异步请求(URL模式) +type ConcatAsyncReq struct { + g.Meta `path:"/concat/async" method:"post" tags:"视频拼接" summary:"视频拼接-异步(URL模式)" dc:"异步拼接视频,立即返回taskId,完成后通过callback_url通知结果"` + VideoURLs []string `json:"video_urls" v:"required#视频URL列表不能为空" dc:"视频URL列表(按此顺序拼接)"` + Method string `json:"method" dc:"拼接方式(auto/fast/reencode)" d:"auto"` + Upload bool `json:"upload" dc:"是否上传到MinIO" d:"false"` + CallbackURL string `json:"callback_url" v:"required#回调地址不能为空" dc:"回调地址,拼接完成后POST结果到该地址"` +} + // ConcatUploadReq 视频拼接请求(文件上传模式) type ConcatUploadReq struct { g.Meta `path:"/concat/upload" method:"post" tags:"视频拼接" summary:"视频拼接(文件上传)" dc:"上传视频文件并拼接(至少2个视频)"` @@ -17,6 +26,14 @@ type ConcatUploadReq struct { Upload bool `json:"upload" dc:"是否上传到MinIO" d:"false"` } +// ConcatUploadAsyncReq 视频拼接-异步请求(文件上传模式) +type ConcatUploadAsyncReq struct { + g.Meta `path:"/concat/upload/async" method:"post" tags:"视频拼接" summary:"视频拼接-异步(文件上传)" dc:"异步拼接上传的视频,立即返回taskId,完成后通过callback_url通知结果"` + Method string `json:"method" dc:"拼接方式(auto/fast/reencode)" d:"auto"` + Upload bool `json:"upload" dc:"是否上传到MinIO" d:"false"` + CallbackURL string `json:"callback_url" v:"required#回调地址不能为空" dc:"回调地址,拼接完成后POST结果到该地址"` +} + // ConcatRes 视频拼接响应 type ConcatRes struct { OutputPath string `json:"outputPath" dc:"输出文件路径"` @@ -28,6 +45,36 @@ type ConcatRes struct { FileURL string `json:"fileURL" dc:"MinIO访问地址(上传后返回)"` } +// ---------- 异步拼接任务 ---------- + +// CreateConcatTaskRes 创建异步拼接任务响应 +type CreateConcatTaskRes struct { + TaskID string `json:"taskId" dc:"任务ID"` +} + +// GetConcatTaskReq 查询异步拼接任务请求 +type GetConcatTaskReq struct { + g.Meta `path:"/concat/task/{taskId}" method:"get" tags:"视频拼接" summary:"查询拼接任务结果" dc:"根据taskId查询异步拼接任务的结果"` + TaskID string `json:"taskId" dc:"任务ID"` +} + +// GetConcatTaskRes 查询异步拼接任务响应 +type GetConcatTaskRes struct { + TaskID string `json:"taskId" dc:"任务ID"` + Status string `json:"status" dc:"状态: pending/running/success/failed"` + FileURL string `json:"fileURL,omitempty" dc:"MinIO文件访问路径"` + FileSize int64 `json:"fileSize,omitempty" dc:"文件大小(字节)"` + FileName string `json:"fileName,omitempty" dc:"文件名"` + FileFormat string `json:"fileFormat,omitempty" dc:"文件格式"` + FileAddressPrefix string `json:"fileAddressPrefix,omitempty" dc:"MinIO地址前缀"` + MethodUsed string `json:"methodUsed,omitempty" dc:"实际使用的拼接方式"` + DurationStr string `json:"durationStr,omitempty" dc:"拼接后时长"` + ErrorMessage string `json:"errorMessage,omitempty" dc:"错误信息"` + CreatedAt int64 `json:"createdAt" dc:"创建时间戳"` +} + +// ---------- 上传工具 ---------- + // UploadFileBytesReq 上传文件请求(字节流) type UploadFileBytesReq struct { FileName string `json:"fileName" dc:"文件名"` diff --git a/model/entity/video/concat_task.go b/model/entity/video/concat_task.go new file mode 100644 index 0000000..6d8abb9 --- /dev/null +++ b/model/entity/video/concat_task.go @@ -0,0 +1,48 @@ +package video + +import "gitea.com/red-future/common/beans" + +// ConcatTask 视频拼接异步任务实体 +type ConcatTask struct { + beans.SQLBaseDO `orm:",inherit"` + TaskID string `orm:"task_id" json:"taskId" description:"任务唯一标识"` + Status string `orm:"status" json:"status" description:"任务状态:pending/running/success/failed"` + FileURL string `orm:"file_url" json:"fileUrl" description:"MinIO文件访问路径"` + FileSize int64 `orm:"file_size" json:"fileSize" description:"文件大小(字节)"` + FileName string `orm:"file_name" json:"fileName" description:"文件名"` + FileFormat string `orm:"file_format" json:"fileFormat" description:"文件格式"` + FileAddressPrefix string `orm:"file_address_prefix" json:"fileAddressPrefix" description:"MinIO地址前缀"` + MethodUsed string `orm:"method_used" json:"methodUsed" description:"实际使用的拼接方式"` + DurationStr string `orm:"duration_str" json:"durationStr" description:"拼接后时长"` + ErrorMessage string `orm:"error_message" json:"errorMessage" description:"错误信息"` +} + +// ConcatTaskCol 字段定义 +type ConcatTaskCol struct { + beans.SQLBaseCol + TaskID string + Status string + FileURL string + FileSize string + FileName string + FileFormat string + FileAddressPrefix string + MethodUsed string + DurationStr string + ErrorMessage string +} + +// ConcatTaskCols 字段常量 +var ConcatTaskCols = ConcatTaskCol{ + SQLBaseCol: beans.DefSQLBaseCol, + TaskID: "task_id", + Status: "status", + FileURL: "file_url", + FileSize: "file_size", + FileName: "file_name", + FileFormat: "file_format", + FileAddressPrefix: "file_address_prefix", + MethodUsed: "method_used", + DurationStr: "duration_str", + ErrorMessage: "error_message", +} diff --git a/service/video/concat_service.go b/service/video/concat_service.go index 11622c8..8d555ea 100644 --- a/service/video/concat_service.go +++ b/service/video/concat_service.go @@ -8,17 +8,23 @@ import ( "io" "mime/multipart" "net/http" + "net/url" "os" "os/exec" "path/filepath" "strings" "time" + dao "media/dao/video" + dto "media/model/dto/video" + entity "media/model/entity/video" + "gitea.com/red-future/common/beans" commonHttp "gitea.com/red-future/common/http" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/util/guid" ) type concatService struct{} @@ -47,6 +53,9 @@ type ConcatRes struct { // Concat 拼接多个视频为一个 func (s *concatService) Concat(ctx context.Context, req *ConcatReq) (res *ConcatRes, err error) { + g.Log().Infof(ctx, "[Concat] 服务层收到请求: videoPaths=%v, method=%s, upload=%v", + req.VideoPaths, req.Method, req.Upload) + if len(req.VideoPaths) < 2 { return nil, fmt.Errorf("至少需要2个视频才能拼接") } @@ -119,9 +128,10 @@ func (s *concatService) Concat(ctx context.Context, req *ConcatReq) (res *Concat InputFiles: len(req.VideoPaths), } - // 如果需要上传到 MinIO + // 如果需要上传到 MinIO(用独立 context,避免 HTTP 断开后 ctx 被取消) if req.Upload { - uploadRes, uploadErr := s.UploadToMinIO(ctx, outputPath) + uploadCtx := context.Background() + uploadRes, uploadErr := s.UploadToMinIO(uploadCtx, outputPath) if uploadErr != nil { return nil, fmt.Errorf("上传到MinIO失败: %v", uploadErr) } @@ -153,7 +163,9 @@ func (s *concatService) concatByDemuxer(ctx context.Context, ffmpegPath string, output, } - cmd := exec.CommandContext(ctx, ffmpegPath, args...) + // 使用独立 context,避免 HTTP 请求超时导致 ffmpeg 被 SIGKILL + bgCtx := context.Background() + cmd := exec.CommandContext(bgCtx, ffmpegPath, args...) outputBytes, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("ffmpeg demuxer 失败: %v\n%s", err, string(outputBytes)) @@ -232,7 +244,9 @@ func (s *concatService) concatByFilter(ctx context.Context, ffmpegPath string, i os.WriteFile(filterFile, []byte(filterStr), 0644) defer os.Remove(filterFile) - cmd := exec.CommandContext(ctx, ffmpegPath, args...) + // 使用独立 context,避免 HTTP 请求超时导致 ffmpeg 被 SIGKILL + bgCtx := context.Background() + cmd := exec.CommandContext(bgCtx, ffmpegPath, args...) outputBytes, err := cmd.CombinedOutput() if err != nil { return fmt.Errorf("ffmpeg filter 失败: %v\n日志:\n%s", err, string(outputBytes)) @@ -405,3 +419,263 @@ func CleanupConcat(paths []string) { os.Remove(p) } } + +// ---------- 异步拼接任务管理 ---------- + +// CreateAsyncTask 创建异步拼接任务(URL模式),返回 taskId,后台处理 +func (s *concatService) CreateAsyncTask(ctx context.Context, videoURLs []string, method string, upload bool, callbackURL string) (string, error) { + if len(videoURLs) < 2 { + return "", fmt.Errorf("至少需要2个视频才能拼接") + } + + taskID := "concat_" + guid.S() + task := &entity.ConcatTask{ + TaskID: taskID, + Status: "pending", + MethodUsed: method, + } + if _, err := dao.ConcatTask.Insert(ctx, task); err != nil { + return "", fmt.Errorf("创建任务失败: %v", err) + } + + g.Log().Infof(ctx, "[异步拼接] 创建任务 %s, 视频数=%d, 回调=%s", taskID, len(videoURLs), callbackURL) + + // 异步处理:先下载再拼接 + go s.processAsyncTask(taskID, videoURLs, method, upload, callbackURL) + + return taskID, nil +} + +// CreateAsyncTaskWithFiles 创建异步拼接任务(文件上传模式),直接处理本地文件 +func (s *concatService) CreateAsyncTaskWithFiles(ctx context.Context, filePaths []string, method string, upload bool, callbackURL string) (string, error) { + if len(filePaths) < 2 { + return "", fmt.Errorf("至少需要2个视频才能拼接") + } + + taskID := "concat_" + guid.S() + task := &entity.ConcatTask{ + TaskID: taskID, + Status: "pending", + MethodUsed: method, + } + if _, err := dao.ConcatTask.Insert(ctx, task); err != nil { + return "", fmt.Errorf("创建任务失败: %v", err) + } + + g.Log().Infof(ctx, "[异步拼接-文件] 创建任务 %s, 文件数=%d, 回调=%s", taskID, len(filePaths), callbackURL) + + // 异步处理:已有本地文件,直接拼接 + go s.processAsyncTaskWithFiles(taskID, filePaths, method, upload, callbackURL) + + return taskID, nil +} + +// GetTaskResult 查询异步任务结果 +func (s *concatService) GetTaskResult(ctx context.Context, taskID string) (*dto.GetConcatTaskRes, error) { + task, err := dao.ConcatTask.GetByTaskID(ctx, taskID) + if err != nil { + return nil, fmt.Errorf("查询任务失败: %v", err) + } + if task == nil { + return nil, fmt.Errorf("任务不存在: %s", taskID) + } + + return dao.EntityToTaskRes(task), nil +} + +// processAsyncTaskWithFiles 后台处理异步拼接任务(文件上传模式,文件已在本地) +func (s *concatService) processAsyncTaskWithFiles(taskID string, filePaths []string, method string, upload bool, callbackURL string) { + bgCtx := context.Background() + bgCtx = context.WithValue(bgCtx, "user", &beans.User{UserName: "admin", TenantId: 1}) + + dao.ConcatTask.UpdateRunning(bgCtx, taskID) + + defer func() { + if r := recover(); r != nil { + errMsg := fmt.Sprintf("异步拼接异常: %v", r) + g.Log().Errorf(bgCtx, "[异步拼接 %s] %s", taskID, errMsg) + dao.ConcatTask.UpdateError(bgCtx, taskID, errMsg) + s.concatCallback(bgCtx, taskID, callbackURL) + } + }() + + concatErr := s.executeConcat(bgCtx, taskID, filePaths, method, upload) + if concatErr != nil { + dao.ConcatTask.UpdateError(bgCtx, taskID, concatErr.Error()) + s.concatCallback(bgCtx, taskID, callbackURL) + return + } + + g.Log().Infof(bgCtx, "[异步拼接 %s] 完成", taskID) + + if callbackURL != "" { + s.concatCallback(bgCtx, taskID, callbackURL) + } +} + +// processAsyncTask 后台处理异步拼接任务(URL模式,需要先下载) +func (s *concatService) processAsyncTask(taskID string, videoURLs []string, method string, upload bool, callbackURL string) { + bgCtx := context.Background() + bgCtx = context.WithValue(bgCtx, "user", &beans.User{UserName: "admin", TenantId: 1}) + + dao.ConcatTask.UpdateRunning(bgCtx, taskID) + + defer func() { + if r := recover(); r != nil { + errMsg := fmt.Sprintf("异步拼接异常: %v", r) + g.Log().Errorf(bgCtx, "[异步拼接 %s] %s", taskID, errMsg) + dao.ConcatTask.UpdateError(bgCtx, taskID, errMsg) + s.concatCallback(bgCtx, taskID, callbackURL) + } + }() + + // 下载视频 + var savePaths []string + tempDir := g.Cfg().MustGet(bgCtx, "ffmpeg.temp_dir", "resource/temp").String() + os.MkdirAll(tempDir, 0755) + + for _, videoURL := range videoURLs { + savePath, dlErr := downloadFile(bgCtx, videoURL, tempDir) + if dlErr != nil { + g.Log().Warningf(bgCtx, "[异步拼接 %s] 下载失败 %s: %v", taskID, videoURL, dlErr) + continue + } + savePaths = append(savePaths, savePath) + } + + if len(savePaths) < 2 { + errMsg := fmt.Sprintf("成功下载的视频不足2个(共%d)", len(videoURLs)) + dao.ConcatTask.UpdateError(bgCtx, taskID, errMsg) + CleanupConcat(savePaths) + s.concatCallback(bgCtx, taskID, callbackURL) + return + } + + // 执行拼接 + concatErr := s.executeConcat(bgCtx, taskID, savePaths, method, upload) + CleanupConcat(savePaths) + + if concatErr != nil { + dao.ConcatTask.UpdateError(bgCtx, taskID, concatErr.Error()) + s.concatCallback(bgCtx, taskID, callbackURL) + return + } + + g.Log().Infof(bgCtx, "[异步拼接 %s] 完成", taskID) + + if callbackURL != "" { + s.concatCallback(bgCtx, taskID, callbackURL) + } +} + +// executeConcat 执行拼接并更新任务状态,返回输出路径 +func (s *concatService) executeConcat(ctx context.Context, taskID string, filePaths []string, method string, upload bool) error { + tempDir := filepath.Dir(filePaths[0]) + outputPath := filepath.Join(tempDir, + fmt.Sprintf("concat_%s_x%d_%s.mp4", taskID, len(filePaths), time.Now().Format("150405"))) + + res, concatErr := s.Concat(ctx, &ConcatReq{ + VideoPaths: filePaths, + OutputPath: outputPath, + Method: method, + Upload: upload, + }) + if concatErr != nil { + os.Remove(outputPath) + return concatErr + } + + // 更新数据库为成功 + fileName := filepath.Base(outputPath) + fileFormat := "" + if idx := strings.LastIndex(fileName, "."); idx > 0 { + fileFormat = fileName[idx+1:] + } + dao.ConcatTask.UpdateSuccess(ctx, taskID, + res.FileURL, res.FileSize, fileName, fileFormat, + "", res.MethodUsed, res.DurationStr) + + os.Remove(outputPath) + return nil +} + +// concatCallback 回调通知(从数据库读取任务结果发送) +func (s *concatService) concatCallback(ctx context.Context, taskID, callbackURL string) { + if callbackURL == "" { + return + } + + task, err := dao.ConcatTask.GetByTaskID(ctx, taskID) + if err != nil || task == nil { + g.Log().Errorf(ctx, "[异步拼接回调 %s] 查询任务失败: %v", taskID, err) + return + } + + payload := map[string]interface{}{ + "taskId": task.TaskID, + "status": task.Status, + } + if task.Status == "success" { + payload["fileURL"] = task.FileURL + payload["fileSize"] = task.FileSize + } + if task.Status == "failed" { + payload["errorMessage"] = task.ErrorMessage + } + + body, _ := json.Marshal(payload) + g.Log().Infof(ctx, "[异步拼接回调 %s] 状态=%s, 目标=%s", taskID, task.Status, callbackURL) + + req, _ := http.NewRequest("POST", callbackURL, bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + userJSON, _ := json.Marshal(beans.User{UserName: "admin", TenantId: 1}) + req.Header.Set("X-User-Info", string(userJSON)) + + client := &http.Client{Timeout: 30 * time.Second} + resp, reqErr := client.Do(req) + if reqErr != nil { + g.Log().Errorf(ctx, "[异步拼接回调 %s] 请求失败: %v", taskID, reqErr) + return + } + defer resp.Body.Close() + respBody, _ := io.ReadAll(resp.Body) + g.Log().Infof(ctx, "[异步拼接回调 %s] 响应 status=%d, body=%s", taskID, resp.StatusCode, string(respBody)) +} + +// downloadFile 下载文件到临时目录 +func downloadFile(ctx context.Context, rawURL, tempDir string) (string, error) { + parsedURL, err := url.Parse(rawURL) + if err != nil { + return "", err + } + segments := strings.Split(parsedURL.Path, "/") + fileName := segments[len(segments)-1] + if fileName == "" { + fileName = fmt.Sprintf("video_%d.mp4", time.Now().UnixMilli()) + } + savePath := filepath.Join(tempDir, fmt.Sprintf("%d_%s", time.Now().UnixMilli(), fileName)) + + client := &http.Client{Timeout: 10 * time.Minute} + resp, err := client.Get(rawURL) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("HTTP %d", resp.StatusCode) + } + + out, err := os.Create(savePath) + if err != nil { + return "", err + } + defer out.Close() + + _, err = io.Copy(out, resp.Body) + if err != nil { + os.Remove(savePath) + return "", err + } + return savePath, nil +} diff --git a/sql/concat_task.sql b/sql/concat_task.sql new file mode 100644 index 0000000..0e907d5 --- /dev/null +++ b/sql/concat_task.sql @@ -0,0 +1,39 @@ +-- concat_task 视频拼接异步任务表 +CREATE TABLE IF NOT EXISTS concat_task ( + id BIGSERIAL NOT NULL, + task_id VARCHAR(64) NOT NULL, + tenant_id BIGINT NOT NULL DEFAULT 0, + status VARCHAR(20) NOT NULL DEFAULT 'pending', + file_url TEXT NOT NULL DEFAULT '', + file_size BIGINT NOT NULL DEFAULT 0, + file_name VARCHAR(255) NOT NULL DEFAULT '', + file_format VARCHAR(32) NOT NULL DEFAULT '', + file_address_prefix TEXT NOT NULL DEFAULT '', + method_used VARCHAR(64) NOT NULL DEFAULT '', + duration_str VARCHAR(32) NOT NULL DEFAULT '', + error_message TEXT, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + deleted_at TIMESTAMP WITH TIME ZONE, + PRIMARY KEY (id) +); + +COMMENT ON TABLE concat_task IS '视频拼接异步任务表'; +COMMENT ON COLUMN concat_task.task_id IS '任务唯一标识'; +COMMENT ON COLUMN concat_task.tenant_id IS '租户ID'; +COMMENT ON COLUMN concat_task.status IS '任务状态:pending/running/success/failed'; +COMMENT ON COLUMN concat_task.file_url IS 'MinIO文件访问路径'; +COMMENT ON COLUMN concat_task.file_size IS '文件大小(字节)'; +COMMENT ON COLUMN concat_task.file_name IS '文件名'; +COMMENT ON COLUMN concat_task.file_format IS '文件格式'; +COMMENT ON COLUMN concat_task.file_address_prefix IS 'MinIO地址前缀'; +COMMENT ON COLUMN concat_task.method_used IS '实际使用的拼接方式'; +COMMENT ON COLUMN concat_task.duration_str IS '拼接后时长'; +COMMENT ON COLUMN concat_task.error_message IS '错误信息'; +COMMENT ON COLUMN concat_task.created_at IS '创建时间'; +COMMENT ON COLUMN concat_task.updated_at IS '更新时间'; +COMMENT ON COLUMN concat_task.deleted_at IS '删除时间(软删除)'; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_concat_task_task_id ON concat_task(task_id); +CREATE INDEX IF NOT EXISTS idx_concat_task_status ON concat_task(status); +CREATE INDEX IF NOT EXISTS idx_concat_task_created_at ON concat_task(created_at);