diff --git a/Dockerfile b/Dockerfile index d8bff7b..dda0021 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,7 +27,12 @@ RUN go build -ldflags="-s -w" -o main ./main.go # 阶段2: 运行 FROM alpine:3.19 -RUN apk add --no-cache ca-certificates tzdata +# 安装运行时依赖: ca-certificates(HTTPS), tzdata(时区), ffmpeg(音视频处理) +RUN apk add --no-cache ca-certificates tzdata ffmpeg bash +# 安装 Python3 和 pip(用于 whisper 语音识别) +RUN apk add --no-cache python3 py3-pip && \ + pip3 install --no-cache-dir --break-system-packages openai-whisper 2>/dev/null || \ + pip3 install --no-cache-dir openai-whisper ENV TZ=Asia/Shanghai RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone diff --git a/config.yml b/config.yml index 3baac4b..a774796 100644 --- a/config.yml +++ b/config.yml @@ -34,11 +34,23 @@ database: deletedAt: "deleted_at" timeMaintainDisabled: false +redis: + # 集群模式配置方法 + default: + address: 116.204.74.41:6379 + db: 0 + idleTimeout: "60s" #连接最大空闲时间,使用时间字符串例如30s/1m/1d + maxConnLifetime: "90s" #连接最长存活时间,使用时间字符串例如30s/1m/1d + waitTimeout: "60s" #等待连接池连接的超时时间,使用时间字符串例如30s/1m/1d + dialTimeout: "30s" #TCP连接的超时时间,使用时间字符串例如30s/1m/1d + readTimeout: "30s" #TCP的Read操作超时时间,使用时间字符串例如30s/1m/1d + writeTimeout: "30s" #TCP的Write操作超时时间,使用时间字符串例如30s/1m/1d + maxActive: 100 consul: address: 192.168.3.30:8500 jaeger: - addr: 116.204.74.41:4318 + addr: 192.168.3.30:4318 # FFmpeg 配置(视频音频提取) ffmpeg: @@ -47,6 +59,9 @@ ffmpeg: # 临时文件目录(上传的视频和提取的音频) temp_dir: "resource/temp" +# OSS/MinIO 文件上传配置 +filePrefix: "http://116.204.74.41:9000" + # Whisper 语音识别配置 whisper: # whisper 可执行文件路径,留空则自动查找 diff --git a/consts/audio/task.go b/consts/audio/task.go new file mode 100644 index 0000000..f2b3cfc --- /dev/null +++ b/consts/audio/task.go @@ -0,0 +1,20 @@ +package audio + +// 任务状态常量 +const ( + TaskStatusPending = "pending" // 等待处理 + TaskStatusRunning = "running" // 处理中 + TaskStatusSuccess = "success" // 处理成功 + TaskStatusFailed = "failed" // 处理失败 +) + +// 输入类型常量 +const ( + InputTypeURL = "url" // URL列表 +) + +// 表名常量 +const ( + TranscribeTaskTable = "transcribe_task" + TranscribeTaskDetailTable = "transcribe_task_detail" +) diff --git a/controller/audio/audio_extract_controller.go b/controller/audio/audio_extract_controller.go index 4aa9769..0a3aaf3 100644 --- a/controller/audio/audio_extract_controller.go +++ b/controller/audio/audio_extract_controller.go @@ -2,105 +2,64 @@ package audio import ( "context" - "encoding/json" + "strings" - common "media/controller/common" dto "media/model/dto/audio" service "media/service/asr" "gitea.com/red-future/common/beans" "github.com/gogf/gf/v2/frame/g" - "github.com/gogf/gf/v2/net/ghttp" ) type audio struct{} var AudioExtract = new(audio) -// safeResult 对外输出的识别结果(隐藏内部路径) -type safeResult struct { - Text string `json:"text"` - Model string `json:"model"` - Language string `json:"language"` - AudioSize int64 `json:"audioSize"` - AudioDuration string `json:"audioDuration"` - Scenes *dto.SceneSummaryDTO `json:"scenes,omitempty"` -} - -// safeItem 对外输出的单视频结果 -type safeItem struct { - FileName string `json:"fileName"` - Result *safeResult `json:"result,omitempty"` - Error string `json:"error,omitempty"` -} - -// TranscribeHandler 语音转文字+分镜分析 -// 支持两种入参方式: -// 1. JSON body: {"video_urls":[...], "model":"medium", "language":"zh", "threshold":0.3} -// 2. 文件上传: files 参数(兼容单/多文件) -func (c *audio) TranscribeHandler(r *ghttp.Request) { - ctx := r.Context() - ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin"}) - - // 优先尝试 JSON body(URL 列表模式) - body := r.GetBody() - if len(body) > 0 && body[0] == '{' { - var req dto.TranscribeReq - if json.Unmarshal(body, &req) == nil && len(req.VideoURLs) > 0 { - // 填充默认值 - if req.Model == "" { - req.Model = g.Cfg().MustGet(ctx, "whisper.model", "medium").String() - } - if req.Language == "" { - req.Language = g.Cfg().MustGet(ctx, "whisper.language", "zh").String() - } - if req.Threshold <= 0 { - req.Threshold = 0.3 - } - - res, svcErr := service.VideoTranscribe.TranscribeWithURLs(ctx, &req) - if svcErr != nil { - r.Response.WriteJson(g.Map{"code": 500, "message": svcErr.Error()}) - return - } - r.Response.WriteJson(g.Map{"code": 200, "message": "success", "data": toSafeItems(res.Results)}) - return - } +// Create 创建语音转文字异步任务 POST /audio/transcribe +func (c *audio) Create(ctx context.Context, req *dto.TranscribeReq) (res *dto.CreateTaskRes, err error) { + ctx = withUser(ctx) + fileNames := make([]string, len(req.VideoURLs)) + for i, u := range req.VideoURLs { + parts := strings.Split(u, "/") + fileNames[i] = parts[len(parts)-1] } - // 文件上传模式 - savePaths, err := common.SaveUploadedFiles(r) - if err != nil || len(savePaths) == 0 { - r.Response.WriteJson(g.Map{"code": 400, "message": "请上传视频文件( multipart )或提供 video_urls( JSON )"}) - return + g.Log().Infof(ctx, "收到转写请求, 回调URL: %s", req.CallbackURL) + + params := &service.CreateTaskParams{ + InputData: req.VideoURLs, + FileNames: fileNames, + Model: req.Model, + Language: req.Language, + Threshold: req.Threshold, + CallbackURL: req.CallbackURL, } - results := service.VideoTranscribe.TranscribeUpload(ctx, savePaths, - r.Get("model", g.Cfg().MustGet(ctx, "whisper.model", "medium").String()).String(), - r.Get("language", g.Cfg().MustGet(ctx, "whisper.language", "zh").String()).String(), - r.Get("threshold", 0.3).Float64()) - - r.Response.WriteJson(g.Map{"code": 200, "message": "success", "data": toSafeItems(results)}) + return service.AudioTask.Create(ctx, params) } -// toSafeItems 将结果转为安全的响应格式(移除 audioPath 等内部路径) -func toSafeItems(results []dto.TranscribeItem) []safeItem { - var items []safeItem - for _, item := range results { - si := safeItem{FileName: item.FileName, Error: item.Error} - if item.Result != nil { - if r, ok := item.Result.(*dto.TranscribeResult); ok { - si.Result = &safeResult{ - Text: r.Text, - Model: r.Model, - Language: r.Language, - AudioSize: r.AudioSize, - AudioDuration: r.AudioDuration, - Scenes: r.Scenes, - } - } - } - items = append(items, si) +// GetTask 获取任务详情 GET /audio/task/{taskId} +func (c *audio) GetTask(ctx context.Context, req *dto.GetTaskReq) (res *dto.GetTaskRes, err error) { + ctx = withUser(ctx) + return service.AudioTask.GetTask(ctx, req) +} + +// GetProgress 获取任务进度 GET /audio/task/{taskId}/progress +func (c *audio) GetProgress(ctx context.Context, req *dto.GetProgressReq) (res *dto.GetProgressRes, err error) { + ctx = withUser(ctx) + return service.AudioTask.GetProgress(ctx, req) +} + +// ListTasks 获取任务列表 GET /audio/tasks +func (c *audio) ListTasks(ctx context.Context, req *dto.ListTaskReq) (res *dto.ListTaskRes, err error) { + ctx = withUser(ctx) + return service.AudioTask.ListTasks(ctx, req) +} + +// withUser 为 context 注入默认用户(无认证基础设施时使用) +func withUser(ctx context.Context) context.Context { + if ctx.Value("user") == nil { + ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1}) } - return items + return ctx } diff --git a/controller/common/upload.go b/controller/common/upload.go index 4442c97..8144694 100644 --- a/controller/common/upload.go +++ b/controller/common/upload.go @@ -55,6 +55,15 @@ func SaveUploadedFiles(r *ghttp.Request) ([]string, error) { return saved, nil } +// SaveUploadedFilesFromCtx 从请求上下文中获取上传文件并保存 +func SaveUploadedFilesFromCtx(ctx context.Context) ([]string, error) { + r := g.RequestFromCtx(ctx) + if r == nil { + return nil, fmt.Errorf("无法获取请求上下文") + } + return SaveUploadedFiles(r) +} + func getTempDir(ctx context.Context) string { tempDir := g.Cfg().MustGet(ctx, "ffmpeg.temp_dir", "resource/temp").String() if tempDir == "" { diff --git a/controller/video/concat_controller.go b/controller/video/concat_controller.go index a58bdc5..b62590f 100644 --- a/controller/video/concat_controller.go +++ b/controller/video/concat_controller.go @@ -2,7 +2,6 @@ package video import ( "context" - "encoding/json" "fmt" "io" "net/http" @@ -18,99 +17,98 @@ import ( "gitea.com/red-future/common/beans" "github.com/gogf/gf/v2/frame/g" - "github.com/gogf/gf/v2/net/ghttp" ) type video struct{} var Concat = new(video) -// ConcatVideosHandler 视频拼接 -// 支持两种入参方式: -// 1. JSON body: {"video_urls":[...], "method":"auto"} -// 2. 文件上传: files 参数(至少2个视频) -func (c *video) ConcatVideosHandler(r *ghttp.Request) { - ctx := r.Context() - ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin"}) - - // 优先尝试 JSON body(URL 列表模式) - body := r.GetBody() - if len(body) > 0 && body[0] == '{' { - var req dto.ConcatReq - if json.Unmarshal(body, &req) == nil && len(req.VideoURLs) >= 2 { - if req.Method == "" { - req.Method = "auto" - } - - tempDir := g.Cfg().MustGet(ctx, "ffmpeg.temp_dir", "resource/temp").String() - if !filepath.IsAbs(tempDir) { - absDir, _ := filepath.Abs(tempDir) - tempDir = absDir - } - os.MkdirAll(tempDir, 0755) - - var savePaths []string - for _, videoURL := range req.VideoURLs { - savePath, dlErr := downloadFromURL(ctx, videoURL, tempDir) - if dlErr != nil { - continue - } - savePaths = append(savePaths, savePath) - } - if len(savePaths) < 2 { - cleanupConcat(savePaths) - r.Response.WriteJson(g.Map{"code": 400, "message": "成功下载的视频不足2个"}) - return - } - - svcRes, svcErr := service.Concat.Concat(ctx, &service.ConcatReq{ - VideoPaths: savePaths, - Method: req.Method, - }) - cleanupConcat(savePaths) - if svcErr != nil { - r.Response.WriteJson(g.Map{"code": 500, "message": "视频拼接失败: " + svcErr.Error()}) - return - } - - r.Response.WriteJson(g.Map{ - "code": 200, - "message": "success", - "data": g.Map{ - "outputPath": svcRes.OutputPath, - "fileSize": svcRes.FileSize, - "duration": svcRes.Duration, - "durationStr": svcRes.DurationStr, - "methodUsed": svcRes.MethodUsed, - "inputFiles": svcRes.InputFiles, - }, - }) - return - } +// Concat 视频拼接(URL模式) POST /video/concat +func (c *video) Concat(ctx context.Context, req *dto.ConcatReq) (res *dto.ConcatRes, err error) { + ctx = withUser(ctx) + if req.Method == "" { + req.Method = "auto" } - // 文件上传模式 - savePaths, err := common.SaveUploadedFiles(r) - if err != nil || len(savePaths) < 2 { - r.Response.WriteJson(g.Map{"code": 400, "message": fmt.Sprintf("至少需要2个视频,当前%d个", len(savePaths))}) - return + savePaths, err := downloadVideos(ctx, req.VideoURLs) + if err != nil { + return nil, err } + defer cleanupConcat(savePaths) - svcRes, svcErr := service.Concat.Concat(ctx, &service.ConcatReq{ + svcRes, err := service.Concat.Concat(ctx, &service.ConcatReq{ VideoPaths: savePaths, - Method: r.Get("method", "auto").String(), + Method: req.Method, + Upload: req.Upload, }) - service.CleanupConcat(savePaths) - if svcErr != nil { - r.Response.WriteJson(g.Map{"code": 500, "message": "视频拼接失败: " + svcErr.Error()}) - return + if err != nil { + return nil, err + } + return toDTORes(svcRes), nil +} + +// ConcatUpload 视频拼接(文件上传模式) POST /video/concat/upload +func (c *video) ConcatUpload(ctx context.Context, req *dto.ConcatUploadReq) (res *dto.ConcatRes, err error) { + ctx = withUser(ctx) + savePaths, err := common.SaveUploadedFilesFromCtx(ctx) + if err != nil || len(savePaths) < 2 { + return nil, fmt.Errorf("至少需要2个视频,当前%d个", len(savePaths)) + } + defer service.CleanupConcat(savePaths) + + if req.Method == "" { + req.Method = "auto" } - r.Response.ServeFile(svcRes.OutputPath) - go func(path string) { - time.Sleep(5 * time.Second) - os.Remove(path) - }(svcRes.OutputPath) + svcRes, err := service.Concat.Concat(ctx, &service.ConcatReq{ + VideoPaths: savePaths, + Method: req.Method, + Upload: req.Upload, + }) + if err != nil { + return nil, err + } + return toDTORes(svcRes), nil +} + +// withUser 为 context 注入默认用户(无认证基础设施时使用) +func withUser(ctx context.Context) context.Context { + if ctx.Value("user") == nil { + ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1}) + } + return ctx +} + +// toDTORes 将 Service 内部响应类型转换为 DTO 响应类型 +func toDTORes(svcRes *service.ConcatRes) *dto.ConcatRes { + return &dto.ConcatRes{ + OutputPath: svcRes.OutputPath, + FileSize: svcRes.FileSize, + Duration: svcRes.Duration, + DurationStr: svcRes.DurationStr, + MethodUsed: svcRes.MethodUsed, + InputFiles: svcRes.InputFiles, + FileURL: svcRes.FileURL, + } +} + +// downloadVideos 下载视频URL列表 +func downloadVideos(ctx context.Context, videoURLs []string) ([]string, error) { + tempDir := getTempDir(ctx) + os.MkdirAll(tempDir, 0755) + + var savePaths []string + for _, videoURL := range videoURLs { + savePath, dlErr := downloadFromURL(ctx, videoURL, tempDir) + if dlErr != nil { + continue + } + savePaths = append(savePaths, savePath) + } + if len(savePaths) < 2 { + return savePaths, fmt.Errorf("成功下载的视频不足2个") + } + return savePaths, nil } func downloadFromURL(ctx context.Context, rawURL, tempDir string) (string, error) { @@ -154,3 +152,15 @@ func cleanupConcat(paths []string) { os.Remove(p) } } + +func getTempDir(ctx context.Context) string { + tempDir := g.Cfg().MustGet(ctx, "ffmpeg.temp_dir", "resource/temp").String() + if tempDir == "" { + tempDir = "resource/temp" + } + if !filepath.IsAbs(tempDir) { + absDir, _ := filepath.Abs(tempDir) + tempDir = absDir + } + return tempDir +} diff --git a/dao/audio/transcribe_task_dao.go b/dao/audio/transcribe_task_dao.go new file mode 100644 index 0000000..f97954a --- /dev/null +++ b/dao/audio/transcribe_task_dao.go @@ -0,0 +1,188 @@ +package audio + +import ( + "context" + consts "media/consts/audio" + dto "media/model/dto/audio" + entity "media/model/entity/audio" + + "gitea.com/red-future/common/db/gfdb" + "github.com/gogf/gf/v2/database/gdb" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/gconv" +) + +var TranscribeTask = new(transcribeTaskDao) + +type transcribeTaskDao struct{} + +// Insert 创建任务 +func (d *transcribeTaskDao) Insert(ctx context.Context, data *entity.TranscribeTask) (id int64, err error) { + // FieldsEx 排除空 result 字段(JSONB 列不支持空串 '') + r, err := gfdb.DB(ctx).Model(ctx, consts.TranscribeTaskTable). + Data(data). + FieldsEx(entity.TranscribeTaskCols.Result). + Insert() + if err != nil { + return 0, err + } + return r.LastInsertId() +} + +// GetByTaskID 根据taskId获取任务 +func (d *transcribeTaskDao) GetByTaskID(ctx context.Context, taskID string) (res *entity.TranscribeTask, err error) { + r, err := gfdb.DB(ctx).Model(ctx, consts.TranscribeTaskTable). + Where(entity.TranscribeTaskCols.TaskID, taskID). + One() + if err != nil { + return nil, err + } + if r == nil { + return nil, nil + } + err = r.Struct(&res) + return +} + +// UpdateProgress 更新任务进度 +func (d *transcribeTaskDao) UpdateProgress(ctx context.Context, taskID string, progress int) (rows int64, err error) { + r, err := gfdb.DB(ctx).Model(ctx, consts.TranscribeTaskTable). + Data(g.Map{ + entity.TranscribeTaskCols.Progress: progress, + }). + Where(entity.TranscribeTaskCols.TaskID, taskID). + Update() + if err != nil { + return 0, err + } + return r.RowsAffected() +} + +// UpdateTaskRunning 将任务更新为运行中 +func (d *transcribeTaskDao) UpdateTaskRunning(ctx context.Context, taskID string, progress int) (rows int64, err error) { + r, err := gfdb.DB(ctx).Model(ctx, consts.TranscribeTaskTable). + Data(g.Map{ + entity.TranscribeTaskCols.Status: consts.TaskStatusRunning, + entity.TranscribeTaskCols.Progress: progress, + }). + Where(entity.TranscribeTaskCols.TaskID, taskID). + Update() + if err != nil { + return 0, err + } + return r.RowsAffected() +} + +// UpdateResult 更新任务成功状态(result: 完整结果JSON) +func (d *transcribeTaskDao) UpdateResult(ctx context.Context, taskID, result string, successCount, failCount int) (rows int64, err error) { + r, err := gfdb.DB(ctx).Model(ctx, consts.TranscribeTaskTable). + Data(g.Map{ + entity.TranscribeTaskCols.Status: consts.TaskStatusSuccess, + entity.TranscribeTaskCols.Progress: 100, + entity.TranscribeTaskCols.Result: result, + entity.TranscribeTaskCols.SuccessFiles: successCount, + entity.TranscribeTaskCols.FailFiles: failCount, + entity.TranscribeTaskCols.ErrorMessage: "", + }). + Where(entity.TranscribeTaskCols.TaskID, taskID). + Update() + if err != nil { + return 0, err + } + return r.RowsAffected() +} + +// UpdateError 更新任务错误(失败后) +func (d *transcribeTaskDao) UpdateError(ctx context.Context, taskID string, errMsg string) (rows int64, err error) { + r, err := gfdb.DB(ctx).Model(ctx, consts.TranscribeTaskTable). + Data(g.Map{ + entity.TranscribeTaskCols.Status: consts.TaskStatusFailed, + entity.TranscribeTaskCols.ErrorMessage: errMsg, + }). + Where(entity.TranscribeTaskCols.TaskID, taskID). + Update() + if err != nil { + return 0, err + } + return r.RowsAffected() +} + +// List 获取任务列表 +func (d *transcribeTaskDao) List(ctx context.Context, req *dto.ListTaskReq) (res []entity.TranscribeTask, total int, err error) { + model := d.buildListFilter(ctx, req) + model.OrderDesc(entity.TranscribeTaskCols.CreatedAt) + if req.Page != nil { + model.Page(int(req.Page.PageNum), int(req.Page.PageSize)) + } + r, total, err := model.AllAndCount(false) + if err != nil { + return + } + err = r.Structs(&res) + return +} + +// buildListFilter 构建列表过滤 +func (d *transcribeTaskDao) buildListFilter(ctx context.Context, req *dto.ListTaskReq) *gdb.Model { + model := gfdb.DB(ctx).Model(ctx, consts.TranscribeTaskTable).Model + model.Where(entity.TranscribeTaskCols.Status, req.Status) + model.OmitEmptyWhere() + return model +} + +// EntityToItem 将实体转为DTO项 +// 注意: 不返回 result 字段(数据在 detailList 结构化返回中,避免与 result JSON 重复) +// +// result 仅在回调通知时直接使用 task.Result +func EntityToItem(e *entity.TranscribeTask) dto.TranscribeTaskItem { + item := dto.TranscribeTaskItem{ + ID: e.Id, + TaskID: e.TaskID, + Status: e.Status, + Progress: e.Progress, + TotalFiles: e.TotalFiles, + SuccessFiles: e.SuccessFiles, + FailFiles: e.FailFiles, + Model: e.Model, + Language: e.Language, + Threshold: e.Threshold, + InputType: e.InputType, + InputData: e.InputData, + FileNames: e.FileNames, + CallbackURL: e.CallbackURL, + ErrorMessage: e.ErrorMessage, + } + if e.CreatedAt != nil { + item.CreatedAt = gconv.Int64(e.CreatedAt.Timestamp()) + } + if e.UpdatedAt != nil { + item.UpdatedAt = gconv.Int64(e.UpdatedAt.Timestamp()) + } + return item +} + +// EntityToProgress 将实体转为进度DTO +func EntityToProgress(e *entity.TranscribeTask) dto.GetProgressRes { + return dto.GetProgressRes{ + TaskID: e.TaskID, + Status: e.Status, + Progress: e.Progress, + } +} + +// DetailEntityToItem 将明细实体转为DTO项 +func DetailEntityToItem(e *entity.TranscribeTaskDetail) dto.TranscribeTaskDetailItem { + return dto.TranscribeTaskDetailItem{ + ID: e.Id, + TaskID: e.TaskID, + FileIndex: e.FileIndex, + FileName: e.FileName, + TranscribedText: e.TranscribedText, + Scenes: e.Scenes, + AudioSize: e.AudioSize, + AudioDuration: e.AudioDuration, + Model: e.Model, + Language: e.Language, + ErrorMessage: e.ErrorMessage, + } +} diff --git a/dao/audio/transcribe_task_detail_dao.go b/dao/audio/transcribe_task_detail_dao.go new file mode 100644 index 0000000..0d78ebd --- /dev/null +++ b/dao/audio/transcribe_task_detail_dao.go @@ -0,0 +1,35 @@ +package audio + +import ( + "context" + consts "media/consts/audio" + entity "media/model/entity/audio" + + "gitea.com/red-future/common/db/gfdb" +) + +var TranscribeTaskDetail = new(transcribeTaskDetailDao) + +type transcribeTaskDetailDao struct{} + +// Insert 插入明细 +func (d *transcribeTaskDetailDao) Insert(ctx context.Context, data *entity.TranscribeTaskDetail) (id int64, err error) { + // FieldsEx 排除空 scenes 字段(JSONB 列不支持空串 '') + r, err := gfdb.DB(ctx).Model(ctx, consts.TranscribeTaskDetailTable). + Data(data). + FieldsEx(entity.TranscribeTaskDetailCols.Scenes). + Insert() + if err != nil { + return 0, err + } + return r.LastInsertId() +} + +// ListByTaskID 根据taskId查询明细列表(按file_index升序) +func (d *transcribeTaskDetailDao) ListByTaskID(ctx context.Context, taskID string) (res []entity.TranscribeTaskDetail, err error) { + err = gfdb.DB(ctx).Model(ctx, consts.TranscribeTaskDetailTable). + Where(entity.TranscribeTaskDetailCols.TaskID, taskID). + OrderAsc(entity.TranscribeTaskDetailCols.FileIndex). + Scan(&res) + return +} diff --git a/main.go b/main.go index 64f9813..617eb65 100644 --- a/main.go +++ b/main.go @@ -4,70 +4,20 @@ import ( "context" controllerAudio "media/controller/audio" controllerVideo "media/controller/video" - serviceSetup "media/service/setup" - "os" - "path/filepath" - "time" _ "gitea.com/red-future/common/consul" "gitea.com/red-future/common/http" "gitea.com/red-future/common/jaeger" - "github.com/gogf/gf/v2/frame/g" + _ "github.com/gogf/gf/contrib/drivers/pgsql/v2" ) func main() { ctx := context.Background() defer jaeger.ShutDown(ctx) - loc, err := time.LoadLocation("Asia/Shanghai") - if err == nil { - time.Local = loc - } - os.Setenv("TZ", "Asia/Shanghai") - - serviceSetup.EnsureDependencies(ctx) - - // 清理旧 temp 文件(防止异常中断残留) - cleanupTempDir(ctx) - - // 文件上传路由(在 RouteRegister 启动服务器之前注册) - http.Httpserver.BindHandler("/audio/transcribe", controllerAudio.AudioExtract.TranscribeHandler) - http.Httpserver.BindHandler("/video/concat", controllerVideo.Concat.ConcatVideosHandler) - - // 启动服务器(无需 g.Meta 自动注册) - http.RouteRegister(nil) - - port := g.Cfg().MustGet(ctx, "server.address", ":3001").String() - g.Log().Info(ctx, "============================================") - g.Log().Infof(ctx, "服务启动: http://localhost%s", port) - g.Log().Infof(ctx, " POST %s/audio/transcribe - 语音转文字+分镜分析(文件上传,参数名 files)", port) - g.Log().Infof(ctx, " POST %s/video/concat - 视频拼接(文件上传,参数名 files,至少2个视频)", port) - g.Log().Info(ctx, "============================================") - + http.RouteRegister([]interface{}{ + controllerAudio.AudioExtract, + controllerVideo.Concat, + }) select {} } - -// cleanupTempDir 清理临时文件目录,防止旧运行残留 -func cleanupTempDir(ctx context.Context) { - tempDir := g.Cfg().MustGet(ctx, "ffmpeg.temp_dir", "resource/temp").String() - if tempDir == "" { - tempDir = "resource/temp" - } - if !filepath.IsAbs(tempDir) { - absDir, err := filepath.Abs(tempDir) - if err != nil { - return - } - tempDir = absDir - } - - entries, err := os.ReadDir(tempDir) - if err != nil { - return - } - for _, entry := range entries { - fullPath := filepath.Join(tempDir, entry.Name()) - os.RemoveAll(fullPath) - } - g.Log().Infof(ctx, "临时目录已清理: %s", tempDir) -} diff --git a/model/dto/audio/audio_dto.go b/model/dto/audio/audio_dto.go index c8fbc5f..bb26e71 100644 --- a/model/dto/audio/audio_dto.go +++ b/model/dto/audio/audio_dto.go @@ -1,11 +1,15 @@ package audio +import "github.com/gogf/gf/v2/frame/g" + // TranscribeReq 语音转文字请求(JSON body / URL 方式) type TranscribeReq struct { - VideoURLs []string `json:"video_urls" v:"required#视频URL列表不能为空" dc:"视频URL列表"` - Model string `json:"model" dc:"whisper模型(tiny/base/small/medium)" d:"medium"` - Language string `json:"language" dc:"语言(zh/en/ja)" d:"zh"` - Threshold float64 `json:"threshold" dc:"场景检测阈值(0.1-0.5)" d:"0.3"` + g.Meta `path:"/transcribe" method:"post" tags:"音频转写" summary:"语音转文字(异步)" dc:"创建异步语音转文字任务,返回taskId"` + VideoURLs []string `json:"video_urls" v:"required#视频URL列表不能为空" dc:"视频URL列表"` + Model string `json:"model" dc:"whisper模型(tiny/base/small/medium)" d:"medium"` + Language string `json:"language" dc:"语言(zh/en/ja)" d:"zh"` + Threshold float64 `json:"threshold" dc:"场景检测阈值(0.1-0.5)" d:"0.3"` + CallbackURL string `json:"callback_url" dc:"任务完成后的回调地址(可选),成功后POST结果到此URL"` } // TranscribeRes 语音转文字响应 diff --git a/model/dto/audio/task_dto.go b/model/dto/audio/task_dto.go new file mode 100644 index 0000000..d88d2e5 --- /dev/null +++ b/model/dto/audio/task_dto.go @@ -0,0 +1,130 @@ +package audio + +import ( + "gitea.com/red-future/common/beans" + "github.com/gogf/gf/v2/frame/g" +) + +// CreateTaskRes 创建任务响应 +type CreateTaskRes struct { + TaskID string `json:"taskId" dc:"任务ID"` +} + +// ---------- 获取任务详情 ---------- + +// GetTaskReq 获取任务详情请求 +type GetTaskReq struct { + g.Meta `path:"/{taskId}" method:"get" tags:"音频转写" summary:"查询任务详情" dc:"根据taskId查询任务详情和明细"` + TaskID string `json:"taskId" dc:"任务ID"` +} + +// GetTaskRes 获取任务详情响应 +type GetTaskRes struct { + TaskInfo TranscribeTaskItem `json:"taskInfo" dc:"任务信息"` + DetailList []TranscribeTaskDetailItem `json:"detailList" dc:"明细列表(每视频一条)"` +} + +// TranscribeTaskItem 任务批次项 +type TranscribeTaskItem struct { + ID int64 `json:"id,string" dc:"数据库ID"` + TaskID string `json:"taskId" dc:"任务ID"` + Status string `json:"status" dc:"任务状态"` + Progress int `json:"progress" dc:"进度0-100"` + TotalFiles int `json:"totalFiles" dc:"文件总数"` + SuccessFiles int `json:"successFiles" dc:"成功文件数"` + FailFiles int `json:"failFiles" dc:"失败文件数"` + Model string `json:"model" dc:"whisper模型"` + Language string `json:"language" dc:"语言"` + Threshold float64 `json:"threshold" dc:"场景检测阈值"` + InputType string `json:"inputType" dc:"输入类型"` + InputData string `json:"inputData" dc:"输入数据"` + FileNames string `json:"fileNames" dc:"文件名列表"` + CallbackURL string `json:"callbackUrl" dc:"回调地址"` + Result string `json:"result,omitempty" dc:"完整的处理结果JSON(成功后返回)"` + ErrorMessage string `json:"errorMessage" dc:"错误信息(失败后返回)"` + CreatedAt int64 `json:"createdAt" dc:"创建时间戳"` + UpdatedAt int64 `json:"updatedAt" dc:"更新时间戳"` +} + +// TranscribeTaskDetailItem 任务明细项(每视频) +type TranscribeTaskDetailItem struct { + ID int64 `json:"id,string" dc:"明细ID"` + TaskID string `json:"taskId" dc:"任务ID"` + FileIndex int `json:"fileIndex" dc:"文件序号"` + FileName string `json:"fileName" dc:"文件名"` + TranscribedText string `json:"transcribedText" dc:"语音识别文字"` + Scenes string `json:"scenes" dc:"分镜分析JSON"` + AudioSize int64 `json:"audioSize" dc:"音频文件大小"` + AudioDuration string `json:"audioDuration" dc:"音频时长"` + Model string `json:"model" dc:"whisper模型"` + Language string `json:"language" dc:"语言代码"` + ErrorMessage string `json:"errorMessage" dc:"错误信息"` +} + +// ---------- 获取任务进度 ---------- + +// GetProgressReq 获取任务进度请求 +type GetProgressReq struct { + g.Meta `path:"/{taskId}/progress" method:"get" tags:"音频转写" summary:"查询任务进度" dc:"查询任务的当前处理进度"` + TaskID string `json:"taskId" dc:"任务ID"` +} + +// GetProgressRes 获取任务进度响应 +type GetProgressRes struct { + TaskID string `json:"taskId" dc:"任务ID"` + Status string `json:"status" dc:"任务状态"` + Progress int `json:"progress" dc:"进度0-100"` +} + +// ---------- 任务列表 ---------- + +// ListTaskReq 获取任务列表请求 +type ListTaskReq struct { + g.Meta `path:"/tasks" method:"get" tags:"音频转写" summary:"查询任务列表" dc:"分页查询任务列表,可按状态筛选"` + *beans.Page + Status string `json:"status" dc:"按状态筛选"` +} + +// ListTaskRes 获取任务列表响应 +type ListTaskRes struct { + List []TranscribeTaskItem `json:"list" dc:"任务列表"` + Total int `json:"total" dc:"总数"` +} + +// ---------- 回调通知结构 ---------- + +// CallbackPayload 回调通知内容 +type CallbackPayload struct { + TaskID string `json:"taskId" dc:"任务ID"` + Status string `json:"status" dc:"任务状态"` + TotalFiles int `json:"totalFiles" dc:"文件总数"` + SuccessFiles int `json:"successFiles" dc:"成功文件数"` + FailFiles int `json:"failFiles" dc:"失败文件数"` + Result string `json:"result,omitempty" dc:"完整的处理结果JSON"` + ErrorMessage string `json:"errorMessage,omitempty" dc:"错误信息"` + DetailList []TranscribeTaskDetailItem `json:"detailList" dc:"明细列表"` +} + +// ---------- 任务处理结果结构(用于result JSONB) ---------- + +// TaskResult 单任务处理结果 +type TaskResult struct { + Results []TaskResultItem `json:"results" dc:"处理结果列表"` +} + +// TaskResultItem 单视频处理结果 +type TaskResultItem struct { + FileName string `json:"fileName" dc:"文件名"` + Result *TaskResultDTO `json:"result,omitempty" dc:"识别结果"` + Error string `json:"error,omitempty" dc:"错误信息"` +} + +// TaskResultDTO 识别结果详情(对外输出,隐藏内部路径) +type TaskResultDTO struct { + Text string `json:"text" dc:"识别文本"` + Model string `json:"model" dc:"使用的模型"` + Language string `json:"language" dc:"语言"` + AudioSize int64 `json:"audioSize" dc:"音频文件大小(字节)"` + AudioDuration string `json:"audioDuration" dc:"音频时长"` + Scenes *SceneSummaryDTO `json:"scenes,omitempty" dc:"分镜分析"` +} diff --git a/model/dto/video/video_dto.go b/model/dto/video/video_dto.go index 216ce29..66cfbdd 100644 --- a/model/dto/video/video_dto.go +++ b/model/dto/video/video_dto.go @@ -1,9 +1,20 @@ package video +import "github.com/gogf/gf/v2/frame/g" + // ConcatReq 视频拼接请求(JSON body / URL 方式) type ConcatReq struct { + g.Meta `path:"/concat" method:"post" tags:"视频拼接" summary:"视频拼接(URL模式)" dc:"从视频URL下载并拼接"` VideoURLs []string `json:"video_urls" v:"required#视频URL列表不能为空" dc:"视频URL列表(按此顺序拼接)"` Method string `json:"method" dc:"拼接方式(auto/fast/reencode)" d:"auto"` + Upload bool `json:"upload" dc:"是否上传到MinIO" d:"false"` +} + +// ConcatUploadReq 视频拼接请求(文件上传模式) +type ConcatUploadReq struct { + g.Meta `path:"/concat/upload" method:"post" tags:"视频拼接" summary:"视频拼接(文件上传)" dc:"上传视频文件并拼接(至少2个视频)"` + Method string `json:"method" dc:"拼接方式(auto/fast/reencode)" d:"auto"` + Upload bool `json:"upload" dc:"是否上传到MinIO" d:"false"` } // ConcatRes 视频拼接响应 @@ -14,4 +25,21 @@ type ConcatRes struct { DurationStr string `json:"durationStr" dc:"可读时长"` MethodUsed string `json:"methodUsed" dc:"实际使用的拼接方式"` InputFiles int `json:"inputFiles" dc:"输入文件数"` + FileURL string `json:"fileURL" dc:"MinIO访问地址(上传后返回)"` +} + +// UploadFileBytesReq 上传文件请求(字节流) +type UploadFileBytesReq struct { + FileName string `json:"fileName" dc:"文件名"` + FileBytes []byte `json:"fileBytes" dc:"文件字节流"` + FileStoreURL string `json:"fileStoreURL" dc:"文件存储路径"` +} + +// UploadFileBytesRes 上传文件响应 +type UploadFileBytesRes struct { + FileURL string `json:"fileURL" dc:"上传地址"` + FileSize int `json:"fileSize" dc:"文件大小"` + FileName string `json:"fileName" dc:"文件名称"` + FileFormat string `json:"fileFormat" dc:"文件格式"` + FileAddressPrefix string `json:"fileAddressPrefix"` } diff --git a/model/entity/audio/transcribe_task.go b/model/entity/audio/transcribe_task.go new file mode 100644 index 0000000..4a92238 --- /dev/null +++ b/model/entity/audio/transcribe_task.go @@ -0,0 +1,63 @@ +package audio + +import "gitea.com/red-future/common/beans" + +// TranscribeTask 语音转文字任务批次头实体 +type TranscribeTask struct { + beans.SQLBaseDO `orm:",inherit"` + TaskID string `orm:"task_id" json:"taskId" description:"任务批次唯一标识"` + Status string `orm:"status" json:"status" description:"任务状态:pending/running/success/failed"` + Progress int `orm:"progress" json:"progress" description:"进度0-100"` + TotalFiles int `orm:"total_files" json:"totalFiles" description:"文件总数"` + SuccessFiles int `orm:"success_files" json:"successFiles" description:"成功文件数"` + FailFiles int `orm:"fail_files" json:"failFiles" description:"失败文件数"` + Model string `orm:"model" json:"model" description:"whisper模型"` + Language string `orm:"language" json:"language" description:"语言"` + Threshold float64 `orm:"threshold" json:"threshold" description:"场景检测阈值"` + InputType string `orm:"input_type" json:"inputType" description:"输入类型:upload/url"` + InputData string `orm:"input_data" json:"inputData" description:"输入数据(文件路径/URL列表JSON)"` + FileNames string `orm:"file_names" json:"fileNames" description:"文件名列表JSON"` + CallbackURL string `orm:"callback_url" json:"callbackUrl" description:"任务完成后的回调地址"` + Result string `orm:"result" json:"result" description:"完整的处理结果JSON"` + ErrorMessage string `orm:"error_message" json:"errorMessage" description:"错误信息"` +} + +// TranscribeTaskCol 字段定义 +type TranscribeTaskCol struct { + beans.SQLBaseCol + TaskID string + Status string + Progress string + TotalFiles string + SuccessFiles string + FailFiles string + Model string + Language string + Threshold string + InputType string + InputData string + FileNames string + CallbackURL string + Result string + ErrorMessage string +} + +// TranscribeTaskCols 字段常量 +var TranscribeTaskCols = TranscribeTaskCol{ + SQLBaseCol: beans.DefSQLBaseCol, + TaskID: "task_id", + Status: "status", + Progress: "progress", + TotalFiles: "total_files", + SuccessFiles: "success_files", + FailFiles: "fail_files", + Model: "model", + Language: "language", + Threshold: "threshold", + InputType: "input_type", + InputData: "input_data", + FileNames: "file_names", + CallbackURL: "callback_url", + Result: "result", + ErrorMessage: "error_message", +} diff --git a/model/entity/audio/transcribe_task_detail.go b/model/entity/audio/transcribe_task_detail.go new file mode 100644 index 0000000..ea661da --- /dev/null +++ b/model/entity/audio/transcribe_task_detail.go @@ -0,0 +1,48 @@ +package audio + +import "gitea.com/red-future/common/beans" + +// TranscribeTaskDetail 语音转文字任务明细实体(每视频一条) +type TranscribeTaskDetail struct { + beans.SQLBaseDO `orm:",inherit"` + TaskID string `orm:"task_id" json:"taskId" description:"所属任务批次ID"` + FileIndex int `orm:"file_index" json:"fileIndex" description:"文件序号(从0开始)"` + FileName string `orm:"file_name" json:"fileName" description:"文件名"` + TranscribedText string `orm:"transcribed_text" json:"transcribedText" description:"语音识别文字"` + Scenes string `orm:"scenes" json:"scenes" description:"分镜分析JSON"` + AudioSize int64 `orm:"audio_size" json:"audioSize" description:"音频文件大小(字节)"` + AudioDuration string `orm:"audio_duration" json:"audioDuration" description:"音频时长"` + Model string `orm:"model" json:"model" description:"whisper模型"` + Language string `orm:"language" json:"language" description:"语言代码"` + ErrorMessage string `orm:"error_message" json:"errorMessage" description:"错误信息"` +} + +// TranscribeTaskDetailCol 字段定义 +type TranscribeTaskDetailCol struct { + beans.SQLBaseCol + TaskID string + FileIndex string + FileName string + TranscribedText string + Scenes string + AudioSize string + AudioDuration string + Model string + Language string + ErrorMessage string +} + +// TranscribeTaskDetailCols 字段常量 +var TranscribeTaskDetailCols = TranscribeTaskDetailCol{ + SQLBaseCol: beans.DefSQLBaseCol, + TaskID: "task_id", + FileIndex: "file_index", + FileName: "file_name", + TranscribedText: "transcribed_text", + Scenes: "scenes", + AudioSize: "audio_size", + AudioDuration: "audio_duration", + Model: "model", + Language: "language", + ErrorMessage: "error_message", +} diff --git a/service/asr/task_service.go b/service/asr/task_service.go new file mode 100644 index 0000000..b379768 --- /dev/null +++ b/service/asr/task_service.go @@ -0,0 +1,449 @@ +package asr + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "strings" + "time" + + consts "media/consts/audio" + dao "media/dao/audio" + dto "media/model/dto/audio" + entity "media/model/entity/audio" + serviceScene "media/service/scene" + + "gitea.com/red-future/common/beans" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/gconv" + "github.com/gogf/gf/v2/util/guid" +) + +var AudioTask = new(audioTaskService) + +type audioTaskService struct{} + +// CreateTaskParams 创建任务参数 +type CreateTaskParams struct { + InputData []string // URL列表 + FileNames []string // 文件名列表 + Model string + Language string + Threshold float64 + CallbackURL string // 任务完成回调地址(完整URL,含ip+端口+路径) +} + +// Create 创建转写任务并立即返回taskId +func (s *audioTaskService) Create(ctx context.Context, params *CreateTaskParams) (res *dto.CreateTaskRes, err error) { + taskID := "tsk_" + guid.S() + + if params.Model == "" { + params.Model = g.Cfg().MustGet(ctx, "whisper.model", "medium").String() + } + if params.Language == "" { + params.Language = g.Cfg().MustGet(ctx, "whisper.language", "zh").String() + } + if params.Threshold <= 0 { + params.Threshold = 0.3 + } + + inputBytes, _ := json.Marshal(params.InputData) + fnBytes, _ := json.Marshal(params.FileNames) + + now := time.Now() + task := &entity.TranscribeTask{ + TaskID: taskID, + Status: consts.TaskStatusPending, + Progress: 0, + TotalFiles: len(params.InputData), + InputType: consts.InputTypeURL, + Model: params.Model, + Language: params.Language, + Threshold: params.Threshold, + InputData: string(inputBytes), + FileNames: string(fnBytes), + CallbackURL: params.CallbackURL, + } + task.CreatedAt = gconv.GTime(now) + task.UpdatedAt = gconv.GTime(now) + + if _, daoErr := dao.TranscribeTask.Insert(ctx, task); daoErr != nil { + return nil, fmt.Errorf("创建任务失败: %v", daoErr) + } + + g.Log().Infof(ctx, "[创建任务 %s] 文件数=%d, 模型=%s, 语言=%s, 回调=%s", + taskID, len(params.InputData), params.Model, params.Language, params.CallbackURL) + + // 异步处理 + go s.processTask(taskID, params.InputData, params.Model, params.Language, params.Threshold, params.CallbackURL) + + return &dto.CreateTaskRes{TaskID: taskID}, nil +} + +// processTask 异步处理所有URL,每个文件生成一条明细 +func (s *audioTaskService) processTask(taskID string, urls []string, model, language string, threshold float64, callbackURL string) { + ctx := context.Background() + ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1}) + + defer func() { + if r := recover(); r != nil { + errMsg := fmt.Sprintf("任务处理异常: %v", r) + g.Log().Errorf(ctx, "[任务 %s] %s, 将通过回调通知调用方", taskID, errMsg) + dao.TranscribeTask.UpdateError(ctx, taskID, errMsg) + g.Log().Infof(ctx, "[任务 %s] 触发失败回调(panic恢复)", taskID) + s.callback(ctx, taskID, consts.TaskStatusFailed, errMsg, callbackURL) + } + }() + + g.Log().Infof(ctx, "[任务 %s] 开始处理, 共%d个URL, 回调地址=%s", taskID, len(urls), callbackURL) + dao.TranscribeTask.UpdateTaskRunning(ctx, taskID, 5) + g.Log().Infof(ctx, "[任务 %s] 状态已更新为 running, 进度=5", taskID) + + tempDir := getTempDir(ctx) + os.MkdirAll(tempDir, 0755) + + var results []dto.TranscribeItem + successCount, failCount := 0, 0 + + total := len(urls) + for i, videoURL := range urls { + g.Log().Infof(ctx, "[任务 %s] 下载 %d/%d: %s", taskID, i+1, total, videoURL) + + progress := 10 + (i*70)/total + dao.TranscribeTask.UpdateProgress(ctx, taskID, progress) + g.Log().Debugf(ctx, "[任务 %s] 进度更新: %d/%d → %d%%", taskID, i+1, total, progress) + + savePath, dlErr := downloadFromURL(ctx, videoURL, tempDir) + if dlErr != nil { + g.Log().Warningf(ctx, "[任务 %s] 文件%d/%d 下载失败: %v", taskID, i+1, total, dlErr) + s.saveDetail(ctx, taskID, i, fmt.Sprintf("url_%d.mp4", i+1), + "", "", 0, "", model, language, dlErr.Error()) + results = append(results, dto.TranscribeItem{ + FileName: fmt.Sprintf("url_%d.mp4", i+1), + Error: dlErr.Error(), + }) + failCount++ + continue + } + + fileName := filepath.Base(savePath) + result := s.processSingleVideo(ctx, taskID, savePath, i, fileName, model, language, threshold) + results = append(results, *result) + if result.Error != "" { + g.Log().Warningf(ctx, "[任务 %s] 文件%d/%d 处理失败: %s - %s", taskID, i+1, total, fileName, result.Error) + failCount++ + } else { + g.Log().Infof(ctx, "[任务 %s] 文件%d/%d 处理成功: %s", taskID, i+1, total, fileName) + successCount++ + } + } + + g.Log().Infof(ctx, "[任务 %s] 所有文件处理完毕, 成功=%d 失败=%d, 开始构建结果JSON", taskID, successCount, failCount) + + // 构建完整结果JSON + progress := 95 + dao.TranscribeTask.UpdateProgress(ctx, taskID, progress) + g.Log().Debugf(ctx, "[任务 %s] 进度更新: 95%% (结果构建中)", taskID) + + resultObj := dto.TaskResult{Results: make([]dto.TaskResultItem, len(results))} + for i, item := range results { + itemDTO := dto.TaskResultItem{ + FileName: item.FileName, + Error: item.Error, + } + if item.Result != nil { + if r, ok := item.Result.(*dto.TranscribeResult); ok { + itemDTO.Result = &dto.TaskResultDTO{ + Text: r.Text, + Model: r.Model, + Language: r.Language, + AudioSize: r.AudioSize, + AudioDuration: r.AudioDuration, + Scenes: r.Scenes, + } + } + } + resultObj.Results[i] = itemDTO + } + + resultJSON, marshalErr := json.Marshal(resultObj) + if marshalErr != nil { + errMsg := "结果序列化失败: " + marshalErr.Error() + g.Log().Errorf(ctx, "[任务 %s] %s", taskID, errMsg) + dao.TranscribeTask.UpdateError(ctx, taskID, errMsg) + s.callback(ctx, taskID, consts.TaskStatusFailed, errMsg, callbackURL) + return + } + + resultSize := len(resultJSON) + g.Log().Infof(ctx, "[任务 %s] 结果JSON序列化完成, 大小=%d字节", taskID, resultSize) + + if _, err := dao.TranscribeTask.UpdateResult(ctx, taskID, string(resultJSON), successCount, failCount); err != nil { + g.Log().Errorf(ctx, "[任务 %s] 保存结果失败: %v, 通过回调发送结果", taskID, err) + s.callback(ctx, taskID, consts.TaskStatusFailed, fmt.Sprintf("保存结果失败: %v", err), callbackURL) + return + } + g.Log().Infof(ctx, "[任务 %s] 结果已入库, 成功=%d 失败=%d, 触发成功回调", taskID, successCount, failCount) + + if callbackURL != "" { + s.callback(ctx, taskID, consts.TaskStatusSuccess, "", callbackURL) + } + g.Log().Infof(ctx, "[任务 %s] 全部处理流程结束", taskID) +} + +// callback 向回调地址 POST 任务结果 +func (s *audioTaskService) callback(ctx context.Context, taskID, status, errMsg, callbackURL string) { + if callbackURL == "" { + return + } + + task, _ := dao.TranscribeTask.GetByTaskID(ctx, taskID) + if task == nil { + g.Log().Errorf(ctx, "[回调 %s] 任务不存在", taskID) + return + } + + detailList, _ := dao.TranscribeTaskDetail.ListByTaskID(ctx, taskID) + detailItems := make([]dto.TranscribeTaskDetailItem, 0, len(detailList)) + for i := range detailList { + detailItems = append(detailItems, dao.DetailEntityToItem(&detailList[i])) + } + + payload := dto.CallbackPayload{ + TaskID: taskID, + Status: status, + TotalFiles: task.TotalFiles, + SuccessFiles: task.SuccessFiles, + FailFiles: task.FailFiles, + ErrorMessage: errMsg, + Result: task.Result, + DetailList: detailItems, + } + + body, _ := json.Marshal(payload) + g.Log().Infof(ctx, "[回调 %s] 触发回调, 状态=%s, 成功=%d 失败=%d, 错误=%s, 目标=%s", + taskID, status, payload.SuccessFiles, payload.FailFiles, errMsg, callbackURL) + g.Log().Debugf(ctx, "[回调 %s] 回调载荷长度=%d字节, 明细条数=%d", + taskID, len(body), len(detailItems)) + + req, _ := http.NewRequest("POST", callbackURL, bytes.NewReader(body)) + req.Header.Set("Content-Type", "application/json") + // 透传调用方的用户信息,供回调方 GetUserInfo 从 X-User-Info 头获取 + userJSON, _ := json.Marshal(beans.User{UserName: "admin", TenantId: 1}) + req.Header.Set("X-User-Info", string(userJSON)) + + resp, reqErr := http.DefaultClient.Do(req) + if reqErr != nil { + g.Log().Errorf(ctx, "[回调 %s] 请求失败: %v", taskID, reqErr) + return + } + defer resp.Body.Close() + + respBody, _ := io.ReadAll(resp.Body) + g.Log().Infof(ctx, "[回调 %s] 响应 status=%d, body=%s", taskID, resp.StatusCode, string(respBody)) +} + +// processSingleVideo 处理单个文件,同时写入明细表 +func (s *audioTaskService) processSingleVideo(ctx context.Context, taskID, savePath string, fileIndex int, fileName, model, language string, threshold float64) *dto.TranscribeItem { + if idx := strings.Index(fileName, "_"); idx > 0 { + fileName = fileName[idx+1:] + } + + g.Log().Infof(ctx, "[任务 %s] 开始处理文件(fileIndex=%d): %s", taskID, fileIndex, fileName) + + var scenes *dto.SceneSummaryDTO + sceneRes, sceneErr := serviceScene.SceneAnalyzer.Analyze(ctx, &serviceScene.SceneAnalyzeReq{ + VideoPaths: []string{savePath}, + Threshold: threshold, + ExtractKeyframes: false, + }) + if sceneErr != nil { + g.Log().Warningf(ctx, "[任务 %s] 文件 %s 分镜分析失败: %v", taskID, fileName, sceneErr) + } else if len(sceneRes.Analyses) > 0 { + scenes = toSceneDTO(&sceneRes.Analyses[0]) + g.Log().Infof(ctx, "[任务 %s] 文件 %s 分镜分析完成, 场景数=%d", taskID, fileName, scenes.TotalScenes) + } else { + g.Log().Infof(ctx, "[任务 %s] 文件 %s 分镜分析无结果", taskID, fileName) + } + + g.Log().Infof(ctx, "[任务 %s] 文件 %s 开始语音识别, 模型=%s, 语言=%s", taskID, fileName, model, language) + transRes, transErr := VideoTranscribe.TranscribeVideo(ctx, &VideoTranscribeReq{ + VideoPath: savePath, + Model: model, + Language: language, + }) + if transErr != nil { + g.Log().Errorf(ctx, "[任务 %s] 文件 %s 语音识别失败: %v", taskID, fileName, transErr) + os.Remove(savePath) + s.saveDetail(ctx, taskID, fileIndex, fileName, + "", "", 0, "", model, language, transErr.Error()) + return &dto.TranscribeItem{ + FileName: fileName, + Error: transErr.Error(), + } + } + + g.Log().Infof(ctx, "[任务 %s] 文件 %s 语音识别成功, 文本长度=%d, 音频时长=%s, 大小=%d", + taskID, fileName, len(transRes.Text), transRes.AudioDuration, transRes.AudioSize) + + var scenesJSON string + if scenes != nil { + sb, _ := json.Marshal(scenes) + scenesJSON = string(sb) + } + + s.saveDetail(ctx, taskID, fileIndex, fileName, + transRes.Text, scenesJSON, transRes.AudioSize, transRes.AudioDuration, model, language, "") + + return &dto.TranscribeItem{ + FileName: fileName, + Result: &dto.TranscribeResult{ + Text: transRes.Text, + Model: transRes.Model, + Language: transRes.Language, + AudioPath: transRes.AudioPath, + AudioSize: transRes.AudioSize, + AudioDuration: transRes.AudioDuration, + Scenes: scenes, + }, + } +} + +// saveDetail 保存单文件明细到 transcribe_task_detail +func (s *audioTaskService) saveDetail(ctx context.Context, taskID string, fileIndex int, fileName, text, scenes string, audioSize int64, audioDuration, model, language, errMsg string) { + detail := &entity.TranscribeTaskDetail{ + TaskID: taskID, + FileIndex: fileIndex, + FileName: fileName, + TranscribedText: text, + Scenes: scenes, + AudioSize: audioSize, + AudioDuration: audioDuration, + Model: model, + Language: language, + ErrorMessage: errMsg, + } + if _, daoErr := dao.TranscribeTaskDetail.Insert(ctx, detail); daoErr != nil { + g.Log().Errorf(ctx, "[任务 %s] 保存明细失败(fileIndex=%d): %v", taskID, fileIndex, daoErr) + } else { + g.Log().Debugf(ctx, "[任务 %s] 明细已保存(fileIndex=%d, fileName=%s, 有错误=%v)", + taskID, fileIndex, fileName, errMsg != "") + } +} + +// ---------- 查询任务 ---------- + +func (s *audioTaskService) GetTask(ctx context.Context, req *dto.GetTaskReq) (res *dto.GetTaskRes, err error) { + if req.TaskID == "" { + return nil, fmt.Errorf("taskId不能为空") + } + task, err := dao.TranscribeTask.GetByTaskID(ctx, req.TaskID) + if err != nil { + return nil, fmt.Errorf("查询任务失败: %v", err) + } + if task == nil { + return nil, fmt.Errorf("任务不存在: %s", req.TaskID) + } + + detailList, err := dao.TranscribeTaskDetail.ListByTaskID(ctx, req.TaskID) + if err != nil { + g.Log().Warningf(ctx, "[任务 %s] 查询明细失败: %v", req.TaskID, err) + } + + g.Log().Infof(ctx, "[查询任务] taskId=%s, 状态=%s, 进度=%d", req.TaskID, task.Status, task.Progress) + + item := dao.EntityToItem(task) + detailItems := make([]dto.TranscribeTaskDetailItem, 0, len(detailList)) + for i := range detailList { + detailItems = append(detailItems, dao.DetailEntityToItem(&detailList[i])) + } + + // 兼容历史数据: 若 detail.scenes 为空但有 result JSON, 从 result 中提取 scenes 补上 + detailItems = enrichDetailsFromResult(task.Result, detailItems) + + return &dto.GetTaskRes{ + TaskInfo: item, + DetailList: detailItems, + }, nil +} + +// enrichDetailsFromResult 从 result JSON 中补全明细中的 scenes 等字段 +func enrichDetailsFromResult(resultJSON string, details []dto.TranscribeTaskDetailItem) []dto.TranscribeTaskDetailItem { + if resultJSON == "" || len(details) == 0 { + return details + } + + var taskResult dto.TaskResult + if err := json.Unmarshal([]byte(resultJSON), &taskResult); err != nil { + return details + } + + for i, d := range details { + if d.Scenes != "" { + continue // 已有 scenes,不需要补 + } + for _, r := range taskResult.Results { + if r.Result == nil || r.Result.Scenes == nil { + continue + } + if r.FileName == d.FileName { + sb, _ := json.Marshal(r.Result.Scenes) + details[i].Scenes = string(sb) + // 同时补全其他可能缺失的字段 + if d.AudioDuration == "" { + details[i].AudioDuration = r.Result.AudioDuration + } + if d.AudioSize == 0 { + details[i].AudioSize = r.Result.AudioSize + } + if d.Model == "" { + details[i].Model = r.Result.Model + } + if d.Language == "" { + details[i].Language = r.Result.Language + } + break + } + } + } + return details +} + +func (s *audioTaskService) GetProgress(ctx context.Context, req *dto.GetProgressReq) (res *dto.GetProgressRes, err error) { + if req.TaskID == "" { + return nil, fmt.Errorf("taskId不能为空") + } + task, err := dao.TranscribeTask.GetByTaskID(ctx, req.TaskID) + if err != nil { + return nil, fmt.Errorf("查询任务失败: %v", err) + } + if task == nil { + return nil, fmt.Errorf("任务不存在: %s", req.TaskID) + } + p := dao.EntityToProgress(task) + g.Log().Infof(ctx, "[查询进度] taskId=%s, 状态=%s, 进度=%d", req.TaskID, p.Status, p.Progress) + return &p, nil +} + +func (s *audioTaskService) ListTasks(ctx context.Context, req *dto.ListTaskReq) (res *dto.ListTaskRes, err error) { + // 确保分页参数不为 nil + if req.Page == nil { + req.Page = &beans.Page{PageNum: 1, PageSize: 10} + } + list, total, err := dao.TranscribeTask.List(ctx, req) + if err != nil { + return nil, fmt.Errorf("查询任务列表失败: %v", err) + } + items := make([]dto.TranscribeTaskItem, len(list)) + for i, task := range list { + items[i] = dao.EntityToItem(&task) + } + g.Log().Infof(ctx, "[查询列表] status=%s, pageNum=%d, pageSize=%d, 命中=%d/总量=%d", + req.Status, req.Page.PageNum, req.Page.PageSize, len(items), total) + return &dto.ListTaskRes{List: items, Total: total}, nil +} diff --git a/service/asr/whisper_service.go b/service/asr/whisper_service.go index 2ae664b..c90f974 100644 --- a/service/asr/whisper_service.go +++ b/service/asr/whisper_service.go @@ -9,6 +9,7 @@ import ( "os" "os/exec" "path/filepath" + "runtime" "strings" "time" @@ -24,11 +25,10 @@ const ( backendCpp // whisper.cpp (whisper-cpp) ) -// WhisperService 语音识别服务 -type WhisperService struct{} +type whisperService struct{} // Whisper 语音识别服务单例 -var Whisper = new(WhisperService) +var Whisper = new(whisperService) // TranscribeReq 语音识别请求 type TranscribeReq struct { @@ -54,7 +54,7 @@ type Segment struct { } // Transcribe 对音频文件进行语音识别(自动检测后端,自动降级) -func (s *WhisperService) Transcribe(ctx context.Context, req *TranscribeReq) (res *TranscribeRes, err error) { +func (s *whisperService) Transcribe(ctx context.Context, req *TranscribeReq) (res *TranscribeRes, err error) { // 1. 校验音频文件 if _, err = os.Stat(req.AudioPath); os.IsNotExist(err) { return nil, fmt.Errorf("音频文件不存在: %s", req.AudioPath) @@ -94,7 +94,7 @@ func (s *WhisperService) Transcribe(ctx context.Context, req *TranscribeReq) (re } // transcribeWithCLI 使用 whisper CLI 命令 -func (s *WhisperService) transcribeWithCLI(ctx context.Context, req *TranscribeReq, whisperPath, model, language string) (res *TranscribeRes, err error) { +func (s *whisperService) transcribeWithCLI(ctx context.Context, req *TranscribeReq, whisperPath, model, language string) (res *TranscribeRes, err error) { outputDir := filepath.Dir(req.AudioPath) modelDir := g.Cfg().MustGet(ctx, "whisper.model_dir", "").String() threads := g.Cfg().MustGet(ctx, "whisper.threads", 2).Int() @@ -122,7 +122,7 @@ func (s *WhisperService) transcribeWithCLI(ctx context.Context, req *TranscribeR } // transcribeWithPython 使用 python -m whisper -func (s *WhisperService) transcribeWithPython(ctx context.Context, req *TranscribeReq, model, language string) (res *TranscribeRes, err error) { +func (s *whisperService) transcribeWithPython(ctx context.Context, req *TranscribeReq, model, language string) (res *TranscribeRes, err error) { // 查找 python pythonPath, err := exec.LookPath("python3") if err != nil { @@ -160,7 +160,7 @@ func (s *WhisperService) transcribeWithPython(ctx context.Context, req *Transcri } // readTxtResult 读取 whisper 输出的 txt 文件 -func (s *WhisperService) readTxtResult(outputDir, audioPath, model string) (res *TranscribeRes, err error) { +func (s *whisperService) readTxtResult(outputDir, audioPath, model string) (res *TranscribeRes, err error) { baseName := strings.TrimSuffix(filepath.Base(audioPath), filepath.Ext(audioPath)) txtPaths := []string{ filepath.Join(outputDir, baseName+".txt"), @@ -201,7 +201,7 @@ func cleanTranscript(text string) string { } // detectBackend 检测可用的 whisper 后端,返回后端类型和可执行路径 -func (s *WhisperService) detectBackend() (WhisperBackend, string) { +func (s *whisperService) detectBackend() (WhisperBackend, string) { // 1. 优先检测 C++ 版 whisper.cpp(最快,但参数格式不同) for _, name := range []string{"whisper-cpp", "whisper-cli"} { if path, err := exec.LookPath(name); err == nil { @@ -242,7 +242,7 @@ func (s *WhisperService) detectBackend() (WhisperBackend, string) { } // resolveCppModelPath 查找或下载 whisper.cpp 模型文件 -func (s *WhisperService) resolveCppModelPath(model string) string { +func (s *whisperService) resolveCppModelPath(model string) string { modelName := strings.TrimPrefix(model, "ggml-") modelName = strings.TrimSuffix(modelName, ".bin") @@ -262,8 +262,20 @@ func (s *WhisperService) resolveCppModelPath(model string) string { altPaths := []string{ cppModelName, filepath.Join(home, ".cache", "whisper", "ggml-"+modelName+"-q5_0.bin"), - "/opt/homebrew/share/whisper-cpp/models/" + cppModelName, - "/usr/local/share/whisper-cpp/models/" + cppModelName, + } + // macOS: Homebrew 安装的 whisper.cpp 模型路径 + if runtime.GOOS == "darwin" { + altPaths = append(altPaths, + "/opt/homebrew/share/whisper-cpp/models/"+cppModelName, + "/usr/local/share/whisper-cpp/models/"+cppModelName, + ) + } + // Linux: 常见系统安装路径 + if runtime.GOOS == "linux" { + altPaths = append(altPaths, + "/usr/share/whisper-cpp/models/"+cppModelName, + "/usr/local/share/whisper-cpp/models/"+cppModelName, + ) } for _, p := range altPaths { if _, err := os.Stat(p); err == nil { @@ -310,7 +322,7 @@ func (s *WhisperService) resolveCppModelPath(model string) string { } // downloadFile 下载文件到指定路径(支持超时) -func (s *WhisperService) downloadFile(url, destPath string, timeout time.Duration) error { +func (s *whisperService) downloadFile(url, destPath string, timeout time.Duration) error { tmpPath := destPath + ".tmp" out, err := os.Create(tmpPath) if err != nil { @@ -346,7 +358,7 @@ func (s *WhisperService) downloadFile(url, destPath string, timeout time.Duratio } // transcribeWithCpp 使用 whisper.cpp(C++ 版,参数格式不同) -func (s *WhisperService) transcribeWithCpp(ctx context.Context, req *TranscribeReq, binaryPath, model, language string) (res *TranscribeRes, err error) { +func (s *whisperService) transcribeWithCpp(ctx context.Context, req *TranscribeReq, binaryPath, model, language string) (res *TranscribeRes, err error) { outputDir := filepath.Dir(req.AudioPath) baseName := strings.TrimSuffix(filepath.Base(req.AudioPath), filepath.Ext(req.AudioPath)) outputPrefix := filepath.Join(outputDir, baseName) diff --git a/service/audio/audio_extract_service.go b/service/audio/audio_extract_service.go index 93bc693..b11fa8a 100644 --- a/service/audio/audio_extract_service.go +++ b/service/audio/audio_extract_service.go @@ -13,10 +13,10 @@ import ( ) // AudioExtractService 音频提取服务 -type AudioExtractService struct{} +type audioExtractService struct{} // AudioExtract 音频提取服务单例 -var AudioExtract = new(AudioExtractService) +var AudioExtract = new(audioExtractService) // ExtractAudioReq 提取音频请求 type ExtractAudioReq struct { @@ -32,7 +32,7 @@ type ExtractAudioRes struct { } // Extract 从视频中提取音频 -func (s *AudioExtractService) Extract(ctx context.Context, req *ExtractAudioReq) (res *ExtractAudioRes, err error) { +func (s *audioExtractService) Extract(ctx context.Context, req *ExtractAudioReq) (res *ExtractAudioRes, err error) { // 1. 校验视频文件存在 if _, err = os.Stat(req.VideoPath); os.IsNotExist(err) { return nil, fmt.Errorf("视频文件不存在: %s", req.VideoPath) @@ -117,7 +117,7 @@ func (s *AudioExtractService) Extract(ctx context.Context, req *ExtractAudioReq) } // getFFmpegPath 获取 ffmpeg 可执行路径 -func (s *AudioExtractService) getFFmpegPath() (string, error) { +func (s *audioExtractService) getFFmpegPath() (string, error) { // 1. 优先从配置读取 ffmpegPath := g.Cfg().MustGet(context.Background(), "ffmpeg.path", "").String() if ffmpegPath != "" { @@ -135,7 +135,7 @@ func (s *AudioExtractService) getFFmpegPath() (string, error) { } // getAudioDuration 获取音频时长 -func (s *AudioExtractService) getAudioDuration(ctx context.Context, ffmpegPath string, audioPath string) (string, error) { +func (s *audioExtractService) getAudioDuration(ctx context.Context, ffmpegPath string, audioPath string) (string, error) { // 使用 ffprobe 获取时长 // 先尝试查找 ffprobe ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe") @@ -168,7 +168,7 @@ func (s *AudioExtractService) getAudioDuration(ctx context.Context, ffmpegPath s } // ExtractAndCleanup 提取音频并清理临时视频文件 -func (s *AudioExtractService) ExtractAndCleanup(ctx context.Context, req *ExtractAudioReq) (res *ExtractAudioRes, err error) { +func (s *audioExtractService) ExtractAndCleanup(ctx context.Context, req *ExtractAudioReq) (res *ExtractAudioRes, err error) { res, err = s.Extract(ctx, req) if err != nil { return nil, err diff --git a/service/setup/setup_service.go b/service/setup/setup_service.go index 862ec5f..0174b83 100644 --- a/service/setup/setup_service.go +++ b/service/setup/setup_service.go @@ -19,10 +19,18 @@ var ( DetectedWhisperPath string ) -// EnsureDependencies 启动时检查并安装 ffmpeg 和 whisper -func EnsureDependencies(ctx context.Context) { +func init() { + ensureDependencies() +} + +// ensureDependencies 启动时检查并安装 ffmpeg 和 whisper +func ensureDependencies() { + ctx := context.Background() g.Log().Info(ctx, "========== 检查依赖环境 ==========") + // 打印当前运行环境信息 + g.Log().Infof(ctx, "平台: %s/%s, Docker: %v", runtime.GOOS, runtime.GOARCH, isRunningInContainer()) + ensureFFmpeg(ctx) ensureWhisper(ctx) resolveWhisperPath(ctx) @@ -35,6 +43,26 @@ func EnsureDependencies(ctx context.Context) { g.Log().Info(ctx, "===================================") } +// isRunningInContainer 检测是否运行在 Docker 容器中 +func isRunningInContainer() bool { + // 方法1: 检查 /.dockerenv 文件 + if _, err := os.Stat("/.dockerenv"); err == nil { + return true + } + // 方法2: 检查 /proc/1/cgroup 是否包含 docker 关键字 + if data, err := os.ReadFile("/proc/1/cgroup"); err == nil { + if strings.Contains(string(data), "docker") || + strings.Contains(string(data), "kubepods") || + strings.Contains(string(data), "containerd") { + return true + } + } + return false +} + +// inContainer 是否为容器环境(简化调用) +var inContainer = isRunningInContainer() + // ensureFFmpeg 确保 ffmpeg 可用 func ensureFFmpeg(ctx context.Context) { if _, err := exec.LookPath("ffmpeg"); err == nil { @@ -46,52 +74,147 @@ func ensureFFmpeg(ctx context.Context) { switch runtime.GOOS { case "darwin": - // 检查是否安装了 Homebrew - if _, err := exec.LookPath("brew"); err != nil { - g.Log().Warningf(ctx, "[ffmpeg] ⚠ 未检测到 Homebrew,请手动安装:\n brew install ffmpeg") - return - } - cmd := exec.CommandContext(ctx, "brew", "install", "ffmpeg") - output, err := cmd.CombinedOutput() - if err != nil { - g.Log().Errorf(ctx, "[ffmpeg] ❌ 安装失败: %v\n%s", err, string(output)) - return - } - g.Log().Info(ctx, "[ffmpeg] ✔ 安装成功") + installFFmpegOnMac(ctx) case "linux": - // 尝试 apt - if _, err := exec.LookPath("apt"); err == nil { - cmd := exec.CommandContext(ctx, "sudo", "apt", "install", "-y", "ffmpeg") - output, err := cmd.CombinedOutput() - if err != nil { - g.Log().Errorf(ctx, "[ffmpeg] ❌ apt 安装失败: %v\n%s", err, string(output)) - return - } - g.Log().Info(ctx, "[ffmpeg] ✔ 安装成功") - return - } - // 尝试 yum - if _, err := exec.LookPath("yum"); err == nil { - cmd := exec.CommandContext(ctx, "sudo", "yum", "install", "-y", "ffmpeg") - output, err := cmd.CombinedOutput() - if err != nil { - g.Log().Errorf(ctx, "[ffmpeg] ❌ yum 安装失败: %v\n%s", err, string(output)) - return - } - g.Log().Info(ctx, "[ffmpeg] ✔ 安装成功") - return - } - g.Log().Warningf(ctx, "[ffmpeg] ⚠ 请手动安装: sudo apt install ffmpeg") + installFFmpegOnLinux(ctx) + + case "windows": + installFFmpegOnWindows(ctx) default: g.Log().Warningf(ctx, "[ffmpeg] ⚠ 不支持的平台(%s),请手动安装 ffmpeg", runtime.GOOS) } } +// installFFmpegOnMac 通过 Homebrew 安装 ffmpeg +func installFFmpegOnMac(ctx context.Context) { + if _, err := exec.LookPath("brew"); err != nil { + g.Log().Warningf(ctx, "[ffmpeg] ⚠ 未检测到 Homebrew,请手动安装:\n brew install ffmpeg") + return + } + cmd := exec.CommandContext(ctx, "brew", "install", "ffmpeg") + output, err := cmd.CombinedOutput() + if err != nil { + g.Log().Errorf(ctx, "[ffmpeg] ❌ 安装失败: %v\n%s", err, string(output)) + return + } + g.Log().Info(ctx, "[ffmpeg] ✔ 安装成功") +} + +// installFFmpegOnLinux 在 Linux(含 Docker)上安装 ffmpeg +func installFFmpegOnLinux(ctx context.Context) { + // Docker 容器通常以 root 运行,不需要 sudo + sudoPrefix := "" + if !inContainer { + // 非容器环境,检查是否需要 sudo + if _, err := exec.LookPath("sudo"); err == nil { + sudoPrefix = "sudo" + } + } + + // 1. 尝试 apt (Debian/Ubuntu) + if _, err := exec.LookPath("apt-get"); err == nil { + args := []string{"install", "-y", "ffmpeg"} + if sudoPrefix != "" { + args = append([]string{sudoPrefix}, args...) + } + cmd := exec.CommandContext(ctx, "apt-get", args...) + output, err := cmd.CombinedOutput() + if err != nil { + g.Log().Errorf(ctx, "[ffmpeg] ❌ apt-get 安装失败: %v\n%s", err, string(output)) + return + } + g.Log().Info(ctx, "[ffmpeg] ✔ 安装成功") + // 更新库缓存(Debian/Ubuntu 会用 ldconfig 更新) + return + } + + // 2. 尝试 apk (Alpine Linux,常见于 Docker 精简镜像) + if _, err := exec.LookPath("apk"); err == nil { + // Alpine 的 apk 不需要 sudo(默认以 root 运行) + cmd := exec.CommandContext(ctx, "apk", "add", "ffmpeg") + output, err := cmd.CombinedOutput() + if err != nil { + g.Log().Errorf(ctx, "[ffmpeg] ❌ apk 安装失败: %v\n%s", err, string(output)) + return + } + g.Log().Info(ctx, "[ffmpeg] ✔ 安装成功") + return + } + + // 3. 尝试 yum (CentOS/RHEL) + if _, err := exec.LookPath("yum"); err == nil { + args := []string{"install", "-y", "ffmpeg"} + if sudoPrefix != "" { + args = append([]string{sudoPrefix}, args...) + } + cmd := exec.CommandContext(ctx, "yum", args...) + output, err := cmd.CombinedOutput() + if err != nil { + g.Log().Errorf(ctx, "[ffmpeg] ❌ yum 安装失败: %v\n%s", err, string(output)) + return + } + g.Log().Info(ctx, "[ffmpeg] ✔ 安装成功") + return + } + + if inContainer { + g.Log().Warningf(ctx, "[ffmpeg] ⚠ 容器中未找到 apt-get/apk/yum,请将 ffmpeg 预装在 Docker 镜像中") + } else { + g.Log().Warningf(ctx, "[ffmpeg] ⚠ 请手动安装: sudo apt-get install ffmpeg") + } +} + +// installFFmpegOnWindows 在 Windows 上安装 ffmpeg +func installFFmpegOnWindows(ctx context.Context) { + // 1. 尝试 winget (Windows 10/11 内置) + if _, err := exec.LookPath("winget"); err == nil { + g.Log().Infof(ctx, "[ffmpeg] 通过 winget 安装...") + cmd := exec.CommandContext(ctx, "winget", "install", "--id", "FFmpeg.FFmpeg", "-e", "--accept-package-agreements") + output, err := cmd.CombinedOutput() + if err == nil { + g.Log().Info(ctx, "[ffmpeg] ✔ 安装成功") + return + } + g.Log().Warningf(ctx, "[ffmpeg] ⚠ winget 安装失败: %v\n%s", err, string(output)) + } + + // 2. 尝试 choco (Chocolatey) + if _, err := exec.LookPath("choco"); err == nil { + // choco 安装可能需要管理员权限 + g.Log().Infof(ctx, "[ffmpeg] 通过 choco 安装...") + cmd := exec.CommandContext(ctx, "choco", "install", "ffmpeg", "-y") + output, err := cmd.CombinedOutput() + if err == nil { + g.Log().Info(ctx, "[ffmpeg] ✔ 安装成功") + return + } + g.Log().Warningf(ctx, "[ffmpeg] ⚠ choco 安装失败: %v\n%s", err, string(output)) + } + + // 3. 尝试 scoop + if _, err := exec.LookPath("scoop"); err == nil { + g.Log().Infof(ctx, "[ffmpeg] 通过 scoop 安装...") + cmd := exec.CommandContext(ctx, "scoop", "install", "ffmpeg") + output, err := cmd.CombinedOutput() + if err == nil { + g.Log().Info(ctx, "[ffmpeg] ✔ 安装成功") + return + } + g.Log().Warningf(ctx, "[ffmpeg] ⚠ scoop 安装失败: %v\n%s", err, string(output)) + } + + g.Log().Warningf(ctx, `[ffmpeg] ⚠ 请手动安装 ffmpeg,推荐方式: + 1. winget install --id FFmpeg.FFmpeg -e + 2. choco install ffmpeg -y + 3. 从 https://ffmpeg.org/download.html 下载并加入 PATH`) +} + // ensureWhisper 确保 whisper 可用(优先安装 C++ 版,速度更快) func ensureWhisper(ctx context.Context) { // 1. 检查是否已有 whisper-cpp(C++ 版,最快) + // exec.LookPath 在 Windows 上会自动查找 .exe 后缀 if path, err := exec.LookPath("whisper-cpp"); err == nil { g.Log().Infof(ctx, "[whisper] ✔ C++ 版已安装: %s", path) return @@ -101,16 +224,19 @@ func ensureWhisper(ctx context.Context) { return } - // 2. 检查 Homebrew 安装目录(即使不在 PATH 也能找到) - if p := findHomebrewWhisperCpp(); p != "" { - DetectedWhisperPath = p - // 自动添加到 PATH 环境变量 - addToShellPath(ctx, filepath.Dir(p)) - g.Log().Infof(ctx, "[whisper] ✔ C++ 版已安装(自动检测): %s", p) - return + // 2. 仅在 macOS 上检查 Homebrew 安装目录(即使不在 PATH 也能找到) + if runtime.GOOS == "darwin" { + if p := findHomebrewWhisperCpp(); p != "" { + DetectedWhisperPath = p + if !inContainer { + addToShellPath(ctx, filepath.Dir(p)) + } + g.Log().Infof(ctx, "[whisper] ✔ C++ 版已安装(自动检测): %s", p) + return + } } - // 3. 尝试安装 whisper-cpp(C++ 版) + // 3. 仅在 macOS 上尝试使用 Homebrew 安装 C++ 版 if runtime.GOOS == "darwin" { if _, err := exec.LookPath("brew"); err == nil { g.Log().Infof(ctx, "[whisper] 安装 C++ 版 (brew install whisper-cpp)...") @@ -118,9 +244,9 @@ func ensureWhisper(ctx context.Context) { output, err := cmd.CombinedOutput() if err == nil { g.Log().Info(ctx, "[whisper] ✔ C++ 版安装成功") - // 装好后把 Homebrew bin 加到 PATH - addToShellPath(ctx, getHomebrewBinDir()) - // 检测安装路径 + if !inContainer { + addToShellPath(ctx, getHomebrewBinDir()) + } if p := findHomebrewWhisperCpp(); p != "" { DetectedWhisperPath = p } @@ -151,16 +277,25 @@ func ensureWhisper(ctx context.Context) { pipCmd = "pip" } - cmd := exec.CommandContext(ctx, pipCmd, "install", "--user", "openai-whisper") + // pip install --user 可能在某些环境下不兼容,尝试先不加 --user,失败后再加 + cmd := exec.CommandContext(ctx, pipCmd, "install", "openai-whisper") output, err := cmd.CombinedOutput() if err != nil { - g.Log().Errorf(ctx, "[whisper] ❌ pip 安装失败: %v\n%s", err, string(output)) - return + // 尝试 --user 模式 + g.Log().Warningf(ctx, "[whisper] pip 全局安装失败: %v,尝试 --user 模式...", err) + cmd = exec.CommandContext(ctx, pipCmd, "install", "--user", "openai-whisper") + output, err = cmd.CombinedOutput() + if err != nil { + g.Log().Errorf(ctx, "[whisper] ❌ pip 安装失败: %v\n%s", err, string(output)) + return + } } g.Log().Info(ctx, "[whisper] ✔ Python 版安装成功") - // 安装后自动配置 PATH - configureWhisperPath(ctx) + // 安装后自动配置 PATH(仅在非容器、非 Windows 环境) + if !inContainer && runtime.GOOS != "windows" { + configureWhisperPath(ctx) + } } // resolveWhisperPath 自动找到 whisper 二进制路径并存储 @@ -174,6 +309,7 @@ func resolveWhisperPath(ctx context.Context) { } // 1. 优先检测 C++ 版本(快 3-5 倍) + // exec.LookPath 在 Windows 上自动查找 .exe 后缀 for _, name := range []string{"whisper-cpp", "whisper-cli"} { if path, err := exec.LookPath(name); err == nil { DetectedWhisperPath = path @@ -182,11 +318,13 @@ func resolveWhisperPath(ctx context.Context) { } } - // 2. 在 Homebrew 目录查找 C++ 版本 - if p := findHomebrewWhisperCpp(); p != "" { - DetectedWhisperPath = p - g.Log().Infof(ctx, "[whisper] ✔ C++ 版(自动检测): %s", p) - return + // 2. 仅在 macOS 上查找 Homebrew 目录下的 C++ 版本 + if runtime.GOOS == "darwin" { + if p := findHomebrewWhisperCpp(); p != "" { + DetectedWhisperPath = p + g.Log().Infof(ctx, "[whisper] ✔ C++ 版(自动检测): %s", p) + return + } } // 3. 从 PATH 查找 Python 版 whisper @@ -215,6 +353,10 @@ func getWhisperCandidates() []string { // 通过 python 探针获取 user-site bin 目录 if p := getUserPythonBin(); p != "" { candidates = append(candidates, filepath.Join(p, "whisper")) + // Windows 上 pip 安装的可执行文件是 .exe + if runtime.GOOS == "windows" { + candidates = append(candidates, filepath.Join(p, "whisper.exe")) + } } // 常见 pip user base 路径 @@ -233,6 +375,22 @@ func getWhisperCandidates() []string { candidates = append(candidates, filepath.Join(userHome, ".local", "bin", "whisper"), ) + case "windows": + // Windows 上 pip --user 安装的脚本路径 + candidates = append(candidates, + filepath.Join(userHome, "AppData", "Roaming", "Python", "Scripts", "whisper.exe"), + filepath.Join(userHome, "AppData", "Roaming", "Python", "Scripts", "whisper"), + filepath.Join(userHome, "AppData", "Local", "Programs", "Python", "Scripts", "whisper.exe"), + filepath.Join(userHome, "AppData", "Local", "Programs", "Python", "Scripts", "whisper"), + ) + // Python 版本特定路径 + pythonVersions := []string{"39", "310", "311", "312", "313"} + for _, ver := range pythonVersions { + candidates = append(candidates, + filepath.Join(userHome, "AppData", "Roaming", "Python", "Python"+ver, "Scripts", "whisper.exe"), + filepath.Join(userHome, "AppData", "Roaming", "Python", "Python"+ver, "Scripts", "whisper"), + ) + } } return candidates @@ -361,6 +519,17 @@ func addToShellPath(ctx context.Context, dir string) { return } + // 容器环境不修改 shell 配置(无意义) + if inContainer { + return + } + + // Windows 环境不修改 shell rc 文件(使用系统环境变量) + if runtime.GOOS == "windows" { + g.Log().Infof(ctx, "[setup] Windows 环境,请手动将 %s 添加到系统 PATH 环境变量", dir) + return + } + // 检查是否已在 PATH 中 currentPath := os.Getenv("PATH") if strings.Contains(currentPath, dir) { diff --git a/service/video/concat_service.go b/service/video/concat_service.go index 2d39383..e8aaa33 100644 --- a/service/video/concat_service.go +++ b/service/video/concat_service.go @@ -1,27 +1,34 @@ package video import ( + "bytes" "context" + "encoding/json" "fmt" + "io" + "mime/multipart" "os" "os/exec" "path/filepath" "strings" + commonHttp "gitea.com/red-future/common/http" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/glog" ) -// ConcatService 视频拼接服务 -type ConcatService struct{} +type concatService struct{} // Concat 视频拼接服务单例 -var Concat = new(ConcatService) +var Concat = new(concatService) // ConcatReq 视频拼接请求 type ConcatReq struct { VideoPaths []string // 视频文件路径列表(按此顺序拼接) OutputPath string // 输出视频文件路径,空则自动生成 Method string // 拼接方式: auto/fast/reencode,默认 auto + Upload bool // 是否上传到MinIO } // ConcatRes 视频拼接响应 @@ -32,10 +39,11 @@ type ConcatRes struct { DurationStr string `json:"durationStr"` // 可读时长 MethodUsed string `json:"methodUsed"` // 实际使用的拼接方式 InputFiles int `json:"inputFiles"` // 输入文件数 + FileURL string `json:"fileURL"` // MinIO访问地址(上传后返回) } // Concat 拼接多个视频为一个 -func (s *ConcatService) Concat(ctx context.Context, req *ConcatReq) (res *ConcatRes, err error) { +func (s *concatService) Concat(ctx context.Context, req *ConcatReq) (res *ConcatRes, err error) { if len(req.VideoPaths) < 2 { return nil, fmt.Errorf("至少需要2个视频才能拼接") } @@ -98,11 +106,21 @@ func (s *ConcatService) Concat(ctx context.Context, req *ConcatReq) (res *Concat MethodUsed: methodUsed, InputFiles: len(req.VideoPaths), } + + // 如果需要上传到 MinIO + if req.Upload { + uploadRes, uploadErr := s.UploadToMinIO(ctx, outputPath) + if uploadErr != nil { + return nil, fmt.Errorf("上传到MinIO失败: %v", uploadErr) + } + res.FileURL = uploadRes.FileURL + } + return } // concatByDemuxer 使用 concat demuxer 无损拼接(要求同编码参数) -func (s *ConcatService) concatByDemuxer(ctx context.Context, ffmpegPath string, inputs []string, output string) error { +func (s *concatService) concatByDemuxer(ctx context.Context, ffmpegPath string, inputs []string, output string) error { // 创建文件列表 fileListPath := filepath.Join(filepath.Dir(output), "concat_list.txt") var lines []string @@ -132,7 +150,7 @@ func (s *ConcatService) concatByDemuxer(ctx context.Context, ffmpegPath string, } // concatByFilter 使用 concat filter 重编码拼接(自动归一化分辨率/音频参数) -func (s *ConcatService) concatByFilter(ctx context.Context, ffmpegPath string, inputs []string, output string) error { +func (s *concatService) concatByFilter(ctx context.Context, ffmpegPath string, inputs []string, output string) error { n := len(inputs) // 1. 获取所有视频的分辨率,确定统一输出尺寸 @@ -211,7 +229,7 @@ func (s *ConcatService) concatByFilter(ctx context.Context, ffmpegPath string, i } // getVideoResolution 获取视频分辨率 -func (s *ConcatService) getVideoResolution(ctx context.Context, ffmpegPath, videoPath string) (width, height int, err error) { +func (s *concatService) getVideoResolution(ctx context.Context, ffmpegPath, videoPath string) (width, height int, err error) { ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe") if _, err := os.Stat(ffprobePath); os.IsNotExist(err) { ffprobePath = "ffprobe" @@ -233,7 +251,7 @@ func (s *ConcatService) getVideoResolution(ctx context.Context, ffmpegPath, vide } // getVideoDuration 获取视频时长 -func (s *ConcatService) getVideoDuration(ctx context.Context, ffmpegPath, videoPath string) (float64, error) { +func (s *concatService) getVideoDuration(ctx context.Context, ffmpegPath, videoPath string) (float64, error) { ffprobePath := filepath.Join(filepath.Dir(ffmpegPath), "ffprobe") if _, err := os.Stat(ffprobePath); os.IsNotExist(err) { ffprobePath = "ffprobe" @@ -256,7 +274,7 @@ func (s *ConcatService) getVideoDuration(ctx context.Context, ffmpegPath, videoP return duration, nil } -func (s *ConcatService) getFFmpegPath() (string, error) { +func (s *concatService) getFFmpegPath() (string, error) { ffmpegPath := g.Cfg().MustGet(context.Background(), "ffmpeg.path", "").String() if ffmpegPath != "" { if _, err := os.Stat(ffmpegPath); err == nil { @@ -277,6 +295,84 @@ func formatDuration(seconds float64) string { return fmt.Sprintf("%02d:%02d:%02d", h, m, s) } +// uploadFileRes 上传文件响应 +type uploadFileRes struct { + FileURL string `json:"fileURL" dc:"上传地址"` + FileSize int `json:"fileSize" dc:"文件大小"` + FileName string `json:"fileName" dc:"文件名称"` + FileFormat string `json:"fileFormat" dc:"文件格式"` + FileAddressPrefix string `json:"fileAddressPrefix"` +} + +// UploadToMinIO 通过 OSS 微服务的 multipart 文件上传接口上传到 MinIO +func (s *concatService) UploadToMinIO(ctx context.Context, localFilePath string) (*uploadFileRes, error) { + // 手动构建 multipart/form-data 表单 + var buf bytes.Buffer + mw := multipart.NewWriter(&buf) + + file, err := os.Open(localFilePath) + if err != nil { + return nil, fmt.Errorf("打开文件失败: %v", err) + } + defer file.Close() + + fw, err := mw.CreateFormFile("file", filepath.Base(localFilePath)) + if err != nil { + return nil, fmt.Errorf("创建表单文件字段失败: %v", err) + } + if _, err = io.Copy(fw, file); err != nil { + return nil, fmt.Errorf("写入文件内容失败: %v", err) + } + mw.Close() + + client := commonHttp.Httpclient.Clone() + + // 透传认证 headers + if r := g.RequestFromCtx(ctx); r != nil { + for k, v := range r.Header { + client.SetHeader(k, v[0]) + } + } + // 设置 multipart Content-Type(含 boundary) + contentType := mw.FormDataContentType() + g.Log().Debugf(ctx, "[UploadToMinIO] Content-Type: %s", contentType) + client.SetHeader("Content-Type", contentType) + + // 打印请求信息 + postBytes := buf.Bytes() + g.Log().Debugf(ctx, "[UploadToMinIO] 请求URL: oss/file/uploadFile, 文件: %s, Body大小: %d bytes, Boundary: %s", + localFilePath, len(postBytes), mw.Boundary()) + + response, err := client.Post(ctx, "oss/file/uploadFile", postBytes) + if err != nil { + glog.Error(ctx, err) + return nil, fmt.Errorf("调用OSS上传服务失败: %v", err) + } + defer response.Close() + + body := response.ReadAll() + + // 调试:打印原始响应 + g.Log().Debugf(ctx, "[UploadToMinIO] OSS原始响应: %s", string(body)) + + // 解析标准 GoFrame 响应格式 {code, message, data} + var apiResp struct { + Code int `json:"code"` + Message string `json:"message"` + Data *uploadFileRes `json:"data"` + } + if err = json.Unmarshal(body, &apiResp); err != nil { + return nil, fmt.Errorf("响应解析失败: %v", err) + } + + if apiResp.Code != 200 && apiResp.Code != 0 { + return nil, fmt.Errorf("OSS上传失败: %s", apiResp.Message) + } + + g.Log().Infof(ctx, "[UploadToMinIO] 上传成功 fileURL=%s size=%d", apiResp.Data.FileURL, apiResp.Data.FileSize) + return apiResp.Data, nil +} + // CleanupConcat 清理输入视频文件 func CleanupConcat(paths []string) { for _, p := range paths { diff --git a/sql/transcribe_task.sql b/sql/transcribe_task.sql new file mode 100644 index 0000000..8d774a8 --- /dev/null +++ b/sql/transcribe_task.sql @@ -0,0 +1,86 @@ +-- transcribe_task 语音转文字任务表(批次头) +CREATE TABLE IF NOT EXISTS transcribe_task ( + id BIGSERIAL NOT NULL, + task_id VARCHAR(64) NOT NULL, + tenant_id BIGINT NOT NULL DEFAULT 0, + status VARCHAR(20) NOT NULL DEFAULT 'pending', + progress INT NOT NULL DEFAULT 0, + total_files INT NOT NULL DEFAULT 0, + success_files INT NOT NULL DEFAULT 0, + fail_files INT NOT NULL DEFAULT 0, + model VARCHAR(32) NOT NULL DEFAULT 'medium', + language VARCHAR(10) NOT NULL DEFAULT 'zh', + threshold DECIMAL(4,2) NOT NULL DEFAULT 0.30, + input_type VARCHAR(10) NOT NULL DEFAULT 'upload', + input_data TEXT, + file_names TEXT, + callback_url TEXT, + result TEXT, + error_message TEXT, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + deleted_at TIMESTAMP WITH TIME ZONE, + PRIMARY KEY (id) +); + +COMMENT ON TABLE transcribe_task IS '语音转文字异步任务批次头表'; +COMMENT ON COLUMN transcribe_task.id IS '主键ID'; +COMMENT ON COLUMN transcribe_task.task_id IS '任务批次唯一标识'; +COMMENT ON COLUMN transcribe_task.tenant_id IS '租户ID'; +COMMENT ON COLUMN transcribe_task.status IS '任务状态:pending等待/running处理中/success成功/failed失败'; +COMMENT ON COLUMN transcribe_task.progress IS '处理进度(0-100)'; +COMMENT ON COLUMN transcribe_task.total_files IS '文件总数'; +COMMENT ON COLUMN transcribe_task.success_files IS '成功文件数'; +COMMENT ON COLUMN transcribe_task.fail_files IS '失败文件数'; +COMMENT ON COLUMN transcribe_task.model IS 'whisper模型:tiny/base/small/medium/large'; +COMMENT ON COLUMN transcribe_task.language IS '语言代码:zh/en/ja等'; +COMMENT ON COLUMN transcribe_task.threshold IS '场景检测阈值(0.1-0.5)'; +COMMENT ON COLUMN transcribe_task.input_type IS '输入类型:upload文件上传/url远程URL'; +COMMENT ON COLUMN transcribe_task.input_data IS '输入数据JSON:上传模式存文件路径列表,URL模式存URL列表'; +COMMENT ON COLUMN transcribe_task.file_names IS '文件名列表JSON,用于展示'; +COMMENT ON COLUMN transcribe_task.callback_url IS '任务完成后的回调地址(可选)'; +COMMENT ON COLUMN transcribe_task.result IS '完整的处理结果JSON(成功后填充)'; +COMMENT ON COLUMN transcribe_task.error_message IS '错误信息,失败后填充'; +COMMENT ON COLUMN transcribe_task.created_at IS '创建时间'; +COMMENT ON COLUMN transcribe_task.updated_at IS '更新时间'; +COMMENT ON COLUMN transcribe_task.deleted_at IS '删除时间(软删除)'; + +CREATE UNIQUE INDEX IF NOT EXISTS idx_transcribe_task_task_id ON transcribe_task(task_id); +CREATE INDEX IF NOT EXISTS idx_transcribe_task_status ON transcribe_task(status); +CREATE INDEX IF NOT EXISTS idx_transcribe_task_created_at ON transcribe_task(created_at); + +-- transcribe_task_detail 语音转文字任务明细表(每个视频一条) +CREATE TABLE IF NOT EXISTS transcribe_task_detail ( + id BIGSERIAL NOT NULL, + task_id VARCHAR(64) NOT NULL, + tenant_id BIGINT NOT NULL DEFAULT 0, + file_index INT NOT NULL DEFAULT 0, + file_name VARCHAR(255) NOT NULL DEFAULT '', + transcribed_text TEXT, + scenes TEXT, + audio_size BIGINT DEFAULT 0, + audio_duration VARCHAR(32) DEFAULT '', + model VARCHAR(32) DEFAULT '', + language VARCHAR(10) DEFAULT '', + error_message TEXT, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + PRIMARY KEY (id) +); + +COMMENT ON TABLE transcribe_task_detail IS '语音转文字任务明细表(每视频一条)'; +COMMENT ON COLUMN transcribe_task_detail.id IS '主键ID'; +COMMENT ON COLUMN transcribe_task_detail.task_id IS '所属任务批次ID'; +COMMENT ON COLUMN transcribe_task_detail.tenant_id IS '租户ID'; +COMMENT ON COLUMN transcribe_task_detail.file_index IS '文件在批次中的序号(从0开始)'; +COMMENT ON COLUMN transcribe_task_detail.file_name IS '文件名'; +COMMENT ON COLUMN transcribe_task_detail.transcribed_text IS '语音识别文字'; +COMMENT ON COLUMN transcribe_task_detail.scenes IS '分镜分析JSON'; +COMMENT ON COLUMN transcribe_task_detail.audio_size IS '音频文件大小(字节)'; +COMMENT ON COLUMN transcribe_task_detail.audio_duration IS '音频时长'; +COMMENT ON COLUMN transcribe_task_detail.model IS 'whisper模型'; +COMMENT ON COLUMN transcribe_task_detail.language IS '语言代码'; +COMMENT ON COLUMN transcribe_task_detail.error_message IS '错误信息,失败后填充'; +COMMENT ON COLUMN transcribe_task_detail.created_at IS '创建时间'; + +CREATE INDEX IF NOT EXISTS idx_transcribe_task_detail_task_id ON transcribe_task_detail(task_id); +CREATE INDEX IF NOT EXISTS idx_transcribe_task_detail_file_idx ON transcribe_task_detail(task_id, file_index);