From f6c70a451ec58ba9e6ca36fe8dbb129790385465 Mon Sep 17 00:00:00 2001 From: WangLiZhao <1838393649@qq.com> Date: Sat, 25 Apr 2026 10:42:21 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E6=93=8D=E4=BD=9C?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E3=80=81=E4=BB=BB=E5=8A=A1=E5=88=86=E9=A1=B5?= =?UTF-8?q?=E6=9F=A5=E8=AF=A2=E4=B8=8E=E6=A8=A1=E5=9E=8B=E5=A4=B1=E8=B4=A5?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增操作日志表(asynch_op_log)及对应DAO,记录任务创建等操作的审计信息 - 新增任务分页查询接口(ListTask)及对应DTO、Service和DAO方法 - 优化模型调用失败重试逻辑:支持配置重试排队策略(插队到队首或队尾) - 新增临时文件存储机制,当模型调用成功但OSS上传失败时,下次仅重试OSS上传 - 模型配置新增retry_queue_max_seconds字段,控制失败重试排队策略 - 更新数据库表结构(asynch_models、asynch_task、新增asynch_op_log)及同步更新SQL - 配置文件调整:超时单位改为秒,更新服务地址和轮询间隔 - 修复模型列表查询支持按名称模糊搜索 --- README.md | 2 +- config.yml | 21 ++-- consts/public/table_name.go | 1 + controller/model_controller.go | 6 +- controller/task_controller.go | 6 + dao/model_dao.go | 8 +- dao/op_log_dao.go | 22 ++++ dao/task_dao.go | 26 ++++ dao/task_dao_bg.go | 38 ++++-- model/dto/model_dto.go | 59 ++++----- model/dto/task_dto.go | 20 +++- model/entity/asynch_model.go | 33 ++--- model/entity/asynch_op_log.go | 57 +++++++++ model/entity/asynch_task.go | 8 ++ service/cleaner.go | 17 ++- service/model_invoker.go | 58 ++++++--- service/model_service.go | 40 ++++--- service/storage_oss.go | 15 ++- service/task_service.go | 57 +++++++++ service/tmp_store.go | 38 ++++++ service/worker.go | 42 +++++-- update.sql | 213 ++++++++++++++++++--------------- 22 files changed, 573 insertions(+), 214 deletions(-) create mode 100644 dao/op_log_dao.go create mode 100644 model/entity/asynch_op_log.go create mode 100644 service/tmp_store.go diff --git a/README.md b/README.md index e504340..881494f 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ - 请求地址:`base_url + route` - 请求方式:`http_method`(GET/POST) - 请求密钥:`api_key`(以请求头注入,示例:`TTS_API_KEY:your-key`) - - 超时:`timeout_ms` + - 超时:`timeout_seconds` - 并发:`max_concurrency`(按租户+模型的 Redis 分布式信号量限流) - 重试:`retry_times`(失败后最多再重试 N 次) - 保留:`auto_clean_seconds`(任务被业务领取到 `state=4` 后的保留秒数,到期清理) diff --git a/config.yml b/config.yml index 22ddea5..36b854c 100644 --- a/config.yml +++ b/config.yml @@ -31,29 +31,22 @@ database: asynch: worker: enabled: true # 是否启用后台 worker(开发环境可关闭避免刷DB错误) - pollInterval: "5s" # 轮询间隔(DB抢占 pending 任务) - batchSize: 5 # 每次抢占任务数量 + pollInterval: "10s" # 轮询间隔(DB抢占 pending 任务) + batchSize: 10 # 每次抢占任务数量 goroutines: 1 # worker 并发数(每个 goroutine 串行处理) taskTimeout: "5m" # state=0/1 超时自动失败 cleaner: enabled: true # 是否启用自动清理器(可选) - interval: "5s" # 清理任务扫描间隔 - + interval: "10s" # 清理任务扫描间隔 + redis: default: - address: 116.204.74.41:6379 + address: 192.168.3.30:6379 db: 0 consul: - address: 116.204.74.41:8500 + address: 192.168.3.30:8500 jaeger: - addr: 116.204.74.41:4318 - -# OSS 文件服务 -# 当前实现:通过 common/http 的服务发现直接调用: -# POST oss/file/uploadFile (multipart/form-data) -# 鉴权:透传 Authorization / X-User-Info -oss: - addr: "116.204.74.41:9000" + addr: 192.168.3.30:4318 diff --git a/consts/public/table_name.go b/consts/public/table_name.go index 2e23a86..24e2570 100644 --- a/consts/public/table_name.go +++ b/consts/public/table_name.go @@ -3,4 +3,5 @@ package public const ( TableNameModel = "asynch_models" // 异步模型表 TableNameTask = "asynch_task" // 异步任务表 + TableNameOpLog = "asynch_op_log" // 异步操作日志表 ) diff --git a/controller/model_controller.go b/controller/model_controller.go index adcabea..d7810e2 100644 --- a/controller/model_controller.go +++ b/controller/model_controller.go @@ -56,7 +56,11 @@ func (c *model) ListModel(ctx context.Context, req *dto.ListModelReq) (res *dto. pageSize = int(req.Page.PageSize) } } - list, total, err := service.Model.List(ctx, pageNum, pageSize) + modelName := "" + if req != nil { + modelName = req.ModelName + } + list, total, err := service.Model.List(ctx, pageNum, pageSize, modelName) if err != nil { return nil, err } diff --git a/controller/task_controller.go b/controller/task_controller.go index ecfbfbd..551caad 100644 --- a/controller/task_controller.go +++ b/controller/task_controller.go @@ -29,3 +29,9 @@ func (c *task) GetTaskBatch(ctx context.Context, req *dto.GetTaskBatchReq) (res ctx = ensureUser(ctx) return service.Task.GetBatch(ctx, req) } + +// ListTask 任务列表分页查询 +func (c *task) ListTask(ctx context.Context, req *dto.ListTaskReq) (res *dto.ListTaskRes, err error) { + ctx = ensureUser(ctx) + return service.Task.List(ctx, req) +} diff --git a/dao/model_dao.go b/dao/model_dao.go index e6fbaee..4c0141b 100644 --- a/dao/model_dao.go +++ b/dao/model_dao.go @@ -73,8 +73,11 @@ func (d *modelDao) GetByID(ctx context.Context, id int64) (m *entity.AsynchModel return } -func (d *modelDao) List(ctx context.Context, pageNum, pageSize int) (list []*entity.AsynchModel, total int64, err error) { - model := gfdb.DB(ctx).Model(ctx, public.TableNameModel).OrderDesc(entity.AsynchModelCol.CreatedAt) +func (d *modelDao) List(ctx context.Context, pageNum, pageSize int, modelNameLike string) (list []*entity.AsynchModel, total int64, err error) { + model := gfdb.DB(ctx).Model(ctx, public.TableNameModel).Where("deleted_at IS NULL").OrderDesc(entity.AsynchModelCol.CreatedAt) + if modelNameLike != "" { + model = model.WhereLike(entity.AsynchModelCol.ModelName, "%"+modelNameLike+"%") + } if pageNum > 0 && pageSize > 0 { model = model.Page(pageNum, pageSize) } @@ -86,4 +89,3 @@ func (d *modelDao) List(ctx context.Context, pageNum, pageSize int) (list []*ent err = r.Structs(&list) return } - diff --git a/dao/op_log_dao.go b/dao/op_log_dao.go new file mode 100644 index 0000000..3293cd7 --- /dev/null +++ b/dao/op_log_dao.go @@ -0,0 +1,22 @@ +package dao + +import ( + "context" + + "model-asynch/consts/public" + "model-asynch/model/entity" + + "gitea.com/red-future/common/db/gfdb" +) + +type opLogDao struct{} + +var OpLog = &opLogDao{} + +func (d *opLogDao) Insert(ctx context.Context, log *entity.AsynchOpLog) (id int64, err error) { + r, err := gfdb.DB(ctx).Model(ctx, public.TableNameOpLog).Data(log).Insert() + if err != nil { + return 0, err + } + return r.LastInsertId() +} diff --git a/dao/task_dao.go b/dao/task_dao.go index dba2194..5f4a7d0 100644 --- a/dao/task_dao.go +++ b/dao/task_dao.go @@ -11,6 +11,7 @@ import ( "gitea.com/red-future/common/db/gfdb" "github.com/gogf/gf/v2/database/gdb" "github.com/gogf/gf/v2/os/gtime" + "github.com/gogf/gf/v2/util/gconv" ) var Task = &taskDao{} @@ -136,6 +137,31 @@ func (d *taskDao) CountActiveByModel(ctx context.Context, modelName string) (int return int64(n), err } +// List 任务分页查询(受 gfdb 租户 Hook 影响) +func (d *taskDao) List(ctx context.Context, pageNum, pageSize int, modelNameLike, taskIDLike string, state *int) (list []*entity.AsynchTask, total int64, err error) { + m := gfdb.DB(ctx).Model(ctx, public.TableNameTask).Where("deleted_at IS NULL") + if modelNameLike != "" { + m = m.WhereLike(entity.AsynchTaskCol.ModelName, "%"+modelNameLike+"%") + } + if taskIDLike != "" { + m = m.WhereLike(entity.AsynchTaskCol.TaskID, "%"+taskIDLike+"%") + } + if state != nil { + m = m.Where(entity.AsynchTaskCol.State, *state) + } + m = m.OrderDesc(entity.AsynchTaskCol.CreatedAt) + if pageNum > 0 && pageSize > 0 { + m = m.Page(pageNum, pageSize) + } + r, totalInt, err := m.AllAndCount(false) + if err != nil { + return nil, 0, err + } + total = gconv.Int64(totalInt) + err = r.Structs(&list) + return +} + // ClaimPending 抢占 pending 任务(state=0),并在同一事务中更新为 running(state=1) // 使用 PostgreSQL: FOR UPDATE SKIP LOCKED 避免多 worker 重复消费 func (d *taskDao) ClaimPending(ctx context.Context, batchSize int) (tasks []*entity.AsynchTask, err error) { diff --git a/dao/task_dao_bg.go b/dao/task_dao_bg.go index b6ca963..5159139 100644 --- a/dao/task_dao_bg.go +++ b/dao/task_dao_bg.go @@ -20,7 +20,7 @@ func (d *taskDao) ClaimPendingGlobal(ctx context.Context, batchSize int) (tasks } err = gfdb.DB(ctx).Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { sql := fmt.Sprintf( - `SELECT id, tenant_id, model_name, task_id, input_ref, request_payload + `SELECT id, tenant_id, model_name, task_id, input_ref, request_payload, phase, tmp_file FROM %s WHERE deleted_at IS NULL AND state = 0 ORDER BY enqueue_at ASC @@ -58,7 +58,7 @@ func (d *taskDao) ClaimPendingGlobal(ctx context.Context, batchSize int) (tasks func (d *taskDao) UpdateSuccessGlobal(ctx context.Context, id int64, ossFile, fileType string, fileSize int64, expireAt *gtime.Time) error { now := gtime.Now() _, err := gfdb.DB(ctx).Exec(ctx, - fmt.Sprintf(`UPDATE %s SET state=2, oss_file=?, file_type=?, file_size=?, error_msg='', finished_at=?, expire_at=NULL, updated_at=? WHERE id=?`, public.TableNameTask), + fmt.Sprintf(`UPDATE %s SET state=2, oss_file=?, file_type=?, file_size=?, error_msg='', finished_at=?, expire_at=NULL, phase=0, tmp_file='', updated_at=? WHERE id=?`, public.TableNameTask), ossFile, fileType, fileSize, now, now, id, ) return err @@ -67,12 +67,31 @@ func (d *taskDao) UpdateSuccessGlobal(ctx context.Context, id int64, ossFile, fi func (d *taskDao) UpdateFailedGlobal(ctx context.Context, id int64, errorMsg string) error { now := gtime.Now() _, err := gfdb.DB(ctx).Exec(ctx, - fmt.Sprintf(`UPDATE %s SET state=3, error_msg=?, finished_at=?, updated_at=? WHERE id=?`, public.TableNameTask), + fmt.Sprintf(`UPDATE %s SET state=3, error_msg=?, finished_at=?, phase=0, tmp_file='', updated_at=? WHERE id=?`, public.TableNameTask), errorMsg, now, now, id, ) return err } +// UpdateFailedKeepTmpGlobal OSS 上传失败:保留 phase/tmp_file,下一轮仅重试 OSS 上传 +func (d *taskDao) UpdateFailedKeepTmpGlobal(ctx context.Context, id int64, errorMsg string) error { + now := gtime.Now() + _, err := gfdb.DB(ctx).Exec(ctx, + fmt.Sprintf(`UPDATE %s SET state=3, error_msg=?, finished_at=?, phase=1, updated_at=? WHERE id=?`, public.TableNameTask), + errorMsg, now, now, id, + ) + return err +} + +// UpdateTmpAfterModelGlobal 模型调用成功后,写入临时文件路径并标记 phase=1 +func (d *taskDao) UpdateTmpAfterModelGlobal(ctx context.Context, id int64, tmpFile string) error { + _, err := gfdb.DB(ctx).Exec(ctx, + fmt.Sprintf(`UPDATE %s SET phase=1, tmp_file=?, updated_at=NOW() WHERE id=?`, public.TableNameTask), + tmpFile, id, + ) + return err +} + func (d *taskDao) SoftDeleteByTaskIDGlobal(ctx context.Context, taskID string) error { _, err := gfdb.DB(ctx).Exec(ctx, fmt.Sprintf(`UPDATE %s SET deleted_at=NOW(), updated_at=NOW() WHERE task_id=? AND deleted_at IS NULL`, public.TableNameTask), @@ -113,7 +132,8 @@ func (d *taskDao) ListFailedRetryableGlobal(ctx context.Context, limit int) (lis } r, err := gfdb.DB(ctx).GetAll(ctx, fmt.Sprintf(` -SELECT t.* +SELECT t.*, + m.retry_queue_max_seconds AS retry_queue_max_seconds FROM %s t JOIN %s m ON t.tenant_id = m.tenant_id @@ -132,11 +152,13 @@ SELECT t.* return } -// RequeueForRetryGlobal 将任务重新入队(state=0,enqueue_at=now),并将 retry_count +1 -func (d *taskDao) RequeueForRetryGlobal(ctx context.Context, id int64) error { +// RequeueForRetryGlobal 将任务重新入队(state=0),并将 retry_count +1 +// enqueueAt 用于控制重试任务在队列中的位置: +// - enqueueAt 越早,越靠前(ClaimPendingGlobal 按 enqueue_at ASC 抢占) +func (d *taskDao) RequeueForRetryGlobal(ctx context.Context, id int64, enqueueAt time.Time) error { _, err := gfdb.DB(ctx).Exec(ctx, - fmt.Sprintf(`UPDATE %s SET state=0, retry_count=retry_count+1, enqueue_at=NOW(), updated_at=NOW() WHERE id=? AND state=3 AND deleted_at IS NULL`, public.TableNameTask), - id, + fmt.Sprintf(`UPDATE %s SET state=0, retry_count=retry_count+1, enqueue_at=?, updated_at=NOW() WHERE id=? AND state=3 AND deleted_at IS NULL`, public.TableNameTask), + enqueueAt, id, ) return err } diff --git a/model/dto/model_dto.go b/model/dto/model_dto.go index 1434a6b..3dbe929 100644 --- a/model/dto/model_dto.go +++ b/model/dto/model_dto.go @@ -7,19 +7,20 @@ import ( // CreateModelReq 添加模型配置 type CreateModelReq struct { - g.Meta `path:"/createModel" method:"post" tags:"模型管理" summary:"创建模型配置" dc:"添加新的模型配置"` - ModelName string `p:"modelName" json:"modelName" v:"required#modelName不能为空" dc:"模型名称(唯一标识)"` - BaseURL string `p:"baseUrl" json:"baseUrl" v:"required#baseUrl不能为空" dc:"模型服务基础地址(如 http(s)://host:port)"` - Route string `p:"route" json:"route" dc:"路由/路径(拼接到 BaseURL 之后的可选路径)"` - HttpMethod string `p:"httpMethod" json:"httpMethod" dc:"请求方式:GET/POST(默认POST)"` - APIKey string `p:"apiKey" json:"apiKey" dc:"请求密钥绑定(请求头),示例:TTS_API_KEY:your-key"` - Enabled int `p:"enabled" json:"enabled" dc:"是否启用:0-禁用,1-启用"` - MaxConcurrency int `p:"maxConcurrency" json:"maxConcurrency" dc:"最大并发数"` - QueueLimit int `p:"queueLimit" json:"queueLimit" dc:"排队队列上限(超过则拒绝/限流)"` - TimeoutMs int `p:"timeoutMs" json:"timeoutMs" dc:"请求超时时间(毫秒)"` - RetryTimes int `p:"retryTimes" json:"retryTimes" dc:"失败重试次数"` - AutoCleanSeconds int `p:"autoCleanSeconds" json:"autoCleanSeconds" dc:"自动清理间隔(秒)(如清理超时任务/队列)"` - Remark string `p:"remark" json:"remark" dc:"备注说明"` + g.Meta `path:"/createModel" method:"post" tags:"模型管理" summary:"创建模型配置" dc:"添加新的模型配置"` + ModelName string `p:"modelName" json:"modelName" v:"required#modelName不能为空" dc:"模型名称(唯一标识)"` + BaseURL string `p:"baseUrl" json:"baseUrl" v:"required#baseUrl不能为空" dc:"模型服务基础地址(如 http(s)://host:port)"` + Route string `p:"route" json:"route" dc:"路由/路径(拼接到 BaseURL 之后的可选路径)"` + HttpMethod string `p:"httpMethod" json:"httpMethod" dc:"请求方式:GET/POST(默认POST)"` + APIKey string `p:"apiKey" json:"apiKey" dc:"请求密钥绑定(请求头),示例:TTS_API_KEY:your-key"` + Enabled int `p:"enabled" json:"enabled" dc:"是否启用:0-禁用,1-启用"` + MaxConcurrency int `p:"maxConcurrency" json:"maxConcurrency" dc:"最大并发数"` + QueueLimit int `p:"queueLimit" json:"queueLimit" dc:"排队队列上限(超过则拒绝/限流)"` + TimeoutSeconds int `p:"timeoutSeconds" json:"timeoutSeconds" dc:"请求超时时间(秒)"` + RetryTimes int `p:"retryTimes" json:"retryTimes" dc:"失败重试次数"` + RetryQueueMaxSeconds int `p:"retryQueueMaxSeconds" json:"retryQueueMaxSeconds" dc:"失败重试最大排队时间(秒);0表示失败重试插队到队首;>0表示排队超过该时间后插队,否则仍到队尾"` + AutoCleanSeconds int `p:"autoCleanSeconds" json:"autoCleanSeconds" dc:"自动清理间隔(秒)(如清理超时任务/队列)"` + Remark string `p:"remark" json:"remark" dc:"备注说明"` } type CreateModelRes struct { @@ -28,19 +29,20 @@ type CreateModelRes struct { // UpdateModelReq 更新模型配置 type UpdateModelReq struct { - g.Meta `path:"/updateModel" method:"put" tags:"模型管理" summary:"更新模型配置" dc:"更新指定ID的模型配置"` - ID int64 `p:"id" json:"id,string" v:"required#id不能为空" dc:"配置ID"` - BaseURL string `p:"baseUrl" json:"baseUrl" dc:"模型服务基础地址"` - Route string `p:"route" json:"route" dc:"路由/路径"` - HttpMethod *string `p:"httpMethod" json:"httpMethod" dc:"请求方式:GET/POST(可选更新)"` - APIKey *string `p:"apiKey" json:"apiKey" dc:"请求密钥绑定(请求头)(可选更新)"` - Enabled *int `p:"enabled" json:"enabled" dc:"是否启用:0-禁用,1-启用(可选更新)"` - MaxConcurrency *int `p:"maxConcurrency" json:"maxConcurrency" dc:"最大并发数(可选更新)"` - QueueLimit *int `p:"queueLimit" json:"queueLimit" dc:"排队队列上限(可选更新)"` - TimeoutMs *int `p:"timeoutMs" json:"timeoutMs" dc:"请求超时时间(毫秒)(可选更新)"` - RetryTimes *int `p:"retryTimes" json:"retryTimes" dc:"失败重试次数(可选更新)"` - AutoCleanSeconds *int `p:"autoCleanSeconds" json:"autoCleanSeconds" dc:"自动清理间隔(秒)(可选更新)"` - Remark *string `p:"remark" json:"remark" dc:"备注说明(可选更新)"` + g.Meta `path:"/updateModel" method:"put" tags:"模型管理" summary:"更新模型配置" dc:"更新指定ID的模型配置"` + ID int64 `p:"id" json:"id,string" v:"required#id不能为空" dc:"配置ID"` + BaseURL string `p:"baseUrl" json:"baseUrl" dc:"模型服务基础地址"` + Route string `p:"route" json:"route" dc:"路由/路径"` + HttpMethod *string `p:"httpMethod" json:"httpMethod" dc:"请求方式:GET/POST(可选更新)"` + APIKey *string `p:"apiKey" json:"apiKey" dc:"请求密钥绑定(请求头)(可选更新)"` + Enabled *int `p:"enabled" json:"enabled" dc:"是否启用:0-禁用,1-启用(可选更新)"` + MaxConcurrency *int `p:"maxConcurrency" json:"maxConcurrency" dc:"最大并发数(可选更新)"` + QueueLimit *int `p:"queueLimit" json:"queueLimit" dc:"排队队列上限(可选更新)"` + TimeoutSeconds *int `p:"timeoutSeconds" json:"timeoutSeconds" dc:"请求超时时间(秒)(可选更新)"` + RetryTimes *int `p:"retryTimes" json:"retryTimes" dc:"失败重试次数(可选更新)"` + RetryQueueMaxSeconds *int `p:"retryQueueMaxSeconds" json:"retryQueueMaxSeconds" dc:"失败重试最大排队时间(秒)(可选更新)"` + AutoCleanSeconds *int `p:"autoCleanSeconds" json:"autoCleanSeconds" dc:"自动清理间隔(秒)(可选更新)"` + Remark *string `p:"remark" json:"remark" dc:"备注说明(可选更新)"` } // DeleteModelReq 删除模型配置 @@ -61,8 +63,9 @@ type GetModelRes struct { // ListModelReq 配置列表 type ListModelReq struct { - g.Meta `path:"/listModel" method:"post" tags:"模型管理" summary:"模型配置列表" dc:"分页获取模型配置列表"` - Page *beans.Page `p:"page" json:"page" dc:"分页参数"` + g.Meta `path:"/listModel" method:"post" tags:"模型管理" summary:"模型配置列表" dc:"分页获取模型配置列表"` + Page *beans.Page `p:"page" json:"page" dc:"分页参数"` + ModelName string `p:"modelName" json:"modelName" dc:"模型名称(模糊查询,可选)"` } type ListModelRes struct { diff --git a/model/dto/task_dto.go b/model/dto/task_dto.go index 88732a6..fa2e390 100644 --- a/model/dto/task_dto.go +++ b/model/dto/task_dto.go @@ -1,11 +1,15 @@ package dto -import "github.com/gogf/gf/v2/frame/g" +import ( + "gitea.com/red-future/common/beans" + "github.com/gogf/gf/v2/frame/g" +) // CreateTaskReq 创建异步任务 type CreateTaskReq struct { g.Meta `path:"/createTask" method:"post" tags:"任务管理" summary:"创建异步任务" dc:"创建异步任务并返回任务ID"` ModelName string `p:"modelName" json:"modelName" v:"required#modelName不能为空" dc:"模型名称"` + BizName string `p:"bizName" json:"bizName" dc:"业务名称(调用方模块/系统,用于统计)"` InputRef string `p:"inputRef" json:"inputRef" dc:"输入引用(如OSS/文件引用等)"` RequestPayload any `p:"requestPayload" json:"requestPayload" dc:"请求负载(透传给模型服务)"` } @@ -40,3 +44,17 @@ type GetTaskBatchItem struct { type GetTaskBatchRes struct { List []GetTaskBatchItem `json:"list" dc:"任务列表"` } + +// ListTaskReq 任务列表分页查询 +type ListTaskReq struct { + g.Meta `path:"/listTask" method:"post" tags:"任务管理" summary:"任务列表" dc:"分页查询任务列表,支持按状态/模型名称/task_id过滤"` + Page *beans.Page `p:"page" json:"page" dc:"分页参数"` + ModelName string `p:"modelName" json:"modelName" dc:"模型名称(模糊匹配)"` + TaskID string `p:"taskId" json:"taskId" dc:"任务ID(模糊匹配)"` + State *int `p:"state" json:"state" dc:"任务状态(0/1/2/3/4,可选)"` +} + +type ListTaskRes struct { + List any `json:"list" dc:"列表数据"` + Total int64 `json:"total" dc:"总数"` +} diff --git a/model/entity/asynch_model.go b/model/entity/asynch_model.go index 8ac9d24..2e142cc 100644 --- a/model/entity/asynch_model.go +++ b/model/entity/asynch_model.go @@ -12,8 +12,9 @@ type asynchModelCol struct { Enabled string MaxConcurrency string QueueLimit string - TimeoutMs string + TimeoutSeconds string RetryTimes string + RetryQueueMaxSecs string AutoCleanSeconds string Remark string } @@ -28,25 +29,27 @@ var AsynchModelCol = asynchModelCol{ Enabled: "enabled", MaxConcurrency: "max_concurrency", QueueLimit: "queue_limit", - TimeoutMs: "timeout_ms", + TimeoutSeconds: "timeout_seconds", RetryTimes: "retry_times", + RetryQueueMaxSecs: "retry_queue_max_seconds", AutoCleanSeconds: "auto_clean_seconds", Remark: "remark", } // AsynchModel 异步模型配置 type AsynchModel struct { - beans.SQLBaseDO `orm:",inline"` - ModelName string `orm:"model_name" json:"modelName"` - BaseURL string `orm:"base_url" json:"baseUrl"` - Route string `orm:"route" json:"route"` - HttpMethod string `orm:"http_method" json:"httpMethod"` - APIKey string `orm:"api_key" json:"apiKey"` - Enabled int `orm:"enabled" json:"enabled"` - MaxConcurrency int `orm:"max_concurrency" json:"maxConcurrency"` - QueueLimit int `orm:"queue_limit" json:"queueLimit"` - TimeoutMs int `orm:"timeout_ms" json:"timeoutMs"` - RetryTimes int `orm:"retry_times" json:"retryTimes"` - AutoCleanSeconds int `orm:"auto_clean_seconds" json:"autoCleanSeconds"` - Remark string `orm:"remark" json:"remark"` + beans.SQLBaseDO `orm:",inline"` + ModelName string `orm:"model_name" json:"modelName"` + BaseURL string `orm:"base_url" json:"baseUrl"` + Route string `orm:"route" json:"route"` + HttpMethod string `orm:"http_method" json:"httpMethod"` + APIKey string `orm:"api_key" json:"apiKey"` + Enabled int `orm:"enabled" json:"enabled"` + MaxConcurrency int `orm:"max_concurrency" json:"maxConcurrency"` + QueueLimit int `orm:"queue_limit" json:"queueLimit"` + TimeoutSeconds int `orm:"timeout_seconds" json:"timeoutSeconds"` + RetryTimes int `orm:"retry_times" json:"retryTimes"` + RetryQueueMaxSecs int `orm:"retry_queue_max_seconds" json:"retryQueueMaxSeconds"` + AutoCleanSeconds int `orm:"auto_clean_seconds" json:"autoCleanSeconds"` + Remark string `orm:"remark" json:"remark"` } diff --git a/model/entity/asynch_op_log.go b/model/entity/asynch_op_log.go new file mode 100644 index 0000000..0b821a4 --- /dev/null +++ b/model/entity/asynch_op_log.go @@ -0,0 +1,57 @@ +package entity + +import ( + "gitea.com/red-future/common/beans" +) + +type asynchOpLogCol struct { + beans.SQLBaseCol + IP string + UserAgent string + APIPath string + HttpMethod string + BizName string + ModelName string + TaskID string + OpType string + Success string + ErrorMsg string + CostMs string + RequestPayload string + ResponsePayload string +} + +var AsynchOpLogCol = asynchOpLogCol{ + SQLBaseCol: beans.DefSQLBaseCol, + IP: "ip", + UserAgent: "user_agent", + APIPath: "api_path", + HttpMethod: "http_method", + BizName: "biz_name", + ModelName: "model_name", + TaskID: "task_id", + OpType: "op_type", + Success: "success", + ErrorMsg: "error_msg", + CostMs: "cost_ms", + RequestPayload: "request_payload", + ResponsePayload: "response_payload", +} + +// AsynchOpLog 操作日志(创建任务等) +type AsynchOpLog struct { + beans.SQLBaseDO `orm:",inline"` + IP string `orm:"ip" json:"ip"` + UserAgent string `orm:"user_agent" json:"userAgent"` + APIPath string `orm:"api_path" json:"apiPath"` + HttpMethod string `orm:"http_method" json:"httpMethod"` + BizName string `orm:"biz_name" json:"bizName"` + ModelName string `orm:"model_name" json:"modelName"` + TaskID string `orm:"task_id" json:"taskId"` + OpType string `orm:"op_type" json:"opType"` + Success int `orm:"success" json:"success"` + ErrorMsg string `orm:"error_msg" json:"errorMsg"` + CostMs int64 `orm:"cost_ms" json:"costMs"` + RequestPayload any `orm:"request_payload" json:"requestPayload"` + ResponsePayload any `orm:"response_payload" json:"responsePayload"` +} diff --git a/model/entity/asynch_task.go b/model/entity/asynch_task.go index 4160f00..5a49f80 100644 --- a/model/entity/asynch_task.go +++ b/model/entity/asynch_task.go @@ -19,6 +19,8 @@ type asynchTaskCol struct { ExpireAt string RetryCount string EnqueueAt string + Phase string + TmpFile string InputRef string RequestPayload string } @@ -37,6 +39,8 @@ var AsynchTaskCol = asynchTaskCol{ ExpireAt: "expire_at", RetryCount: "retry_count", EnqueueAt: "enqueue_at", + Phase: "phase", + TmpFile: "tmp_file", InputRef: "input_ref", RequestPayload: "request_payload", } @@ -56,6 +60,10 @@ type AsynchTask struct { ExpireAt *gtime.Time `orm:"expire_at" json:"expireAt"` // 已下载(state=4)后的过期时间 RetryCount int `orm:"retry_count" json:"retryCount"` EnqueueAt *gtime.Time `orm:"enqueue_at" json:"enqueueAt"` + Phase int `orm:"phase" json:"phase"` // 0模型阶段/1OSS阶段 + TmpFile string `orm:"tmp_file" json:"tmpFile"` // 临时结果文件路径 + // RetryQueueMaxSeconds 为 ListFailedRetryableGlobal 的 join 字段(非任务表字段) + RetryQueueMaxSeconds int `orm:"retry_queue_max_seconds" json:"-"` InputRef string `orm:"input_ref" json:"inputRef"` RequestPayload any `orm:"request_payload" json:"requestPayload"` } diff --git a/service/cleaner.go b/service/cleaner.go index 75f8856..2f761c2 100644 --- a/service/cleaner.go +++ b/service/cleaner.go @@ -45,6 +45,7 @@ func (c *cleaner) runOnce(ctx context.Context) { g.Log().Errorf(ctx, "[cleaner] list expired(downloaded) error: %v", err) } else { for _, t := range expired { + deleteTmpResult(t.TmpFile) _ = dao.Task.HardDeleteByIDGlobal(ctx, t.Id) } g.Log().Infof(ctx, "[cleaner] expired(downloaded) cleaned, count=%d", len(expired)) @@ -71,7 +72,20 @@ func (c *cleaner) runOnce(ctx context.Context) { g.Log().Errorf(ctx, "[cleaner] list failed retryable error: %v", err) } else { for _, t := range retryable { - _ = dao.Task.RequeueForRetryGlobal(ctx, t.Id) + // retry_queue_max_seconds 控制失败重试的排队策略: + // - =0:失败重试插队到队首 + // - >0:当任务从创建到现在的排队时长 >= maxSeconds,则插队到队首;否则仍放到队尾 + now := time.Now() + enqueueAt := now + maxSeconds := t.RetryQueueMaxSeconds + if maxSeconds == 0 { + enqueueAt = now.Add(-100 * 365 * 24 * time.Hour) + } else if maxSeconds > 0 && t.CreatedAt != nil { + if now.Sub(t.CreatedAt.Time) >= time.Duration(maxSeconds)*time.Second { + enqueueAt = now.Add(-100 * 365 * 24 * time.Hour) + } + } + _ = dao.Task.RequeueForRetryGlobal(ctx, t.Id, enqueueAt) } g.Log().Infof(ctx, "[cleaner] failed retryable cleaned, count=%d", len(retryable)) } @@ -82,6 +96,7 @@ func (c *cleaner) runOnce(ctx context.Context) { g.Log().Errorf(ctx, "[cleaner] list failed exhausted error: %v", err) } else { for _, t := range exhausted { + deleteTmpResult(t.TmpFile) _ = dao.Task.HardDeleteByIDGlobal(ctx, t.Id) } g.Log().Infof(ctx, "[cleaner] failed exhausted cleaned, count=%d", len(exhausted)) diff --git a/service/model_invoker.go b/service/model_invoker.go index 69159d1..41d0af9 100644 --- a/service/model_invoker.go +++ b/service/model_invoker.go @@ -14,24 +14,52 @@ import ( "model-asynch/model/entity" ) -func parseAPIKeyHeader(apiKey string) (k, v string) { +// parseAPIKeyHeaders 支持多个 header 绑定,逗号分隔: +// 示例: +// - X-API-Key:qwen3-tts-key,operation:true,count:123 +// - X-API-Key:"qwen3-tts-key",operation:"true" +// +// 说明: +// - HTTP Header 最终都是字符串,这里做的是“值的字符串化表达”。 +// - 若 value 用双引号包裹,会去掉外层引号再注入,便于在配置中区分字符串/布尔/数字等表达(以及避免值中包含特殊字符时歧义)。 +func parseAPIKeyHeaders(apiKey string) map[string]string { apiKey = strings.TrimSpace(apiKey) if apiKey == "" { - return "", "" + return nil } - // 支持两种写法: - // 1) HeaderName:HeaderValue(推荐) - // 2) HeaderName=HeaderValue(兼容) - if strings.Contains(apiKey, ":") { - parts := strings.SplitN(apiKey, ":", 2) - return strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]) + out := map[string]string{} + parts := strings.Split(apiKey, ",") + for _, p := range parts { + p = strings.TrimSpace(p) + if p == "" { + continue + } + // HeaderName:HeaderValue(推荐) / HeaderName=HeaderValue(兼容) + if strings.Contains(p, ":") { + kv := strings.SplitN(p, ":", 2) + k := strings.TrimSpace(kv[0]) + v := strings.TrimSpace(kv[1]) + v = strings.Trim(v, "\"") + if k != "" && v != "" { + out[k] = v + } + continue + } + if strings.Contains(p, "=") { + kv := strings.SplitN(p, "=", 2) + k := strings.TrimSpace(kv[0]) + v := strings.TrimSpace(kv[1]) + v = strings.Trim(v, "\"") + if k != "" && v != "" { + out[k] = v + } + continue + } } - if strings.Contains(apiKey, "=") { - parts := strings.SplitN(apiKey, "=", 2) - return strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1]) + if len(out) == 0 { + return nil } - // 只给了 value:不做注入(避免注入非法 header) - return "", "" + return out } func payloadToQuery(payload any) (url.Values, error) { @@ -76,7 +104,7 @@ func InvokeModel(ctx context.Context, m *entity.AsynchModel, payload any) ([]byt url = strings.TrimRight(m.BaseURL, "/") } - timeout := time.Duration(m.TimeoutMs) * time.Millisecond + timeout := time.Duration(m.TimeoutSeconds) * time.Second if timeout <= 0 { timeout = 60 * time.Second } @@ -122,7 +150,7 @@ func InvokeModel(ctx context.Context, m *entity.AsynchModel, payload any) ([]byt req.Header.Set(k, v) } } - if hk, hv := parseAPIKeyHeader(m.APIKey); hk != "" && hv != "" { + for hk, hv := range parseAPIKeyHeaders(m.APIKey) { req.Header.Set(hk, hv) } if method != http.MethodGet { diff --git a/service/model_service.go b/service/model_service.go index 345b691..ac5f569 100644 --- a/service/model_service.go +++ b/service/model_service.go @@ -15,18 +15,19 @@ type modelService struct{} func (s *modelService) Create(ctx context.Context, req *dto.CreateModelReq) (res *dto.CreateModelRes, err error) { m := &entity.AsynchModel{ - ModelName: req.ModelName, - BaseURL: req.BaseURL, - Route: req.Route, - HttpMethod: req.HttpMethod, - APIKey: req.APIKey, - Enabled: req.Enabled, - MaxConcurrency: req.MaxConcurrency, - QueueLimit: req.QueueLimit, - TimeoutMs: req.TimeoutMs, - RetryTimes: req.RetryTimes, - AutoCleanSeconds: req.AutoCleanSeconds, - Remark: req.Remark, + ModelName: req.ModelName, + BaseURL: req.BaseURL, + Route: req.Route, + HttpMethod: req.HttpMethod, + APIKey: req.APIKey, + Enabled: req.Enabled, + MaxConcurrency: req.MaxConcurrency, + QueueLimit: req.QueueLimit, + TimeoutSeconds: req.TimeoutSeconds, + RetryTimes: req.RetryTimes, + RetryQueueMaxSecs: req.RetryQueueMaxSeconds, + AutoCleanSeconds: req.AutoCleanSeconds, + Remark: req.Remark, } if m.HttpMethod == "" { m.HttpMethod = "POST" @@ -40,8 +41,8 @@ func (s *modelService) Create(ctx context.Context, req *dto.CreateModelReq) (res if m.QueueLimit <= 0 { m.QueueLimit = 1000 } - if m.TimeoutMs <= 0 { - m.TimeoutMs = 60000 + if m.TimeoutSeconds <= 0 { + m.TimeoutSeconds = 60 } if m.AutoCleanSeconds <= 0 { m.AutoCleanSeconds = 86400 @@ -76,12 +77,15 @@ func (s *modelService) Update(ctx context.Context, req *dto.UpdateModelReq) erro if req.QueueLimit != nil { data[entity.AsynchModelCol.QueueLimit] = *req.QueueLimit } - if req.TimeoutMs != nil { - data[entity.AsynchModelCol.TimeoutMs] = *req.TimeoutMs + if req.TimeoutSeconds != nil { + data[entity.AsynchModelCol.TimeoutSeconds] = *req.TimeoutSeconds } if req.RetryTimes != nil { data[entity.AsynchModelCol.RetryTimes] = *req.RetryTimes } + if req.RetryQueueMaxSeconds != nil { + data[entity.AsynchModelCol.RetryQueueMaxSecs] = *req.RetryQueueMaxSeconds + } if req.AutoCleanSeconds != nil { data[entity.AsynchModelCol.AutoCleanSeconds] = *req.AutoCleanSeconds } @@ -104,6 +108,6 @@ func (s *modelService) Get(ctx context.Context, id int64) (*entity.AsynchModel, return dao.Model.GetByID(ctx, id) } -func (s *modelService) List(ctx context.Context, pageNum, pageSize int) (list []*entity.AsynchModel, total int64, err error) { - return dao.Model.List(ctx, pageNum, pageSize) +func (s *modelService) List(ctx context.Context, pageNum, pageSize int, modelNameLike string) (list []*entity.AsynchModel, total int64, err error) { + return dao.Model.List(ctx, pageNum, pageSize, modelNameLike) } diff --git a/service/storage_oss.go b/service/storage_oss.go index 6d16a49..60b09f8 100644 --- a/service/storage_oss.go +++ b/service/storage_oss.go @@ -5,12 +5,14 @@ import ( "context" "fmt" "mime/multipart" + "time" "model-asynch/model/entity" commonHttp "gitea.com/red-future/common/http" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/gconv" + "github.com/gogf/gf/v2/util/guid" ) // 对接你们的 oss 文件服务:POST oss/file/uploadFile (multipart/form-data) @@ -25,8 +27,6 @@ type uploadFileResponse struct { } func (s *ossStorage) UploadByTask(ctx context.Context, _ *entity.AsynchTask, data []byte, fileExt string, _ string) (ossURL string, err error) { - // ossUrl := g.Cfg().MustGet(ctx, "oss.addr", "192.168.3.30:9000").String() - // multipart body := &bytes.Buffer{} writer := multipart.NewWriter(body) @@ -39,7 +39,8 @@ func (s *ossStorage) UploadByTask(ctx context.Context, _ *entity.AsynchTask, dat ext = "." + ext } - part, err := writer.CreateFormFile("file", "") + filename := fmt.Sprintf("asynch_%d_%s%s", time.Now().Unix(), guid.S(), ext) + part, err := writer.CreateFormFile("file", filename) if err != nil { return "", err } @@ -55,16 +56,14 @@ func (s *ossStorage) UploadByTask(ctx context.Context, _ *entity.AsynchTask, dat headers["Content-Type"] = contentType fullURL := "oss/file/uploadFile" - g.Log().Infof(ctx, "[OSS] upload start url=%s size=%d", fullURL, len(data)) + g.Log().Infof(ctx, "[OSS] upload start url=%s filename=%s size=%d", fullURL, filename, len(data)) var resp uploadFileResponse if err := commonHttp.Post(ctx, fullURL, headers, &resp, body.Bytes()); err != nil { return "", err } - if resp.FileURL == "" { - return "", fmt.Errorf("OSS服务返回错误: 上传失败") - } - g.Log().Infof(ctx, "[OSS] upload success url=%s filename=%s size=%d format=%s", resp.FileURL, resp.FileName, resp.FileSize, resp.FileFormat) + fmt.Println("打印结果 resp:", resp) + g.Log().Infof(ctx, "[OSS] upload success url=%s size=%d format=%s", resp.FileURL, resp.FileSize, resp.FileFormat) return resp.FileURL, nil } diff --git a/service/task_service.go b/service/task_service.go index c9021cc..4972b4c 100644 --- a/service/task_service.go +++ b/service/task_service.go @@ -9,6 +9,8 @@ import ( "model-asynch/model/dto" "model-asynch/model/entity" + "github.com/gogf/gf/v2/database/gdb" + "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gtime" "github.com/google/uuid" ) @@ -18,6 +20,7 @@ var Task = &taskService{} type taskService struct{} func (s *taskService) Create(ctx context.Context, req *dto.CreateTaskReq) (res *dto.CreateTaskRes, err error) { + startAt := time.Now() // 固化 token/user 等信息 ctx = asyncCtx(ctx) @@ -59,6 +62,35 @@ func (s *taskService) Create(ctx context.Context, req *dto.CreateTaskReq) (res * if err != nil { return nil, err } + + // 3) 写操作日志(尽量不影响主流程,失败忽略) + ip := "" + ua := "" + apiPath := "/task/createTask" + httpMethod := "POST" + if r := g.RequestFromCtx(ctx); r != nil { + ip = r.GetClientIp() + ua = r.UserAgent() + apiPath = r.URL.Path + httpMethod = r.Method + } + _, _ = dao.OpLog.Insert(ctx, &entity.AsynchOpLog{ + IP: ip, + UserAgent: ua, + APIPath: apiPath, + HttpMethod: httpMethod, + BizName: req.BizName, + ModelName: req.ModelName, + TaskID: taskID, + OpType: "createTask", + Success: 1, + ErrorMsg: "", + CostMs: time.Since(startAt).Milliseconds(), + RequestPayload: storedPayload, + ResponsePayload: gdb.Map{ + "taskId": taskID, + }, + }) return &dto.CreateTaskRes{TaskID: taskID}, nil } @@ -127,3 +159,28 @@ func (s *taskService) GetBatch(ctx context.Context, req *dto.GetTaskBatchReq) (r } return &dto.GetTaskBatchRes{List: items}, nil } + +func (s *taskService) List(ctx context.Context, req *dto.ListTaskReq) (res *dto.ListTaskRes, err error) { + pageNum, pageSize := 1, 10 + if req != nil && req.Page != nil { + if req.Page.PageNum > 0 { + pageNum = int(req.Page.PageNum) + } + if req.Page.PageSize > 0 { + pageSize = int(req.Page.PageSize) + } + } + modelName := "" + taskID := "" + var state *int + if req != nil { + modelName = req.ModelName + taskID = req.TaskID + state = req.State + } + list, total, err := dao.Task.List(ctx, pageNum, pageSize, modelName, taskID, state) + if err != nil { + return nil, err + } + return &dto.ListTaskRes{List: list, Total: total}, nil +} diff --git a/service/tmp_store.go b/service/tmp_store.go new file mode 100644 index 0000000..9dea56a --- /dev/null +++ b/service/tmp_store.go @@ -0,0 +1,38 @@ +package service + +import ( + "fmt" + "os" + "path/filepath" +) + +// saveTmpResult 将模型输出写入临时文件,用于 OSS 上传失败后的“仅重试 OSS”。 +func saveTmpResult(taskID string, data []byte, ext string) (string, error) { + dir := filepath.Join(os.TempDir(), "model-asynch") + if err := os.MkdirAll(dir, 0o755); err != nil { + return "", err + } + if ext == "" { + ext = ".bin" + } + if ext[0] != '.' { + ext = "." + ext + } + path := filepath.Join(dir, fmt.Sprintf("%s%s", taskID, ext)) + if err := os.WriteFile(path, data, 0o644); err != nil { + return "", err + } + return path, nil +} + +func loadTmpResult(path string) ([]byte, error) { + return os.ReadFile(path) +} + +func deleteTmpResult(path string) { + if path == "" { + return + } + _ = os.Remove(path) +} + diff --git a/service/worker.go b/service/worker.go index ec67578..8107829 100644 --- a/service/worker.go +++ b/service/worker.go @@ -146,17 +146,43 @@ func (w *asyncWorker) handleOne(ctx context.Context, t *entity.AsynchTask) { "inputRef": t.InputRef, } } - data, err := InvokeModel(ctx, m, payload) - if err != nil { - _ = dao.Task.UpdateFailedGlobal(ctx, t.Id, err.Error()) - return + var ( + data []byte + contentType string + ext string + ) + + // phase=1 表示模型已成功但 OSS 上传失败:优先从临时文件加载,避免重复跑模型 + if t.Phase == 1 && strings.TrimSpace(t.TmpFile) != "" { + data, err = loadTmpResult(t.TmpFile) + if err == nil && len(data) > 0 { + contentType, ext = DetectFileType(data) + } else { + // 临时文件不可用:回退重新调用模型 + data = nil + } + } + if data == nil { + data, err = InvokeModel(ctx, m, payload) + if err != nil { + _ = dao.Task.UpdateFailedGlobal(ctx, t.Id, err.Error()) + return + } + contentType, ext = DetectFileType(data) + // 将模型输出写入临时文件,后续若 OSS 失败可只重试 OSS + tmpPath, err := saveTmpResult(t.TaskID, data, ext) + if err == nil && tmpPath != "" { + t.TmpFile = tmpPath + t.Phase = 1 + _ = dao.Task.UpdateTmpAfterModelGlobal(ctx, t.Id, tmpPath) + } } - // 4) 存储 OSS/MinIO - contentType, ext := DetectFileType(data) + // 4) 存储 OSS ossURL, err := Storage.UploadByTask(ctx, t, data, ext, contentType) if err != nil { - _ = dao.Task.UpdateFailedGlobal(ctx, t.Id, err.Error()) + // OSS 阶段失败:保留临时文件,下一轮仅重试 OSS + _ = dao.Task.UpdateFailedKeepTmpGlobal(ctx, t.Id, err.Error()) return } @@ -170,6 +196,8 @@ func (w *asyncWorker) handleOne(ctx context.Context, t *entity.AsynchTask) { g.Log().Errorf(ctx, "[worker] update success failed: %v", err) return } + // 成功后清理临时文件 + deleteTmpResult(t.TmpFile) } func (w *asyncWorker) rollbackToPending(ctx context.Context, id int64) error { diff --git a/update.sql b/update.sql index 150ed0f..18c7e09 100644 --- a/update.sql +++ b/update.sql @@ -1,138 +1,163 @@ --- -----------------------王立钊2026-04-22 10:00:00----------------------- +-- model-asynch 三张核心表(pgsql) +-- 1) asynch_models:模型配置 +-- 2) asynch_task:异步任务 +-- 3) asynch_op_log:操作日志(统计用) ---------------------pgsql创建 asynch_models / asynch_task 表语句--------------------------- - --- 异步模型表 +-- ========================= +-- 1) asynch_models +-- ========================= CREATE TABLE IF NOT EXISTS asynch_models ( -- 基础字段(与现有表保持一致) - id BIGINT PRIMARY KEY, -- 主键ID(非自增) - tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID + id BIGINT PRIMARY KEY, -- 主键ID(非自增) + tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID creator VARCHAR(64) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updater VARCHAR(64) NOT NULL, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - deleted_at timestamp(6), + deleted_at TIMESTAMP(6), -- 业务字段 - model_name VARCHAR(128) NOT NULL, -- 模型名称(路由键) - base_url VARCHAR(256) NOT NULL, -- 模型服务基础地址(如 http://1.2.3.4:8080) - route VARCHAR(256) NOT NULL DEFAULT '', -- 模型服务路由(如 /v1/infer) + model_name VARCHAR(128) NOT NULL, -- 模型名称(路由键) + base_url VARCHAR(256) NOT NULL, -- 模型服务基础地址(如 http://1.2.3.4:8080) + route VARCHAR(256) NOT NULL DEFAULT '',-- 模型服务路由(如 /v1/infer) http_method VARCHAR(8) NOT NULL DEFAULT 'POST', -- 请求方式:GET/POST - api_key VARCHAR(256) DEFAULT '', -- 请求密钥绑定(请求头),示例:TTS_API_KEY:your-key - enabled SMALLINT NOT NULL DEFAULT 1, -- 是否启用:1启用/0停用 - max_concurrency INT NOT NULL DEFAULT 10, -- 单模型最大并发(worker/队列消费侧应遵守) - queue_limit INT NOT NULL DEFAULT 1000, -- 单模型排队上限(防堆积,可选) - timeout_ms INT NOT NULL DEFAULT 60000, -- 调用模型服务超时(毫秒) - retry_times SMALLINT NOT NULL DEFAULT 0, -- 失败重试次数(0表示不重试) - auto_clean_seconds INT NOT NULL DEFAULT 86400, -- 已下载(state=4)后的保留时间(秒),到期后清理 - remark TEXT DEFAULT '' -- 备注 + api_key VARCHAR(1024) DEFAULT '', -- 请求头绑定(支持多个,逗号分隔):X-API-Key:xxx,operation:true + + enabled SMALLINT NOT NULL DEFAULT 1, -- 是否启用:1启用/0停用 + max_concurrency INT NOT NULL DEFAULT 10, -- 单模型最大并发 + queue_limit INT NOT NULL DEFAULT 1000, -- 排队上限(近似控制) + timeout_seconds INT NOT NULL DEFAULT 60, -- 调用模型服务超时(秒) + + retry_times SMALLINT NOT NULL DEFAULT 0, -- 失败后最多再重试 N 次(不含首次) + retry_queue_max_seconds INT NOT NULL DEFAULT 0, -- 失败重试最大排队时间(秒):0=插队到队首;>0=排队超过该时间后插队,否则仍到队尾 + + auto_clean_seconds INT NOT NULL DEFAULT 86400, -- 已下载(state=4)后的保留时间(秒) + remark TEXT DEFAULT '' -- 备注 ); --- 索引/约束 -CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_models_tenant_model_name ON asynch_models(tenant_id, model_name); +CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_models_tenant_model_name + ON asynch_models(tenant_id, model_name); CREATE INDEX IF NOT EXISTS idx_asynch_models_tenant_id ON asynch_models(tenant_id); CREATE INDEX IF NOT EXISTS idx_asynch_models_model_name ON asynch_models(model_name); CREATE INDEX IF NOT EXISTS idx_asynch_models_enabled ON asynch_models(enabled); CREATE INDEX IF NOT EXISTS idx_asynch_models_deleted_at ON asynch_models(deleted_at); --- 表和字段注释 COMMENT ON TABLE asynch_models IS '异步模型表(模型服务配置)'; -COMMENT ON COLUMN asynch_models.id IS '主键ID(非自增)'; -COMMENT ON COLUMN asynch_models.tenant_id IS '租户ID'; -COMMENT ON COLUMN asynch_models.creator IS '创建人'; -COMMENT ON COLUMN asynch_models.created_at IS '创建时间'; -COMMENT ON COLUMN asynch_models.updater IS '更新人'; -COMMENT ON COLUMN asynch_models.updated_at IS '更新时间'; -COMMENT ON COLUMN asynch_models.deleted_at IS '删除时间(软删)'; COMMENT ON COLUMN asynch_models.model_name IS '模型名称(路由键)'; -COMMENT ON COLUMN asynch_models.base_url IS '模型服务基础地址'; -COMMENT ON COLUMN asynch_models.route IS '模型服务路由'; -COMMENT ON COLUMN asynch_models.http_method IS '请求方式:GET/POST'; -COMMENT ON COLUMN asynch_models.api_key IS '请求密钥绑定(请求头),示例:TTS_API_KEY:your-key'; -COMMENT ON COLUMN asynch_models.enabled IS '是否启用:1启用/0停用'; -COMMENT ON COLUMN asynch_models.max_concurrency IS '单模型最大并发'; -COMMENT ON COLUMN asynch_models.queue_limit IS '单模型排队上限'; -COMMENT ON COLUMN asynch_models.timeout_ms IS '调用模型服务超时(毫秒)'; -COMMENT ON COLUMN asynch_models.retry_times IS '失败重试次数'; -COMMENT ON COLUMN asynch_models.auto_clean_seconds IS '已下载(state=4)后的保留时间(秒),到期后清理'; -COMMENT ON COLUMN asynch_models.remark IS '备注'; - --- 兼容已有库:更新字段注释 -COMMENT ON COLUMN asynch_models.auto_clean_seconds IS '已下载(state=4)后的保留时间(秒),到期后清理'; +COMMENT ON COLUMN asynch_models.api_key IS '请求头绑定(支持多个,逗号分隔):X-API-Key:xxx,operation:true'; +COMMENT ON COLUMN asynch_models.retry_times IS '失败后最多再重试 N 次(不含首次)'; +COMMENT ON COLUMN asynch_models.retry_queue_max_seconds IS '失败重试最大排队时间(秒):0=插队到队首;>0=排队超过该时间后插队,否则仍到队尾'; +COMMENT ON COLUMN asynch_models.auto_clean_seconds IS '已下载(state=4)后的保留时间(秒),到期清理'; --- 异步任务表 +-- ========================= +-- 2) asynch_task +-- ========================= CREATE TABLE IF NOT EXISTS asynch_task ( -- 基础字段(与现有表保持一致) - id BIGINT PRIMARY KEY, -- 主键ID(非自增) - tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID + id BIGINT PRIMARY KEY, -- 主键ID(非自增) + tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID creator VARCHAR(64) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updater VARCHAR(64) NOT NULL, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - deleted_at timestamp(6), + deleted_at TIMESTAMP(6), -- 任务核心字段 - model_name VARCHAR(128) NOT NULL, -- 模型名称 - task_id VARCHAR(64) NOT NULL, -- 任务ID(对外返回) - state SMALLINT NOT NULL DEFAULT 0, -- 状态:0排队中/1执行中/2成功/3失败/4已下载 - oss_file VARCHAR(512) DEFAULT '', -- OSS文件URL/Key - file_type VARCHAR(32) DEFAULT '', -- 文件类型(如 mp3/mp4/png/pdf/...) - file_size BIGINT NOT NULL DEFAULT 0, -- 文件大小(字节) - error_msg TEXT DEFAULT '', -- 错误信息 + model_name VARCHAR(128) NOT NULL, -- 模型名称 + task_id VARCHAR(64) NOT NULL, -- 任务ID(对外返回) + state SMALLINT NOT NULL DEFAULT 0, -- 0排队中/1执行中/2成功/3失败/4已下载 - -- 执行与清理字段 - started_at TIMESTAMP, -- 开始执行时间 - finished_at TIMESTAMP, -- 执行结束时间 - expire_at TIMESTAMP, -- 过期清理时间(已下载后写入:updated_at + auto_clean_seconds) - retry_count INT NOT NULL DEFAULT 0, -- 已重试次数(不含首次) - enqueue_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 入队时间(用于排队顺序,重试会更新时间到队尾) + oss_file VARCHAR(512) DEFAULT '', -- 结果文件OSS地址 + file_type VARCHAR(32) DEFAULT '', -- 文件类型(mp3/mp4/png/...) + file_size BIGINT NOT NULL DEFAULT 0, -- 文件大小(字节) + error_msg TEXT DEFAULT '', -- 错误信息 - -- 输入信息(用于排障/重放;如不需要可不写入或置空) - input_ref TEXT DEFAULT '', -- 输入引用(如上传文件URL/OSS key/业务侧资源ID) - request_payload JSONB -- 请求参数(可选) + started_at TIMESTAMP, -- 开始执行时间 + finished_at TIMESTAMP, -- 执行结束时间 + + expire_at TIMESTAMP, -- state=4 后写入,用于清理 + + -- 重试/排队 + retry_count INT NOT NULL DEFAULT 0, -- 已重试次数(不含首次) + enqueue_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 入队时间(用于排队顺序) + + -- 任务执行阶段:用于区分“重试模型”与“仅重试 OSS” + phase SMALLINT NOT NULL DEFAULT 0, -- 0模型阶段/1OSS阶段 + tmp_file TEXT DEFAULT '', -- 临时结果文件路径(phase=1 时仅重试 OSS 上传) + + -- 输入信息(可选) + input_ref TEXT DEFAULT '', -- 输入引用(如OSS/业务资源ID等) + request_payload JSONB -- 请求参数(可选) ); --- 索引/约束 -CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_task_tenant_task_id ON asynch_task(tenant_id, task_id); +CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_task_tenant_task_id + ON asynch_task(tenant_id, task_id); CREATE INDEX IF NOT EXISTS idx_asynch_task_tenant_id ON asynch_task(tenant_id); CREATE INDEX IF NOT EXISTS idx_asynch_task_model_name ON asynch_task(model_name); CREATE INDEX IF NOT EXISTS idx_asynch_task_state ON asynch_task(state); +CREATE INDEX IF NOT EXISTS idx_asynch_task_enqueue_at ON asynch_task(enqueue_at); CREATE INDEX IF NOT EXISTS idx_asynch_task_updated_at ON asynch_task(updated_at); CREATE INDEX IF NOT EXISTS idx_asynch_task_expire_at ON asynch_task(expire_at); CREATE INDEX IF NOT EXISTS idx_asynch_task_deleted_at ON asynch_task(deleted_at); --- 表和字段注释 COMMENT ON TABLE asynch_task IS '异步任务表'; -COMMENT ON COLUMN asynch_task.id IS '主键ID(非自增)'; -COMMENT ON COLUMN asynch_task.tenant_id IS '租户ID'; -COMMENT ON COLUMN asynch_task.creator IS '创建人'; -COMMENT ON COLUMN asynch_task.created_at IS '创建时间'; -COMMENT ON COLUMN asynch_task.updater IS '更新人'; -COMMENT ON COLUMN asynch_task.updated_at IS '更新时间'; -COMMENT ON COLUMN asynch_task.deleted_at IS '删除时间(软删)'; -COMMENT ON COLUMN asynch_task.model_name IS '模型名称'; -COMMENT ON COLUMN asynch_task.task_id IS '任务ID(对外返回)'; -COMMENT ON COLUMN asynch_task.state IS '状态:0排队中/1执行中/2成功/3失败/4已下载'; -COMMENT ON COLUMN asynch_task.oss_file IS 'OSS文件URL/Key'; -COMMENT ON COLUMN asynch_task.file_type IS '文件类型'; -COMMENT ON COLUMN asynch_task.file_size IS '文件大小(字节)'; -COMMENT ON COLUMN asynch_task.error_msg IS '错误信息'; -COMMENT ON COLUMN asynch_task.started_at IS '开始执行时间'; -COMMENT ON COLUMN asynch_task.finished_at IS '执行结束时间'; -COMMENT ON COLUMN asynch_task.expire_at IS '过期清理时间(已下载后保留到期)'; +COMMENT ON COLUMN asynch_task.state IS '0排队中/1执行中/2成功/3失败/4已下载'; COMMENT ON COLUMN asynch_task.retry_count IS '已重试次数(不含首次)'; -COMMENT ON COLUMN asynch_task.enqueue_at IS '入队时间(用于排队顺序,重试会更新时间到队尾)'; -COMMENT ON COLUMN asynch_task.input_ref IS '输入引用(如上传文件URL/OSS key/业务侧资源ID)'; -COMMENT ON COLUMN asynch_task.request_payload IS '请求参数(JSON)'; +COMMENT ON COLUMN asynch_task.enqueue_at IS '入队时间(用于排队顺序)'; +COMMENT ON COLUMN asynch_task.phase IS '执行阶段:0模型阶段/1OSS阶段(模型已成功,等待上传OSS)'; +COMMENT ON COLUMN asynch_task.tmp_file IS '临时结果文件路径(phase=1 时仅重试 OSS 上传)'; --- 兼容已有库:增量加字段(PostgreSQL 支持 IF NOT EXISTS) -ALTER TABLE asynch_task ADD COLUMN IF NOT EXISTS retry_count INT NOT NULL DEFAULT 0; -ALTER TABLE asynch_task ADD COLUMN IF NOT EXISTS enqueue_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP; -COMMENT ON COLUMN asynch_task.retry_count IS '已重试次数(不含首次)'; -COMMENT ON COLUMN asynch_task.enqueue_at IS '入队时间(用于排队顺序,重试会更新时间到队尾)'; -COMMENT ON COLUMN asynch_task.state IS '状态:0排队中/1执行中/2成功/3失败/4已下载'; -COMMENT ON COLUMN asynch_task.expire_at IS '过期清理时间(已下载后保留到期)'; --- 对存量数据进行入队时间回填(只在 enqueue_at 为空时) -UPDATE asynch_task SET enqueue_at = created_at WHERE enqueue_at IS NULL; +-- ========================= +-- 3) asynch_op_log +-- ========================= +CREATE TABLE IF NOT EXISTS asynch_op_log ( + -- 基础字段(与现有表保持一致) + id BIGINT PRIMARY KEY, + tenant_id BIGINT NOT NULL DEFAULT 0, + creator VARCHAR(64) NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updater VARCHAR(64) NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + deleted_at TIMESTAMP(6), + + -- 基础审计信息 + ip VARCHAR(64) DEFAULT '', + user_agent VARCHAR(256) DEFAULT '', + api_path VARCHAR(256) DEFAULT '', + http_method VARCHAR(16) DEFAULT '', + + -- 业务信息 + biz_name VARCHAR(128) NOT NULL DEFAULT '', -- 调用方业务模块/系统 + model_name VARCHAR(128) NOT NULL DEFAULT '', + task_id VARCHAR(64) NOT NULL DEFAULT '', + + -- 统计字段 + op_type VARCHAR(64) NOT NULL DEFAULT 'createTask', -- 操作类型(默认创建任务) + success SMALLINT NOT NULL DEFAULT 1, -- 1成功/0失败 + error_msg TEXT DEFAULT '', + cost_ms BIGINT NOT NULL DEFAULT 0, -- 耗时(毫秒) + + -- 请求/响应 JSON(用于后期统计分析) + request_payload JSONB, + response_payload JSONB +); + +CREATE INDEX IF NOT EXISTS idx_asynch_op_log_tenant_time ON asynch_op_log(tenant_id, created_at); +CREATE INDEX IF NOT EXISTS idx_asynch_op_log_model_name ON asynch_op_log(model_name); +CREATE INDEX IF NOT EXISTS idx_asynch_op_log_biz_name ON asynch_op_log(biz_name); +CREATE INDEX IF NOT EXISTS idx_asynch_op_log_task_id ON asynch_op_log(task_id); +CREATE INDEX IF NOT EXISTS idx_asynch_op_log_op_type ON asynch_op_log(op_type); +CREATE INDEX IF NOT EXISTS idx_asynch_op_log_deleted_at ON asynch_op_log(deleted_at); + +COMMENT ON TABLE asynch_op_log IS '操作记录日志表(创建任务等,用于统计)'; +COMMENT ON COLUMN asynch_op_log.biz_name IS '业务名称(调用方模块/系统)'; +COMMENT ON COLUMN asynch_op_log.model_name IS '模型名称'; +COMMENT ON COLUMN asynch_op_log.task_id IS '任务ID'; +COMMENT ON COLUMN asynch_op_log.op_type IS '操作类型(如 createTask/getTaskResult/getTaskBatch 等)'; +COMMENT ON COLUMN asynch_op_log.success IS '是否成功:1成功/0失败'; +COMMENT ON COLUMN asynch_op_log.error_msg IS '错误信息(失败时)'; +COMMENT ON COLUMN asynch_op_log.cost_ms IS '耗时(毫秒)'; +COMMENT ON COLUMN asynch_op_log.request_payload IS '请求 JSON'; +COMMENT ON COLUMN asynch_op_log.response_payload IS '响应 JSON';