Compare commits

3 Commits
main ... dev

Author SHA1 Message Date
8fb99094af docs: 更新 README 以反映 API Key 格式和流程优化 2026-04-27 10:57:05 +08:00
4e6b98b7d3 feat(stat): 添加模型请求按天统计功能
- 新增统计控制器、服务层与数据访问层,提供按天统计接口
- 在 worker 处理任务时原子累加请求计数(仅实际调用模型时计数)
- 更新数据库表结构,添加 asynch_model_stat 表及索引
- 更新文档说明统计功能的使用方式与统计口径
2026-04-27 10:42:42 +08:00
f6c70a451e feat: 新增操作日志、任务分页查询与模型失败重试优化
- 新增操作日志表(asynch_op_log)及对应DAO,记录任务创建等操作的审计信息
- 新增任务分页查询接口(ListTask)及对应DTO、Service和DAO方法
- 优化模型调用失败重试逻辑:支持配置重试排队策略(插队到队首或队尾)
- 新增临时文件存储机制,当模型调用成功但OSS上传失败时,下次仅重试OSS上传
- 模型配置新增retry_queue_max_seconds字段,控制失败重试排队策略
- 更新数据库表结构(asynch_models、asynch_task、新增asynch_op_log)及同步更新SQL
- 配置文件调整:超时单位改为秒,更新服务地址和轮询间隔
- 修复模型列表查询支持按名称模糊搜索
2026-04-25 10:42:21 +08:00
28 changed files with 829 additions and 248 deletions

View File

@@ -13,8 +13,8 @@
- 支持配置:
- 请求地址:`base_url + route`
- 请求方式:`http_method`GET/POST
- 请求密钥:`api_key`(以请求头注入,示例:`TTS_API_KEY:your-key`
- 超时:`timeout_ms`
- 请求密钥:`api_key`(以请求头注入,支持多个 header
- 超时:`timeout_seconds`
- 并发:`max_concurrency`(按租户+模型的 Redis 分布式信号量限流)
- 重试:`retry_times`(失败后最多再重试 N 次)
- 保留:`auto_clean_seconds`(任务被业务领取到 `state=4` 后的保留秒数,到期清理)
@@ -26,38 +26,73 @@
- 调用模型服务GET/POST
- 结果上传 OSS调用你们的 OSS 文件服务 `oss/file/uploadFile`,透传 `Authorization/X-User-Info`
- 批量领取结果:批量查询 `task_id` 列表,返回 `task_id/state/oss_file`,并把成功的任务从 `state=2` 更新为 `state=4`
- 自动重试:失败 `state=3` 会由清理器按 `retry_times` 重新入队到队尾(保证先来后到)
- 自动重试:失败 `state=3` 会由清理器按 `retry_times` 重新入队到队尾
- 自动清理:
- `state=4``expire_at` 到期 → 硬删除任务(并尝试删除 OSS
- 失败重试耗尽仍失败 → 硬删除任务(并尝试删除 OSS
- `state=4``expire_at` 到期 → 硬删除任务
- 失败重试耗尽仍失败 → 硬删除任务
- `state=0/1` 超时 → 标记失败(防止卡死)
### 1.3 统计asynch_model_stat
- 按天统计:`day + tenant_id + creator + model_name -> request_count`
- 统计口径:仅在 Worker 真正调用模型服务时计数OSS 重试不计数)
- 用途:给其他服务提供全局限流/监控依据
---
## 2. 使用流程(业务方如何接入)
### 2.1 第一步:配置模型
调用“模型管理”接口新增模型配置TTS
- `model_name=tts`
- `base_url=http://xxx:port`
- `route=/tts`
- `http_method=POST`
- `api_key=TTS_API_KEY:your-key`(可选)
### 第一步:创建模型配置
业务方(或运维)先在中间件里创建/更新模型配置(`model_name` 为唯一键),例如
- `POST /model/createModel`(或 `/model/updateModel`
### 2.2 第二步:创建任务拿 task_id
业务方调用 `CreateTask`,传 `modelName + requestPayload`,中间件返回 `task_id`
业务方把 `task_id` 落到自己的业务表(状态=生成中)。
请求示例JSON
```json
{
"modelName": "model-service",
"baseUrl": "http://127.0.0.1:8000",
"route": "/api/v1/chat",
"httpMethod": "POST",
"apiKey": "API_KEY:model-key,API_STATE:true,API_NUM:123",
"enabled": 1,
"maxConcurrency": 5,
"queueLimit": 20,
"timeoutSeconds": 1800,
"retryTimes": 3,
"retryQueueMaxSeconds": 600,
"autoCleanSeconds": 3600,
"remark": "Model-Service 模型服务"
}
```
### 2.3 第三步:业务方领取结果(推荐批量)
业务方在自己的“定时任务服务/轮询器”中,批量把 `task_id` 列表传给中间件:
- 返回每个任务的 `state + oss_file`
- `state=2(成功)` 的任务,中间件会更新为 `state=4(已下载)` 并写入 `expire_at = now + auto_clean_seconds`
参数说明:
- `modelName`:模型名称(唯一标识/路由键)
- `baseUrl`模型服务地址Base URL
- `route`:模型服务路由(拼接到 baseUrl 后)
- `httpMethod`请求方式GET/POST
- `apiKey`:请求头绑定(支持多个 header逗号分隔格式 `Key:Value`;布尔/数字也会以字符串形式注入 header
- `enabled`是否启用0禁用/1启用
- `maxConcurrency`:单模型最大并发(按租户+模型维度限流)
- `queueLimit`:排队上限(近似控制,超过则拒绝创建)
- `timeoutSeconds`:调用模型服务超时(秒)
- `retryTimes`:失败后最多再重试 N 次(不含首次)
- `retryQueueMaxSeconds`失败重试最大排队时间0 表示重试插队到队首;>0 表示排队超过该时间后插队,否则仍到队尾
- `autoCleanSeconds`:任务被领取到 `state=4` 后的保留时间(秒),到期清理
- `remark`:备注说明
业务方拿到 `oss_file` 后做“业务转移”:
- 方案 A直接在业务表保存 `oss_file` 作为最终资源地址
- 方案 B业务侧下载后重新上传到业务自己的资产域再保存新地址
### 第二步:创建任务拿到 task_id
业务方发起推理请求时调用:
- `POST /task/createTask`(传 `modelName + requestPayload (+ bizName)`
- 中间件返回 `task_id`
- 业务方将 `task_id` 落到自己的业务表,并把业务状态置为「生成中」
> `state=4` 的数据允许重复获取,避免业务侧偶发中断导致“领取不到结果”。
### 第三步:同步任务进度(推荐批量)
业务方通过轮询/定时任务同步进度:
- 推荐:`POST /task/getTaskBatch`(批量传 `taskIds`,返回每个任务的 `state + oss_file`
- 或单条:`GET /task/getTaskResult?taskId=...`
业务侧拿到 `oss_file` 后自行做资源处理(直接保存或转存),并把业务状态更新为「成功/失败」。
> 说明:批量接口对 `state=2(成功)` 的任务会自动标记为 `state=4(已下载)` 并写入 `expire_at`,用于后续清理。
---
@@ -93,21 +128,13 @@
---
## 5. 数据库初始化/升级
## 5. 数据库初始化
项目根目录提供 `update.sql`
- 首次部署:执行建表 SQL
- 升级:执行 `ALTER TABLE ... ADD COLUMN IF NOT EXISTS ...` 的增量语句
项目根目录提供 `update.sql`首次部署执行建表 SQL。
---
## 6. 接口文档
更详细接口示例见:`docs/api.md`(包含模型管理、任务接口、批量领取接口、字段说明)。
---
## 7. 开发与发布建议Git
## 6. 开发与发布建议Git
- `dev`:日常开发与联调
- `main`:线上稳定分支
@@ -115,4 +142,3 @@
1) 从 `main` 拉出 `dev`
2) 功能完成后提 MR/PR 合并回 `main`
3) 打 tag / 发布镜像

View File

@@ -31,29 +31,22 @@ database:
asynch:
worker:
enabled: true # 是否启用后台 worker开发环境可关闭避免刷DB错误
pollInterval: "5s" # 轮询间隔DB抢占 pending 任务)
batchSize: 5 # 每次抢占任务数量
pollInterval: "10s" # 轮询间隔DB抢占 pending 任务)
batchSize: 10 # 每次抢占任务数量
goroutines: 1 # worker 并发数(每个 goroutine 串行处理)
taskTimeout: "5m" # state=0/1 超时自动失败
cleaner:
enabled: true # 是否启用自动清理器(可选)
interval: "5s" # 清理任务扫描间隔
interval: "10s" # 清理任务扫描间隔
redis:
default:
address: 116.204.74.41:6379
address: 192.168.3.30:6379
db: 0
consul:
address: 116.204.74.41:8500
address: 192.168.3.30:8500
jaeger:
addr: 116.204.74.41:4318
# OSS 文件服务
# 当前实现:通过 common/http 的服务发现直接调用:
# POST oss/file/uploadFile (multipart/form-data)
# 鉴权:透传 Authorization / X-User-Info
oss:
addr: "116.204.74.41:9000"
addr: 192.168.3.30:4318

View File

@@ -3,4 +3,6 @@ package public
const (
TableNameModel = "asynch_models" // 异步模型表
TableNameTask = "asynch_task" // 异步任务表
TableNameOpLog = "asynch_op_log" // 异步操作日志表
TableNameStat = "asynch_model_stat" // 按天统计表(请求次数)
)

View File

@@ -56,7 +56,11 @@ func (c *model) ListModel(ctx context.Context, req *dto.ListModelReq) (res *dto.
pageSize = int(req.Page.PageSize)
}
}
list, total, err := service.Model.List(ctx, pageNum, pageSize)
modelName := ""
if req != nil {
modelName = req.ModelName
}
list, total, err := service.Model.List(ctx, pageNum, pageSize, modelName)
if err != nil {
return nil, err
}

View File

@@ -0,0 +1,20 @@
package controller
import (
"context"
"model-asynch/model/dto"
"model-asynch/service"
)
type stat struct{}
// Stat 统计控制器
var Stat = new(stat)
// ListModelStat 统计列表
func (c *stat) ListModelStat(ctx context.Context, req *dto.ListModelStatReq) (res *dto.ListModelStatRes, err error) {
ctx = ensureUser(ctx)
return service.Stat.List(ctx, req)
}

View File

@@ -29,3 +29,9 @@ func (c *task) GetTaskBatch(ctx context.Context, req *dto.GetTaskBatchReq) (res
ctx = ensureUser(ctx)
return service.Task.GetBatch(ctx, req)
}
// ListTask 任务列表分页查询
func (c *task) ListTask(ctx context.Context, req *dto.ListTaskReq) (res *dto.ListTaskRes, err error) {
ctx = ensureUser(ctx)
return service.Task.List(ctx, req)
}

View File

@@ -73,8 +73,11 @@ func (d *modelDao) GetByID(ctx context.Context, id int64) (m *entity.AsynchModel
return
}
func (d *modelDao) List(ctx context.Context, pageNum, pageSize int) (list []*entity.AsynchModel, total int64, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameModel).OrderDesc(entity.AsynchModelCol.CreatedAt)
func (d *modelDao) List(ctx context.Context, pageNum, pageSize int, modelNameLike string) (list []*entity.AsynchModel, total int64, err error) {
model := gfdb.DB(ctx).Model(ctx, public.TableNameModel).Where("deleted_at IS NULL").OrderDesc(entity.AsynchModelCol.CreatedAt)
if modelNameLike != "" {
model = model.WhereLike(entity.AsynchModelCol.ModelName, "%"+modelNameLike+"%")
}
if pageNum > 0 && pageSize > 0 {
model = model.Page(pageNum, pageSize)
}
@@ -86,4 +89,3 @@ func (d *modelDao) List(ctx context.Context, pageNum, pageSize int) (list []*ent
err = r.Structs(&list)
return
}

22
dao/op_log_dao.go Normal file
View File

@@ -0,0 +1,22 @@
package dao
import (
"context"
"model-asynch/consts/public"
"model-asynch/model/entity"
"gitea.com/red-future/common/db/gfdb"
)
type opLogDao struct{}
var OpLog = &opLogDao{}
func (d *opLogDao) Insert(ctx context.Context, log *entity.AsynchOpLog) (id int64, err error) {
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameOpLog).Data(log).Insert()
if err != nil {
return 0, err
}
return r.LastInsertId()
}

61
dao/stat_dao.go Normal file
View File

@@ -0,0 +1,61 @@
package dao
import (
"context"
"fmt"
"time"
"model-asynch/consts/public"
"model-asynch/model/entity"
"gitea.com/red-future/common/db/gfdb"
"github.com/gogf/gf/v2/os/gtime"
)
type statDao struct{}
var Stat = &statDao{}
// IncRequestCount 原子累加(支持分布式/多协程):按天+租户+创建人+模型 +1
func (d *statDao) IncRequestCount(ctx context.Context, day time.Time, tenantId int64, creator, modelName string) error {
sql := fmt.Sprintf(`
INSERT INTO %s(day, tenant_id, creator, model_name, request_count, created_at, updated_at)
VALUES(?, ?, ?, ?, 1, NOW(), NOW())
ON CONFLICT (day, tenant_id, creator, model_name)
DO UPDATE SET request_count = %s.request_count + 1, updated_at = NOW()`,
public.TableNameStat, public.TableNameStat,
)
_, err := gfdb.DB(ctx).Exec(ctx, sql, gtime.New(day).Format("Y-m-d"), tenantId, creator, modelName)
return err
}
func (d *statDao) List(ctx context.Context, pageNum, pageSize int, startDay, endDay string, tenantId *int64, creator, modelName string) (list []*entity.AsynchModelStat, total int64, err error) {
m := gfdb.DB(ctx).Model(ctx, public.TableNameStat).Where("1=1")
if startDay != "" {
m = m.Where("day >= ?", startDay)
}
if endDay != "" {
m = m.Where("day <= ?", endDay)
}
if tenantId != nil {
m = m.Where("tenant_id = ?", *tenantId)
}
if creator != "" {
m = m.WhereLike("creator", "%"+creator+"%")
}
if modelName != "" {
m = m.WhereLike("model_name", "%"+modelName+"%")
}
m = m.OrderDesc("day").OrderDesc("request_count")
if pageNum > 0 && pageSize > 0 {
m = m.Page(pageNum, pageSize)
}
r, totalInt, err := m.AllAndCount(false)
if err != nil {
return nil, 0, err
}
total = int64(totalInt)
err = r.Structs(&list)
return
}

View File

@@ -11,6 +11,7 @@ import (
"gitea.com/red-future/common/db/gfdb"
"github.com/gogf/gf/v2/database/gdb"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
)
var Task = &taskDao{}
@@ -136,6 +137,31 @@ func (d *taskDao) CountActiveByModel(ctx context.Context, modelName string) (int
return int64(n), err
}
// List 任务分页查询(受 gfdb 租户 Hook 影响)
func (d *taskDao) List(ctx context.Context, pageNum, pageSize int, modelNameLike, taskIDLike string, state *int) (list []*entity.AsynchTask, total int64, err error) {
m := gfdb.DB(ctx).Model(ctx, public.TableNameTask).Where("deleted_at IS NULL")
if modelNameLike != "" {
m = m.WhereLike(entity.AsynchTaskCol.ModelName, "%"+modelNameLike+"%")
}
if taskIDLike != "" {
m = m.WhereLike(entity.AsynchTaskCol.TaskID, "%"+taskIDLike+"%")
}
if state != nil {
m = m.Where(entity.AsynchTaskCol.State, *state)
}
m = m.OrderDesc(entity.AsynchTaskCol.CreatedAt)
if pageNum > 0 && pageSize > 0 {
m = m.Page(pageNum, pageSize)
}
r, totalInt, err := m.AllAndCount(false)
if err != nil {
return nil, 0, err
}
total = gconv.Int64(totalInt)
err = r.Structs(&list)
return
}
// ClaimPending 抢占 pending 任务state=0并在同一事务中更新为 runningstate=1
// 使用 PostgreSQL: FOR UPDATE SKIP LOCKED 避免多 worker 重复消费
func (d *taskDao) ClaimPending(ctx context.Context, batchSize int) (tasks []*entity.AsynchTask, err error) {

View File

@@ -20,7 +20,7 @@ func (d *taskDao) ClaimPendingGlobal(ctx context.Context, batchSize int) (tasks
}
err = gfdb.DB(ctx).Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
sql := fmt.Sprintf(
`SELECT id, tenant_id, model_name, task_id, input_ref, request_payload
`SELECT id, tenant_id, creator, model_name, task_id, input_ref, request_payload, phase, tmp_file
FROM %s
WHERE deleted_at IS NULL AND state = 0
ORDER BY enqueue_at ASC
@@ -58,7 +58,7 @@ func (d *taskDao) ClaimPendingGlobal(ctx context.Context, batchSize int) (tasks
func (d *taskDao) UpdateSuccessGlobal(ctx context.Context, id int64, ossFile, fileType string, fileSize int64, expireAt *gtime.Time) error {
now := gtime.Now()
_, err := gfdb.DB(ctx).Exec(ctx,
fmt.Sprintf(`UPDATE %s SET state=2, oss_file=?, file_type=?, file_size=?, error_msg='', finished_at=?, expire_at=NULL, updated_at=? WHERE id=?`, public.TableNameTask),
fmt.Sprintf(`UPDATE %s SET state=2, oss_file=?, file_type=?, file_size=?, error_msg='', finished_at=?, expire_at=NULL, phase=0, tmp_file='', updated_at=? WHERE id=?`, public.TableNameTask),
ossFile, fileType, fileSize, now, now, id,
)
return err
@@ -67,12 +67,31 @@ func (d *taskDao) UpdateSuccessGlobal(ctx context.Context, id int64, ossFile, fi
func (d *taskDao) UpdateFailedGlobal(ctx context.Context, id int64, errorMsg string) error {
now := gtime.Now()
_, err := gfdb.DB(ctx).Exec(ctx,
fmt.Sprintf(`UPDATE %s SET state=3, error_msg=?, finished_at=?, updated_at=? WHERE id=?`, public.TableNameTask),
fmt.Sprintf(`UPDATE %s SET state=3, error_msg=?, finished_at=?, phase=0, tmp_file='', updated_at=? WHERE id=?`, public.TableNameTask),
errorMsg, now, now, id,
)
return err
}
// UpdateFailedKeepTmpGlobal OSS 上传失败:保留 phase/tmp_file下一轮仅重试 OSS 上传
func (d *taskDao) UpdateFailedKeepTmpGlobal(ctx context.Context, id int64, errorMsg string) error {
now := gtime.Now()
_, err := gfdb.DB(ctx).Exec(ctx,
fmt.Sprintf(`UPDATE %s SET state=3, error_msg=?, finished_at=?, phase=1, updated_at=? WHERE id=?`, public.TableNameTask),
errorMsg, now, now, id,
)
return err
}
// UpdateTmpAfterModelGlobal 模型调用成功后,写入临时文件路径并标记 phase=1
func (d *taskDao) UpdateTmpAfterModelGlobal(ctx context.Context, id int64, tmpFile string) error {
_, err := gfdb.DB(ctx).Exec(ctx,
fmt.Sprintf(`UPDATE %s SET phase=1, tmp_file=?, updated_at=NOW() WHERE id=?`, public.TableNameTask),
tmpFile, id,
)
return err
}
func (d *taskDao) SoftDeleteByTaskIDGlobal(ctx context.Context, taskID string) error {
_, err := gfdb.DB(ctx).Exec(ctx,
fmt.Sprintf(`UPDATE %s SET deleted_at=NOW(), updated_at=NOW() WHERE task_id=? AND deleted_at IS NULL`, public.TableNameTask),
@@ -113,7 +132,8 @@ func (d *taskDao) ListFailedRetryableGlobal(ctx context.Context, limit int) (lis
}
r, err := gfdb.DB(ctx).GetAll(ctx,
fmt.Sprintf(`
SELECT t.*
SELECT t.*,
m.retry_queue_max_seconds AS retry_queue_max_seconds
FROM %s t
JOIN %s m
ON t.tenant_id = m.tenant_id
@@ -132,11 +152,13 @@ SELECT t.*
return
}
// RequeueForRetryGlobal 将任务重新入队state=0enqueue_at=now),并将 retry_count +1
func (d *taskDao) RequeueForRetryGlobal(ctx context.Context, id int64) error {
// RequeueForRetryGlobal 将任务重新入队state=0并将 retry_count +1
// enqueueAt 用于控制重试任务在队列中的位置:
// - enqueueAt 越早越靠前ClaimPendingGlobal 按 enqueue_at ASC 抢占)
func (d *taskDao) RequeueForRetryGlobal(ctx context.Context, id int64, enqueueAt time.Time) error {
_, err := gfdb.DB(ctx).Exec(ctx,
fmt.Sprintf(`UPDATE %s SET state=0, retry_count=retry_count+1, enqueue_at=NOW(), updated_at=NOW() WHERE id=? AND state=3 AND deleted_at IS NULL`, public.TableNameTask),
id,
fmt.Sprintf(`UPDATE %s SET state=0, retry_count=retry_count+1, enqueue_at=?, updated_at=NOW() WHERE id=? AND state=3 AND deleted_at IS NULL`, public.TableNameTask),
enqueueAt, id,
)
return err
}

View File

@@ -27,6 +27,7 @@ func main() {
http.RouteRegister([]interface{}{
controller.Model,
controller.Task,
controller.Stat,
})
// 启动后台任务worker + 清理器

View File

@@ -16,8 +16,9 @@ type CreateModelReq struct {
Enabled int `p:"enabled" json:"enabled" dc:"是否启用0-禁用1-启用"`
MaxConcurrency int `p:"maxConcurrency" json:"maxConcurrency" dc:"最大并发数"`
QueueLimit int `p:"queueLimit" json:"queueLimit" dc:"排队队列上限(超过则拒绝/限流)"`
TimeoutMs int `p:"timeoutMs" json:"timeoutMs" dc:"请求超时时间(秒)"`
TimeoutSeconds int `p:"timeoutSeconds" json:"timeoutSeconds" dc:"请求超时时间(秒)"`
RetryTimes int `p:"retryTimes" json:"retryTimes" dc:"失败重试次数"`
RetryQueueMaxSeconds int `p:"retryQueueMaxSeconds" json:"retryQueueMaxSeconds" dc:"失败重试最大排队时间0表示失败重试插队到队首>0表示排队超过该时间后插队否则仍到队尾"`
AutoCleanSeconds int `p:"autoCleanSeconds" json:"autoCleanSeconds" dc:"自动清理间隔(秒)(如清理超时任务/队列)"`
Remark string `p:"remark" json:"remark" dc:"备注说明"`
}
@@ -37,8 +38,9 @@ type UpdateModelReq struct {
Enabled *int `p:"enabled" json:"enabled" dc:"是否启用0-禁用1-启用(可选更新)"`
MaxConcurrency *int `p:"maxConcurrency" json:"maxConcurrency" dc:"最大并发数(可选更新)"`
QueueLimit *int `p:"queueLimit" json:"queueLimit" dc:"排队队列上限(可选更新)"`
TimeoutMs *int `p:"timeoutMs" json:"timeoutMs" dc:"请求超时时间(秒)(可选更新)"`
TimeoutSeconds *int `p:"timeoutSeconds" json:"timeoutSeconds" dc:"请求超时时间(秒)(可选更新)"`
RetryTimes *int `p:"retryTimes" json:"retryTimes" dc:"失败重试次数(可选更新)"`
RetryQueueMaxSeconds *int `p:"retryQueueMaxSeconds" json:"retryQueueMaxSeconds" dc:"失败重试最大排队时间(秒)(可选更新)"`
AutoCleanSeconds *int `p:"autoCleanSeconds" json:"autoCleanSeconds" dc:"自动清理间隔(秒)(可选更新)"`
Remark *string `p:"remark" json:"remark" dc:"备注说明(可选更新)"`
}
@@ -63,6 +65,7 @@ type GetModelRes struct {
type ListModelReq struct {
g.Meta `path:"/listModel" method:"post" tags:"模型管理" summary:"模型配置列表" dc:"分页获取模型配置列表"`
Page *beans.Page `p:"page" json:"page" dc:"分页参数"`
ModelName string `p:"modelName" json:"modelName" dc:"模型名称(模糊查询,可选)"`
}
type ListModelRes struct {

23
model/dto/stat_dto.go Normal file
View File

@@ -0,0 +1,23 @@
package dto
import (
"gitea.com/red-future/common/beans"
"github.com/gogf/gf/v2/frame/g"
)
// ListModelStatReq 统计列表
type ListModelStatReq struct {
g.Meta `path:"/listModelStat" method:"post" tags:"统计" summary:"模型请求统计列表" dc:"按天统计模型请求次数,支持分页与条件筛选"`
Page *beans.Page `p:"page" json:"page" dc:"分页参数默认10条"`
StartDay string `p:"startDay" json:"startDay" dc:"开始日期YYYY-MM-DD可选"`
EndDay string `p:"endDay" json:"endDay" dc:"结束日期YYYY-MM-DD可选"`
TenantID *int64 `p:"tenantId" json:"tenantId" dc:"租户ID可选"`
Creator string `p:"creator" json:"creator" dc:"创建人(可选,模糊匹配)"`
ModelName string `p:"modelName" json:"modelName" dc:"模型名称(可选,模糊匹配)"`
}
type ListModelStatRes struct {
List any `json:"list" dc:"列表数据"`
Total int64 `json:"total" dc:"总数"`
}

View File

@@ -1,11 +1,15 @@
package dto
import "github.com/gogf/gf/v2/frame/g"
import (
"gitea.com/red-future/common/beans"
"github.com/gogf/gf/v2/frame/g"
)
// CreateTaskReq 创建异步任务
type CreateTaskReq struct {
g.Meta `path:"/createTask" method:"post" tags:"任务管理" summary:"创建异步任务" dc:"创建异步任务并返回任务ID"`
ModelName string `p:"modelName" json:"modelName" v:"required#modelName不能为空" dc:"模型名称"`
BizName string `p:"bizName" json:"bizName" dc:"业务名称(调用方模块/系统,用于统计)"`
InputRef string `p:"inputRef" json:"inputRef" dc:"输入引用如OSS/文件引用等)"`
RequestPayload any `p:"requestPayload" json:"requestPayload" dc:"请求负载(透传给模型服务)"`
}
@@ -40,3 +44,17 @@ type GetTaskBatchItem struct {
type GetTaskBatchRes struct {
List []GetTaskBatchItem `json:"list" dc:"任务列表"`
}
// ListTaskReq 任务列表分页查询
type ListTaskReq struct {
g.Meta `path:"/listTask" method:"post" tags:"任务管理" summary:"任务列表" dc:"分页查询任务列表,支持按状态/模型名称/task_id过滤"`
Page *beans.Page `p:"page" json:"page" dc:"分页参数"`
ModelName string `p:"modelName" json:"modelName" dc:"模型名称(模糊匹配)"`
TaskID string `p:"taskId" json:"taskId" dc:"任务ID模糊匹配"`
State *int `p:"state" json:"state" dc:"任务状态0/1/2/3/4可选"`
}
type ListTaskRes struct {
List any `json:"list" dc:"列表数据"`
Total int64 `json:"total" dc:"总数"`
}

View File

@@ -12,8 +12,9 @@ type asynchModelCol struct {
Enabled string
MaxConcurrency string
QueueLimit string
TimeoutMs string
TimeoutSeconds string
RetryTimes string
RetryQueueMaxSecs string
AutoCleanSeconds string
Remark string
}
@@ -28,8 +29,9 @@ var AsynchModelCol = asynchModelCol{
Enabled: "enabled",
MaxConcurrency: "max_concurrency",
QueueLimit: "queue_limit",
TimeoutMs: "timeout_ms",
TimeoutSeconds: "timeout_seconds",
RetryTimes: "retry_times",
RetryQueueMaxSecs: "retry_queue_max_seconds",
AutoCleanSeconds: "auto_clean_seconds",
Remark: "remark",
}
@@ -45,8 +47,9 @@ type AsynchModel struct {
Enabled int `orm:"enabled" json:"enabled"`
MaxConcurrency int `orm:"max_concurrency" json:"maxConcurrency"`
QueueLimit int `orm:"queue_limit" json:"queueLimit"`
TimeoutMs int `orm:"timeout_ms" json:"timeoutMs"`
TimeoutSeconds int `orm:"timeout_seconds" json:"timeoutSeconds"`
RetryTimes int `orm:"retry_times" json:"retryTimes"`
RetryQueueMaxSecs int `orm:"retry_queue_max_seconds" json:"retryQueueMaxSeconds"`
AutoCleanSeconds int `orm:"auto_clean_seconds" json:"autoCleanSeconds"`
Remark string `orm:"remark" json:"remark"`
}

View File

@@ -0,0 +1,16 @@
package entity
import "github.com/gogf/gf/v2/os/gtime"
// AsynchModelStat 按天统计:某天/租户/创建人/模型的请求次数
// 注:这里不走通用 SQLBaseDO采用联合唯一键day,tenant_id,creator,model_name做 UPSERT 原子累加。
type AsynchModelStat struct {
Day *gtime.Time `orm:"day" json:"day"` // 日期(建议仅使用日期部分)
TenantId int64 `orm:"tenant_id" json:"tenantId,string"`
Creator string `orm:"creator" json:"creator"`
ModelName string `orm:"model_name" json:"modelName"`
RequestCount int64 `orm:"request_count" json:"requestCount"`
CreatedAt *gtime.Time `orm:"created_at" json:"createdAt"`
UpdatedAt *gtime.Time `orm:"updated_at" json:"updatedAt"`
}

View File

@@ -0,0 +1,57 @@
package entity
import (
"gitea.com/red-future/common/beans"
)
type asynchOpLogCol struct {
beans.SQLBaseCol
IP string
UserAgent string
APIPath string
HttpMethod string
BizName string
ModelName string
TaskID string
OpType string
Success string
ErrorMsg string
CostMs string
RequestPayload string
ResponsePayload string
}
var AsynchOpLogCol = asynchOpLogCol{
SQLBaseCol: beans.DefSQLBaseCol,
IP: "ip",
UserAgent: "user_agent",
APIPath: "api_path",
HttpMethod: "http_method",
BizName: "biz_name",
ModelName: "model_name",
TaskID: "task_id",
OpType: "op_type",
Success: "success",
ErrorMsg: "error_msg",
CostMs: "cost_ms",
RequestPayload: "request_payload",
ResponsePayload: "response_payload",
}
// AsynchOpLog 操作日志(创建任务等)
type AsynchOpLog struct {
beans.SQLBaseDO `orm:",inline"`
IP string `orm:"ip" json:"ip"`
UserAgent string `orm:"user_agent" json:"userAgent"`
APIPath string `orm:"api_path" json:"apiPath"`
HttpMethod string `orm:"http_method" json:"httpMethod"`
BizName string `orm:"biz_name" json:"bizName"`
ModelName string `orm:"model_name" json:"modelName"`
TaskID string `orm:"task_id" json:"taskId"`
OpType string `orm:"op_type" json:"opType"`
Success int `orm:"success" json:"success"`
ErrorMsg string `orm:"error_msg" json:"errorMsg"`
CostMs int64 `orm:"cost_ms" json:"costMs"`
RequestPayload any `orm:"request_payload" json:"requestPayload"`
ResponsePayload any `orm:"response_payload" json:"responsePayload"`
}

View File

@@ -19,6 +19,8 @@ type asynchTaskCol struct {
ExpireAt string
RetryCount string
EnqueueAt string
Phase string
TmpFile string
InputRef string
RequestPayload string
}
@@ -37,6 +39,8 @@ var AsynchTaskCol = asynchTaskCol{
ExpireAt: "expire_at",
RetryCount: "retry_count",
EnqueueAt: "enqueue_at",
Phase: "phase",
TmpFile: "tmp_file",
InputRef: "input_ref",
RequestPayload: "request_payload",
}
@@ -56,6 +60,10 @@ type AsynchTask struct {
ExpireAt *gtime.Time `orm:"expire_at" json:"expireAt"` // 已下载(state=4)后的过期时间
RetryCount int `orm:"retry_count" json:"retryCount"`
EnqueueAt *gtime.Time `orm:"enqueue_at" json:"enqueueAt"`
Phase int `orm:"phase" json:"phase"` // 0模型阶段/1OSS阶段
TmpFile string `orm:"tmp_file" json:"tmpFile"` // 临时结果文件路径
// RetryQueueMaxSeconds 为 ListFailedRetryableGlobal 的 join 字段(非任务表字段)
RetryQueueMaxSeconds int `orm:"retry_queue_max_seconds" json:"-"`
InputRef string `orm:"input_ref" json:"inputRef"`
RequestPayload any `orm:"request_payload" json:"requestPayload"`
}

View File

@@ -45,6 +45,7 @@ func (c *cleaner) runOnce(ctx context.Context) {
g.Log().Errorf(ctx, "[cleaner] list expired(downloaded) error: %v", err)
} else {
for _, t := range expired {
deleteTmpResult(t.TmpFile)
_ = dao.Task.HardDeleteByIDGlobal(ctx, t.Id)
}
g.Log().Infof(ctx, "[cleaner] expired(downloaded) cleaned, count=%d", len(expired))
@@ -71,7 +72,20 @@ func (c *cleaner) runOnce(ctx context.Context) {
g.Log().Errorf(ctx, "[cleaner] list failed retryable error: %v", err)
} else {
for _, t := range retryable {
_ = dao.Task.RequeueForRetryGlobal(ctx, t.Id)
// retry_queue_max_seconds 控制失败重试的排队策略:
// - =0失败重试插队到队首
// - >0当任务从创建到现在的排队时长 >= maxSeconds则插队到队首否则仍放到队尾
now := time.Now()
enqueueAt := now
maxSeconds := t.RetryQueueMaxSeconds
if maxSeconds == 0 {
enqueueAt = now.Add(-100 * 365 * 24 * time.Hour)
} else if maxSeconds > 0 && t.CreatedAt != nil {
if now.Sub(t.CreatedAt.Time) >= time.Duration(maxSeconds)*time.Second {
enqueueAt = now.Add(-100 * 365 * 24 * time.Hour)
}
}
_ = dao.Task.RequeueForRetryGlobal(ctx, t.Id, enqueueAt)
}
g.Log().Infof(ctx, "[cleaner] failed retryable cleaned, count=%d", len(retryable))
}
@@ -82,6 +96,7 @@ func (c *cleaner) runOnce(ctx context.Context) {
g.Log().Errorf(ctx, "[cleaner] list failed exhausted error: %v", err)
} else {
for _, t := range exhausted {
deleteTmpResult(t.TmpFile)
_ = dao.Task.HardDeleteByIDGlobal(ctx, t.Id)
}
g.Log().Infof(ctx, "[cleaner] failed exhausted cleaned, count=%d", len(exhausted))

View File

@@ -14,24 +14,52 @@ import (
"model-asynch/model/entity"
)
func parseAPIKeyHeader(apiKey string) (k, v string) {
// parseAPIKeyHeaders 支持多个 header 绑定,逗号分隔:
// 示例:
// - X-API-Key:qwen3-tts-key,operation:true,count:123
// - X-API-Key:"qwen3-tts-key",operation:"true"
//
// 说明:
// - HTTP Header 最终都是字符串,这里做的是“值的字符串化表达”。
// - 若 value 用双引号包裹,会去掉外层引号再注入,便于在配置中区分字符串/布尔/数字等表达(以及避免值中包含特殊字符时歧义)。
func parseAPIKeyHeaders(apiKey string) map[string]string {
apiKey = strings.TrimSpace(apiKey)
if apiKey == "" {
return "", ""
return nil
}
// 支持两种写法:
// 1) HeaderName:HeaderValue推荐
// 2) HeaderName=HeaderValue兼容
if strings.Contains(apiKey, ":") {
parts := strings.SplitN(apiKey, ":", 2)
return strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1])
out := map[string]string{}
parts := strings.Split(apiKey, ",")
for _, p := range parts {
p = strings.TrimSpace(p)
if p == "" {
continue
}
if strings.Contains(apiKey, "=") {
parts := strings.SplitN(apiKey, "=", 2)
return strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1])
// HeaderName:HeaderValue推荐 / HeaderName=HeaderValue兼容
if strings.Contains(p, ":") {
kv := strings.SplitN(p, ":", 2)
k := strings.TrimSpace(kv[0])
v := strings.TrimSpace(kv[1])
v = strings.Trim(v, "\"")
if k != "" && v != "" {
out[k] = v
}
// 只给了 value不做注入避免注入非法 header
return "", ""
continue
}
if strings.Contains(p, "=") {
kv := strings.SplitN(p, "=", 2)
k := strings.TrimSpace(kv[0])
v := strings.TrimSpace(kv[1])
v = strings.Trim(v, "\"")
if k != "" && v != "" {
out[k] = v
}
continue
}
}
if len(out) == 0 {
return nil
}
return out
}
func payloadToQuery(payload any) (url.Values, error) {
@@ -76,7 +104,7 @@ func InvokeModel(ctx context.Context, m *entity.AsynchModel, payload any) ([]byt
url = strings.TrimRight(m.BaseURL, "/")
}
timeout := time.Duration(m.TimeoutMs) * time.Millisecond
timeout := time.Duration(m.TimeoutSeconds) * time.Second
if timeout <= 0 {
timeout = 60 * time.Second
}
@@ -122,7 +150,7 @@ func InvokeModel(ctx context.Context, m *entity.AsynchModel, payload any) ([]byt
req.Header.Set(k, v)
}
}
if hk, hv := parseAPIKeyHeader(m.APIKey); hk != "" && hv != "" {
for hk, hv := range parseAPIKeyHeaders(m.APIKey) {
req.Header.Set(hk, hv)
}
if method != http.MethodGet {

View File

@@ -23,8 +23,9 @@ func (s *modelService) Create(ctx context.Context, req *dto.CreateModelReq) (res
Enabled: req.Enabled,
MaxConcurrency: req.MaxConcurrency,
QueueLimit: req.QueueLimit,
TimeoutMs: req.TimeoutMs,
TimeoutSeconds: req.TimeoutSeconds,
RetryTimes: req.RetryTimes,
RetryQueueMaxSecs: req.RetryQueueMaxSeconds,
AutoCleanSeconds: req.AutoCleanSeconds,
Remark: req.Remark,
}
@@ -40,8 +41,8 @@ func (s *modelService) Create(ctx context.Context, req *dto.CreateModelReq) (res
if m.QueueLimit <= 0 {
m.QueueLimit = 1000
}
if m.TimeoutMs <= 0 {
m.TimeoutMs = 60000
if m.TimeoutSeconds <= 0 {
m.TimeoutSeconds = 60
}
if m.AutoCleanSeconds <= 0 {
m.AutoCleanSeconds = 86400
@@ -76,12 +77,15 @@ func (s *modelService) Update(ctx context.Context, req *dto.UpdateModelReq) erro
if req.QueueLimit != nil {
data[entity.AsynchModelCol.QueueLimit] = *req.QueueLimit
}
if req.TimeoutMs != nil {
data[entity.AsynchModelCol.TimeoutMs] = *req.TimeoutMs
if req.TimeoutSeconds != nil {
data[entity.AsynchModelCol.TimeoutSeconds] = *req.TimeoutSeconds
}
if req.RetryTimes != nil {
data[entity.AsynchModelCol.RetryTimes] = *req.RetryTimes
}
if req.RetryQueueMaxSeconds != nil {
data[entity.AsynchModelCol.RetryQueueMaxSecs] = *req.RetryQueueMaxSeconds
}
if req.AutoCleanSeconds != nil {
data[entity.AsynchModelCol.AutoCleanSeconds] = *req.AutoCleanSeconds
}
@@ -104,6 +108,6 @@ func (s *modelService) Get(ctx context.Context, id int64) (*entity.AsynchModel,
return dao.Model.GetByID(ctx, id)
}
func (s *modelService) List(ctx context.Context, pageNum, pageSize int) (list []*entity.AsynchModel, total int64, err error) {
return dao.Model.List(ctx, pageNum, pageSize)
func (s *modelService) List(ctx context.Context, pageNum, pageSize int, modelNameLike string) (list []*entity.AsynchModel, total int64, err error) {
return dao.Model.List(ctx, pageNum, pageSize, modelNameLike)
}

40
service/stat_service.go Normal file
View File

@@ -0,0 +1,40 @@
package service
import (
"context"
"model-asynch/dao"
"model-asynch/model/dto"
)
type statService struct{}
var Stat = &statService{}
func (s *statService) List(ctx context.Context, req *dto.ListModelStatReq) (res *dto.ListModelStatRes, err error) {
pageNum, pageSize := 1, 10
if req != nil && req.Page != nil {
if req.Page.PageNum > 0 {
pageNum = int(req.Page.PageNum)
}
if req.Page.PageSize > 0 {
pageSize = int(req.Page.PageSize)
}
}
startDay, endDay := "", ""
var tenantID *int64
creator, modelName := "", ""
if req != nil {
startDay = req.StartDay
endDay = req.EndDay
tenantID = req.TenantID
creator = req.Creator
modelName = req.ModelName
}
list, total, err := dao.Stat.List(ctx, pageNum, pageSize, startDay, endDay, tenantID, creator, modelName)
if err != nil {
return nil, err
}
return &dto.ListModelStatRes{List: list, Total: total}, nil
}

View File

@@ -5,12 +5,14 @@ import (
"context"
"fmt"
"mime/multipart"
"time"
"model-asynch/model/entity"
commonHttp "gitea.com/red-future/common/http"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
"github.com/gogf/gf/v2/util/guid"
)
// 对接你们的 oss 文件服务POST oss/file/uploadFile (multipart/form-data)
@@ -25,8 +27,6 @@ type uploadFileResponse struct {
}
func (s *ossStorage) UploadByTask(ctx context.Context, _ *entity.AsynchTask, data []byte, fileExt string, _ string) (ossURL string, err error) {
// ossUrl := g.Cfg().MustGet(ctx, "oss.addr", "192.168.3.30:9000").String()
// multipart
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
@@ -39,7 +39,8 @@ func (s *ossStorage) UploadByTask(ctx context.Context, _ *entity.AsynchTask, dat
ext = "." + ext
}
part, err := writer.CreateFormFile("file", "")
filename := fmt.Sprintf("asynch_%d_%s%s", time.Now().Unix(), guid.S(), ext)
part, err := writer.CreateFormFile("file", filename)
if err != nil {
return "", err
}
@@ -55,16 +56,14 @@ func (s *ossStorage) UploadByTask(ctx context.Context, _ *entity.AsynchTask, dat
headers["Content-Type"] = contentType
fullURL := "oss/file/uploadFile"
g.Log().Infof(ctx, "[OSS] upload start url=%s size=%d", fullURL, len(data))
g.Log().Infof(ctx, "[OSS] upload start url=%s filename=%s size=%d", fullURL, filename, len(data))
var resp uploadFileResponse
if err := commonHttp.Post(ctx, fullURL, headers, &resp, body.Bytes()); err != nil {
return "", err
}
if resp.FileURL == "" {
return "", fmt.Errorf("OSS服务返回错误: 上传失败")
}
g.Log().Infof(ctx, "[OSS] upload success url=%s filename=%s size=%d format=%s", resp.FileURL, resp.FileName, resp.FileSize, resp.FileFormat)
fmt.Println("打印结果 resp:", resp)
g.Log().Infof(ctx, "[OSS] upload success url=%s size=%d format=%s", resp.FileURL, resp.FileSize, resp.FileFormat)
return resp.FileURL, nil
}

View File

@@ -9,6 +9,8 @@ import (
"model-asynch/model/dto"
"model-asynch/model/entity"
"github.com/gogf/gf/v2/database/gdb"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gtime"
"github.com/google/uuid"
)
@@ -18,6 +20,7 @@ var Task = &taskService{}
type taskService struct{}
func (s *taskService) Create(ctx context.Context, req *dto.CreateTaskReq) (res *dto.CreateTaskRes, err error) {
startAt := time.Now()
// 固化 token/user 等信息
ctx = asyncCtx(ctx)
@@ -59,6 +62,35 @@ func (s *taskService) Create(ctx context.Context, req *dto.CreateTaskReq) (res *
if err != nil {
return nil, err
}
// 3) 写操作日志(尽量不影响主流程,失败忽略)
ip := ""
ua := ""
apiPath := "/task/createTask"
httpMethod := "POST"
if r := g.RequestFromCtx(ctx); r != nil {
ip = r.GetClientIp()
ua = r.UserAgent()
apiPath = r.URL.Path
httpMethod = r.Method
}
_, _ = dao.OpLog.Insert(ctx, &entity.AsynchOpLog{
IP: ip,
UserAgent: ua,
APIPath: apiPath,
HttpMethod: httpMethod,
BizName: req.BizName,
ModelName: req.ModelName,
TaskID: taskID,
OpType: "createTask",
Success: 1,
ErrorMsg: "",
CostMs: time.Since(startAt).Milliseconds(),
RequestPayload: storedPayload,
ResponsePayload: gdb.Map{
"taskId": taskID,
},
})
return &dto.CreateTaskRes{TaskID: taskID}, nil
}
@@ -127,3 +159,28 @@ func (s *taskService) GetBatch(ctx context.Context, req *dto.GetTaskBatchReq) (r
}
return &dto.GetTaskBatchRes{List: items}, nil
}
func (s *taskService) List(ctx context.Context, req *dto.ListTaskReq) (res *dto.ListTaskRes, err error) {
pageNum, pageSize := 1, 10
if req != nil && req.Page != nil {
if req.Page.PageNum > 0 {
pageNum = int(req.Page.PageNum)
}
if req.Page.PageSize > 0 {
pageSize = int(req.Page.PageSize)
}
}
modelName := ""
taskID := ""
var state *int
if req != nil {
modelName = req.ModelName
taskID = req.TaskID
state = req.State
}
list, total, err := dao.Task.List(ctx, pageNum, pageSize, modelName, taskID, state)
if err != nil {
return nil, err
}
return &dto.ListTaskRes{List: list, Total: total}, nil
}

38
service/tmp_store.go Normal file
View File

@@ -0,0 +1,38 @@
package service
import (
"fmt"
"os"
"path/filepath"
)
// saveTmpResult 将模型输出写入临时文件,用于 OSS 上传失败后的“仅重试 OSS”。
func saveTmpResult(taskID string, data []byte, ext string) (string, error) {
dir := filepath.Join(os.TempDir(), "model-asynch")
if err := os.MkdirAll(dir, 0o755); err != nil {
return "", err
}
if ext == "" {
ext = ".bin"
}
if ext[0] != '.' {
ext = "." + ext
}
path := filepath.Join(dir, fmt.Sprintf("%s%s", taskID, ext))
if err := os.WriteFile(path, data, 0o644); err != nil {
return "", err
}
return path, nil
}
func loadTmpResult(path string) ([]byte, error) {
return os.ReadFile(path)
}
func deleteTmpResult(path string) {
if path == "" {
return
}
_ = os.Remove(path)
}

View File

@@ -146,17 +146,46 @@ func (w *asyncWorker) handleOne(ctx context.Context, t *entity.AsynchTask) {
"inputRef": t.InputRef,
}
}
data, err := InvokeModel(ctx, m, payload)
var (
data []byte
contentType string
ext string
)
// phase=1 表示模型已成功但 OSS 上传失败:优先从临时文件加载,避免重复跑模型
if t.Phase == 1 && strings.TrimSpace(t.TmpFile) != "" {
data, err = loadTmpResult(t.TmpFile)
if err == nil && len(data) > 0 {
contentType, ext = DetectFileType(data)
} else {
// 临时文件不可用:回退重新调用模型
data = nil
}
}
if data == nil {
// 统计:仅在真正请求模型时 +1OSS 重试不计入)
_ = dao.Stat.IncRequestCount(ctx, time.Now(), int64(t.TenantId), t.Creator, t.ModelName)
data, err = InvokeModel(ctx, m, payload)
if err != nil {
_ = dao.Task.UpdateFailedGlobal(ctx, t.Id, err.Error())
return
}
contentType, ext = DetectFileType(data)
// 将模型输出写入临时文件,后续若 OSS 失败可只重试 OSS
tmpPath, err := saveTmpResult(t.TaskID, data, ext)
if err == nil && tmpPath != "" {
t.TmpFile = tmpPath
t.Phase = 1
_ = dao.Task.UpdateTmpAfterModelGlobal(ctx, t.Id, tmpPath)
}
}
// 4) 存储 OSS/MinIO
contentType, ext := DetectFileType(data)
// 4) 存储 OSS
ossURL, err := Storage.UploadByTask(ctx, t, data, ext, contentType)
if err != nil {
_ = dao.Task.UpdateFailedGlobal(ctx, t.Id, err.Error())
// OSS 阶段失败:保留临时文件,下一轮仅重试 OSS
_ = dao.Task.UpdateFailedKeepTmpGlobal(ctx, t.Id, err.Error())
return
}
@@ -170,6 +199,8 @@ func (w *asyncWorker) handleOne(ctx context.Context, t *entity.AsynchTask) {
g.Log().Errorf(ctx, "[worker] update success failed: %v", err)
return
}
// 成功后清理临时文件
deleteTmpResult(t.TmpFile)
}
func (w *asyncWorker) rollbackToPending(ctx context.Context, id int64) error {

View File

@@ -1,8 +1,12 @@
-- -----------------------王立钊2026-04-22 10:00:00-----------------------
-- model-asynch 三张核心表pgsql
-- 1) asynch_models模型配置
-- 2) asynch_task异步任务
-- 3) asynch_op_log操作日志统计用
-- 4) asynch_model_stat按天模型请求统计限流/监控用)
--------------------pgsql创建 asynch_models / asynch_task ---------------------------
-- 异步模型表
-- =========================
-- 1) asynch_models
-- =========================
CREATE TABLE IF NOT EXISTS asynch_models (
-- 基础字段(与现有表保持一致)
id BIGINT PRIMARY KEY, -- 主键ID非自增
@@ -11,57 +15,45 @@ CREATE TABLE IF NOT EXISTS asynch_models (
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updater VARCHAR(64) NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at timestamp(6),
deleted_at TIMESTAMP(6),
-- 业务字段
model_name VARCHAR(128) NOT NULL, -- 模型名称(路由键)
base_url VARCHAR(256) NOT NULL, -- 模型服务基础地址(如 http://1.2.3.4:8080
route VARCHAR(256) NOT NULL DEFAULT '', -- 模型服务路由(如 /v1/infer
route VARCHAR(256) NOT NULL DEFAULT '',-- 模型服务路由(如 /v1/infer
http_method VARCHAR(8) NOT NULL DEFAULT 'POST', -- 请求方式GET/POST
api_key VARCHAR(256) DEFAULT '', -- 请求密钥绑定(请求头示例TTS_API_KEY:your-key
api_key VARCHAR(1024) DEFAULT '', -- 请求绑定(支持多个逗号分隔X-API-Key:xxx,operation:true
enabled SMALLINT NOT NULL DEFAULT 1, -- 是否启用1启用/0停用
max_concurrency INT NOT NULL DEFAULT 10, -- 单模型最大并发worker/队列消费侧应遵守)
queue_limit INT NOT NULL DEFAULT 1000, -- 单模型排队上限(防堆积,可选
timeout_ms INT NOT NULL DEFAULT 60000, -- 调用模型服务超时(秒)
retry_times SMALLINT NOT NULL DEFAULT 0, -- 失败重试次数0表示不重试
auto_clean_seconds INT NOT NULL DEFAULT 86400, -- 已下载(state=4)后的保留时间(秒),到期后清理
max_concurrency INT NOT NULL DEFAULT 10, -- 单模型最大并发
queue_limit INT NOT NULL DEFAULT 1000, -- 排队上限(近似控制
timeout_seconds INT NOT NULL DEFAULT 60, -- 调用模型服务超时(秒)
retry_times SMALLINT NOT NULL DEFAULT 0, -- 失败后最多再重试 N 次(不含首次)
retry_queue_max_seconds INT NOT NULL DEFAULT 0, -- 失败重试最大排队时间0=插队到队首;>0=排队超过该时间后插队,否则仍到队尾
auto_clean_seconds INT NOT NULL DEFAULT 86400, -- 已下载(state=4)后的保留时间(秒)
remark TEXT DEFAULT '' -- 备注
);
-- 索引/约束
CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_models_tenant_model_name ON asynch_models(tenant_id, model_name);
CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_models_tenant_model_name
ON asynch_models(tenant_id, model_name);
CREATE INDEX IF NOT EXISTS idx_asynch_models_tenant_id ON asynch_models(tenant_id);
CREATE INDEX IF NOT EXISTS idx_asynch_models_model_name ON asynch_models(model_name);
CREATE INDEX IF NOT EXISTS idx_asynch_models_enabled ON asynch_models(enabled);
CREATE INDEX IF NOT EXISTS idx_asynch_models_deleted_at ON asynch_models(deleted_at);
-- 表和字段注释
COMMENT ON TABLE asynch_models IS '异步模型表(模型服务配置)';
COMMENT ON COLUMN asynch_models.id IS '主键ID非自增';
COMMENT ON COLUMN asynch_models.tenant_id IS '租户ID';
COMMENT ON COLUMN asynch_models.creator IS '创建人';
COMMENT ON COLUMN asynch_models.created_at IS '创建时间';
COMMENT ON COLUMN asynch_models.updater IS '更新人';
COMMENT ON COLUMN asynch_models.updated_at IS '更新时间';
COMMENT ON COLUMN asynch_models.deleted_at IS '删除时间(软删)';
COMMENT ON COLUMN asynch_models.model_name IS '模型名称(路由键)';
COMMENT ON COLUMN asynch_models.base_url IS '模型服务基础地址';
COMMENT ON COLUMN asynch_models.route IS '模型服务路由';
COMMENT ON COLUMN asynch_models.http_method IS '请求方式GET/POST';
COMMENT ON COLUMN asynch_models.api_key IS '请求密钥绑定请求头示例TTS_API_KEY:your-key';
COMMENT ON COLUMN asynch_models.enabled IS '是否启用1启用/0停用';
COMMENT ON COLUMN asynch_models.max_concurrency IS '单模型最大并发';
COMMENT ON COLUMN asynch_models.queue_limit IS '单模型排队上限';
COMMENT ON COLUMN asynch_models.timeout_ms IS '调用模型服务超时(毫秒)';
COMMENT ON COLUMN asynch_models.retry_times IS '失败重试次数';
COMMENT ON COLUMN asynch_models.auto_clean_seconds IS '已下载(state=4)后的保留时间(秒),到期后清理';
COMMENT ON COLUMN asynch_models.remark IS '备注';
-- 兼容已有库:更新字段注释
COMMENT ON COLUMN asynch_models.auto_clean_seconds IS '已下载(state=4)后的保留时间(秒),到期后清理';
COMMENT ON COLUMN asynch_models.api_key IS '请求头绑定支持多个逗号分隔X-API-Key:xxx,operation:true';
COMMENT ON COLUMN asynch_models.retry_times IS '失败后最多再重试 N 次(不含首次)';
COMMENT ON COLUMN asynch_models.retry_queue_max_seconds IS '失败重试最大排队时间0=插队到队首;>0=排队超过该时间后插队,否则仍到队尾';
COMMENT ON COLUMN asynch_models.auto_clean_seconds IS '已下载(state=4)后的保留时间(秒),到期清理';
-- 异步任务表
-- =========================
-- 2) asynch_task
-- =========================
CREATE TABLE IF NOT EXISTS asynch_task (
-- 基础字段(与现有表保持一致)
id BIGINT PRIMARY KEY, -- 主键ID非自增
@@ -70,69 +62,133 @@ CREATE TABLE IF NOT EXISTS asynch_task (
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updater VARCHAR(64) NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at timestamp(6),
deleted_at TIMESTAMP(6),
-- 任务核心字段
model_name VARCHAR(128) NOT NULL, -- 模型名称
task_id VARCHAR(64) NOT NULL, -- 任务ID对外返回
state SMALLINT NOT NULL DEFAULT 0, -- 状态:0排队中/1执行中/2成功/3失败/4已下载
oss_file VARCHAR(512) DEFAULT '', -- OSS文件URL/Key
file_type VARCHAR(32) DEFAULT '', -- 文件类型(如 mp3/mp4/png/pdf/...
state SMALLINT NOT NULL DEFAULT 0, -- 0排队中/1执行中/2成功/3失败/4已下载
oss_file VARCHAR(512) DEFAULT '', -- 结果文件OSS地址
file_type VARCHAR(32) DEFAULT '', -- 文件类型mp3/mp4/png/...
file_size BIGINT NOT NULL DEFAULT 0, -- 文件大小(字节)
error_msg TEXT DEFAULT '', -- 错误信息
-- 执行与清理字段
started_at TIMESTAMP, -- 开始执行时间
finished_at TIMESTAMP, -- 执行结束时间
expire_at TIMESTAMP, -- 过期清理时间已下载后写入updated_at + auto_clean_seconds
retry_count INT NOT NULL DEFAULT 0, -- 已重试次数(不含首次)
enqueue_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 入队时间(用于排队顺序,重试会更新时间到队尾)
-- 输入信息(用于排障/重放;如不需要可不写入或置空)
input_ref TEXT DEFAULT '', -- 输入引用如上传文件URL/OSS key/业务侧资源ID
expire_at TIMESTAMP, -- state=4 后写入,用于清理
-- 重试/排队
retry_count INT NOT NULL DEFAULT 0, -- 已重试次数(不含首次)
enqueue_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 入队时间(用于排队顺序)
-- 任务执行阶段:用于区分“重试模型”与“仅重试 OSS”
phase SMALLINT NOT NULL DEFAULT 0, -- 0模型阶段/1OSS阶段
tmp_file TEXT DEFAULT '', -- 临时结果文件路径phase=1 时仅重试 OSS 上传)
-- 输入信息(可选)
input_ref TEXT DEFAULT '', -- 输入引用如OSS/业务资源ID等
request_payload JSONB -- 请求参数(可选)
);
-- 索引/约束
CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_task_tenant_task_id ON asynch_task(tenant_id, task_id);
CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_task_tenant_task_id
ON asynch_task(tenant_id, task_id);
CREATE INDEX IF NOT EXISTS idx_asynch_task_tenant_id ON asynch_task(tenant_id);
CREATE INDEX IF NOT EXISTS idx_asynch_task_model_name ON asynch_task(model_name);
CREATE INDEX IF NOT EXISTS idx_asynch_task_state ON asynch_task(state);
CREATE INDEX IF NOT EXISTS idx_asynch_task_enqueue_at ON asynch_task(enqueue_at);
CREATE INDEX IF NOT EXISTS idx_asynch_task_updated_at ON asynch_task(updated_at);
CREATE INDEX IF NOT EXISTS idx_asynch_task_expire_at ON asynch_task(expire_at);
CREATE INDEX IF NOT EXISTS idx_asynch_task_deleted_at ON asynch_task(deleted_at);
-- 表和字段注释
COMMENT ON TABLE asynch_task IS '异步任务表';
COMMENT ON COLUMN asynch_task.id IS '主键ID非自增';
COMMENT ON COLUMN asynch_task.tenant_id IS '租户ID';
COMMENT ON COLUMN asynch_task.creator IS '创建人';
COMMENT ON COLUMN asynch_task.created_at IS '创建时间';
COMMENT ON COLUMN asynch_task.updater IS '更新人';
COMMENT ON COLUMN asynch_task.updated_at IS '更新时间';
COMMENT ON COLUMN asynch_task.deleted_at IS '删除时间(软删)';
COMMENT ON COLUMN asynch_task.model_name IS '模型名称';
COMMENT ON COLUMN asynch_task.task_id IS '任务ID对外返回';
COMMENT ON COLUMN asynch_task.state IS '状态0排队中/1执行中/2成功/3失败/4已下载';
COMMENT ON COLUMN asynch_task.oss_file IS 'OSS文件URL/Key';
COMMENT ON COLUMN asynch_task.file_type IS '文件类型';
COMMENT ON COLUMN asynch_task.file_size IS '文件大小(字节)';
COMMENT ON COLUMN asynch_task.error_msg IS '错误信息';
COMMENT ON COLUMN asynch_task.started_at IS '开始执行时间';
COMMENT ON COLUMN asynch_task.finished_at IS '执行结束时间';
COMMENT ON COLUMN asynch_task.expire_at IS '过期清理时间(已下载后保留到期)';
COMMENT ON COLUMN asynch_task.state IS '0排队中/1执行中/2成功/3失败/4已下载';
COMMENT ON COLUMN asynch_task.retry_count IS '已重试次数(不含首次)';
COMMENT ON COLUMN asynch_task.enqueue_at IS '入队时间(用于排队顺序,重试会更新时间到队尾';
COMMENT ON COLUMN asynch_task.input_ref IS '输入引用如上传文件URL/OSS key/业务侧资源ID';
COMMENT ON COLUMN asynch_task.request_payload IS '请求参数JSON';
COMMENT ON COLUMN asynch_task.enqueue_at IS '入队时间(用于排队顺序)';
COMMENT ON COLUMN asynch_task.phase IS '执行阶段0模型阶段/1OSS阶段模型已成功等待上传OSS';
COMMENT ON COLUMN asynch_task.tmp_file IS '临时结果文件路径phase=1 时仅重试 OSS 上传';
-- 兼容已有库增量加字段PostgreSQL 支持 IF NOT EXISTS
ALTER TABLE asynch_task ADD COLUMN IF NOT EXISTS retry_count INT NOT NULL DEFAULT 0;
ALTER TABLE asynch_task ADD COLUMN IF NOT EXISTS enqueue_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP;
COMMENT ON COLUMN asynch_task.retry_count IS '已重试次数(不含首次)';
COMMENT ON COLUMN asynch_task.enqueue_at IS '入队时间(用于排队顺序,重试会更新时间到队尾)';
COMMENT ON COLUMN asynch_task.state IS '状态0排队中/1执行中/2成功/3失败/4已下载';
COMMENT ON COLUMN asynch_task.expire_at IS '过期清理时间(已下载后保留到期)';
-- 对存量数据进行入队时间回填(只在 enqueue_at 为空时)
UPDATE asynch_task SET enqueue_at = created_at WHERE enqueue_at IS NULL;
-- =========================
-- 3) asynch_op_log
-- =========================
CREATE TABLE IF NOT EXISTS asynch_op_log (
-- 基础字段(与现有表保持一致)
id BIGINT PRIMARY KEY,
tenant_id BIGINT NOT NULL DEFAULT 0,
creator VARCHAR(64) NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updater VARCHAR(64) NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMP(6),
-- 基础审计信息
ip VARCHAR(64) DEFAULT '',
user_agent VARCHAR(256) DEFAULT '',
api_path VARCHAR(256) DEFAULT '',
http_method VARCHAR(16) DEFAULT '',
-- 业务信息
biz_name VARCHAR(128) NOT NULL DEFAULT '', -- 调用方业务模块/系统
model_name VARCHAR(128) NOT NULL DEFAULT '',
task_id VARCHAR(64) NOT NULL DEFAULT '',
-- 统计字段
op_type VARCHAR(64) NOT NULL DEFAULT 'createTask', -- 操作类型(默认创建任务)
success SMALLINT NOT NULL DEFAULT 1, -- 1成功/0失败
error_msg TEXT DEFAULT '',
cost_ms BIGINT NOT NULL DEFAULT 0, -- 耗时(毫秒)
-- 请求/响应 JSON用于后期统计分析
request_payload JSONB,
response_payload JSONB
);
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_tenant_time ON asynch_op_log(tenant_id, created_at);
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_model_name ON asynch_op_log(model_name);
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_biz_name ON asynch_op_log(biz_name);
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_task_id ON asynch_op_log(task_id);
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_op_type ON asynch_op_log(op_type);
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_deleted_at ON asynch_op_log(deleted_at);
COMMENT ON TABLE asynch_op_log IS '操作记录日志表(创建任务等,用于统计)';
COMMENT ON COLUMN asynch_op_log.biz_name IS '业务名称(调用方模块/系统)';
COMMENT ON COLUMN asynch_op_log.model_name IS '模型名称';
COMMENT ON COLUMN asynch_op_log.task_id IS '任务ID';
COMMENT ON COLUMN asynch_op_log.op_type IS '操作类型(如 createTask/getTaskResult/getTaskBatch 等)';
COMMENT ON COLUMN asynch_op_log.success IS '是否成功1成功/0失败';
COMMENT ON COLUMN asynch_op_log.error_msg IS '错误信息(失败时)';
COMMENT ON COLUMN asynch_op_log.cost_ms IS '耗时(毫秒)';
COMMENT ON COLUMN asynch_op_log.request_payload IS '请求 JSON';
COMMENT ON COLUMN asynch_op_log.response_payload IS '响应 JSON';
-- =========================
-- 4) asynch_model_stat
-- =========================
CREATE TABLE IF NOT EXISTS asynch_model_stat (
day DATE NOT NULL, -- 天YYYY-MM-DD
tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID
creator VARCHAR(64) NOT NULL DEFAULT '', -- 创建人
model_name VARCHAR(128) NOT NULL DEFAULT '', -- 模型名称
request_count BIGINT NOT NULL DEFAULT 0, -- 请求次数
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(day, tenant_id, creator, model_name)
);
-- 便于时间段/租户/人/模型过滤
CREATE INDEX IF NOT EXISTS idx_asynch_model_stat_tenant_day ON asynch_model_stat(tenant_id, day);
CREATE INDEX IF NOT EXISTS idx_asynch_model_stat_day ON asynch_model_stat(day);
CREATE INDEX IF NOT EXISTS idx_asynch_model_stat_model_name ON asynch_model_stat(model_name);
CREATE INDEX IF NOT EXISTS idx_asynch_model_stat_creator ON asynch_model_stat(creator);
COMMENT ON TABLE asynch_model_stat IS '按天模型请求统计(用于限流/监控)';
COMMENT ON COLUMN asynch_model_stat.day IS 'YYYY-MM-DD';
COMMENT ON COLUMN asynch_model_stat.tenant_id IS '租户ID';
COMMENT ON COLUMN asynch_model_stat.creator IS '创建人';
COMMENT ON COLUMN asynch_model_stat.model_name IS '模型名称';
COMMENT ON COLUMN asynch_model_stat.request_count IS '请求次数';
COMMENT ON COLUMN asynch_model_stat.created_at IS '创建时间';
COMMENT ON COLUMN asynch_model_stat.updated_at IS '更新时间';