Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8fb99094af | |||
| 4e6b98b7d3 | |||
| f6c70a451e |
96
README.md
96
README.md
@@ -13,8 +13,8 @@
|
||||
- 支持配置:
|
||||
- 请求地址:`base_url + route`
|
||||
- 请求方式:`http_method`(GET/POST)
|
||||
- 请求密钥:`api_key`(以请求头注入,示例:`TTS_API_KEY:your-key`)
|
||||
- 超时:`timeout_ms`
|
||||
- 请求密钥:`api_key`(以请求头注入,支持多个 header)
|
||||
- 超时:`timeout_seconds`
|
||||
- 并发:`max_concurrency`(按租户+模型的 Redis 分布式信号量限流)
|
||||
- 重试:`retry_times`(失败后最多再重试 N 次)
|
||||
- 保留:`auto_clean_seconds`(任务被业务领取到 `state=4` 后的保留秒数,到期清理)
|
||||
@@ -26,38 +26,73 @@
|
||||
- 调用模型服务(GET/POST)
|
||||
- 结果上传 OSS(调用你们的 OSS 文件服务 `oss/file/uploadFile`,透传 `Authorization/X-User-Info`)
|
||||
- 批量领取结果:批量查询 `task_id` 列表,返回 `task_id/state/oss_file`,并把成功的任务从 `state=2` 更新为 `state=4`
|
||||
- 自动重试:失败 `state=3` 会由清理器按 `retry_times` 重新入队到队尾(保证先来后到)
|
||||
- 自动重试:失败 `state=3` 会由清理器按 `retry_times` 重新入队到队尾
|
||||
- 自动清理:
|
||||
- `state=4` 且 `expire_at` 到期 → 硬删除任务(并尝试删除 OSS)
|
||||
- 失败重试耗尽仍失败 → 硬删除任务(并尝试删除 OSS)
|
||||
- `state=4` 且 `expire_at` 到期 → 硬删除任务
|
||||
- 失败重试耗尽仍失败 → 硬删除任务
|
||||
- `state=0/1` 超时 → 标记失败(防止卡死)
|
||||
|
||||
### 1.3 统计(asynch_model_stat)
|
||||
- 按天统计:`day + tenant_id + creator + model_name -> request_count`
|
||||
- 统计口径:仅在 Worker 真正调用模型服务时计数(OSS 重试不计数)
|
||||
- 用途:给其他服务提供全局限流/监控依据
|
||||
|
||||
---
|
||||
|
||||
## 2. 使用流程(业务方如何接入)
|
||||
|
||||
### 2.1 第一步:配置模型
|
||||
调用“模型管理”接口新增模型配置(例:TTS):
|
||||
- `model_name=tts`
|
||||
- `base_url=http://xxx:port`
|
||||
- `route=/tts`
|
||||
- `http_method=POST`
|
||||
- `api_key=TTS_API_KEY:your-key`(可选)
|
||||
### 第一步:创建模型配置
|
||||
业务方(或运维)先在中间件里创建/更新模型配置(`model_name` 为唯一键),例如:
|
||||
- `POST /model/createModel`(或 `/model/updateModel`)
|
||||
|
||||
### 2.2 第二步:创建任务拿 task_id
|
||||
业务方调用 `CreateTask`,传 `modelName + requestPayload`,中间件返回 `task_id`。
|
||||
业务方把 `task_id` 落到自己的业务表(状态=生成中)。
|
||||
请求示例(JSON):
|
||||
```json
|
||||
{
|
||||
"modelName": "model-service",
|
||||
"baseUrl": "http://127.0.0.1:8000",
|
||||
"route": "/api/v1/chat",
|
||||
"httpMethod": "POST",
|
||||
"apiKey": "API_KEY:model-key,API_STATE:true,API_NUM:123",
|
||||
"enabled": 1,
|
||||
"maxConcurrency": 5,
|
||||
"queueLimit": 20,
|
||||
"timeoutSeconds": 1800,
|
||||
"retryTimes": 3,
|
||||
"retryQueueMaxSeconds": 600,
|
||||
"autoCleanSeconds": 3600,
|
||||
"remark": "Model-Service 模型服务"
|
||||
}
|
||||
```
|
||||
|
||||
### 2.3 第三步:业务方领取结果(推荐批量)
|
||||
业务方在自己的“定时任务服务/轮询器”中,批量把 `task_id` 列表传给中间件:
|
||||
- 返回每个任务的 `state + oss_file`
|
||||
- 对 `state=2(成功)` 的任务,中间件会更新为 `state=4(已下载)` 并写入 `expire_at = now + auto_clean_seconds`
|
||||
参数说明:
|
||||
- `modelName`:模型名称(唯一标识/路由键)
|
||||
- `baseUrl`:模型服务地址(Base URL)
|
||||
- `route`:模型服务路由(拼接到 baseUrl 后)
|
||||
- `httpMethod`:请求方式(GET/POST)
|
||||
- `apiKey`:请求头绑定(支持多个 header,逗号分隔,格式 `Key:Value`;布尔/数字也会以字符串形式注入 header)
|
||||
- `enabled`:是否启用(0禁用/1启用)
|
||||
- `maxConcurrency`:单模型最大并发(按租户+模型维度限流)
|
||||
- `queueLimit`:排队上限(近似控制,超过则拒绝创建)
|
||||
- `timeoutSeconds`:调用模型服务超时(秒)
|
||||
- `retryTimes`:失败后最多再重试 N 次(不含首次)
|
||||
- `retryQueueMaxSeconds`:失败重试最大排队时间(秒);0 表示重试插队到队首;>0 表示排队超过该时间后插队,否则仍到队尾
|
||||
- `autoCleanSeconds`:任务被领取到 `state=4` 后的保留时间(秒),到期清理
|
||||
- `remark`:备注说明
|
||||
|
||||
业务方拿到 `oss_file` 后做“业务转移”:
|
||||
- 方案 A:直接在业务表保存 `oss_file` 作为最终资源地址
|
||||
- 方案 B:业务侧下载后重新上传到业务自己的资产域,再保存新地址
|
||||
### 第二步:创建任务拿到 task_id
|
||||
业务方发起推理请求时调用:
|
||||
- `POST /task/createTask`(传 `modelName + requestPayload (+ bizName)`)
|
||||
- 中间件返回 `task_id`
|
||||
- 业务方将 `task_id` 落到自己的业务表,并把业务状态置为「生成中」
|
||||
|
||||
> `state=4` 的数据允许重复获取,避免业务侧偶发中断导致“领取不到结果”。
|
||||
### 第三步:同步任务进度(推荐批量)
|
||||
业务方通过轮询/定时任务同步进度:
|
||||
- 推荐:`POST /task/getTaskBatch`(批量传 `taskIds`,返回每个任务的 `state + oss_file`)
|
||||
- 或单条:`GET /task/getTaskResult?taskId=...`
|
||||
|
||||
业务侧拿到 `oss_file` 后自行做资源处理(直接保存或转存),并把业务状态更新为「成功/失败」。
|
||||
|
||||
> 说明:批量接口对 `state=2(成功)` 的任务会自动标记为 `state=4(已下载)` 并写入 `expire_at`,用于后续清理。
|
||||
|
||||
---
|
||||
|
||||
@@ -93,21 +128,13 @@
|
||||
|
||||
---
|
||||
|
||||
## 5. 数据库初始化/升级
|
||||
## 5. 数据库初始化
|
||||
|
||||
项目根目录提供 `update.sql`:
|
||||
- 首次部署:执行建表 SQL
|
||||
- 升级:执行 `ALTER TABLE ... ADD COLUMN IF NOT EXISTS ...` 的增量语句
|
||||
项目根目录提供 `update.sql`:首次部署执行建表 SQL。
|
||||
|
||||
---
|
||||
|
||||
## 6. 接口文档
|
||||
|
||||
更详细接口示例见:`docs/api.md`(包含模型管理、任务接口、批量领取接口、字段说明)。
|
||||
|
||||
---
|
||||
|
||||
## 7. 开发与发布建议(Git)
|
||||
## 6. 开发与发布建议(Git)
|
||||
|
||||
- `dev`:日常开发与联调
|
||||
- `main`:线上稳定分支
|
||||
@@ -115,4 +142,3 @@
|
||||
1) 从 `main` 拉出 `dev`
|
||||
2) 功能完成后提 MR/PR 合并回 `main`
|
||||
3) 打 tag / 发布镜像
|
||||
|
||||
|
||||
21
config.yml
21
config.yml
@@ -31,29 +31,22 @@ database:
|
||||
asynch:
|
||||
worker:
|
||||
enabled: true # 是否启用后台 worker(开发环境可关闭避免刷DB错误)
|
||||
pollInterval: "5s" # 轮询间隔(DB抢占 pending 任务)
|
||||
batchSize: 5 # 每次抢占任务数量
|
||||
pollInterval: "10s" # 轮询间隔(DB抢占 pending 任务)
|
||||
batchSize: 10 # 每次抢占任务数量
|
||||
goroutines: 1 # worker 并发数(每个 goroutine 串行处理)
|
||||
taskTimeout: "5m" # state=0/1 超时自动失败
|
||||
cleaner:
|
||||
enabled: true # 是否启用自动清理器(可选)
|
||||
interval: "5s" # 清理任务扫描间隔
|
||||
|
||||
interval: "10s" # 清理任务扫描间隔
|
||||
|
||||
redis:
|
||||
default:
|
||||
address: 116.204.74.41:6379
|
||||
address: 192.168.3.30:6379
|
||||
db: 0
|
||||
|
||||
consul:
|
||||
address: 116.204.74.41:8500
|
||||
address: 192.168.3.30:8500
|
||||
|
||||
jaeger:
|
||||
addr: 116.204.74.41:4318
|
||||
|
||||
# OSS 文件服务
|
||||
# 当前实现:通过 common/http 的服务发现直接调用:
|
||||
# POST oss/file/uploadFile (multipart/form-data)
|
||||
# 鉴权:透传 Authorization / X-User-Info
|
||||
oss:
|
||||
addr: "116.204.74.41:9000"
|
||||
addr: 192.168.3.30:4318
|
||||
|
||||
|
||||
@@ -3,4 +3,6 @@ package public
|
||||
const (
|
||||
TableNameModel = "asynch_models" // 异步模型表
|
||||
TableNameTask = "asynch_task" // 异步任务表
|
||||
TableNameOpLog = "asynch_op_log" // 异步操作日志表
|
||||
TableNameStat = "asynch_model_stat" // 按天统计表(请求次数)
|
||||
)
|
||||
|
||||
@@ -56,7 +56,11 @@ func (c *model) ListModel(ctx context.Context, req *dto.ListModelReq) (res *dto.
|
||||
pageSize = int(req.Page.PageSize)
|
||||
}
|
||||
}
|
||||
list, total, err := service.Model.List(ctx, pageNum, pageSize)
|
||||
modelName := ""
|
||||
if req != nil {
|
||||
modelName = req.ModelName
|
||||
}
|
||||
list, total, err := service.Model.List(ctx, pageNum, pageSize, modelName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
20
controller/stat_controller.go
Normal file
20
controller/stat_controller.go
Normal file
@@ -0,0 +1,20 @@
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"model-asynch/model/dto"
|
||||
"model-asynch/service"
|
||||
)
|
||||
|
||||
type stat struct{}
|
||||
|
||||
// Stat 统计控制器
|
||||
var Stat = new(stat)
|
||||
|
||||
// ListModelStat 统计列表
|
||||
func (c *stat) ListModelStat(ctx context.Context, req *dto.ListModelStatReq) (res *dto.ListModelStatRes, err error) {
|
||||
ctx = ensureUser(ctx)
|
||||
return service.Stat.List(ctx, req)
|
||||
}
|
||||
|
||||
@@ -29,3 +29,9 @@ func (c *task) GetTaskBatch(ctx context.Context, req *dto.GetTaskBatchReq) (res
|
||||
ctx = ensureUser(ctx)
|
||||
return service.Task.GetBatch(ctx, req)
|
||||
}
|
||||
|
||||
// ListTask 任务列表分页查询
|
||||
func (c *task) ListTask(ctx context.Context, req *dto.ListTaskReq) (res *dto.ListTaskRes, err error) {
|
||||
ctx = ensureUser(ctx)
|
||||
return service.Task.List(ctx, req)
|
||||
}
|
||||
|
||||
@@ -73,8 +73,11 @@ func (d *modelDao) GetByID(ctx context.Context, id int64) (m *entity.AsynchModel
|
||||
return
|
||||
}
|
||||
|
||||
func (d *modelDao) List(ctx context.Context, pageNum, pageSize int) (list []*entity.AsynchModel, total int64, err error) {
|
||||
model := gfdb.DB(ctx).Model(ctx, public.TableNameModel).OrderDesc(entity.AsynchModelCol.CreatedAt)
|
||||
func (d *modelDao) List(ctx context.Context, pageNum, pageSize int, modelNameLike string) (list []*entity.AsynchModel, total int64, err error) {
|
||||
model := gfdb.DB(ctx).Model(ctx, public.TableNameModel).Where("deleted_at IS NULL").OrderDesc(entity.AsynchModelCol.CreatedAt)
|
||||
if modelNameLike != "" {
|
||||
model = model.WhereLike(entity.AsynchModelCol.ModelName, "%"+modelNameLike+"%")
|
||||
}
|
||||
if pageNum > 0 && pageSize > 0 {
|
||||
model = model.Page(pageNum, pageSize)
|
||||
}
|
||||
@@ -86,4 +89,3 @@ func (d *modelDao) List(ctx context.Context, pageNum, pageSize int) (list []*ent
|
||||
err = r.Structs(&list)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
22
dao/op_log_dao.go
Normal file
22
dao/op_log_dao.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package dao
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"model-asynch/consts/public"
|
||||
"model-asynch/model/entity"
|
||||
|
||||
"gitea.com/red-future/common/db/gfdb"
|
||||
)
|
||||
|
||||
type opLogDao struct{}
|
||||
|
||||
var OpLog = &opLogDao{}
|
||||
|
||||
func (d *opLogDao) Insert(ctx context.Context, log *entity.AsynchOpLog) (id int64, err error) {
|
||||
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameOpLog).Data(log).Insert()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return r.LastInsertId()
|
||||
}
|
||||
61
dao/stat_dao.go
Normal file
61
dao/stat_dao.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package dao
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"model-asynch/consts/public"
|
||||
"model-asynch/model/entity"
|
||||
|
||||
"gitea.com/red-future/common/db/gfdb"
|
||||
"github.com/gogf/gf/v2/os/gtime"
|
||||
)
|
||||
|
||||
type statDao struct{}
|
||||
|
||||
var Stat = &statDao{}
|
||||
|
||||
// IncRequestCount 原子累加(支持分布式/多协程):按天+租户+创建人+模型 +1
|
||||
func (d *statDao) IncRequestCount(ctx context.Context, day time.Time, tenantId int64, creator, modelName string) error {
|
||||
sql := fmt.Sprintf(`
|
||||
INSERT INTO %s(day, tenant_id, creator, model_name, request_count, created_at, updated_at)
|
||||
VALUES(?, ?, ?, ?, 1, NOW(), NOW())
|
||||
ON CONFLICT (day, tenant_id, creator, model_name)
|
||||
DO UPDATE SET request_count = %s.request_count + 1, updated_at = NOW()`,
|
||||
public.TableNameStat, public.TableNameStat,
|
||||
)
|
||||
_, err := gfdb.DB(ctx).Exec(ctx, sql, gtime.New(day).Format("Y-m-d"), tenantId, creator, modelName)
|
||||
return err
|
||||
}
|
||||
|
||||
func (d *statDao) List(ctx context.Context, pageNum, pageSize int, startDay, endDay string, tenantId *int64, creator, modelName string) (list []*entity.AsynchModelStat, total int64, err error) {
|
||||
m := gfdb.DB(ctx).Model(ctx, public.TableNameStat).Where("1=1")
|
||||
if startDay != "" {
|
||||
m = m.Where("day >= ?", startDay)
|
||||
}
|
||||
if endDay != "" {
|
||||
m = m.Where("day <= ?", endDay)
|
||||
}
|
||||
if tenantId != nil {
|
||||
m = m.Where("tenant_id = ?", *tenantId)
|
||||
}
|
||||
if creator != "" {
|
||||
m = m.WhereLike("creator", "%"+creator+"%")
|
||||
}
|
||||
if modelName != "" {
|
||||
m = m.WhereLike("model_name", "%"+modelName+"%")
|
||||
}
|
||||
m = m.OrderDesc("day").OrderDesc("request_count")
|
||||
if pageNum > 0 && pageSize > 0 {
|
||||
m = m.Page(pageNum, pageSize)
|
||||
}
|
||||
r, totalInt, err := m.AllAndCount(false)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
total = int64(totalInt)
|
||||
err = r.Structs(&list)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"gitea.com/red-future/common/db/gfdb"
|
||||
"github.com/gogf/gf/v2/database/gdb"
|
||||
"github.com/gogf/gf/v2/os/gtime"
|
||||
"github.com/gogf/gf/v2/util/gconv"
|
||||
)
|
||||
|
||||
var Task = &taskDao{}
|
||||
@@ -136,6 +137,31 @@ func (d *taskDao) CountActiveByModel(ctx context.Context, modelName string) (int
|
||||
return int64(n), err
|
||||
}
|
||||
|
||||
// List 任务分页查询(受 gfdb 租户 Hook 影响)
|
||||
func (d *taskDao) List(ctx context.Context, pageNum, pageSize int, modelNameLike, taskIDLike string, state *int) (list []*entity.AsynchTask, total int64, err error) {
|
||||
m := gfdb.DB(ctx).Model(ctx, public.TableNameTask).Where("deleted_at IS NULL")
|
||||
if modelNameLike != "" {
|
||||
m = m.WhereLike(entity.AsynchTaskCol.ModelName, "%"+modelNameLike+"%")
|
||||
}
|
||||
if taskIDLike != "" {
|
||||
m = m.WhereLike(entity.AsynchTaskCol.TaskID, "%"+taskIDLike+"%")
|
||||
}
|
||||
if state != nil {
|
||||
m = m.Where(entity.AsynchTaskCol.State, *state)
|
||||
}
|
||||
m = m.OrderDesc(entity.AsynchTaskCol.CreatedAt)
|
||||
if pageNum > 0 && pageSize > 0 {
|
||||
m = m.Page(pageNum, pageSize)
|
||||
}
|
||||
r, totalInt, err := m.AllAndCount(false)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
total = gconv.Int64(totalInt)
|
||||
err = r.Structs(&list)
|
||||
return
|
||||
}
|
||||
|
||||
// ClaimPending 抢占 pending 任务(state=0),并在同一事务中更新为 running(state=1)
|
||||
// 使用 PostgreSQL: FOR UPDATE SKIP LOCKED 避免多 worker 重复消费
|
||||
func (d *taskDao) ClaimPending(ctx context.Context, batchSize int) (tasks []*entity.AsynchTask, err error) {
|
||||
|
||||
@@ -20,7 +20,7 @@ func (d *taskDao) ClaimPendingGlobal(ctx context.Context, batchSize int) (tasks
|
||||
}
|
||||
err = gfdb.DB(ctx).Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
|
||||
sql := fmt.Sprintf(
|
||||
`SELECT id, tenant_id, model_name, task_id, input_ref, request_payload
|
||||
`SELECT id, tenant_id, creator, model_name, task_id, input_ref, request_payload, phase, tmp_file
|
||||
FROM %s
|
||||
WHERE deleted_at IS NULL AND state = 0
|
||||
ORDER BY enqueue_at ASC
|
||||
@@ -58,7 +58,7 @@ func (d *taskDao) ClaimPendingGlobal(ctx context.Context, batchSize int) (tasks
|
||||
func (d *taskDao) UpdateSuccessGlobal(ctx context.Context, id int64, ossFile, fileType string, fileSize int64, expireAt *gtime.Time) error {
|
||||
now := gtime.Now()
|
||||
_, err := gfdb.DB(ctx).Exec(ctx,
|
||||
fmt.Sprintf(`UPDATE %s SET state=2, oss_file=?, file_type=?, file_size=?, error_msg='', finished_at=?, expire_at=NULL, updated_at=? WHERE id=?`, public.TableNameTask),
|
||||
fmt.Sprintf(`UPDATE %s SET state=2, oss_file=?, file_type=?, file_size=?, error_msg='', finished_at=?, expire_at=NULL, phase=0, tmp_file='', updated_at=? WHERE id=?`, public.TableNameTask),
|
||||
ossFile, fileType, fileSize, now, now, id,
|
||||
)
|
||||
return err
|
||||
@@ -67,12 +67,31 @@ func (d *taskDao) UpdateSuccessGlobal(ctx context.Context, id int64, ossFile, fi
|
||||
func (d *taskDao) UpdateFailedGlobal(ctx context.Context, id int64, errorMsg string) error {
|
||||
now := gtime.Now()
|
||||
_, err := gfdb.DB(ctx).Exec(ctx,
|
||||
fmt.Sprintf(`UPDATE %s SET state=3, error_msg=?, finished_at=?, updated_at=? WHERE id=?`, public.TableNameTask),
|
||||
fmt.Sprintf(`UPDATE %s SET state=3, error_msg=?, finished_at=?, phase=0, tmp_file='', updated_at=? WHERE id=?`, public.TableNameTask),
|
||||
errorMsg, now, now, id,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateFailedKeepTmpGlobal OSS 上传失败:保留 phase/tmp_file,下一轮仅重试 OSS 上传
|
||||
func (d *taskDao) UpdateFailedKeepTmpGlobal(ctx context.Context, id int64, errorMsg string) error {
|
||||
now := gtime.Now()
|
||||
_, err := gfdb.DB(ctx).Exec(ctx,
|
||||
fmt.Sprintf(`UPDATE %s SET state=3, error_msg=?, finished_at=?, phase=1, updated_at=? WHERE id=?`, public.TableNameTask),
|
||||
errorMsg, now, now, id,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// UpdateTmpAfterModelGlobal 模型调用成功后,写入临时文件路径并标记 phase=1
|
||||
func (d *taskDao) UpdateTmpAfterModelGlobal(ctx context.Context, id int64, tmpFile string) error {
|
||||
_, err := gfdb.DB(ctx).Exec(ctx,
|
||||
fmt.Sprintf(`UPDATE %s SET phase=1, tmp_file=?, updated_at=NOW() WHERE id=?`, public.TableNameTask),
|
||||
tmpFile, id,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (d *taskDao) SoftDeleteByTaskIDGlobal(ctx context.Context, taskID string) error {
|
||||
_, err := gfdb.DB(ctx).Exec(ctx,
|
||||
fmt.Sprintf(`UPDATE %s SET deleted_at=NOW(), updated_at=NOW() WHERE task_id=? AND deleted_at IS NULL`, public.TableNameTask),
|
||||
@@ -113,7 +132,8 @@ func (d *taskDao) ListFailedRetryableGlobal(ctx context.Context, limit int) (lis
|
||||
}
|
||||
r, err := gfdb.DB(ctx).GetAll(ctx,
|
||||
fmt.Sprintf(`
|
||||
SELECT t.*
|
||||
SELECT t.*,
|
||||
m.retry_queue_max_seconds AS retry_queue_max_seconds
|
||||
FROM %s t
|
||||
JOIN %s m
|
||||
ON t.tenant_id = m.tenant_id
|
||||
@@ -132,11 +152,13 @@ SELECT t.*
|
||||
return
|
||||
}
|
||||
|
||||
// RequeueForRetryGlobal 将任务重新入队(state=0,enqueue_at=now),并将 retry_count +1
|
||||
func (d *taskDao) RequeueForRetryGlobal(ctx context.Context, id int64) error {
|
||||
// RequeueForRetryGlobal 将任务重新入队(state=0),并将 retry_count +1
|
||||
// enqueueAt 用于控制重试任务在队列中的位置:
|
||||
// - enqueueAt 越早,越靠前(ClaimPendingGlobal 按 enqueue_at ASC 抢占)
|
||||
func (d *taskDao) RequeueForRetryGlobal(ctx context.Context, id int64, enqueueAt time.Time) error {
|
||||
_, err := gfdb.DB(ctx).Exec(ctx,
|
||||
fmt.Sprintf(`UPDATE %s SET state=0, retry_count=retry_count+1, enqueue_at=NOW(), updated_at=NOW() WHERE id=? AND state=3 AND deleted_at IS NULL`, public.TableNameTask),
|
||||
id,
|
||||
fmt.Sprintf(`UPDATE %s SET state=0, retry_count=retry_count+1, enqueue_at=?, updated_at=NOW() WHERE id=? AND state=3 AND deleted_at IS NULL`, public.TableNameTask),
|
||||
enqueueAt, id,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
1
main.go
1
main.go
@@ -27,6 +27,7 @@ func main() {
|
||||
http.RouteRegister([]interface{}{
|
||||
controller.Model,
|
||||
controller.Task,
|
||||
controller.Stat,
|
||||
})
|
||||
|
||||
// 启动后台任务:worker + 清理器
|
||||
|
||||
@@ -7,19 +7,20 @@ import (
|
||||
|
||||
// CreateModelReq 添加模型配置
|
||||
type CreateModelReq struct {
|
||||
g.Meta `path:"/createModel" method:"post" tags:"模型管理" summary:"创建模型配置" dc:"添加新的模型配置"`
|
||||
ModelName string `p:"modelName" json:"modelName" v:"required#modelName不能为空" dc:"模型名称(唯一标识)"`
|
||||
BaseURL string `p:"baseUrl" json:"baseUrl" v:"required#baseUrl不能为空" dc:"模型服务基础地址(如 http(s)://host:port)"`
|
||||
Route string `p:"route" json:"route" dc:"路由/路径(拼接到 BaseURL 之后的可选路径)"`
|
||||
HttpMethod string `p:"httpMethod" json:"httpMethod" dc:"请求方式:GET/POST(默认POST)"`
|
||||
APIKey string `p:"apiKey" json:"apiKey" dc:"请求密钥绑定(请求头),示例:TTS_API_KEY:your-key"`
|
||||
Enabled int `p:"enabled" json:"enabled" dc:"是否启用:0-禁用,1-启用"`
|
||||
MaxConcurrency int `p:"maxConcurrency" json:"maxConcurrency" dc:"最大并发数"`
|
||||
QueueLimit int `p:"queueLimit" json:"queueLimit" dc:"排队队列上限(超过则拒绝/限流)"`
|
||||
TimeoutMs int `p:"timeoutMs" json:"timeoutMs" dc:"请求超时时间(毫秒)"`
|
||||
RetryTimes int `p:"retryTimes" json:"retryTimes" dc:"失败重试次数"`
|
||||
AutoCleanSeconds int `p:"autoCleanSeconds" json:"autoCleanSeconds" dc:"自动清理间隔(秒)(如清理超时任务/队列)"`
|
||||
Remark string `p:"remark" json:"remark" dc:"备注说明"`
|
||||
g.Meta `path:"/createModel" method:"post" tags:"模型管理" summary:"创建模型配置" dc:"添加新的模型配置"`
|
||||
ModelName string `p:"modelName" json:"modelName" v:"required#modelName不能为空" dc:"模型名称(唯一标识)"`
|
||||
BaseURL string `p:"baseUrl" json:"baseUrl" v:"required#baseUrl不能为空" dc:"模型服务基础地址(如 http(s)://host:port)"`
|
||||
Route string `p:"route" json:"route" dc:"路由/路径(拼接到 BaseURL 之后的可选路径)"`
|
||||
HttpMethod string `p:"httpMethod" json:"httpMethod" dc:"请求方式:GET/POST(默认POST)"`
|
||||
APIKey string `p:"apiKey" json:"apiKey" dc:"请求密钥绑定(请求头),示例:TTS_API_KEY:your-key"`
|
||||
Enabled int `p:"enabled" json:"enabled" dc:"是否启用:0-禁用,1-启用"`
|
||||
MaxConcurrency int `p:"maxConcurrency" json:"maxConcurrency" dc:"最大并发数"`
|
||||
QueueLimit int `p:"queueLimit" json:"queueLimit" dc:"排队队列上限(超过则拒绝/限流)"`
|
||||
TimeoutSeconds int `p:"timeoutSeconds" json:"timeoutSeconds" dc:"请求超时时间(秒)"`
|
||||
RetryTimes int `p:"retryTimes" json:"retryTimes" dc:"失败重试次数"`
|
||||
RetryQueueMaxSeconds int `p:"retryQueueMaxSeconds" json:"retryQueueMaxSeconds" dc:"失败重试最大排队时间(秒);0表示失败重试插队到队首;>0表示排队超过该时间后插队,否则仍到队尾"`
|
||||
AutoCleanSeconds int `p:"autoCleanSeconds" json:"autoCleanSeconds" dc:"自动清理间隔(秒)(如清理超时任务/队列)"`
|
||||
Remark string `p:"remark" json:"remark" dc:"备注说明"`
|
||||
}
|
||||
|
||||
type CreateModelRes struct {
|
||||
@@ -28,19 +29,20 @@ type CreateModelRes struct {
|
||||
|
||||
// UpdateModelReq 更新模型配置
|
||||
type UpdateModelReq struct {
|
||||
g.Meta `path:"/updateModel" method:"put" tags:"模型管理" summary:"更新模型配置" dc:"更新指定ID的模型配置"`
|
||||
ID int64 `p:"id" json:"id,string" v:"required#id不能为空" dc:"配置ID"`
|
||||
BaseURL string `p:"baseUrl" json:"baseUrl" dc:"模型服务基础地址"`
|
||||
Route string `p:"route" json:"route" dc:"路由/路径"`
|
||||
HttpMethod *string `p:"httpMethod" json:"httpMethod" dc:"请求方式:GET/POST(可选更新)"`
|
||||
APIKey *string `p:"apiKey" json:"apiKey" dc:"请求密钥绑定(请求头)(可选更新)"`
|
||||
Enabled *int `p:"enabled" json:"enabled" dc:"是否启用:0-禁用,1-启用(可选更新)"`
|
||||
MaxConcurrency *int `p:"maxConcurrency" json:"maxConcurrency" dc:"最大并发数(可选更新)"`
|
||||
QueueLimit *int `p:"queueLimit" json:"queueLimit" dc:"排队队列上限(可选更新)"`
|
||||
TimeoutMs *int `p:"timeoutMs" json:"timeoutMs" dc:"请求超时时间(毫秒)(可选更新)"`
|
||||
RetryTimes *int `p:"retryTimes" json:"retryTimes" dc:"失败重试次数(可选更新)"`
|
||||
AutoCleanSeconds *int `p:"autoCleanSeconds" json:"autoCleanSeconds" dc:"自动清理间隔(秒)(可选更新)"`
|
||||
Remark *string `p:"remark" json:"remark" dc:"备注说明(可选更新)"`
|
||||
g.Meta `path:"/updateModel" method:"put" tags:"模型管理" summary:"更新模型配置" dc:"更新指定ID的模型配置"`
|
||||
ID int64 `p:"id" json:"id,string" v:"required#id不能为空" dc:"配置ID"`
|
||||
BaseURL string `p:"baseUrl" json:"baseUrl" dc:"模型服务基础地址"`
|
||||
Route string `p:"route" json:"route" dc:"路由/路径"`
|
||||
HttpMethod *string `p:"httpMethod" json:"httpMethod" dc:"请求方式:GET/POST(可选更新)"`
|
||||
APIKey *string `p:"apiKey" json:"apiKey" dc:"请求密钥绑定(请求头)(可选更新)"`
|
||||
Enabled *int `p:"enabled" json:"enabled" dc:"是否启用:0-禁用,1-启用(可选更新)"`
|
||||
MaxConcurrency *int `p:"maxConcurrency" json:"maxConcurrency" dc:"最大并发数(可选更新)"`
|
||||
QueueLimit *int `p:"queueLimit" json:"queueLimit" dc:"排队队列上限(可选更新)"`
|
||||
TimeoutSeconds *int `p:"timeoutSeconds" json:"timeoutSeconds" dc:"请求超时时间(秒)(可选更新)"`
|
||||
RetryTimes *int `p:"retryTimes" json:"retryTimes" dc:"失败重试次数(可选更新)"`
|
||||
RetryQueueMaxSeconds *int `p:"retryQueueMaxSeconds" json:"retryQueueMaxSeconds" dc:"失败重试最大排队时间(秒)(可选更新)"`
|
||||
AutoCleanSeconds *int `p:"autoCleanSeconds" json:"autoCleanSeconds" dc:"自动清理间隔(秒)(可选更新)"`
|
||||
Remark *string `p:"remark" json:"remark" dc:"备注说明(可选更新)"`
|
||||
}
|
||||
|
||||
// DeleteModelReq 删除模型配置
|
||||
@@ -61,8 +63,9 @@ type GetModelRes struct {
|
||||
|
||||
// ListModelReq 配置列表
|
||||
type ListModelReq struct {
|
||||
g.Meta `path:"/listModel" method:"post" tags:"模型管理" summary:"模型配置列表" dc:"分页获取模型配置列表"`
|
||||
Page *beans.Page `p:"page" json:"page" dc:"分页参数"`
|
||||
g.Meta `path:"/listModel" method:"post" tags:"模型管理" summary:"模型配置列表" dc:"分页获取模型配置列表"`
|
||||
Page *beans.Page `p:"page" json:"page" dc:"分页参数"`
|
||||
ModelName string `p:"modelName" json:"modelName" dc:"模型名称(模糊查询,可选)"`
|
||||
}
|
||||
|
||||
type ListModelRes struct {
|
||||
|
||||
23
model/dto/stat_dto.go
Normal file
23
model/dto/stat_dto.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package dto
|
||||
|
||||
import (
|
||||
"gitea.com/red-future/common/beans"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
)
|
||||
|
||||
// ListModelStatReq 统计列表
|
||||
type ListModelStatReq struct {
|
||||
g.Meta `path:"/listModelStat" method:"post" tags:"统计" summary:"模型请求统计列表" dc:"按天统计模型请求次数,支持分页与条件筛选"`
|
||||
Page *beans.Page `p:"page" json:"page" dc:"分页参数(默认10条)"`
|
||||
StartDay string `p:"startDay" json:"startDay" dc:"开始日期(YYYY-MM-DD,可选)"`
|
||||
EndDay string `p:"endDay" json:"endDay" dc:"结束日期(YYYY-MM-DD,可选)"`
|
||||
TenantID *int64 `p:"tenantId" json:"tenantId" dc:"租户ID(可选)"`
|
||||
Creator string `p:"creator" json:"creator" dc:"创建人(可选,模糊匹配)"`
|
||||
ModelName string `p:"modelName" json:"modelName" dc:"模型名称(可选,模糊匹配)"`
|
||||
}
|
||||
|
||||
type ListModelStatRes struct {
|
||||
List any `json:"list" dc:"列表数据"`
|
||||
Total int64 `json:"total" dc:"总数"`
|
||||
}
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
package dto
|
||||
|
||||
import "github.com/gogf/gf/v2/frame/g"
|
||||
import (
|
||||
"gitea.com/red-future/common/beans"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
)
|
||||
|
||||
// CreateTaskReq 创建异步任务
|
||||
type CreateTaskReq struct {
|
||||
g.Meta `path:"/createTask" method:"post" tags:"任务管理" summary:"创建异步任务" dc:"创建异步任务并返回任务ID"`
|
||||
ModelName string `p:"modelName" json:"modelName" v:"required#modelName不能为空" dc:"模型名称"`
|
||||
BizName string `p:"bizName" json:"bizName" dc:"业务名称(调用方模块/系统,用于统计)"`
|
||||
InputRef string `p:"inputRef" json:"inputRef" dc:"输入引用(如OSS/文件引用等)"`
|
||||
RequestPayload any `p:"requestPayload" json:"requestPayload" dc:"请求负载(透传给模型服务)"`
|
||||
}
|
||||
@@ -40,3 +44,17 @@ type GetTaskBatchItem struct {
|
||||
type GetTaskBatchRes struct {
|
||||
List []GetTaskBatchItem `json:"list" dc:"任务列表"`
|
||||
}
|
||||
|
||||
// ListTaskReq 任务列表分页查询
|
||||
type ListTaskReq struct {
|
||||
g.Meta `path:"/listTask" method:"post" tags:"任务管理" summary:"任务列表" dc:"分页查询任务列表,支持按状态/模型名称/task_id过滤"`
|
||||
Page *beans.Page `p:"page" json:"page" dc:"分页参数"`
|
||||
ModelName string `p:"modelName" json:"modelName" dc:"模型名称(模糊匹配)"`
|
||||
TaskID string `p:"taskId" json:"taskId" dc:"任务ID(模糊匹配)"`
|
||||
State *int `p:"state" json:"state" dc:"任务状态(0/1/2/3/4,可选)"`
|
||||
}
|
||||
|
||||
type ListTaskRes struct {
|
||||
List any `json:"list" dc:"列表数据"`
|
||||
Total int64 `json:"total" dc:"总数"`
|
||||
}
|
||||
|
||||
@@ -12,8 +12,9 @@ type asynchModelCol struct {
|
||||
Enabled string
|
||||
MaxConcurrency string
|
||||
QueueLimit string
|
||||
TimeoutMs string
|
||||
TimeoutSeconds string
|
||||
RetryTimes string
|
||||
RetryQueueMaxSecs string
|
||||
AutoCleanSeconds string
|
||||
Remark string
|
||||
}
|
||||
@@ -28,25 +29,27 @@ var AsynchModelCol = asynchModelCol{
|
||||
Enabled: "enabled",
|
||||
MaxConcurrency: "max_concurrency",
|
||||
QueueLimit: "queue_limit",
|
||||
TimeoutMs: "timeout_ms",
|
||||
TimeoutSeconds: "timeout_seconds",
|
||||
RetryTimes: "retry_times",
|
||||
RetryQueueMaxSecs: "retry_queue_max_seconds",
|
||||
AutoCleanSeconds: "auto_clean_seconds",
|
||||
Remark: "remark",
|
||||
}
|
||||
|
||||
// AsynchModel 异步模型配置
|
||||
type AsynchModel struct {
|
||||
beans.SQLBaseDO `orm:",inline"`
|
||||
ModelName string `orm:"model_name" json:"modelName"`
|
||||
BaseURL string `orm:"base_url" json:"baseUrl"`
|
||||
Route string `orm:"route" json:"route"`
|
||||
HttpMethod string `orm:"http_method" json:"httpMethod"`
|
||||
APIKey string `orm:"api_key" json:"apiKey"`
|
||||
Enabled int `orm:"enabled" json:"enabled"`
|
||||
MaxConcurrency int `orm:"max_concurrency" json:"maxConcurrency"`
|
||||
QueueLimit int `orm:"queue_limit" json:"queueLimit"`
|
||||
TimeoutMs int `orm:"timeout_ms" json:"timeoutMs"`
|
||||
RetryTimes int `orm:"retry_times" json:"retryTimes"`
|
||||
AutoCleanSeconds int `orm:"auto_clean_seconds" json:"autoCleanSeconds"`
|
||||
Remark string `orm:"remark" json:"remark"`
|
||||
beans.SQLBaseDO `orm:",inline"`
|
||||
ModelName string `orm:"model_name" json:"modelName"`
|
||||
BaseURL string `orm:"base_url" json:"baseUrl"`
|
||||
Route string `orm:"route" json:"route"`
|
||||
HttpMethod string `orm:"http_method" json:"httpMethod"`
|
||||
APIKey string `orm:"api_key" json:"apiKey"`
|
||||
Enabled int `orm:"enabled" json:"enabled"`
|
||||
MaxConcurrency int `orm:"max_concurrency" json:"maxConcurrency"`
|
||||
QueueLimit int `orm:"queue_limit" json:"queueLimit"`
|
||||
TimeoutSeconds int `orm:"timeout_seconds" json:"timeoutSeconds"`
|
||||
RetryTimes int `orm:"retry_times" json:"retryTimes"`
|
||||
RetryQueueMaxSecs int `orm:"retry_queue_max_seconds" json:"retryQueueMaxSeconds"`
|
||||
AutoCleanSeconds int `orm:"auto_clean_seconds" json:"autoCleanSeconds"`
|
||||
Remark string `orm:"remark" json:"remark"`
|
||||
}
|
||||
|
||||
16
model/entity/asynch_model_stat.go
Normal file
16
model/entity/asynch_model_stat.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package entity
|
||||
|
||||
import "github.com/gogf/gf/v2/os/gtime"
|
||||
|
||||
// AsynchModelStat 按天统计:某天/租户/创建人/模型的请求次数
|
||||
// 注:这里不走通用 SQLBaseDO,采用联合唯一键(day,tenant_id,creator,model_name)做 UPSERT 原子累加。
|
||||
type AsynchModelStat struct {
|
||||
Day *gtime.Time `orm:"day" json:"day"` // 日期(建议仅使用日期部分)
|
||||
TenantId int64 `orm:"tenant_id" json:"tenantId,string"`
|
||||
Creator string `orm:"creator" json:"creator"`
|
||||
ModelName string `orm:"model_name" json:"modelName"`
|
||||
RequestCount int64 `orm:"request_count" json:"requestCount"`
|
||||
CreatedAt *gtime.Time `orm:"created_at" json:"createdAt"`
|
||||
UpdatedAt *gtime.Time `orm:"updated_at" json:"updatedAt"`
|
||||
}
|
||||
|
||||
57
model/entity/asynch_op_log.go
Normal file
57
model/entity/asynch_op_log.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package entity
|
||||
|
||||
import (
|
||||
"gitea.com/red-future/common/beans"
|
||||
)
|
||||
|
||||
type asynchOpLogCol struct {
|
||||
beans.SQLBaseCol
|
||||
IP string
|
||||
UserAgent string
|
||||
APIPath string
|
||||
HttpMethod string
|
||||
BizName string
|
||||
ModelName string
|
||||
TaskID string
|
||||
OpType string
|
||||
Success string
|
||||
ErrorMsg string
|
||||
CostMs string
|
||||
RequestPayload string
|
||||
ResponsePayload string
|
||||
}
|
||||
|
||||
var AsynchOpLogCol = asynchOpLogCol{
|
||||
SQLBaseCol: beans.DefSQLBaseCol,
|
||||
IP: "ip",
|
||||
UserAgent: "user_agent",
|
||||
APIPath: "api_path",
|
||||
HttpMethod: "http_method",
|
||||
BizName: "biz_name",
|
||||
ModelName: "model_name",
|
||||
TaskID: "task_id",
|
||||
OpType: "op_type",
|
||||
Success: "success",
|
||||
ErrorMsg: "error_msg",
|
||||
CostMs: "cost_ms",
|
||||
RequestPayload: "request_payload",
|
||||
ResponsePayload: "response_payload",
|
||||
}
|
||||
|
||||
// AsynchOpLog 操作日志(创建任务等)
|
||||
type AsynchOpLog struct {
|
||||
beans.SQLBaseDO `orm:",inline"`
|
||||
IP string `orm:"ip" json:"ip"`
|
||||
UserAgent string `orm:"user_agent" json:"userAgent"`
|
||||
APIPath string `orm:"api_path" json:"apiPath"`
|
||||
HttpMethod string `orm:"http_method" json:"httpMethod"`
|
||||
BizName string `orm:"biz_name" json:"bizName"`
|
||||
ModelName string `orm:"model_name" json:"modelName"`
|
||||
TaskID string `orm:"task_id" json:"taskId"`
|
||||
OpType string `orm:"op_type" json:"opType"`
|
||||
Success int `orm:"success" json:"success"`
|
||||
ErrorMsg string `orm:"error_msg" json:"errorMsg"`
|
||||
CostMs int64 `orm:"cost_ms" json:"costMs"`
|
||||
RequestPayload any `orm:"request_payload" json:"requestPayload"`
|
||||
ResponsePayload any `orm:"response_payload" json:"responsePayload"`
|
||||
}
|
||||
@@ -19,6 +19,8 @@ type asynchTaskCol struct {
|
||||
ExpireAt string
|
||||
RetryCount string
|
||||
EnqueueAt string
|
||||
Phase string
|
||||
TmpFile string
|
||||
InputRef string
|
||||
RequestPayload string
|
||||
}
|
||||
@@ -37,6 +39,8 @@ var AsynchTaskCol = asynchTaskCol{
|
||||
ExpireAt: "expire_at",
|
||||
RetryCount: "retry_count",
|
||||
EnqueueAt: "enqueue_at",
|
||||
Phase: "phase",
|
||||
TmpFile: "tmp_file",
|
||||
InputRef: "input_ref",
|
||||
RequestPayload: "request_payload",
|
||||
}
|
||||
@@ -56,6 +60,10 @@ type AsynchTask struct {
|
||||
ExpireAt *gtime.Time `orm:"expire_at" json:"expireAt"` // 已下载(state=4)后的过期时间
|
||||
RetryCount int `orm:"retry_count" json:"retryCount"`
|
||||
EnqueueAt *gtime.Time `orm:"enqueue_at" json:"enqueueAt"`
|
||||
Phase int `orm:"phase" json:"phase"` // 0模型阶段/1OSS阶段
|
||||
TmpFile string `orm:"tmp_file" json:"tmpFile"` // 临时结果文件路径
|
||||
// RetryQueueMaxSeconds 为 ListFailedRetryableGlobal 的 join 字段(非任务表字段)
|
||||
RetryQueueMaxSeconds int `orm:"retry_queue_max_seconds" json:"-"`
|
||||
InputRef string `orm:"input_ref" json:"inputRef"`
|
||||
RequestPayload any `orm:"request_payload" json:"requestPayload"`
|
||||
}
|
||||
|
||||
@@ -45,6 +45,7 @@ func (c *cleaner) runOnce(ctx context.Context) {
|
||||
g.Log().Errorf(ctx, "[cleaner] list expired(downloaded) error: %v", err)
|
||||
} else {
|
||||
for _, t := range expired {
|
||||
deleteTmpResult(t.TmpFile)
|
||||
_ = dao.Task.HardDeleteByIDGlobal(ctx, t.Id)
|
||||
}
|
||||
g.Log().Infof(ctx, "[cleaner] expired(downloaded) cleaned, count=%d", len(expired))
|
||||
@@ -71,7 +72,20 @@ func (c *cleaner) runOnce(ctx context.Context) {
|
||||
g.Log().Errorf(ctx, "[cleaner] list failed retryable error: %v", err)
|
||||
} else {
|
||||
for _, t := range retryable {
|
||||
_ = dao.Task.RequeueForRetryGlobal(ctx, t.Id)
|
||||
// retry_queue_max_seconds 控制失败重试的排队策略:
|
||||
// - =0:失败重试插队到队首
|
||||
// - >0:当任务从创建到现在的排队时长 >= maxSeconds,则插队到队首;否则仍放到队尾
|
||||
now := time.Now()
|
||||
enqueueAt := now
|
||||
maxSeconds := t.RetryQueueMaxSeconds
|
||||
if maxSeconds == 0 {
|
||||
enqueueAt = now.Add(-100 * 365 * 24 * time.Hour)
|
||||
} else if maxSeconds > 0 && t.CreatedAt != nil {
|
||||
if now.Sub(t.CreatedAt.Time) >= time.Duration(maxSeconds)*time.Second {
|
||||
enqueueAt = now.Add(-100 * 365 * 24 * time.Hour)
|
||||
}
|
||||
}
|
||||
_ = dao.Task.RequeueForRetryGlobal(ctx, t.Id, enqueueAt)
|
||||
}
|
||||
g.Log().Infof(ctx, "[cleaner] failed retryable cleaned, count=%d", len(retryable))
|
||||
}
|
||||
@@ -82,6 +96,7 @@ func (c *cleaner) runOnce(ctx context.Context) {
|
||||
g.Log().Errorf(ctx, "[cleaner] list failed exhausted error: %v", err)
|
||||
} else {
|
||||
for _, t := range exhausted {
|
||||
deleteTmpResult(t.TmpFile)
|
||||
_ = dao.Task.HardDeleteByIDGlobal(ctx, t.Id)
|
||||
}
|
||||
g.Log().Infof(ctx, "[cleaner] failed exhausted cleaned, count=%d", len(exhausted))
|
||||
|
||||
@@ -14,24 +14,52 @@ import (
|
||||
"model-asynch/model/entity"
|
||||
)
|
||||
|
||||
func parseAPIKeyHeader(apiKey string) (k, v string) {
|
||||
// parseAPIKeyHeaders 支持多个 header 绑定,逗号分隔:
|
||||
// 示例:
|
||||
// - X-API-Key:qwen3-tts-key,operation:true,count:123
|
||||
// - X-API-Key:"qwen3-tts-key",operation:"true"
|
||||
//
|
||||
// 说明:
|
||||
// - HTTP Header 最终都是字符串,这里做的是“值的字符串化表达”。
|
||||
// - 若 value 用双引号包裹,会去掉外层引号再注入,便于在配置中区分字符串/布尔/数字等表达(以及避免值中包含特殊字符时歧义)。
|
||||
func parseAPIKeyHeaders(apiKey string) map[string]string {
|
||||
apiKey = strings.TrimSpace(apiKey)
|
||||
if apiKey == "" {
|
||||
return "", ""
|
||||
return nil
|
||||
}
|
||||
// 支持两种写法:
|
||||
// 1) HeaderName:HeaderValue(推荐)
|
||||
// 2) HeaderName=HeaderValue(兼容)
|
||||
if strings.Contains(apiKey, ":") {
|
||||
parts := strings.SplitN(apiKey, ":", 2)
|
||||
return strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1])
|
||||
out := map[string]string{}
|
||||
parts := strings.Split(apiKey, ",")
|
||||
for _, p := range parts {
|
||||
p = strings.TrimSpace(p)
|
||||
if p == "" {
|
||||
continue
|
||||
}
|
||||
// HeaderName:HeaderValue(推荐) / HeaderName=HeaderValue(兼容)
|
||||
if strings.Contains(p, ":") {
|
||||
kv := strings.SplitN(p, ":", 2)
|
||||
k := strings.TrimSpace(kv[0])
|
||||
v := strings.TrimSpace(kv[1])
|
||||
v = strings.Trim(v, "\"")
|
||||
if k != "" && v != "" {
|
||||
out[k] = v
|
||||
}
|
||||
continue
|
||||
}
|
||||
if strings.Contains(p, "=") {
|
||||
kv := strings.SplitN(p, "=", 2)
|
||||
k := strings.TrimSpace(kv[0])
|
||||
v := strings.TrimSpace(kv[1])
|
||||
v = strings.Trim(v, "\"")
|
||||
if k != "" && v != "" {
|
||||
out[k] = v
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
if strings.Contains(apiKey, "=") {
|
||||
parts := strings.SplitN(apiKey, "=", 2)
|
||||
return strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1])
|
||||
if len(out) == 0 {
|
||||
return nil
|
||||
}
|
||||
// 只给了 value:不做注入(避免注入非法 header)
|
||||
return "", ""
|
||||
return out
|
||||
}
|
||||
|
||||
func payloadToQuery(payload any) (url.Values, error) {
|
||||
@@ -76,7 +104,7 @@ func InvokeModel(ctx context.Context, m *entity.AsynchModel, payload any) ([]byt
|
||||
url = strings.TrimRight(m.BaseURL, "/")
|
||||
}
|
||||
|
||||
timeout := time.Duration(m.TimeoutMs) * time.Millisecond
|
||||
timeout := time.Duration(m.TimeoutSeconds) * time.Second
|
||||
if timeout <= 0 {
|
||||
timeout = 60 * time.Second
|
||||
}
|
||||
@@ -122,7 +150,7 @@ func InvokeModel(ctx context.Context, m *entity.AsynchModel, payload any) ([]byt
|
||||
req.Header.Set(k, v)
|
||||
}
|
||||
}
|
||||
if hk, hv := parseAPIKeyHeader(m.APIKey); hk != "" && hv != "" {
|
||||
for hk, hv := range parseAPIKeyHeaders(m.APIKey) {
|
||||
req.Header.Set(hk, hv)
|
||||
}
|
||||
if method != http.MethodGet {
|
||||
|
||||
@@ -15,18 +15,19 @@ type modelService struct{}
|
||||
|
||||
func (s *modelService) Create(ctx context.Context, req *dto.CreateModelReq) (res *dto.CreateModelRes, err error) {
|
||||
m := &entity.AsynchModel{
|
||||
ModelName: req.ModelName,
|
||||
BaseURL: req.BaseURL,
|
||||
Route: req.Route,
|
||||
HttpMethod: req.HttpMethod,
|
||||
APIKey: req.APIKey,
|
||||
Enabled: req.Enabled,
|
||||
MaxConcurrency: req.MaxConcurrency,
|
||||
QueueLimit: req.QueueLimit,
|
||||
TimeoutMs: req.TimeoutMs,
|
||||
RetryTimes: req.RetryTimes,
|
||||
AutoCleanSeconds: req.AutoCleanSeconds,
|
||||
Remark: req.Remark,
|
||||
ModelName: req.ModelName,
|
||||
BaseURL: req.BaseURL,
|
||||
Route: req.Route,
|
||||
HttpMethod: req.HttpMethod,
|
||||
APIKey: req.APIKey,
|
||||
Enabled: req.Enabled,
|
||||
MaxConcurrency: req.MaxConcurrency,
|
||||
QueueLimit: req.QueueLimit,
|
||||
TimeoutSeconds: req.TimeoutSeconds,
|
||||
RetryTimes: req.RetryTimes,
|
||||
RetryQueueMaxSecs: req.RetryQueueMaxSeconds,
|
||||
AutoCleanSeconds: req.AutoCleanSeconds,
|
||||
Remark: req.Remark,
|
||||
}
|
||||
if m.HttpMethod == "" {
|
||||
m.HttpMethod = "POST"
|
||||
@@ -40,8 +41,8 @@ func (s *modelService) Create(ctx context.Context, req *dto.CreateModelReq) (res
|
||||
if m.QueueLimit <= 0 {
|
||||
m.QueueLimit = 1000
|
||||
}
|
||||
if m.TimeoutMs <= 0 {
|
||||
m.TimeoutMs = 60000
|
||||
if m.TimeoutSeconds <= 0 {
|
||||
m.TimeoutSeconds = 60
|
||||
}
|
||||
if m.AutoCleanSeconds <= 0 {
|
||||
m.AutoCleanSeconds = 86400
|
||||
@@ -76,12 +77,15 @@ func (s *modelService) Update(ctx context.Context, req *dto.UpdateModelReq) erro
|
||||
if req.QueueLimit != nil {
|
||||
data[entity.AsynchModelCol.QueueLimit] = *req.QueueLimit
|
||||
}
|
||||
if req.TimeoutMs != nil {
|
||||
data[entity.AsynchModelCol.TimeoutMs] = *req.TimeoutMs
|
||||
if req.TimeoutSeconds != nil {
|
||||
data[entity.AsynchModelCol.TimeoutSeconds] = *req.TimeoutSeconds
|
||||
}
|
||||
if req.RetryTimes != nil {
|
||||
data[entity.AsynchModelCol.RetryTimes] = *req.RetryTimes
|
||||
}
|
||||
if req.RetryQueueMaxSeconds != nil {
|
||||
data[entity.AsynchModelCol.RetryQueueMaxSecs] = *req.RetryQueueMaxSeconds
|
||||
}
|
||||
if req.AutoCleanSeconds != nil {
|
||||
data[entity.AsynchModelCol.AutoCleanSeconds] = *req.AutoCleanSeconds
|
||||
}
|
||||
@@ -104,6 +108,6 @@ func (s *modelService) Get(ctx context.Context, id int64) (*entity.AsynchModel,
|
||||
return dao.Model.GetByID(ctx, id)
|
||||
}
|
||||
|
||||
func (s *modelService) List(ctx context.Context, pageNum, pageSize int) (list []*entity.AsynchModel, total int64, err error) {
|
||||
return dao.Model.List(ctx, pageNum, pageSize)
|
||||
func (s *modelService) List(ctx context.Context, pageNum, pageSize int, modelNameLike string) (list []*entity.AsynchModel, total int64, err error) {
|
||||
return dao.Model.List(ctx, pageNum, pageSize, modelNameLike)
|
||||
}
|
||||
|
||||
40
service/stat_service.go
Normal file
40
service/stat_service.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"model-asynch/dao"
|
||||
"model-asynch/model/dto"
|
||||
)
|
||||
|
||||
type statService struct{}
|
||||
|
||||
var Stat = &statService{}
|
||||
|
||||
func (s *statService) List(ctx context.Context, req *dto.ListModelStatReq) (res *dto.ListModelStatRes, err error) {
|
||||
pageNum, pageSize := 1, 10
|
||||
if req != nil && req.Page != nil {
|
||||
if req.Page.PageNum > 0 {
|
||||
pageNum = int(req.Page.PageNum)
|
||||
}
|
||||
if req.Page.PageSize > 0 {
|
||||
pageSize = int(req.Page.PageSize)
|
||||
}
|
||||
}
|
||||
startDay, endDay := "", ""
|
||||
var tenantID *int64
|
||||
creator, modelName := "", ""
|
||||
if req != nil {
|
||||
startDay = req.StartDay
|
||||
endDay = req.EndDay
|
||||
tenantID = req.TenantID
|
||||
creator = req.Creator
|
||||
modelName = req.ModelName
|
||||
}
|
||||
list, total, err := dao.Stat.List(ctx, pageNum, pageSize, startDay, endDay, tenantID, creator, modelName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &dto.ListModelStatRes{List: list, Total: total}, nil
|
||||
}
|
||||
|
||||
@@ -5,12 +5,14 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"mime/multipart"
|
||||
"time"
|
||||
|
||||
"model-asynch/model/entity"
|
||||
|
||||
commonHttp "gitea.com/red-future/common/http"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/util/gconv"
|
||||
"github.com/gogf/gf/v2/util/guid"
|
||||
)
|
||||
|
||||
// 对接你们的 oss 文件服务:POST oss/file/uploadFile (multipart/form-data)
|
||||
@@ -25,8 +27,6 @@ type uploadFileResponse struct {
|
||||
}
|
||||
|
||||
func (s *ossStorage) UploadByTask(ctx context.Context, _ *entity.AsynchTask, data []byte, fileExt string, _ string) (ossURL string, err error) {
|
||||
// ossUrl := g.Cfg().MustGet(ctx, "oss.addr", "192.168.3.30:9000").String()
|
||||
|
||||
// multipart
|
||||
body := &bytes.Buffer{}
|
||||
writer := multipart.NewWriter(body)
|
||||
@@ -39,7 +39,8 @@ func (s *ossStorage) UploadByTask(ctx context.Context, _ *entity.AsynchTask, dat
|
||||
ext = "." + ext
|
||||
}
|
||||
|
||||
part, err := writer.CreateFormFile("file", "")
|
||||
filename := fmt.Sprintf("asynch_%d_%s%s", time.Now().Unix(), guid.S(), ext)
|
||||
part, err := writer.CreateFormFile("file", filename)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -55,16 +56,14 @@ func (s *ossStorage) UploadByTask(ctx context.Context, _ *entity.AsynchTask, dat
|
||||
headers["Content-Type"] = contentType
|
||||
|
||||
fullURL := "oss/file/uploadFile"
|
||||
g.Log().Infof(ctx, "[OSS] upload start url=%s size=%d", fullURL, len(data))
|
||||
g.Log().Infof(ctx, "[OSS] upload start url=%s filename=%s size=%d", fullURL, filename, len(data))
|
||||
|
||||
var resp uploadFileResponse
|
||||
if err := commonHttp.Post(ctx, fullURL, headers, &resp, body.Bytes()); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if resp.FileURL == "" {
|
||||
return "", fmt.Errorf("OSS服务返回错误: 上传失败")
|
||||
}
|
||||
g.Log().Infof(ctx, "[OSS] upload success url=%s filename=%s size=%d format=%s", resp.FileURL, resp.FileName, resp.FileSize, resp.FileFormat)
|
||||
fmt.Println("打印结果 resp:", resp)
|
||||
g.Log().Infof(ctx, "[OSS] upload success url=%s size=%d format=%s", resp.FileURL, resp.FileSize, resp.FileFormat)
|
||||
return resp.FileURL, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -9,6 +9,8 @@ import (
|
||||
"model-asynch/model/dto"
|
||||
"model-asynch/model/entity"
|
||||
|
||||
"github.com/gogf/gf/v2/database/gdb"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/gogf/gf/v2/os/gtime"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
@@ -18,6 +20,7 @@ var Task = &taskService{}
|
||||
type taskService struct{}
|
||||
|
||||
func (s *taskService) Create(ctx context.Context, req *dto.CreateTaskReq) (res *dto.CreateTaskRes, err error) {
|
||||
startAt := time.Now()
|
||||
// 固化 token/user 等信息
|
||||
ctx = asyncCtx(ctx)
|
||||
|
||||
@@ -59,6 +62,35 @@ func (s *taskService) Create(ctx context.Context, req *dto.CreateTaskReq) (res *
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 3) 写操作日志(尽量不影响主流程,失败忽略)
|
||||
ip := ""
|
||||
ua := ""
|
||||
apiPath := "/task/createTask"
|
||||
httpMethod := "POST"
|
||||
if r := g.RequestFromCtx(ctx); r != nil {
|
||||
ip = r.GetClientIp()
|
||||
ua = r.UserAgent()
|
||||
apiPath = r.URL.Path
|
||||
httpMethod = r.Method
|
||||
}
|
||||
_, _ = dao.OpLog.Insert(ctx, &entity.AsynchOpLog{
|
||||
IP: ip,
|
||||
UserAgent: ua,
|
||||
APIPath: apiPath,
|
||||
HttpMethod: httpMethod,
|
||||
BizName: req.BizName,
|
||||
ModelName: req.ModelName,
|
||||
TaskID: taskID,
|
||||
OpType: "createTask",
|
||||
Success: 1,
|
||||
ErrorMsg: "",
|
||||
CostMs: time.Since(startAt).Milliseconds(),
|
||||
RequestPayload: storedPayload,
|
||||
ResponsePayload: gdb.Map{
|
||||
"taskId": taskID,
|
||||
},
|
||||
})
|
||||
return &dto.CreateTaskRes{TaskID: taskID}, nil
|
||||
}
|
||||
|
||||
@@ -127,3 +159,28 @@ func (s *taskService) GetBatch(ctx context.Context, req *dto.GetTaskBatchReq) (r
|
||||
}
|
||||
return &dto.GetTaskBatchRes{List: items}, nil
|
||||
}
|
||||
|
||||
func (s *taskService) List(ctx context.Context, req *dto.ListTaskReq) (res *dto.ListTaskRes, err error) {
|
||||
pageNum, pageSize := 1, 10
|
||||
if req != nil && req.Page != nil {
|
||||
if req.Page.PageNum > 0 {
|
||||
pageNum = int(req.Page.PageNum)
|
||||
}
|
||||
if req.Page.PageSize > 0 {
|
||||
pageSize = int(req.Page.PageSize)
|
||||
}
|
||||
}
|
||||
modelName := ""
|
||||
taskID := ""
|
||||
var state *int
|
||||
if req != nil {
|
||||
modelName = req.ModelName
|
||||
taskID = req.TaskID
|
||||
state = req.State
|
||||
}
|
||||
list, total, err := dao.Task.List(ctx, pageNum, pageSize, modelName, taskID, state)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &dto.ListTaskRes{List: list, Total: total}, nil
|
||||
}
|
||||
|
||||
38
service/tmp_store.go
Normal file
38
service/tmp_store.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
// saveTmpResult 将模型输出写入临时文件,用于 OSS 上传失败后的“仅重试 OSS”。
|
||||
func saveTmpResult(taskID string, data []byte, ext string) (string, error) {
|
||||
dir := filepath.Join(os.TempDir(), "model-asynch")
|
||||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if ext == "" {
|
||||
ext = ".bin"
|
||||
}
|
||||
if ext[0] != '.' {
|
||||
ext = "." + ext
|
||||
}
|
||||
path := filepath.Join(dir, fmt.Sprintf("%s%s", taskID, ext))
|
||||
if err := os.WriteFile(path, data, 0o644); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return path, nil
|
||||
}
|
||||
|
||||
func loadTmpResult(path string) ([]byte, error) {
|
||||
return os.ReadFile(path)
|
||||
}
|
||||
|
||||
func deleteTmpResult(path string) {
|
||||
if path == "" {
|
||||
return
|
||||
}
|
||||
_ = os.Remove(path)
|
||||
}
|
||||
|
||||
@@ -146,17 +146,46 @@ func (w *asyncWorker) handleOne(ctx context.Context, t *entity.AsynchTask) {
|
||||
"inputRef": t.InputRef,
|
||||
}
|
||||
}
|
||||
data, err := InvokeModel(ctx, m, payload)
|
||||
if err != nil {
|
||||
_ = dao.Task.UpdateFailedGlobal(ctx, t.Id, err.Error())
|
||||
return
|
||||
var (
|
||||
data []byte
|
||||
contentType string
|
||||
ext string
|
||||
)
|
||||
|
||||
// phase=1 表示模型已成功但 OSS 上传失败:优先从临时文件加载,避免重复跑模型
|
||||
if t.Phase == 1 && strings.TrimSpace(t.TmpFile) != "" {
|
||||
data, err = loadTmpResult(t.TmpFile)
|
||||
if err == nil && len(data) > 0 {
|
||||
contentType, ext = DetectFileType(data)
|
||||
} else {
|
||||
// 临时文件不可用:回退重新调用模型
|
||||
data = nil
|
||||
}
|
||||
}
|
||||
if data == nil {
|
||||
// 统计:仅在真正请求模型时 +1(OSS 重试不计入)
|
||||
_ = dao.Stat.IncRequestCount(ctx, time.Now(), int64(t.TenantId), t.Creator, t.ModelName)
|
||||
|
||||
data, err = InvokeModel(ctx, m, payload)
|
||||
if err != nil {
|
||||
_ = dao.Task.UpdateFailedGlobal(ctx, t.Id, err.Error())
|
||||
return
|
||||
}
|
||||
contentType, ext = DetectFileType(data)
|
||||
// 将模型输出写入临时文件,后续若 OSS 失败可只重试 OSS
|
||||
tmpPath, err := saveTmpResult(t.TaskID, data, ext)
|
||||
if err == nil && tmpPath != "" {
|
||||
t.TmpFile = tmpPath
|
||||
t.Phase = 1
|
||||
_ = dao.Task.UpdateTmpAfterModelGlobal(ctx, t.Id, tmpPath)
|
||||
}
|
||||
}
|
||||
|
||||
// 4) 存储 OSS/MinIO
|
||||
contentType, ext := DetectFileType(data)
|
||||
// 4) 存储 OSS
|
||||
ossURL, err := Storage.UploadByTask(ctx, t, data, ext, contentType)
|
||||
if err != nil {
|
||||
_ = dao.Task.UpdateFailedGlobal(ctx, t.Id, err.Error())
|
||||
// OSS 阶段失败:保留临时文件,下一轮仅重试 OSS
|
||||
_ = dao.Task.UpdateFailedKeepTmpGlobal(ctx, t.Id, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -170,6 +199,8 @@ func (w *asyncWorker) handleOne(ctx context.Context, t *entity.AsynchTask) {
|
||||
g.Log().Errorf(ctx, "[worker] update success failed: %v", err)
|
||||
return
|
||||
}
|
||||
// 成功后清理临时文件
|
||||
deleteTmpResult(t.TmpFile)
|
||||
}
|
||||
|
||||
func (w *asyncWorker) rollbackToPending(ctx context.Context, id int64) error {
|
||||
|
||||
244
update.sql
244
update.sql
@@ -1,138 +1,194 @@
|
||||
-- -----------------------王立钊2026-04-22 10:00:00-----------------------
|
||||
-- model-asynch 三张核心表(pgsql)
|
||||
-- 1) asynch_models:模型配置
|
||||
-- 2) asynch_task:异步任务
|
||||
-- 3) asynch_op_log:操作日志(统计用)
|
||||
-- 4) asynch_model_stat:按天模型请求统计(限流/监控用)
|
||||
|
||||
--------------------pgsql创建 asynch_models / asynch_task 表语句---------------------------
|
||||
|
||||
-- 异步模型表
|
||||
-- =========================
|
||||
-- 1) asynch_models
|
||||
-- =========================
|
||||
CREATE TABLE IF NOT EXISTS asynch_models (
|
||||
-- 基础字段(与现有表保持一致)
|
||||
id BIGINT PRIMARY KEY, -- 主键ID(非自增)
|
||||
tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID
|
||||
id BIGINT PRIMARY KEY, -- 主键ID(非自增)
|
||||
tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID
|
||||
creator VARCHAR(64) NOT NULL,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updater VARCHAR(64) NOT NULL,
|
||||
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
deleted_at timestamp(6),
|
||||
deleted_at TIMESTAMP(6),
|
||||
|
||||
-- 业务字段
|
||||
model_name VARCHAR(128) NOT NULL, -- 模型名称(路由键)
|
||||
base_url VARCHAR(256) NOT NULL, -- 模型服务基础地址(如 http://1.2.3.4:8080)
|
||||
route VARCHAR(256) NOT NULL DEFAULT '', -- 模型服务路由(如 /v1/infer)
|
||||
model_name VARCHAR(128) NOT NULL, -- 模型名称(路由键)
|
||||
base_url VARCHAR(256) NOT NULL, -- 模型服务基础地址(如 http://1.2.3.4:8080)
|
||||
route VARCHAR(256) NOT NULL DEFAULT '',-- 模型服务路由(如 /v1/infer)
|
||||
http_method VARCHAR(8) NOT NULL DEFAULT 'POST', -- 请求方式:GET/POST
|
||||
api_key VARCHAR(256) DEFAULT '', -- 请求密钥绑定(请求头),示例:TTS_API_KEY:your-key
|
||||
enabled SMALLINT NOT NULL DEFAULT 1, -- 是否启用:1启用/0停用
|
||||
max_concurrency INT NOT NULL DEFAULT 10, -- 单模型最大并发(worker/队列消费侧应遵守)
|
||||
queue_limit INT NOT NULL DEFAULT 1000, -- 单模型排队上限(防堆积,可选)
|
||||
timeout_ms INT NOT NULL DEFAULT 60000, -- 调用模型服务超时(毫秒)
|
||||
retry_times SMALLINT NOT NULL DEFAULT 0, -- 失败重试次数(0表示不重试)
|
||||
auto_clean_seconds INT NOT NULL DEFAULT 86400, -- 已下载(state=4)后的保留时间(秒),到期后清理
|
||||
remark TEXT DEFAULT '' -- 备注
|
||||
api_key VARCHAR(1024) DEFAULT '', -- 请求头绑定(支持多个,逗号分隔):X-API-Key:xxx,operation:true
|
||||
|
||||
enabled SMALLINT NOT NULL DEFAULT 1, -- 是否启用:1启用/0停用
|
||||
max_concurrency INT NOT NULL DEFAULT 10, -- 单模型最大并发
|
||||
queue_limit INT NOT NULL DEFAULT 1000, -- 排队上限(近似控制)
|
||||
timeout_seconds INT NOT NULL DEFAULT 60, -- 调用模型服务超时(秒)
|
||||
|
||||
retry_times SMALLINT NOT NULL DEFAULT 0, -- 失败后最多再重试 N 次(不含首次)
|
||||
retry_queue_max_seconds INT NOT NULL DEFAULT 0, -- 失败重试最大排队时间(秒):0=插队到队首;>0=排队超过该时间后插队,否则仍到队尾
|
||||
|
||||
auto_clean_seconds INT NOT NULL DEFAULT 86400, -- 已下载(state=4)后的保留时间(秒)
|
||||
remark TEXT DEFAULT '' -- 备注
|
||||
);
|
||||
|
||||
-- 索引/约束
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_models_tenant_model_name ON asynch_models(tenant_id, model_name);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_models_tenant_model_name
|
||||
ON asynch_models(tenant_id, model_name);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_models_tenant_id ON asynch_models(tenant_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_models_model_name ON asynch_models(model_name);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_models_enabled ON asynch_models(enabled);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_models_deleted_at ON asynch_models(deleted_at);
|
||||
|
||||
-- 表和字段注释
|
||||
COMMENT ON TABLE asynch_models IS '异步模型表(模型服务配置)';
|
||||
COMMENT ON COLUMN asynch_models.id IS '主键ID(非自增)';
|
||||
COMMENT ON COLUMN asynch_models.tenant_id IS '租户ID';
|
||||
COMMENT ON COLUMN asynch_models.creator IS '创建人';
|
||||
COMMENT ON COLUMN asynch_models.created_at IS '创建时间';
|
||||
COMMENT ON COLUMN asynch_models.updater IS '更新人';
|
||||
COMMENT ON COLUMN asynch_models.updated_at IS '更新时间';
|
||||
COMMENT ON COLUMN asynch_models.deleted_at IS '删除时间(软删)';
|
||||
COMMENT ON COLUMN asynch_models.model_name IS '模型名称(路由键)';
|
||||
COMMENT ON COLUMN asynch_models.base_url IS '模型服务基础地址';
|
||||
COMMENT ON COLUMN asynch_models.route IS '模型服务路由';
|
||||
COMMENT ON COLUMN asynch_models.http_method IS '请求方式:GET/POST';
|
||||
COMMENT ON COLUMN asynch_models.api_key IS '请求密钥绑定(请求头),示例:TTS_API_KEY:your-key';
|
||||
COMMENT ON COLUMN asynch_models.enabled IS '是否启用:1启用/0停用';
|
||||
COMMENT ON COLUMN asynch_models.max_concurrency IS '单模型最大并发';
|
||||
COMMENT ON COLUMN asynch_models.queue_limit IS '单模型排队上限';
|
||||
COMMENT ON COLUMN asynch_models.timeout_ms IS '调用模型服务超时(毫秒)';
|
||||
COMMENT ON COLUMN asynch_models.retry_times IS '失败重试次数';
|
||||
COMMENT ON COLUMN asynch_models.auto_clean_seconds IS '已下载(state=4)后的保留时间(秒),到期后清理';
|
||||
COMMENT ON COLUMN asynch_models.remark IS '备注';
|
||||
|
||||
-- 兼容已有库:更新字段注释
|
||||
COMMENT ON COLUMN asynch_models.auto_clean_seconds IS '已下载(state=4)后的保留时间(秒),到期后清理';
|
||||
COMMENT ON COLUMN asynch_models.api_key IS '请求头绑定(支持多个,逗号分隔):X-API-Key:xxx,operation:true';
|
||||
COMMENT ON COLUMN asynch_models.retry_times IS '失败后最多再重试 N 次(不含首次)';
|
||||
COMMENT ON COLUMN asynch_models.retry_queue_max_seconds IS '失败重试最大排队时间(秒):0=插队到队首;>0=排队超过该时间后插队,否则仍到队尾';
|
||||
COMMENT ON COLUMN asynch_models.auto_clean_seconds IS '已下载(state=4)后的保留时间(秒),到期清理';
|
||||
|
||||
|
||||
-- 异步任务表
|
||||
-- =========================
|
||||
-- 2) asynch_task
|
||||
-- =========================
|
||||
CREATE TABLE IF NOT EXISTS asynch_task (
|
||||
-- 基础字段(与现有表保持一致)
|
||||
id BIGINT PRIMARY KEY, -- 主键ID(非自增)
|
||||
tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID
|
||||
id BIGINT PRIMARY KEY, -- 主键ID(非自增)
|
||||
tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID
|
||||
creator VARCHAR(64) NOT NULL,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updater VARCHAR(64) NOT NULL,
|
||||
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
deleted_at timestamp(6),
|
||||
deleted_at TIMESTAMP(6),
|
||||
|
||||
-- 任务核心字段
|
||||
model_name VARCHAR(128) NOT NULL, -- 模型名称
|
||||
task_id VARCHAR(64) NOT NULL, -- 任务ID(对外返回)
|
||||
state SMALLINT NOT NULL DEFAULT 0, -- 状态:0排队中/1执行中/2成功/3失败/4已下载
|
||||
oss_file VARCHAR(512) DEFAULT '', -- OSS文件URL/Key
|
||||
file_type VARCHAR(32) DEFAULT '', -- 文件类型(如 mp3/mp4/png/pdf/...)
|
||||
file_size BIGINT NOT NULL DEFAULT 0, -- 文件大小(字节)
|
||||
error_msg TEXT DEFAULT '', -- 错误信息
|
||||
model_name VARCHAR(128) NOT NULL, -- 模型名称
|
||||
task_id VARCHAR(64) NOT NULL, -- 任务ID(对外返回)
|
||||
state SMALLINT NOT NULL DEFAULT 0, -- 0排队中/1执行中/2成功/3失败/4已下载
|
||||
|
||||
-- 执行与清理字段
|
||||
started_at TIMESTAMP, -- 开始执行时间
|
||||
finished_at TIMESTAMP, -- 执行结束时间
|
||||
expire_at TIMESTAMP, -- 过期清理时间(已下载后写入:updated_at + auto_clean_seconds)
|
||||
retry_count INT NOT NULL DEFAULT 0, -- 已重试次数(不含首次)
|
||||
enqueue_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 入队时间(用于排队顺序,重试会更新时间到队尾)
|
||||
oss_file VARCHAR(512) DEFAULT '', -- 结果文件OSS地址
|
||||
file_type VARCHAR(32) DEFAULT '', -- 文件类型(mp3/mp4/png/...)
|
||||
file_size BIGINT NOT NULL DEFAULT 0, -- 文件大小(字节)
|
||||
error_msg TEXT DEFAULT '', -- 错误信息
|
||||
|
||||
-- 输入信息(用于排障/重放;如不需要可不写入或置空)
|
||||
input_ref TEXT DEFAULT '', -- 输入引用(如上传文件URL/OSS key/业务侧资源ID)
|
||||
request_payload JSONB -- 请求参数(可选)
|
||||
started_at TIMESTAMP, -- 开始执行时间
|
||||
finished_at TIMESTAMP, -- 执行结束时间
|
||||
|
||||
expire_at TIMESTAMP, -- state=4 后写入,用于清理
|
||||
|
||||
-- 重试/排队
|
||||
retry_count INT NOT NULL DEFAULT 0, -- 已重试次数(不含首次)
|
||||
enqueue_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 入队时间(用于排队顺序)
|
||||
|
||||
-- 任务执行阶段:用于区分“重试模型”与“仅重试 OSS”
|
||||
phase SMALLINT NOT NULL DEFAULT 0, -- 0模型阶段/1OSS阶段
|
||||
tmp_file TEXT DEFAULT '', -- 临时结果文件路径(phase=1 时仅重试 OSS 上传)
|
||||
|
||||
-- 输入信息(可选)
|
||||
input_ref TEXT DEFAULT '', -- 输入引用(如OSS/业务资源ID等)
|
||||
request_payload JSONB -- 请求参数(可选)
|
||||
);
|
||||
|
||||
-- 索引/约束
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_task_tenant_task_id ON asynch_task(tenant_id, task_id);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_task_tenant_task_id
|
||||
ON asynch_task(tenant_id, task_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_task_tenant_id ON asynch_task(tenant_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_task_model_name ON asynch_task(model_name);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_task_state ON asynch_task(state);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_task_enqueue_at ON asynch_task(enqueue_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_task_updated_at ON asynch_task(updated_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_task_expire_at ON asynch_task(expire_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_task_deleted_at ON asynch_task(deleted_at);
|
||||
|
||||
-- 表和字段注释
|
||||
COMMENT ON TABLE asynch_task IS '异步任务表';
|
||||
COMMENT ON COLUMN asynch_task.id IS '主键ID(非自增)';
|
||||
COMMENT ON COLUMN asynch_task.tenant_id IS '租户ID';
|
||||
COMMENT ON COLUMN asynch_task.creator IS '创建人';
|
||||
COMMENT ON COLUMN asynch_task.created_at IS '创建时间';
|
||||
COMMENT ON COLUMN asynch_task.updater IS '更新人';
|
||||
COMMENT ON COLUMN asynch_task.updated_at IS '更新时间';
|
||||
COMMENT ON COLUMN asynch_task.deleted_at IS '删除时间(软删)';
|
||||
COMMENT ON COLUMN asynch_task.model_name IS '模型名称';
|
||||
COMMENT ON COLUMN asynch_task.task_id IS '任务ID(对外返回)';
|
||||
COMMENT ON COLUMN asynch_task.state IS '状态:0排队中/1执行中/2成功/3失败/4已下载';
|
||||
COMMENT ON COLUMN asynch_task.oss_file IS 'OSS文件URL/Key';
|
||||
COMMENT ON COLUMN asynch_task.file_type IS '文件类型';
|
||||
COMMENT ON COLUMN asynch_task.file_size IS '文件大小(字节)';
|
||||
COMMENT ON COLUMN asynch_task.error_msg IS '错误信息';
|
||||
COMMENT ON COLUMN asynch_task.started_at IS '开始执行时间';
|
||||
COMMENT ON COLUMN asynch_task.finished_at IS '执行结束时间';
|
||||
COMMENT ON COLUMN asynch_task.expire_at IS '过期清理时间(已下载后保留到期)';
|
||||
COMMENT ON COLUMN asynch_task.state IS '0排队中/1执行中/2成功/3失败/4已下载';
|
||||
COMMENT ON COLUMN asynch_task.retry_count IS '已重试次数(不含首次)';
|
||||
COMMENT ON COLUMN asynch_task.enqueue_at IS '入队时间(用于排队顺序,重试会更新时间到队尾)';
|
||||
COMMENT ON COLUMN asynch_task.input_ref IS '输入引用(如上传文件URL/OSS key/业务侧资源ID)';
|
||||
COMMENT ON COLUMN asynch_task.request_payload IS '请求参数(JSON)';
|
||||
COMMENT ON COLUMN asynch_task.enqueue_at IS '入队时间(用于排队顺序)';
|
||||
COMMENT ON COLUMN asynch_task.phase IS '执行阶段:0模型阶段/1OSS阶段(模型已成功,等待上传OSS)';
|
||||
COMMENT ON COLUMN asynch_task.tmp_file IS '临时结果文件路径(phase=1 时仅重试 OSS 上传)';
|
||||
|
||||
-- 兼容已有库:增量加字段(PostgreSQL 支持 IF NOT EXISTS)
|
||||
ALTER TABLE asynch_task ADD COLUMN IF NOT EXISTS retry_count INT NOT NULL DEFAULT 0;
|
||||
ALTER TABLE asynch_task ADD COLUMN IF NOT EXISTS enqueue_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP;
|
||||
COMMENT ON COLUMN asynch_task.retry_count IS '已重试次数(不含首次)';
|
||||
COMMENT ON COLUMN asynch_task.enqueue_at IS '入队时间(用于排队顺序,重试会更新时间到队尾)';
|
||||
COMMENT ON COLUMN asynch_task.state IS '状态:0排队中/1执行中/2成功/3失败/4已下载';
|
||||
COMMENT ON COLUMN asynch_task.expire_at IS '过期清理时间(已下载后保留到期)';
|
||||
|
||||
-- 对存量数据进行入队时间回填(只在 enqueue_at 为空时)
|
||||
UPDATE asynch_task SET enqueue_at = created_at WHERE enqueue_at IS NULL;
|
||||
-- =========================
|
||||
-- 3) asynch_op_log
|
||||
-- =========================
|
||||
CREATE TABLE IF NOT EXISTS asynch_op_log (
|
||||
-- 基础字段(与现有表保持一致)
|
||||
id BIGINT PRIMARY KEY,
|
||||
tenant_id BIGINT NOT NULL DEFAULT 0,
|
||||
creator VARCHAR(64) NOT NULL,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updater VARCHAR(64) NOT NULL,
|
||||
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
deleted_at TIMESTAMP(6),
|
||||
|
||||
-- 基础审计信息
|
||||
ip VARCHAR(64) DEFAULT '',
|
||||
user_agent VARCHAR(256) DEFAULT '',
|
||||
api_path VARCHAR(256) DEFAULT '',
|
||||
http_method VARCHAR(16) DEFAULT '',
|
||||
|
||||
-- 业务信息
|
||||
biz_name VARCHAR(128) NOT NULL DEFAULT '', -- 调用方业务模块/系统
|
||||
model_name VARCHAR(128) NOT NULL DEFAULT '',
|
||||
task_id VARCHAR(64) NOT NULL DEFAULT '',
|
||||
|
||||
-- 统计字段
|
||||
op_type VARCHAR(64) NOT NULL DEFAULT 'createTask', -- 操作类型(默认创建任务)
|
||||
success SMALLINT NOT NULL DEFAULT 1, -- 1成功/0失败
|
||||
error_msg TEXT DEFAULT '',
|
||||
cost_ms BIGINT NOT NULL DEFAULT 0, -- 耗时(毫秒)
|
||||
|
||||
-- 请求/响应 JSON(用于后期统计分析)
|
||||
request_payload JSONB,
|
||||
response_payload JSONB
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_tenant_time ON asynch_op_log(tenant_id, created_at);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_model_name ON asynch_op_log(model_name);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_biz_name ON asynch_op_log(biz_name);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_task_id ON asynch_op_log(task_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_op_type ON asynch_op_log(op_type);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_deleted_at ON asynch_op_log(deleted_at);
|
||||
|
||||
COMMENT ON TABLE asynch_op_log IS '操作记录日志表(创建任务等,用于统计)';
|
||||
COMMENT ON COLUMN asynch_op_log.biz_name IS '业务名称(调用方模块/系统)';
|
||||
COMMENT ON COLUMN asynch_op_log.model_name IS '模型名称';
|
||||
COMMENT ON COLUMN asynch_op_log.task_id IS '任务ID';
|
||||
COMMENT ON COLUMN asynch_op_log.op_type IS '操作类型(如 createTask/getTaskResult/getTaskBatch 等)';
|
||||
COMMENT ON COLUMN asynch_op_log.success IS '是否成功:1成功/0失败';
|
||||
COMMENT ON COLUMN asynch_op_log.error_msg IS '错误信息(失败时)';
|
||||
COMMENT ON COLUMN asynch_op_log.cost_ms IS '耗时(毫秒)';
|
||||
COMMENT ON COLUMN asynch_op_log.request_payload IS '请求 JSON';
|
||||
COMMENT ON COLUMN asynch_op_log.response_payload IS '响应 JSON';
|
||||
|
||||
|
||||
-- =========================
|
||||
-- 4) asynch_model_stat
|
||||
-- =========================
|
||||
CREATE TABLE IF NOT EXISTS asynch_model_stat (
|
||||
day DATE NOT NULL, -- 天(YYYY-MM-DD)
|
||||
tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID
|
||||
creator VARCHAR(64) NOT NULL DEFAULT '', -- 创建人
|
||||
model_name VARCHAR(128) NOT NULL DEFAULT '', -- 模型名称
|
||||
request_count BIGINT NOT NULL DEFAULT 0, -- 请求次数
|
||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY(day, tenant_id, creator, model_name)
|
||||
);
|
||||
|
||||
-- 便于时间段/租户/人/模型过滤
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_model_stat_tenant_day ON asynch_model_stat(tenant_id, day);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_model_stat_day ON asynch_model_stat(day);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_model_stat_model_name ON asynch_model_stat(model_name);
|
||||
CREATE INDEX IF NOT EXISTS idx_asynch_model_stat_creator ON asynch_model_stat(creator);
|
||||
|
||||
COMMENT ON TABLE asynch_model_stat IS '按天模型请求统计(用于限流/监控)';
|
||||
COMMENT ON COLUMN asynch_model_stat.day IS '天(YYYY-MM-DD)';
|
||||
COMMENT ON COLUMN asynch_model_stat.tenant_id IS '租户ID';
|
||||
COMMENT ON COLUMN asynch_model_stat.creator IS '创建人';
|
||||
COMMENT ON COLUMN asynch_model_stat.model_name IS '模型名称';
|
||||
COMMENT ON COLUMN asynch_model_stat.request_count IS '请求次数';
|
||||
COMMENT ON COLUMN asynch_model_stat.created_at IS '创建时间';
|
||||
COMMENT ON COLUMN asynch_model_stat.updated_at IS '更新时间';
|
||||
|
||||
Reference in New Issue
Block a user