Compare commits
1 Commits
dev
...
e79afc8fa3
| Author | SHA1 | Date | |
|---|---|---|---|
| e79afc8fa3 |
96
README.md
96
README.md
@@ -13,8 +13,8 @@
|
|||||||
- 支持配置:
|
- 支持配置:
|
||||||
- 请求地址:`base_url + route`
|
- 请求地址:`base_url + route`
|
||||||
- 请求方式:`http_method`(GET/POST)
|
- 请求方式:`http_method`(GET/POST)
|
||||||
- 请求密钥:`api_key`(以请求头注入,支持多个 header)
|
- 请求密钥:`api_key`(以请求头注入,示例:`TTS_API_KEY:your-key`)
|
||||||
- 超时:`timeout_seconds`
|
- 超时:`timeout_ms`
|
||||||
- 并发:`max_concurrency`(按租户+模型的 Redis 分布式信号量限流)
|
- 并发:`max_concurrency`(按租户+模型的 Redis 分布式信号量限流)
|
||||||
- 重试:`retry_times`(失败后最多再重试 N 次)
|
- 重试:`retry_times`(失败后最多再重试 N 次)
|
||||||
- 保留:`auto_clean_seconds`(任务被业务领取到 `state=4` 后的保留秒数,到期清理)
|
- 保留:`auto_clean_seconds`(任务被业务领取到 `state=4` 后的保留秒数,到期清理)
|
||||||
@@ -26,73 +26,38 @@
|
|||||||
- 调用模型服务(GET/POST)
|
- 调用模型服务(GET/POST)
|
||||||
- 结果上传 OSS(调用你们的 OSS 文件服务 `oss/file/uploadFile`,透传 `Authorization/X-User-Info`)
|
- 结果上传 OSS(调用你们的 OSS 文件服务 `oss/file/uploadFile`,透传 `Authorization/X-User-Info`)
|
||||||
- 批量领取结果:批量查询 `task_id` 列表,返回 `task_id/state/oss_file`,并把成功的任务从 `state=2` 更新为 `state=4`
|
- 批量领取结果:批量查询 `task_id` 列表,返回 `task_id/state/oss_file`,并把成功的任务从 `state=2` 更新为 `state=4`
|
||||||
- 自动重试:失败 `state=3` 会由清理器按 `retry_times` 重新入队到队尾
|
- 自动重试:失败 `state=3` 会由清理器按 `retry_times` 重新入队到队尾(保证先来后到)
|
||||||
- 自动清理:
|
- 自动清理:
|
||||||
- `state=4` 且 `expire_at` 到期 → 硬删除任务
|
- `state=4` 且 `expire_at` 到期 → 硬删除任务(并尝试删除 OSS)
|
||||||
- 失败重试耗尽仍失败 → 硬删除任务
|
- 失败重试耗尽仍失败 → 硬删除任务(并尝试删除 OSS)
|
||||||
- `state=0/1` 超时 → 标记失败(防止卡死)
|
- `state=0/1` 超时 → 标记失败(防止卡死)
|
||||||
|
|
||||||
### 1.3 统计(asynch_model_stat)
|
|
||||||
- 按天统计:`day + tenant_id + creator + model_name -> request_count`
|
|
||||||
- 统计口径:仅在 Worker 真正调用模型服务时计数(OSS 重试不计数)
|
|
||||||
- 用途:给其他服务提供全局限流/监控依据
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 2. 使用流程(业务方如何接入)
|
## 2. 使用流程(业务方如何接入)
|
||||||
|
|
||||||
### 第一步:创建模型配置
|
### 2.1 第一步:配置模型
|
||||||
业务方(或运维)先在中间件里创建/更新模型配置(`model_name` 为唯一键),例如:
|
调用“模型管理”接口新增模型配置(例:TTS):
|
||||||
- `POST /model/createModel`(或 `/model/updateModel`)
|
- `model_name=tts`
|
||||||
|
- `base_url=http://xxx:port`
|
||||||
|
- `route=/tts`
|
||||||
|
- `http_method=POST`
|
||||||
|
- `api_key=TTS_API_KEY:your-key`(可选)
|
||||||
|
|
||||||
请求示例(JSON):
|
### 2.2 第二步:创建任务拿 task_id
|
||||||
```json
|
业务方调用 `CreateTask`,传 `modelName + requestPayload`,中间件返回 `task_id`。
|
||||||
{
|
业务方把 `task_id` 落到自己的业务表(状态=生成中)。
|
||||||
"modelName": "model-service",
|
|
||||||
"baseUrl": "http://127.0.0.1:8000",
|
|
||||||
"route": "/api/v1/chat",
|
|
||||||
"httpMethod": "POST",
|
|
||||||
"apiKey": "API_KEY:model-key,API_STATE:true,API_NUM:123",
|
|
||||||
"enabled": 1,
|
|
||||||
"maxConcurrency": 5,
|
|
||||||
"queueLimit": 20,
|
|
||||||
"timeoutSeconds": 1800,
|
|
||||||
"retryTimes": 3,
|
|
||||||
"retryQueueMaxSeconds": 600,
|
|
||||||
"autoCleanSeconds": 3600,
|
|
||||||
"remark": "Model-Service 模型服务"
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
参数说明:
|
### 2.3 第三步:业务方领取结果(推荐批量)
|
||||||
- `modelName`:模型名称(唯一标识/路由键)
|
业务方在自己的“定时任务服务/轮询器”中,批量把 `task_id` 列表传给中间件:
|
||||||
- `baseUrl`:模型服务地址(Base URL)
|
- 返回每个任务的 `state + oss_file`
|
||||||
- `route`:模型服务路由(拼接到 baseUrl 后)
|
- 对 `state=2(成功)` 的任务,中间件会更新为 `state=4(已下载)` 并写入 `expire_at = now + auto_clean_seconds`
|
||||||
- `httpMethod`:请求方式(GET/POST)
|
|
||||||
- `apiKey`:请求头绑定(支持多个 header,逗号分隔,格式 `Key:Value`;布尔/数字也会以字符串形式注入 header)
|
|
||||||
- `enabled`:是否启用(0禁用/1启用)
|
|
||||||
- `maxConcurrency`:单模型最大并发(按租户+模型维度限流)
|
|
||||||
- `queueLimit`:排队上限(近似控制,超过则拒绝创建)
|
|
||||||
- `timeoutSeconds`:调用模型服务超时(秒)
|
|
||||||
- `retryTimes`:失败后最多再重试 N 次(不含首次)
|
|
||||||
- `retryQueueMaxSeconds`:失败重试最大排队时间(秒);0 表示重试插队到队首;>0 表示排队超过该时间后插队,否则仍到队尾
|
|
||||||
- `autoCleanSeconds`:任务被领取到 `state=4` 后的保留时间(秒),到期清理
|
|
||||||
- `remark`:备注说明
|
|
||||||
|
|
||||||
### 第二步:创建任务拿到 task_id
|
业务方拿到 `oss_file` 后做“业务转移”:
|
||||||
业务方发起推理请求时调用:
|
- 方案 A:直接在业务表保存 `oss_file` 作为最终资源地址
|
||||||
- `POST /task/createTask`(传 `modelName + requestPayload (+ bizName)`)
|
- 方案 B:业务侧下载后重新上传到业务自己的资产域,再保存新地址
|
||||||
- 中间件返回 `task_id`
|
|
||||||
- 业务方将 `task_id` 落到自己的业务表,并把业务状态置为「生成中」
|
|
||||||
|
|
||||||
### 第三步:同步任务进度(推荐批量)
|
> `state=4` 的数据允许重复获取,避免业务侧偶发中断导致“领取不到结果”。
|
||||||
业务方通过轮询/定时任务同步进度:
|
|
||||||
- 推荐:`POST /task/getTaskBatch`(批量传 `taskIds`,返回每个任务的 `state + oss_file`)
|
|
||||||
- 或单条:`GET /task/getTaskResult?taskId=...`
|
|
||||||
|
|
||||||
业务侧拿到 `oss_file` 后自行做资源处理(直接保存或转存),并把业务状态更新为「成功/失败」。
|
|
||||||
|
|
||||||
> 说明:批量接口对 `state=2(成功)` 的任务会自动标记为 `state=4(已下载)` 并写入 `expire_at`,用于后续清理。
|
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -128,13 +93,21 @@
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 5. 数据库初始化
|
## 5. 数据库初始化/升级
|
||||||
|
|
||||||
项目根目录提供 `update.sql`:首次部署执行建表 SQL。
|
项目根目录提供 `update.sql`:
|
||||||
|
- 首次部署:执行建表 SQL
|
||||||
|
- 升级:执行 `ALTER TABLE ... ADD COLUMN IF NOT EXISTS ...` 的增量语句
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 6. 开发与发布建议(Git)
|
## 6. 接口文档
|
||||||
|
|
||||||
|
更详细接口示例见:`docs/api.md`(包含模型管理、任务接口、批量领取接口、字段说明)。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 7. 开发与发布建议(Git)
|
||||||
|
|
||||||
- `dev`:日常开发与联调
|
- `dev`:日常开发与联调
|
||||||
- `main`:线上稳定分支
|
- `main`:线上稳定分支
|
||||||
@@ -142,3 +115,4 @@
|
|||||||
1) 从 `main` 拉出 `dev`
|
1) 从 `main` 拉出 `dev`
|
||||||
2) 功能完成后提 MR/PR 合并回 `main`
|
2) 功能完成后提 MR/PR 合并回 `main`
|
||||||
3) 打 tag / 发布镜像
|
3) 打 tag / 发布镜像
|
||||||
|
|
||||||
|
|||||||
19
config.yml
19
config.yml
@@ -31,22 +31,29 @@ database:
|
|||||||
asynch:
|
asynch:
|
||||||
worker:
|
worker:
|
||||||
enabled: true # 是否启用后台 worker(开发环境可关闭避免刷DB错误)
|
enabled: true # 是否启用后台 worker(开发环境可关闭避免刷DB错误)
|
||||||
pollInterval: "10s" # 轮询间隔(DB抢占 pending 任务)
|
pollInterval: "5s" # 轮询间隔(DB抢占 pending 任务)
|
||||||
batchSize: 10 # 每次抢占任务数量
|
batchSize: 5 # 每次抢占任务数量
|
||||||
goroutines: 1 # worker 并发数(每个 goroutine 串行处理)
|
goroutines: 1 # worker 并发数(每个 goroutine 串行处理)
|
||||||
taskTimeout: "5m" # state=0/1 超时自动失败
|
taskTimeout: "5m" # state=0/1 超时自动失败
|
||||||
cleaner:
|
cleaner:
|
||||||
enabled: true # 是否启用自动清理器(可选)
|
enabled: true # 是否启用自动清理器(可选)
|
||||||
interval: "10s" # 清理任务扫描间隔
|
interval: "5s" # 清理任务扫描间隔
|
||||||
|
|
||||||
redis:
|
redis:
|
||||||
default:
|
default:
|
||||||
address: 192.168.3.30:6379
|
address: 116.204.74.41:6379
|
||||||
db: 0
|
db: 0
|
||||||
|
|
||||||
consul:
|
consul:
|
||||||
address: 192.168.3.30:8500
|
address: 116.204.74.41:8500
|
||||||
|
|
||||||
jaeger:
|
jaeger:
|
||||||
addr: 192.168.3.30:4318
|
addr: 116.204.74.41:4318
|
||||||
|
|
||||||
|
# OSS 文件服务
|
||||||
|
# 当前实现:通过 common/http 的服务发现直接调用:
|
||||||
|
# POST oss/file/uploadFile (multipart/form-data)
|
||||||
|
# 鉴权:透传 Authorization / X-User-Info
|
||||||
|
oss:
|
||||||
|
addr: "116.204.74.41:9000"
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,4 @@ package public
|
|||||||
const (
|
const (
|
||||||
TableNameModel = "asynch_models" // 异步模型表
|
TableNameModel = "asynch_models" // 异步模型表
|
||||||
TableNameTask = "asynch_task" // 异步任务表
|
TableNameTask = "asynch_task" // 异步任务表
|
||||||
TableNameOpLog = "asynch_op_log" // 异步操作日志表
|
|
||||||
TableNameStat = "asynch_model_stat" // 按天统计表(请求次数)
|
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -56,11 +56,7 @@ func (c *model) ListModel(ctx context.Context, req *dto.ListModelReq) (res *dto.
|
|||||||
pageSize = int(req.Page.PageSize)
|
pageSize = int(req.Page.PageSize)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
modelName := ""
|
list, total, err := service.Model.List(ctx, pageNum, pageSize)
|
||||||
if req != nil {
|
|
||||||
modelName = req.ModelName
|
|
||||||
}
|
|
||||||
list, total, err := service.Model.List(ctx, pageNum, pageSize, modelName)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,20 +0,0 @@
|
|||||||
package controller
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"model-asynch/model/dto"
|
|
||||||
"model-asynch/service"
|
|
||||||
)
|
|
||||||
|
|
||||||
type stat struct{}
|
|
||||||
|
|
||||||
// Stat 统计控制器
|
|
||||||
var Stat = new(stat)
|
|
||||||
|
|
||||||
// ListModelStat 统计列表
|
|
||||||
func (c *stat) ListModelStat(ctx context.Context, req *dto.ListModelStatReq) (res *dto.ListModelStatRes, err error) {
|
|
||||||
ctx = ensureUser(ctx)
|
|
||||||
return service.Stat.List(ctx, req)
|
|
||||||
}
|
|
||||||
|
|
||||||
@@ -29,9 +29,3 @@ func (c *task) GetTaskBatch(ctx context.Context, req *dto.GetTaskBatchReq) (res
|
|||||||
ctx = ensureUser(ctx)
|
ctx = ensureUser(ctx)
|
||||||
return service.Task.GetBatch(ctx, req)
|
return service.Task.GetBatch(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListTask 任务列表分页查询
|
|
||||||
func (c *task) ListTask(ctx context.Context, req *dto.ListTaskReq) (res *dto.ListTaskRes, err error) {
|
|
||||||
ctx = ensureUser(ctx)
|
|
||||||
return service.Task.List(ctx, req)
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -73,11 +73,8 @@ func (d *modelDao) GetByID(ctx context.Context, id int64) (m *entity.AsynchModel
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *modelDao) List(ctx context.Context, pageNum, pageSize int, modelNameLike string) (list []*entity.AsynchModel, total int64, err error) {
|
func (d *modelDao) List(ctx context.Context, pageNum, pageSize int) (list []*entity.AsynchModel, total int64, err error) {
|
||||||
model := gfdb.DB(ctx).Model(ctx, public.TableNameModel).Where("deleted_at IS NULL").OrderDesc(entity.AsynchModelCol.CreatedAt)
|
model := gfdb.DB(ctx).Model(ctx, public.TableNameModel).OrderDesc(entity.AsynchModelCol.CreatedAt)
|
||||||
if modelNameLike != "" {
|
|
||||||
model = model.WhereLike(entity.AsynchModelCol.ModelName, "%"+modelNameLike+"%")
|
|
||||||
}
|
|
||||||
if pageNum > 0 && pageSize > 0 {
|
if pageNum > 0 && pageSize > 0 {
|
||||||
model = model.Page(pageNum, pageSize)
|
model = model.Page(pageNum, pageSize)
|
||||||
}
|
}
|
||||||
@@ -89,3 +86,4 @@ func (d *modelDao) List(ctx context.Context, pageNum, pageSize int, modelNameLik
|
|||||||
err = r.Structs(&list)
|
err = r.Structs(&list)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,22 +0,0 @@
|
|||||||
package dao
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"model-asynch/consts/public"
|
|
||||||
"model-asynch/model/entity"
|
|
||||||
|
|
||||||
"gitea.com/red-future/common/db/gfdb"
|
|
||||||
)
|
|
||||||
|
|
||||||
type opLogDao struct{}
|
|
||||||
|
|
||||||
var OpLog = &opLogDao{}
|
|
||||||
|
|
||||||
func (d *opLogDao) Insert(ctx context.Context, log *entity.AsynchOpLog) (id int64, err error) {
|
|
||||||
r, err := gfdb.DB(ctx).Model(ctx, public.TableNameOpLog).Data(log).Insert()
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
return r.LastInsertId()
|
|
||||||
}
|
|
||||||
@@ -1,61 +0,0 @@
|
|||||||
package dao
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"model-asynch/consts/public"
|
|
||||||
"model-asynch/model/entity"
|
|
||||||
|
|
||||||
"gitea.com/red-future/common/db/gfdb"
|
|
||||||
"github.com/gogf/gf/v2/os/gtime"
|
|
||||||
)
|
|
||||||
|
|
||||||
type statDao struct{}
|
|
||||||
|
|
||||||
var Stat = &statDao{}
|
|
||||||
|
|
||||||
// IncRequestCount 原子累加(支持分布式/多协程):按天+租户+创建人+模型 +1
|
|
||||||
func (d *statDao) IncRequestCount(ctx context.Context, day time.Time, tenantId int64, creator, modelName string) error {
|
|
||||||
sql := fmt.Sprintf(`
|
|
||||||
INSERT INTO %s(day, tenant_id, creator, model_name, request_count, created_at, updated_at)
|
|
||||||
VALUES(?, ?, ?, ?, 1, NOW(), NOW())
|
|
||||||
ON CONFLICT (day, tenant_id, creator, model_name)
|
|
||||||
DO UPDATE SET request_count = %s.request_count + 1, updated_at = NOW()`,
|
|
||||||
public.TableNameStat, public.TableNameStat,
|
|
||||||
)
|
|
||||||
_, err := gfdb.DB(ctx).Exec(ctx, sql, gtime.New(day).Format("Y-m-d"), tenantId, creator, modelName)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *statDao) List(ctx context.Context, pageNum, pageSize int, startDay, endDay string, tenantId *int64, creator, modelName string) (list []*entity.AsynchModelStat, total int64, err error) {
|
|
||||||
m := gfdb.DB(ctx).Model(ctx, public.TableNameStat).Where("1=1")
|
|
||||||
if startDay != "" {
|
|
||||||
m = m.Where("day >= ?", startDay)
|
|
||||||
}
|
|
||||||
if endDay != "" {
|
|
||||||
m = m.Where("day <= ?", endDay)
|
|
||||||
}
|
|
||||||
if tenantId != nil {
|
|
||||||
m = m.Where("tenant_id = ?", *tenantId)
|
|
||||||
}
|
|
||||||
if creator != "" {
|
|
||||||
m = m.WhereLike("creator", "%"+creator+"%")
|
|
||||||
}
|
|
||||||
if modelName != "" {
|
|
||||||
m = m.WhereLike("model_name", "%"+modelName+"%")
|
|
||||||
}
|
|
||||||
m = m.OrderDesc("day").OrderDesc("request_count")
|
|
||||||
if pageNum > 0 && pageSize > 0 {
|
|
||||||
m = m.Page(pageNum, pageSize)
|
|
||||||
}
|
|
||||||
r, totalInt, err := m.AllAndCount(false)
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
total = int64(totalInt)
|
|
||||||
err = r.Structs(&list)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
@@ -11,7 +11,6 @@ import (
|
|||||||
"gitea.com/red-future/common/db/gfdb"
|
"gitea.com/red-future/common/db/gfdb"
|
||||||
"github.com/gogf/gf/v2/database/gdb"
|
"github.com/gogf/gf/v2/database/gdb"
|
||||||
"github.com/gogf/gf/v2/os/gtime"
|
"github.com/gogf/gf/v2/os/gtime"
|
||||||
"github.com/gogf/gf/v2/util/gconv"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var Task = &taskDao{}
|
var Task = &taskDao{}
|
||||||
@@ -137,31 +136,6 @@ func (d *taskDao) CountActiveByModel(ctx context.Context, modelName string) (int
|
|||||||
return int64(n), err
|
return int64(n), err
|
||||||
}
|
}
|
||||||
|
|
||||||
// List 任务分页查询(受 gfdb 租户 Hook 影响)
|
|
||||||
func (d *taskDao) List(ctx context.Context, pageNum, pageSize int, modelNameLike, taskIDLike string, state *int) (list []*entity.AsynchTask, total int64, err error) {
|
|
||||||
m := gfdb.DB(ctx).Model(ctx, public.TableNameTask).Where("deleted_at IS NULL")
|
|
||||||
if modelNameLike != "" {
|
|
||||||
m = m.WhereLike(entity.AsynchTaskCol.ModelName, "%"+modelNameLike+"%")
|
|
||||||
}
|
|
||||||
if taskIDLike != "" {
|
|
||||||
m = m.WhereLike(entity.AsynchTaskCol.TaskID, "%"+taskIDLike+"%")
|
|
||||||
}
|
|
||||||
if state != nil {
|
|
||||||
m = m.Where(entity.AsynchTaskCol.State, *state)
|
|
||||||
}
|
|
||||||
m = m.OrderDesc(entity.AsynchTaskCol.CreatedAt)
|
|
||||||
if pageNum > 0 && pageSize > 0 {
|
|
||||||
m = m.Page(pageNum, pageSize)
|
|
||||||
}
|
|
||||||
r, totalInt, err := m.AllAndCount(false)
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
total = gconv.Int64(totalInt)
|
|
||||||
err = r.Structs(&list)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// ClaimPending 抢占 pending 任务(state=0),并在同一事务中更新为 running(state=1)
|
// ClaimPending 抢占 pending 任务(state=0),并在同一事务中更新为 running(state=1)
|
||||||
// 使用 PostgreSQL: FOR UPDATE SKIP LOCKED 避免多 worker 重复消费
|
// 使用 PostgreSQL: FOR UPDATE SKIP LOCKED 避免多 worker 重复消费
|
||||||
func (d *taskDao) ClaimPending(ctx context.Context, batchSize int) (tasks []*entity.AsynchTask, err error) {
|
func (d *taskDao) ClaimPending(ctx context.Context, batchSize int) (tasks []*entity.AsynchTask, err error) {
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ func (d *taskDao) ClaimPendingGlobal(ctx context.Context, batchSize int) (tasks
|
|||||||
}
|
}
|
||||||
err = gfdb.DB(ctx).Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
|
err = gfdb.DB(ctx).Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
|
||||||
sql := fmt.Sprintf(
|
sql := fmt.Sprintf(
|
||||||
`SELECT id, tenant_id, creator, model_name, task_id, input_ref, request_payload, phase, tmp_file
|
`SELECT id, tenant_id, model_name, task_id, input_ref, request_payload
|
||||||
FROM %s
|
FROM %s
|
||||||
WHERE deleted_at IS NULL AND state = 0
|
WHERE deleted_at IS NULL AND state = 0
|
||||||
ORDER BY enqueue_at ASC
|
ORDER BY enqueue_at ASC
|
||||||
@@ -58,7 +58,7 @@ func (d *taskDao) ClaimPendingGlobal(ctx context.Context, batchSize int) (tasks
|
|||||||
func (d *taskDao) UpdateSuccessGlobal(ctx context.Context, id int64, ossFile, fileType string, fileSize int64, expireAt *gtime.Time) error {
|
func (d *taskDao) UpdateSuccessGlobal(ctx context.Context, id int64, ossFile, fileType string, fileSize int64, expireAt *gtime.Time) error {
|
||||||
now := gtime.Now()
|
now := gtime.Now()
|
||||||
_, err := gfdb.DB(ctx).Exec(ctx,
|
_, err := gfdb.DB(ctx).Exec(ctx,
|
||||||
fmt.Sprintf(`UPDATE %s SET state=2, oss_file=?, file_type=?, file_size=?, error_msg='', finished_at=?, expire_at=NULL, phase=0, tmp_file='', updated_at=? WHERE id=?`, public.TableNameTask),
|
fmt.Sprintf(`UPDATE %s SET state=2, oss_file=?, file_type=?, file_size=?, error_msg='', finished_at=?, expire_at=NULL, updated_at=? WHERE id=?`, public.TableNameTask),
|
||||||
ossFile, fileType, fileSize, now, now, id,
|
ossFile, fileType, fileSize, now, now, id,
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
@@ -67,31 +67,12 @@ func (d *taskDao) UpdateSuccessGlobal(ctx context.Context, id int64, ossFile, fi
|
|||||||
func (d *taskDao) UpdateFailedGlobal(ctx context.Context, id int64, errorMsg string) error {
|
func (d *taskDao) UpdateFailedGlobal(ctx context.Context, id int64, errorMsg string) error {
|
||||||
now := gtime.Now()
|
now := gtime.Now()
|
||||||
_, err := gfdb.DB(ctx).Exec(ctx,
|
_, err := gfdb.DB(ctx).Exec(ctx,
|
||||||
fmt.Sprintf(`UPDATE %s SET state=3, error_msg=?, finished_at=?, phase=0, tmp_file='', updated_at=? WHERE id=?`, public.TableNameTask),
|
fmt.Sprintf(`UPDATE %s SET state=3, error_msg=?, finished_at=?, updated_at=? WHERE id=?`, public.TableNameTask),
|
||||||
errorMsg, now, now, id,
|
errorMsg, now, now, id,
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateFailedKeepTmpGlobal OSS 上传失败:保留 phase/tmp_file,下一轮仅重试 OSS 上传
|
|
||||||
func (d *taskDao) UpdateFailedKeepTmpGlobal(ctx context.Context, id int64, errorMsg string) error {
|
|
||||||
now := gtime.Now()
|
|
||||||
_, err := gfdb.DB(ctx).Exec(ctx,
|
|
||||||
fmt.Sprintf(`UPDATE %s SET state=3, error_msg=?, finished_at=?, phase=1, updated_at=? WHERE id=?`, public.TableNameTask),
|
|
||||||
errorMsg, now, now, id,
|
|
||||||
)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// UpdateTmpAfterModelGlobal 模型调用成功后,写入临时文件路径并标记 phase=1
|
|
||||||
func (d *taskDao) UpdateTmpAfterModelGlobal(ctx context.Context, id int64, tmpFile string) error {
|
|
||||||
_, err := gfdb.DB(ctx).Exec(ctx,
|
|
||||||
fmt.Sprintf(`UPDATE %s SET phase=1, tmp_file=?, updated_at=NOW() WHERE id=?`, public.TableNameTask),
|
|
||||||
tmpFile, id,
|
|
||||||
)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *taskDao) SoftDeleteByTaskIDGlobal(ctx context.Context, taskID string) error {
|
func (d *taskDao) SoftDeleteByTaskIDGlobal(ctx context.Context, taskID string) error {
|
||||||
_, err := gfdb.DB(ctx).Exec(ctx,
|
_, err := gfdb.DB(ctx).Exec(ctx,
|
||||||
fmt.Sprintf(`UPDATE %s SET deleted_at=NOW(), updated_at=NOW() WHERE task_id=? AND deleted_at IS NULL`, public.TableNameTask),
|
fmt.Sprintf(`UPDATE %s SET deleted_at=NOW(), updated_at=NOW() WHERE task_id=? AND deleted_at IS NULL`, public.TableNameTask),
|
||||||
@@ -132,8 +113,7 @@ func (d *taskDao) ListFailedRetryableGlobal(ctx context.Context, limit int) (lis
|
|||||||
}
|
}
|
||||||
r, err := gfdb.DB(ctx).GetAll(ctx,
|
r, err := gfdb.DB(ctx).GetAll(ctx,
|
||||||
fmt.Sprintf(`
|
fmt.Sprintf(`
|
||||||
SELECT t.*,
|
SELECT t.*
|
||||||
m.retry_queue_max_seconds AS retry_queue_max_seconds
|
|
||||||
FROM %s t
|
FROM %s t
|
||||||
JOIN %s m
|
JOIN %s m
|
||||||
ON t.tenant_id = m.tenant_id
|
ON t.tenant_id = m.tenant_id
|
||||||
@@ -152,13 +132,11 @@ SELECT t.*,
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// RequeueForRetryGlobal 将任务重新入队(state=0),并将 retry_count +1
|
// RequeueForRetryGlobal 将任务重新入队(state=0,enqueue_at=now),并将 retry_count +1
|
||||||
// enqueueAt 用于控制重试任务在队列中的位置:
|
func (d *taskDao) RequeueForRetryGlobal(ctx context.Context, id int64) error {
|
||||||
// - enqueueAt 越早,越靠前(ClaimPendingGlobal 按 enqueue_at ASC 抢占)
|
|
||||||
func (d *taskDao) RequeueForRetryGlobal(ctx context.Context, id int64, enqueueAt time.Time) error {
|
|
||||||
_, err := gfdb.DB(ctx).Exec(ctx,
|
_, err := gfdb.DB(ctx).Exec(ctx,
|
||||||
fmt.Sprintf(`UPDATE %s SET state=0, retry_count=retry_count+1, enqueue_at=?, updated_at=NOW() WHERE id=? AND state=3 AND deleted_at IS NULL`, public.TableNameTask),
|
fmt.Sprintf(`UPDATE %s SET state=0, retry_count=retry_count+1, enqueue_at=NOW(), updated_at=NOW() WHERE id=? AND state=3 AND deleted_at IS NULL`, public.TableNameTask),
|
||||||
enqueueAt, id,
|
id,
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
1
main.go
1
main.go
@@ -27,7 +27,6 @@ func main() {
|
|||||||
http.RouteRegister([]interface{}{
|
http.RouteRegister([]interface{}{
|
||||||
controller.Model,
|
controller.Model,
|
||||||
controller.Task,
|
controller.Task,
|
||||||
controller.Stat,
|
|
||||||
})
|
})
|
||||||
|
|
||||||
// 启动后台任务:worker + 清理器
|
// 启动后台任务:worker + 清理器
|
||||||
|
|||||||
@@ -7,20 +7,19 @@ import (
|
|||||||
|
|
||||||
// CreateModelReq 添加模型配置
|
// CreateModelReq 添加模型配置
|
||||||
type CreateModelReq struct {
|
type CreateModelReq struct {
|
||||||
g.Meta `path:"/createModel" method:"post" tags:"模型管理" summary:"创建模型配置" dc:"添加新的模型配置"`
|
g.Meta `path:"/createModel" method:"post" tags:"模型管理" summary:"创建模型配置" dc:"添加新的模型配置"`
|
||||||
ModelName string `p:"modelName" json:"modelName" v:"required#modelName不能为空" dc:"模型名称(唯一标识)"`
|
ModelName string `p:"modelName" json:"modelName" v:"required#modelName不能为空" dc:"模型名称(唯一标识)"`
|
||||||
BaseURL string `p:"baseUrl" json:"baseUrl" v:"required#baseUrl不能为空" dc:"模型服务基础地址(如 http(s)://host:port)"`
|
BaseURL string `p:"baseUrl" json:"baseUrl" v:"required#baseUrl不能为空" dc:"模型服务基础地址(如 http(s)://host:port)"`
|
||||||
Route string `p:"route" json:"route" dc:"路由/路径(拼接到 BaseURL 之后的可选路径)"`
|
Route string `p:"route" json:"route" dc:"路由/路径(拼接到 BaseURL 之后的可选路径)"`
|
||||||
HttpMethod string `p:"httpMethod" json:"httpMethod" dc:"请求方式:GET/POST(默认POST)"`
|
HttpMethod string `p:"httpMethod" json:"httpMethod" dc:"请求方式:GET/POST(默认POST)"`
|
||||||
APIKey string `p:"apiKey" json:"apiKey" dc:"请求密钥绑定(请求头),示例:TTS_API_KEY:your-key"`
|
APIKey string `p:"apiKey" json:"apiKey" dc:"请求密钥绑定(请求头),示例:TTS_API_KEY:your-key"`
|
||||||
Enabled int `p:"enabled" json:"enabled" dc:"是否启用:0-禁用,1-启用"`
|
Enabled int `p:"enabled" json:"enabled" dc:"是否启用:0-禁用,1-启用"`
|
||||||
MaxConcurrency int `p:"maxConcurrency" json:"maxConcurrency" dc:"最大并发数"`
|
MaxConcurrency int `p:"maxConcurrency" json:"maxConcurrency" dc:"最大并发数"`
|
||||||
QueueLimit int `p:"queueLimit" json:"queueLimit" dc:"排队队列上限(超过则拒绝/限流)"`
|
QueueLimit int `p:"queueLimit" json:"queueLimit" dc:"排队队列上限(超过则拒绝/限流)"`
|
||||||
TimeoutSeconds int `p:"timeoutSeconds" json:"timeoutSeconds" dc:"请求超时时间(秒)"`
|
TimeoutMs int `p:"timeoutMs" json:"timeoutMs" dc:"请求超时时间(毫秒)"`
|
||||||
RetryTimes int `p:"retryTimes" json:"retryTimes" dc:"失败重试次数"`
|
RetryTimes int `p:"retryTimes" json:"retryTimes" dc:"失败重试次数"`
|
||||||
RetryQueueMaxSeconds int `p:"retryQueueMaxSeconds" json:"retryQueueMaxSeconds" dc:"失败重试最大排队时间(秒);0表示失败重试插队到队首;>0表示排队超过该时间后插队,否则仍到队尾"`
|
AutoCleanSeconds int `p:"autoCleanSeconds" json:"autoCleanSeconds" dc:"自动清理间隔(秒)(如清理超时任务/队列)"`
|
||||||
AutoCleanSeconds int `p:"autoCleanSeconds" json:"autoCleanSeconds" dc:"自动清理间隔(秒)(如清理超时任务/队列)"`
|
Remark string `p:"remark" json:"remark" dc:"备注说明"`
|
||||||
Remark string `p:"remark" json:"remark" dc:"备注说明"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type CreateModelRes struct {
|
type CreateModelRes struct {
|
||||||
@@ -29,20 +28,19 @@ type CreateModelRes struct {
|
|||||||
|
|
||||||
// UpdateModelReq 更新模型配置
|
// UpdateModelReq 更新模型配置
|
||||||
type UpdateModelReq struct {
|
type UpdateModelReq struct {
|
||||||
g.Meta `path:"/updateModel" method:"put" tags:"模型管理" summary:"更新模型配置" dc:"更新指定ID的模型配置"`
|
g.Meta `path:"/updateModel" method:"put" tags:"模型管理" summary:"更新模型配置" dc:"更新指定ID的模型配置"`
|
||||||
ID int64 `p:"id" json:"id,string" v:"required#id不能为空" dc:"配置ID"`
|
ID int64 `p:"id" json:"id,string" v:"required#id不能为空" dc:"配置ID"`
|
||||||
BaseURL string `p:"baseUrl" json:"baseUrl" dc:"模型服务基础地址"`
|
BaseURL string `p:"baseUrl" json:"baseUrl" dc:"模型服务基础地址"`
|
||||||
Route string `p:"route" json:"route" dc:"路由/路径"`
|
Route string `p:"route" json:"route" dc:"路由/路径"`
|
||||||
HttpMethod *string `p:"httpMethod" json:"httpMethod" dc:"请求方式:GET/POST(可选更新)"`
|
HttpMethod *string `p:"httpMethod" json:"httpMethod" dc:"请求方式:GET/POST(可选更新)"`
|
||||||
APIKey *string `p:"apiKey" json:"apiKey" dc:"请求密钥绑定(请求头)(可选更新)"`
|
APIKey *string `p:"apiKey" json:"apiKey" dc:"请求密钥绑定(请求头)(可选更新)"`
|
||||||
Enabled *int `p:"enabled" json:"enabled" dc:"是否启用:0-禁用,1-启用(可选更新)"`
|
Enabled *int `p:"enabled" json:"enabled" dc:"是否启用:0-禁用,1-启用(可选更新)"`
|
||||||
MaxConcurrency *int `p:"maxConcurrency" json:"maxConcurrency" dc:"最大并发数(可选更新)"`
|
MaxConcurrency *int `p:"maxConcurrency" json:"maxConcurrency" dc:"最大并发数(可选更新)"`
|
||||||
QueueLimit *int `p:"queueLimit" json:"queueLimit" dc:"排队队列上限(可选更新)"`
|
QueueLimit *int `p:"queueLimit" json:"queueLimit" dc:"排队队列上限(可选更新)"`
|
||||||
TimeoutSeconds *int `p:"timeoutSeconds" json:"timeoutSeconds" dc:"请求超时时间(秒)(可选更新)"`
|
TimeoutMs *int `p:"timeoutMs" json:"timeoutMs" dc:"请求超时时间(毫秒)(可选更新)"`
|
||||||
RetryTimes *int `p:"retryTimes" json:"retryTimes" dc:"失败重试次数(可选更新)"`
|
RetryTimes *int `p:"retryTimes" json:"retryTimes" dc:"失败重试次数(可选更新)"`
|
||||||
RetryQueueMaxSeconds *int `p:"retryQueueMaxSeconds" json:"retryQueueMaxSeconds" dc:"失败重试最大排队时间(秒)(可选更新)"`
|
AutoCleanSeconds *int `p:"autoCleanSeconds" json:"autoCleanSeconds" dc:"自动清理间隔(秒)(可选更新)"`
|
||||||
AutoCleanSeconds *int `p:"autoCleanSeconds" json:"autoCleanSeconds" dc:"自动清理间隔(秒)(可选更新)"`
|
Remark *string `p:"remark" json:"remark" dc:"备注说明(可选更新)"`
|
||||||
Remark *string `p:"remark" json:"remark" dc:"备注说明(可选更新)"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteModelReq 删除模型配置
|
// DeleteModelReq 删除模型配置
|
||||||
@@ -63,9 +61,8 @@ type GetModelRes struct {
|
|||||||
|
|
||||||
// ListModelReq 配置列表
|
// ListModelReq 配置列表
|
||||||
type ListModelReq struct {
|
type ListModelReq struct {
|
||||||
g.Meta `path:"/listModel" method:"post" tags:"模型管理" summary:"模型配置列表" dc:"分页获取模型配置列表"`
|
g.Meta `path:"/listModel" method:"post" tags:"模型管理" summary:"模型配置列表" dc:"分页获取模型配置列表"`
|
||||||
Page *beans.Page `p:"page" json:"page" dc:"分页参数"`
|
Page *beans.Page `p:"page" json:"page" dc:"分页参数"`
|
||||||
ModelName string `p:"modelName" json:"modelName" dc:"模型名称(模糊查询,可选)"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type ListModelRes struct {
|
type ListModelRes struct {
|
||||||
|
|||||||
@@ -1,23 +0,0 @@
|
|||||||
package dto
|
|
||||||
|
|
||||||
import (
|
|
||||||
"gitea.com/red-future/common/beans"
|
|
||||||
"github.com/gogf/gf/v2/frame/g"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ListModelStatReq 统计列表
|
|
||||||
type ListModelStatReq struct {
|
|
||||||
g.Meta `path:"/listModelStat" method:"post" tags:"统计" summary:"模型请求统计列表" dc:"按天统计模型请求次数,支持分页与条件筛选"`
|
|
||||||
Page *beans.Page `p:"page" json:"page" dc:"分页参数(默认10条)"`
|
|
||||||
StartDay string `p:"startDay" json:"startDay" dc:"开始日期(YYYY-MM-DD,可选)"`
|
|
||||||
EndDay string `p:"endDay" json:"endDay" dc:"结束日期(YYYY-MM-DD,可选)"`
|
|
||||||
TenantID *int64 `p:"tenantId" json:"tenantId" dc:"租户ID(可选)"`
|
|
||||||
Creator string `p:"creator" json:"creator" dc:"创建人(可选,模糊匹配)"`
|
|
||||||
ModelName string `p:"modelName" json:"modelName" dc:"模型名称(可选,模糊匹配)"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type ListModelStatRes struct {
|
|
||||||
List any `json:"list" dc:"列表数据"`
|
|
||||||
Total int64 `json:"total" dc:"总数"`
|
|
||||||
}
|
|
||||||
|
|
||||||
@@ -1,15 +1,11 @@
|
|||||||
package dto
|
package dto
|
||||||
|
|
||||||
import (
|
import "github.com/gogf/gf/v2/frame/g"
|
||||||
"gitea.com/red-future/common/beans"
|
|
||||||
"github.com/gogf/gf/v2/frame/g"
|
|
||||||
)
|
|
||||||
|
|
||||||
// CreateTaskReq 创建异步任务
|
// CreateTaskReq 创建异步任务
|
||||||
type CreateTaskReq struct {
|
type CreateTaskReq struct {
|
||||||
g.Meta `path:"/createTask" method:"post" tags:"任务管理" summary:"创建异步任务" dc:"创建异步任务并返回任务ID"`
|
g.Meta `path:"/createTask" method:"post" tags:"任务管理" summary:"创建异步任务" dc:"创建异步任务并返回任务ID"`
|
||||||
ModelName string `p:"modelName" json:"modelName" v:"required#modelName不能为空" dc:"模型名称"`
|
ModelName string `p:"modelName" json:"modelName" v:"required#modelName不能为空" dc:"模型名称"`
|
||||||
BizName string `p:"bizName" json:"bizName" dc:"业务名称(调用方模块/系统,用于统计)"`
|
|
||||||
InputRef string `p:"inputRef" json:"inputRef" dc:"输入引用(如OSS/文件引用等)"`
|
InputRef string `p:"inputRef" json:"inputRef" dc:"输入引用(如OSS/文件引用等)"`
|
||||||
RequestPayload any `p:"requestPayload" json:"requestPayload" dc:"请求负载(透传给模型服务)"`
|
RequestPayload any `p:"requestPayload" json:"requestPayload" dc:"请求负载(透传给模型服务)"`
|
||||||
}
|
}
|
||||||
@@ -44,17 +40,3 @@ type GetTaskBatchItem struct {
|
|||||||
type GetTaskBatchRes struct {
|
type GetTaskBatchRes struct {
|
||||||
List []GetTaskBatchItem `json:"list" dc:"任务列表"`
|
List []GetTaskBatchItem `json:"list" dc:"任务列表"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// ListTaskReq 任务列表分页查询
|
|
||||||
type ListTaskReq struct {
|
|
||||||
g.Meta `path:"/listTask" method:"post" tags:"任务管理" summary:"任务列表" dc:"分页查询任务列表,支持按状态/模型名称/task_id过滤"`
|
|
||||||
Page *beans.Page `p:"page" json:"page" dc:"分页参数"`
|
|
||||||
ModelName string `p:"modelName" json:"modelName" dc:"模型名称(模糊匹配)"`
|
|
||||||
TaskID string `p:"taskId" json:"taskId" dc:"任务ID(模糊匹配)"`
|
|
||||||
State *int `p:"state" json:"state" dc:"任务状态(0/1/2/3/4,可选)"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type ListTaskRes struct {
|
|
||||||
List any `json:"list" dc:"列表数据"`
|
|
||||||
Total int64 `json:"total" dc:"总数"`
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -12,9 +12,8 @@ type asynchModelCol struct {
|
|||||||
Enabled string
|
Enabled string
|
||||||
MaxConcurrency string
|
MaxConcurrency string
|
||||||
QueueLimit string
|
QueueLimit string
|
||||||
TimeoutSeconds string
|
TimeoutMs string
|
||||||
RetryTimes string
|
RetryTimes string
|
||||||
RetryQueueMaxSecs string
|
|
||||||
AutoCleanSeconds string
|
AutoCleanSeconds string
|
||||||
Remark string
|
Remark string
|
||||||
}
|
}
|
||||||
@@ -29,27 +28,25 @@ var AsynchModelCol = asynchModelCol{
|
|||||||
Enabled: "enabled",
|
Enabled: "enabled",
|
||||||
MaxConcurrency: "max_concurrency",
|
MaxConcurrency: "max_concurrency",
|
||||||
QueueLimit: "queue_limit",
|
QueueLimit: "queue_limit",
|
||||||
TimeoutSeconds: "timeout_seconds",
|
TimeoutMs: "timeout_ms",
|
||||||
RetryTimes: "retry_times",
|
RetryTimes: "retry_times",
|
||||||
RetryQueueMaxSecs: "retry_queue_max_seconds",
|
|
||||||
AutoCleanSeconds: "auto_clean_seconds",
|
AutoCleanSeconds: "auto_clean_seconds",
|
||||||
Remark: "remark",
|
Remark: "remark",
|
||||||
}
|
}
|
||||||
|
|
||||||
// AsynchModel 异步模型配置
|
// AsynchModel 异步模型配置
|
||||||
type AsynchModel struct {
|
type AsynchModel struct {
|
||||||
beans.SQLBaseDO `orm:",inline"`
|
beans.SQLBaseDO `orm:",inline"`
|
||||||
ModelName string `orm:"model_name" json:"modelName"`
|
ModelName string `orm:"model_name" json:"modelName"`
|
||||||
BaseURL string `orm:"base_url" json:"baseUrl"`
|
BaseURL string `orm:"base_url" json:"baseUrl"`
|
||||||
Route string `orm:"route" json:"route"`
|
Route string `orm:"route" json:"route"`
|
||||||
HttpMethod string `orm:"http_method" json:"httpMethod"`
|
HttpMethod string `orm:"http_method" json:"httpMethod"`
|
||||||
APIKey string `orm:"api_key" json:"apiKey"`
|
APIKey string `orm:"api_key" json:"apiKey"`
|
||||||
Enabled int `orm:"enabled" json:"enabled"`
|
Enabled int `orm:"enabled" json:"enabled"`
|
||||||
MaxConcurrency int `orm:"max_concurrency" json:"maxConcurrency"`
|
MaxConcurrency int `orm:"max_concurrency" json:"maxConcurrency"`
|
||||||
QueueLimit int `orm:"queue_limit" json:"queueLimit"`
|
QueueLimit int `orm:"queue_limit" json:"queueLimit"`
|
||||||
TimeoutSeconds int `orm:"timeout_seconds" json:"timeoutSeconds"`
|
TimeoutMs int `orm:"timeout_ms" json:"timeoutMs"`
|
||||||
RetryTimes int `orm:"retry_times" json:"retryTimes"`
|
RetryTimes int `orm:"retry_times" json:"retryTimes"`
|
||||||
RetryQueueMaxSecs int `orm:"retry_queue_max_seconds" json:"retryQueueMaxSeconds"`
|
AutoCleanSeconds int `orm:"auto_clean_seconds" json:"autoCleanSeconds"`
|
||||||
AutoCleanSeconds int `orm:"auto_clean_seconds" json:"autoCleanSeconds"`
|
Remark string `orm:"remark" json:"remark"`
|
||||||
Remark string `orm:"remark" json:"remark"`
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,16 +0,0 @@
|
|||||||
package entity
|
|
||||||
|
|
||||||
import "github.com/gogf/gf/v2/os/gtime"
|
|
||||||
|
|
||||||
// AsynchModelStat 按天统计:某天/租户/创建人/模型的请求次数
|
|
||||||
// 注:这里不走通用 SQLBaseDO,采用联合唯一键(day,tenant_id,creator,model_name)做 UPSERT 原子累加。
|
|
||||||
type AsynchModelStat struct {
|
|
||||||
Day *gtime.Time `orm:"day" json:"day"` // 日期(建议仅使用日期部分)
|
|
||||||
TenantId int64 `orm:"tenant_id" json:"tenantId,string"`
|
|
||||||
Creator string `orm:"creator" json:"creator"`
|
|
||||||
ModelName string `orm:"model_name" json:"modelName"`
|
|
||||||
RequestCount int64 `orm:"request_count" json:"requestCount"`
|
|
||||||
CreatedAt *gtime.Time `orm:"created_at" json:"createdAt"`
|
|
||||||
UpdatedAt *gtime.Time `orm:"updated_at" json:"updatedAt"`
|
|
||||||
}
|
|
||||||
|
|
||||||
@@ -1,57 +0,0 @@
|
|||||||
package entity
|
|
||||||
|
|
||||||
import (
|
|
||||||
"gitea.com/red-future/common/beans"
|
|
||||||
)
|
|
||||||
|
|
||||||
type asynchOpLogCol struct {
|
|
||||||
beans.SQLBaseCol
|
|
||||||
IP string
|
|
||||||
UserAgent string
|
|
||||||
APIPath string
|
|
||||||
HttpMethod string
|
|
||||||
BizName string
|
|
||||||
ModelName string
|
|
||||||
TaskID string
|
|
||||||
OpType string
|
|
||||||
Success string
|
|
||||||
ErrorMsg string
|
|
||||||
CostMs string
|
|
||||||
RequestPayload string
|
|
||||||
ResponsePayload string
|
|
||||||
}
|
|
||||||
|
|
||||||
var AsynchOpLogCol = asynchOpLogCol{
|
|
||||||
SQLBaseCol: beans.DefSQLBaseCol,
|
|
||||||
IP: "ip",
|
|
||||||
UserAgent: "user_agent",
|
|
||||||
APIPath: "api_path",
|
|
||||||
HttpMethod: "http_method",
|
|
||||||
BizName: "biz_name",
|
|
||||||
ModelName: "model_name",
|
|
||||||
TaskID: "task_id",
|
|
||||||
OpType: "op_type",
|
|
||||||
Success: "success",
|
|
||||||
ErrorMsg: "error_msg",
|
|
||||||
CostMs: "cost_ms",
|
|
||||||
RequestPayload: "request_payload",
|
|
||||||
ResponsePayload: "response_payload",
|
|
||||||
}
|
|
||||||
|
|
||||||
// AsynchOpLog 操作日志(创建任务等)
|
|
||||||
type AsynchOpLog struct {
|
|
||||||
beans.SQLBaseDO `orm:",inline"`
|
|
||||||
IP string `orm:"ip" json:"ip"`
|
|
||||||
UserAgent string `orm:"user_agent" json:"userAgent"`
|
|
||||||
APIPath string `orm:"api_path" json:"apiPath"`
|
|
||||||
HttpMethod string `orm:"http_method" json:"httpMethod"`
|
|
||||||
BizName string `orm:"biz_name" json:"bizName"`
|
|
||||||
ModelName string `orm:"model_name" json:"modelName"`
|
|
||||||
TaskID string `orm:"task_id" json:"taskId"`
|
|
||||||
OpType string `orm:"op_type" json:"opType"`
|
|
||||||
Success int `orm:"success" json:"success"`
|
|
||||||
ErrorMsg string `orm:"error_msg" json:"errorMsg"`
|
|
||||||
CostMs int64 `orm:"cost_ms" json:"costMs"`
|
|
||||||
RequestPayload any `orm:"request_payload" json:"requestPayload"`
|
|
||||||
ResponsePayload any `orm:"response_payload" json:"responsePayload"`
|
|
||||||
}
|
|
||||||
@@ -19,8 +19,6 @@ type asynchTaskCol struct {
|
|||||||
ExpireAt string
|
ExpireAt string
|
||||||
RetryCount string
|
RetryCount string
|
||||||
EnqueueAt string
|
EnqueueAt string
|
||||||
Phase string
|
|
||||||
TmpFile string
|
|
||||||
InputRef string
|
InputRef string
|
||||||
RequestPayload string
|
RequestPayload string
|
||||||
}
|
}
|
||||||
@@ -39,8 +37,6 @@ var AsynchTaskCol = asynchTaskCol{
|
|||||||
ExpireAt: "expire_at",
|
ExpireAt: "expire_at",
|
||||||
RetryCount: "retry_count",
|
RetryCount: "retry_count",
|
||||||
EnqueueAt: "enqueue_at",
|
EnqueueAt: "enqueue_at",
|
||||||
Phase: "phase",
|
|
||||||
TmpFile: "tmp_file",
|
|
||||||
InputRef: "input_ref",
|
InputRef: "input_ref",
|
||||||
RequestPayload: "request_payload",
|
RequestPayload: "request_payload",
|
||||||
}
|
}
|
||||||
@@ -60,10 +56,6 @@ type AsynchTask struct {
|
|||||||
ExpireAt *gtime.Time `orm:"expire_at" json:"expireAt"` // 已下载(state=4)后的过期时间
|
ExpireAt *gtime.Time `orm:"expire_at" json:"expireAt"` // 已下载(state=4)后的过期时间
|
||||||
RetryCount int `orm:"retry_count" json:"retryCount"`
|
RetryCount int `orm:"retry_count" json:"retryCount"`
|
||||||
EnqueueAt *gtime.Time `orm:"enqueue_at" json:"enqueueAt"`
|
EnqueueAt *gtime.Time `orm:"enqueue_at" json:"enqueueAt"`
|
||||||
Phase int `orm:"phase" json:"phase"` // 0模型阶段/1OSS阶段
|
|
||||||
TmpFile string `orm:"tmp_file" json:"tmpFile"` // 临时结果文件路径
|
|
||||||
// RetryQueueMaxSeconds 为 ListFailedRetryableGlobal 的 join 字段(非任务表字段)
|
|
||||||
RetryQueueMaxSeconds int `orm:"retry_queue_max_seconds" json:"-"`
|
|
||||||
InputRef string `orm:"input_ref" json:"inputRef"`
|
InputRef string `orm:"input_ref" json:"inputRef"`
|
||||||
RequestPayload any `orm:"request_payload" json:"requestPayload"`
|
RequestPayload any `orm:"request_payload" json:"requestPayload"`
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -45,7 +45,6 @@ func (c *cleaner) runOnce(ctx context.Context) {
|
|||||||
g.Log().Errorf(ctx, "[cleaner] list expired(downloaded) error: %v", err)
|
g.Log().Errorf(ctx, "[cleaner] list expired(downloaded) error: %v", err)
|
||||||
} else {
|
} else {
|
||||||
for _, t := range expired {
|
for _, t := range expired {
|
||||||
deleteTmpResult(t.TmpFile)
|
|
||||||
_ = dao.Task.HardDeleteByIDGlobal(ctx, t.Id)
|
_ = dao.Task.HardDeleteByIDGlobal(ctx, t.Id)
|
||||||
}
|
}
|
||||||
g.Log().Infof(ctx, "[cleaner] expired(downloaded) cleaned, count=%d", len(expired))
|
g.Log().Infof(ctx, "[cleaner] expired(downloaded) cleaned, count=%d", len(expired))
|
||||||
@@ -72,20 +71,7 @@ func (c *cleaner) runOnce(ctx context.Context) {
|
|||||||
g.Log().Errorf(ctx, "[cleaner] list failed retryable error: %v", err)
|
g.Log().Errorf(ctx, "[cleaner] list failed retryable error: %v", err)
|
||||||
} else {
|
} else {
|
||||||
for _, t := range retryable {
|
for _, t := range retryable {
|
||||||
// retry_queue_max_seconds 控制失败重试的排队策略:
|
_ = dao.Task.RequeueForRetryGlobal(ctx, t.Id)
|
||||||
// - =0:失败重试插队到队首
|
|
||||||
// - >0:当任务从创建到现在的排队时长 >= maxSeconds,则插队到队首;否则仍放到队尾
|
|
||||||
now := time.Now()
|
|
||||||
enqueueAt := now
|
|
||||||
maxSeconds := t.RetryQueueMaxSeconds
|
|
||||||
if maxSeconds == 0 {
|
|
||||||
enqueueAt = now.Add(-100 * 365 * 24 * time.Hour)
|
|
||||||
} else if maxSeconds > 0 && t.CreatedAt != nil {
|
|
||||||
if now.Sub(t.CreatedAt.Time) >= time.Duration(maxSeconds)*time.Second {
|
|
||||||
enqueueAt = now.Add(-100 * 365 * 24 * time.Hour)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ = dao.Task.RequeueForRetryGlobal(ctx, t.Id, enqueueAt)
|
|
||||||
}
|
}
|
||||||
g.Log().Infof(ctx, "[cleaner] failed retryable cleaned, count=%d", len(retryable))
|
g.Log().Infof(ctx, "[cleaner] failed retryable cleaned, count=%d", len(retryable))
|
||||||
}
|
}
|
||||||
@@ -96,7 +82,6 @@ func (c *cleaner) runOnce(ctx context.Context) {
|
|||||||
g.Log().Errorf(ctx, "[cleaner] list failed exhausted error: %v", err)
|
g.Log().Errorf(ctx, "[cleaner] list failed exhausted error: %v", err)
|
||||||
} else {
|
} else {
|
||||||
for _, t := range exhausted {
|
for _, t := range exhausted {
|
||||||
deleteTmpResult(t.TmpFile)
|
|
||||||
_ = dao.Task.HardDeleteByIDGlobal(ctx, t.Id)
|
_ = dao.Task.HardDeleteByIDGlobal(ctx, t.Id)
|
||||||
}
|
}
|
||||||
g.Log().Infof(ctx, "[cleaner] failed exhausted cleaned, count=%d", len(exhausted))
|
g.Log().Infof(ctx, "[cleaner] failed exhausted cleaned, count=%d", len(exhausted))
|
||||||
|
|||||||
@@ -14,52 +14,24 @@ import (
|
|||||||
"model-asynch/model/entity"
|
"model-asynch/model/entity"
|
||||||
)
|
)
|
||||||
|
|
||||||
// parseAPIKeyHeaders 支持多个 header 绑定,逗号分隔:
|
func parseAPIKeyHeader(apiKey string) (k, v string) {
|
||||||
// 示例:
|
|
||||||
// - X-API-Key:qwen3-tts-key,operation:true,count:123
|
|
||||||
// - X-API-Key:"qwen3-tts-key",operation:"true"
|
|
||||||
//
|
|
||||||
// 说明:
|
|
||||||
// - HTTP Header 最终都是字符串,这里做的是“值的字符串化表达”。
|
|
||||||
// - 若 value 用双引号包裹,会去掉外层引号再注入,便于在配置中区分字符串/布尔/数字等表达(以及避免值中包含特殊字符时歧义)。
|
|
||||||
func parseAPIKeyHeaders(apiKey string) map[string]string {
|
|
||||||
apiKey = strings.TrimSpace(apiKey)
|
apiKey = strings.TrimSpace(apiKey)
|
||||||
if apiKey == "" {
|
if apiKey == "" {
|
||||||
return nil
|
return "", ""
|
||||||
}
|
}
|
||||||
out := map[string]string{}
|
// 支持两种写法:
|
||||||
parts := strings.Split(apiKey, ",")
|
// 1) HeaderName:HeaderValue(推荐)
|
||||||
for _, p := range parts {
|
// 2) HeaderName=HeaderValue(兼容)
|
||||||
p = strings.TrimSpace(p)
|
if strings.Contains(apiKey, ":") {
|
||||||
if p == "" {
|
parts := strings.SplitN(apiKey, ":", 2)
|
||||||
continue
|
return strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1])
|
||||||
}
|
|
||||||
// HeaderName:HeaderValue(推荐) / HeaderName=HeaderValue(兼容)
|
|
||||||
if strings.Contains(p, ":") {
|
|
||||||
kv := strings.SplitN(p, ":", 2)
|
|
||||||
k := strings.TrimSpace(kv[0])
|
|
||||||
v := strings.TrimSpace(kv[1])
|
|
||||||
v = strings.Trim(v, "\"")
|
|
||||||
if k != "" && v != "" {
|
|
||||||
out[k] = v
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if strings.Contains(p, "=") {
|
|
||||||
kv := strings.SplitN(p, "=", 2)
|
|
||||||
k := strings.TrimSpace(kv[0])
|
|
||||||
v := strings.TrimSpace(kv[1])
|
|
||||||
v = strings.Trim(v, "\"")
|
|
||||||
if k != "" && v != "" {
|
|
||||||
out[k] = v
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if len(out) == 0 {
|
if strings.Contains(apiKey, "=") {
|
||||||
return nil
|
parts := strings.SplitN(apiKey, "=", 2)
|
||||||
|
return strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1])
|
||||||
}
|
}
|
||||||
return out
|
// 只给了 value:不做注入(避免注入非法 header)
|
||||||
|
return "", ""
|
||||||
}
|
}
|
||||||
|
|
||||||
func payloadToQuery(payload any) (url.Values, error) {
|
func payloadToQuery(payload any) (url.Values, error) {
|
||||||
@@ -104,7 +76,7 @@ func InvokeModel(ctx context.Context, m *entity.AsynchModel, payload any) ([]byt
|
|||||||
url = strings.TrimRight(m.BaseURL, "/")
|
url = strings.TrimRight(m.BaseURL, "/")
|
||||||
}
|
}
|
||||||
|
|
||||||
timeout := time.Duration(m.TimeoutSeconds) * time.Second
|
timeout := time.Duration(m.TimeoutMs) * time.Millisecond
|
||||||
if timeout <= 0 {
|
if timeout <= 0 {
|
||||||
timeout = 60 * time.Second
|
timeout = 60 * time.Second
|
||||||
}
|
}
|
||||||
@@ -150,7 +122,7 @@ func InvokeModel(ctx context.Context, m *entity.AsynchModel, payload any) ([]byt
|
|||||||
req.Header.Set(k, v)
|
req.Header.Set(k, v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for hk, hv := range parseAPIKeyHeaders(m.APIKey) {
|
if hk, hv := parseAPIKeyHeader(m.APIKey); hk != "" && hv != "" {
|
||||||
req.Header.Set(hk, hv)
|
req.Header.Set(hk, hv)
|
||||||
}
|
}
|
||||||
if method != http.MethodGet {
|
if method != http.MethodGet {
|
||||||
|
|||||||
@@ -15,19 +15,18 @@ type modelService struct{}
|
|||||||
|
|
||||||
func (s *modelService) Create(ctx context.Context, req *dto.CreateModelReq) (res *dto.CreateModelRes, err error) {
|
func (s *modelService) Create(ctx context.Context, req *dto.CreateModelReq) (res *dto.CreateModelRes, err error) {
|
||||||
m := &entity.AsynchModel{
|
m := &entity.AsynchModel{
|
||||||
ModelName: req.ModelName,
|
ModelName: req.ModelName,
|
||||||
BaseURL: req.BaseURL,
|
BaseURL: req.BaseURL,
|
||||||
Route: req.Route,
|
Route: req.Route,
|
||||||
HttpMethod: req.HttpMethod,
|
HttpMethod: req.HttpMethod,
|
||||||
APIKey: req.APIKey,
|
APIKey: req.APIKey,
|
||||||
Enabled: req.Enabled,
|
Enabled: req.Enabled,
|
||||||
MaxConcurrency: req.MaxConcurrency,
|
MaxConcurrency: req.MaxConcurrency,
|
||||||
QueueLimit: req.QueueLimit,
|
QueueLimit: req.QueueLimit,
|
||||||
TimeoutSeconds: req.TimeoutSeconds,
|
TimeoutMs: req.TimeoutMs,
|
||||||
RetryTimes: req.RetryTimes,
|
RetryTimes: req.RetryTimes,
|
||||||
RetryQueueMaxSecs: req.RetryQueueMaxSeconds,
|
AutoCleanSeconds: req.AutoCleanSeconds,
|
||||||
AutoCleanSeconds: req.AutoCleanSeconds,
|
Remark: req.Remark,
|
||||||
Remark: req.Remark,
|
|
||||||
}
|
}
|
||||||
if m.HttpMethod == "" {
|
if m.HttpMethod == "" {
|
||||||
m.HttpMethod = "POST"
|
m.HttpMethod = "POST"
|
||||||
@@ -41,8 +40,8 @@ func (s *modelService) Create(ctx context.Context, req *dto.CreateModelReq) (res
|
|||||||
if m.QueueLimit <= 0 {
|
if m.QueueLimit <= 0 {
|
||||||
m.QueueLimit = 1000
|
m.QueueLimit = 1000
|
||||||
}
|
}
|
||||||
if m.TimeoutSeconds <= 0 {
|
if m.TimeoutMs <= 0 {
|
||||||
m.TimeoutSeconds = 60
|
m.TimeoutMs = 60000
|
||||||
}
|
}
|
||||||
if m.AutoCleanSeconds <= 0 {
|
if m.AutoCleanSeconds <= 0 {
|
||||||
m.AutoCleanSeconds = 86400
|
m.AutoCleanSeconds = 86400
|
||||||
@@ -77,15 +76,12 @@ func (s *modelService) Update(ctx context.Context, req *dto.UpdateModelReq) erro
|
|||||||
if req.QueueLimit != nil {
|
if req.QueueLimit != nil {
|
||||||
data[entity.AsynchModelCol.QueueLimit] = *req.QueueLimit
|
data[entity.AsynchModelCol.QueueLimit] = *req.QueueLimit
|
||||||
}
|
}
|
||||||
if req.TimeoutSeconds != nil {
|
if req.TimeoutMs != nil {
|
||||||
data[entity.AsynchModelCol.TimeoutSeconds] = *req.TimeoutSeconds
|
data[entity.AsynchModelCol.TimeoutMs] = *req.TimeoutMs
|
||||||
}
|
}
|
||||||
if req.RetryTimes != nil {
|
if req.RetryTimes != nil {
|
||||||
data[entity.AsynchModelCol.RetryTimes] = *req.RetryTimes
|
data[entity.AsynchModelCol.RetryTimes] = *req.RetryTimes
|
||||||
}
|
}
|
||||||
if req.RetryQueueMaxSeconds != nil {
|
|
||||||
data[entity.AsynchModelCol.RetryQueueMaxSecs] = *req.RetryQueueMaxSeconds
|
|
||||||
}
|
|
||||||
if req.AutoCleanSeconds != nil {
|
if req.AutoCleanSeconds != nil {
|
||||||
data[entity.AsynchModelCol.AutoCleanSeconds] = *req.AutoCleanSeconds
|
data[entity.AsynchModelCol.AutoCleanSeconds] = *req.AutoCleanSeconds
|
||||||
}
|
}
|
||||||
@@ -108,6 +104,6 @@ func (s *modelService) Get(ctx context.Context, id int64) (*entity.AsynchModel,
|
|||||||
return dao.Model.GetByID(ctx, id)
|
return dao.Model.GetByID(ctx, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *modelService) List(ctx context.Context, pageNum, pageSize int, modelNameLike string) (list []*entity.AsynchModel, total int64, err error) {
|
func (s *modelService) List(ctx context.Context, pageNum, pageSize int) (list []*entity.AsynchModel, total int64, err error) {
|
||||||
return dao.Model.List(ctx, pageNum, pageSize, modelNameLike)
|
return dao.Model.List(ctx, pageNum, pageSize)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,40 +0,0 @@
|
|||||||
package service
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
|
|
||||||
"model-asynch/dao"
|
|
||||||
"model-asynch/model/dto"
|
|
||||||
)
|
|
||||||
|
|
||||||
type statService struct{}
|
|
||||||
|
|
||||||
var Stat = &statService{}
|
|
||||||
|
|
||||||
func (s *statService) List(ctx context.Context, req *dto.ListModelStatReq) (res *dto.ListModelStatRes, err error) {
|
|
||||||
pageNum, pageSize := 1, 10
|
|
||||||
if req != nil && req.Page != nil {
|
|
||||||
if req.Page.PageNum > 0 {
|
|
||||||
pageNum = int(req.Page.PageNum)
|
|
||||||
}
|
|
||||||
if req.Page.PageSize > 0 {
|
|
||||||
pageSize = int(req.Page.PageSize)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
startDay, endDay := "", ""
|
|
||||||
var tenantID *int64
|
|
||||||
creator, modelName := "", ""
|
|
||||||
if req != nil {
|
|
||||||
startDay = req.StartDay
|
|
||||||
endDay = req.EndDay
|
|
||||||
tenantID = req.TenantID
|
|
||||||
creator = req.Creator
|
|
||||||
modelName = req.ModelName
|
|
||||||
}
|
|
||||||
list, total, err := dao.Stat.List(ctx, pageNum, pageSize, startDay, endDay, tenantID, creator, modelName)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &dto.ListModelStatRes{List: list, Total: total}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
@@ -5,14 +5,12 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"mime/multipart"
|
"mime/multipart"
|
||||||
"time"
|
|
||||||
|
|
||||||
"model-asynch/model/entity"
|
"model-asynch/model/entity"
|
||||||
|
|
||||||
commonHttp "gitea.com/red-future/common/http"
|
commonHttp "gitea.com/red-future/common/http"
|
||||||
"github.com/gogf/gf/v2/frame/g"
|
"github.com/gogf/gf/v2/frame/g"
|
||||||
"github.com/gogf/gf/v2/util/gconv"
|
"github.com/gogf/gf/v2/util/gconv"
|
||||||
"github.com/gogf/gf/v2/util/guid"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// 对接你们的 oss 文件服务:POST oss/file/uploadFile (multipart/form-data)
|
// 对接你们的 oss 文件服务:POST oss/file/uploadFile (multipart/form-data)
|
||||||
@@ -27,6 +25,8 @@ type uploadFileResponse struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *ossStorage) UploadByTask(ctx context.Context, _ *entity.AsynchTask, data []byte, fileExt string, _ string) (ossURL string, err error) {
|
func (s *ossStorage) UploadByTask(ctx context.Context, _ *entity.AsynchTask, data []byte, fileExt string, _ string) (ossURL string, err error) {
|
||||||
|
// ossUrl := g.Cfg().MustGet(ctx, "oss.addr", "192.168.3.30:9000").String()
|
||||||
|
|
||||||
// multipart
|
// multipart
|
||||||
body := &bytes.Buffer{}
|
body := &bytes.Buffer{}
|
||||||
writer := multipart.NewWriter(body)
|
writer := multipart.NewWriter(body)
|
||||||
@@ -39,8 +39,7 @@ func (s *ossStorage) UploadByTask(ctx context.Context, _ *entity.AsynchTask, dat
|
|||||||
ext = "." + ext
|
ext = "." + ext
|
||||||
}
|
}
|
||||||
|
|
||||||
filename := fmt.Sprintf("asynch_%d_%s%s", time.Now().Unix(), guid.S(), ext)
|
part, err := writer.CreateFormFile("file", "")
|
||||||
part, err := writer.CreateFormFile("file", filename)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@@ -56,14 +55,16 @@ func (s *ossStorage) UploadByTask(ctx context.Context, _ *entity.AsynchTask, dat
|
|||||||
headers["Content-Type"] = contentType
|
headers["Content-Type"] = contentType
|
||||||
|
|
||||||
fullURL := "oss/file/uploadFile"
|
fullURL := "oss/file/uploadFile"
|
||||||
g.Log().Infof(ctx, "[OSS] upload start url=%s filename=%s size=%d", fullURL, filename, len(data))
|
g.Log().Infof(ctx, "[OSS] upload start url=%s size=%d", fullURL, len(data))
|
||||||
|
|
||||||
var resp uploadFileResponse
|
var resp uploadFileResponse
|
||||||
if err := commonHttp.Post(ctx, fullURL, headers, &resp, body.Bytes()); err != nil {
|
if err := commonHttp.Post(ctx, fullURL, headers, &resp, body.Bytes()); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
fmt.Println("打印结果 resp:", resp)
|
if resp.FileURL == "" {
|
||||||
g.Log().Infof(ctx, "[OSS] upload success url=%s size=%d format=%s", resp.FileURL, resp.FileSize, resp.FileFormat)
|
return "", fmt.Errorf("OSS服务返回错误: 上传失败")
|
||||||
|
}
|
||||||
|
g.Log().Infof(ctx, "[OSS] upload success url=%s filename=%s size=%d format=%s", resp.FileURL, resp.FileName, resp.FileSize, resp.FileFormat)
|
||||||
return resp.FileURL, nil
|
return resp.FileURL, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,8 +9,6 @@ import (
|
|||||||
"model-asynch/model/dto"
|
"model-asynch/model/dto"
|
||||||
"model-asynch/model/entity"
|
"model-asynch/model/entity"
|
||||||
|
|
||||||
"github.com/gogf/gf/v2/database/gdb"
|
|
||||||
"github.com/gogf/gf/v2/frame/g"
|
|
||||||
"github.com/gogf/gf/v2/os/gtime"
|
"github.com/gogf/gf/v2/os/gtime"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
@@ -20,7 +18,6 @@ var Task = &taskService{}
|
|||||||
type taskService struct{}
|
type taskService struct{}
|
||||||
|
|
||||||
func (s *taskService) Create(ctx context.Context, req *dto.CreateTaskReq) (res *dto.CreateTaskRes, err error) {
|
func (s *taskService) Create(ctx context.Context, req *dto.CreateTaskReq) (res *dto.CreateTaskRes, err error) {
|
||||||
startAt := time.Now()
|
|
||||||
// 固化 token/user 等信息
|
// 固化 token/user 等信息
|
||||||
ctx = asyncCtx(ctx)
|
ctx = asyncCtx(ctx)
|
||||||
|
|
||||||
@@ -62,35 +59,6 @@ func (s *taskService) Create(ctx context.Context, req *dto.CreateTaskReq) (res *
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3) 写操作日志(尽量不影响主流程,失败忽略)
|
|
||||||
ip := ""
|
|
||||||
ua := ""
|
|
||||||
apiPath := "/task/createTask"
|
|
||||||
httpMethod := "POST"
|
|
||||||
if r := g.RequestFromCtx(ctx); r != nil {
|
|
||||||
ip = r.GetClientIp()
|
|
||||||
ua = r.UserAgent()
|
|
||||||
apiPath = r.URL.Path
|
|
||||||
httpMethod = r.Method
|
|
||||||
}
|
|
||||||
_, _ = dao.OpLog.Insert(ctx, &entity.AsynchOpLog{
|
|
||||||
IP: ip,
|
|
||||||
UserAgent: ua,
|
|
||||||
APIPath: apiPath,
|
|
||||||
HttpMethod: httpMethod,
|
|
||||||
BizName: req.BizName,
|
|
||||||
ModelName: req.ModelName,
|
|
||||||
TaskID: taskID,
|
|
||||||
OpType: "createTask",
|
|
||||||
Success: 1,
|
|
||||||
ErrorMsg: "",
|
|
||||||
CostMs: time.Since(startAt).Milliseconds(),
|
|
||||||
RequestPayload: storedPayload,
|
|
||||||
ResponsePayload: gdb.Map{
|
|
||||||
"taskId": taskID,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
return &dto.CreateTaskRes{TaskID: taskID}, nil
|
return &dto.CreateTaskRes{TaskID: taskID}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -159,28 +127,3 @@ func (s *taskService) GetBatch(ctx context.Context, req *dto.GetTaskBatchReq) (r
|
|||||||
}
|
}
|
||||||
return &dto.GetTaskBatchRes{List: items}, nil
|
return &dto.GetTaskBatchRes{List: items}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *taskService) List(ctx context.Context, req *dto.ListTaskReq) (res *dto.ListTaskRes, err error) {
|
|
||||||
pageNum, pageSize := 1, 10
|
|
||||||
if req != nil && req.Page != nil {
|
|
||||||
if req.Page.PageNum > 0 {
|
|
||||||
pageNum = int(req.Page.PageNum)
|
|
||||||
}
|
|
||||||
if req.Page.PageSize > 0 {
|
|
||||||
pageSize = int(req.Page.PageSize)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
modelName := ""
|
|
||||||
taskID := ""
|
|
||||||
var state *int
|
|
||||||
if req != nil {
|
|
||||||
modelName = req.ModelName
|
|
||||||
taskID = req.TaskID
|
|
||||||
state = req.State
|
|
||||||
}
|
|
||||||
list, total, err := dao.Task.List(ctx, pageNum, pageSize, modelName, taskID, state)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &dto.ListTaskRes{List: list, Total: total}, nil
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -1,38 +0,0 @@
|
|||||||
package service
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
)
|
|
||||||
|
|
||||||
// saveTmpResult 将模型输出写入临时文件,用于 OSS 上传失败后的“仅重试 OSS”。
|
|
||||||
func saveTmpResult(taskID string, data []byte, ext string) (string, error) {
|
|
||||||
dir := filepath.Join(os.TempDir(), "model-asynch")
|
|
||||||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
if ext == "" {
|
|
||||||
ext = ".bin"
|
|
||||||
}
|
|
||||||
if ext[0] != '.' {
|
|
||||||
ext = "." + ext
|
|
||||||
}
|
|
||||||
path := filepath.Join(dir, fmt.Sprintf("%s%s", taskID, ext))
|
|
||||||
if err := os.WriteFile(path, data, 0o644); err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return path, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func loadTmpResult(path string) ([]byte, error) {
|
|
||||||
return os.ReadFile(path)
|
|
||||||
}
|
|
||||||
|
|
||||||
func deleteTmpResult(path string) {
|
|
||||||
if path == "" {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
_ = os.Remove(path)
|
|
||||||
}
|
|
||||||
|
|
||||||
@@ -146,46 +146,17 @@ func (w *asyncWorker) handleOne(ctx context.Context, t *entity.AsynchTask) {
|
|||||||
"inputRef": t.InputRef,
|
"inputRef": t.InputRef,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var (
|
data, err := InvokeModel(ctx, m, payload)
|
||||||
data []byte
|
if err != nil {
|
||||||
contentType string
|
_ = dao.Task.UpdateFailedGlobal(ctx, t.Id, err.Error())
|
||||||
ext string
|
return
|
||||||
)
|
|
||||||
|
|
||||||
// phase=1 表示模型已成功但 OSS 上传失败:优先从临时文件加载,避免重复跑模型
|
|
||||||
if t.Phase == 1 && strings.TrimSpace(t.TmpFile) != "" {
|
|
||||||
data, err = loadTmpResult(t.TmpFile)
|
|
||||||
if err == nil && len(data) > 0 {
|
|
||||||
contentType, ext = DetectFileType(data)
|
|
||||||
} else {
|
|
||||||
// 临时文件不可用:回退重新调用模型
|
|
||||||
data = nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if data == nil {
|
|
||||||
// 统计:仅在真正请求模型时 +1(OSS 重试不计入)
|
|
||||||
_ = dao.Stat.IncRequestCount(ctx, time.Now(), int64(t.TenantId), t.Creator, t.ModelName)
|
|
||||||
|
|
||||||
data, err = InvokeModel(ctx, m, payload)
|
|
||||||
if err != nil {
|
|
||||||
_ = dao.Task.UpdateFailedGlobal(ctx, t.Id, err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
contentType, ext = DetectFileType(data)
|
|
||||||
// 将模型输出写入临时文件,后续若 OSS 失败可只重试 OSS
|
|
||||||
tmpPath, err := saveTmpResult(t.TaskID, data, ext)
|
|
||||||
if err == nil && tmpPath != "" {
|
|
||||||
t.TmpFile = tmpPath
|
|
||||||
t.Phase = 1
|
|
||||||
_ = dao.Task.UpdateTmpAfterModelGlobal(ctx, t.Id, tmpPath)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4) 存储 OSS
|
// 4) 存储 OSS/MinIO
|
||||||
|
contentType, ext := DetectFileType(data)
|
||||||
ossURL, err := Storage.UploadByTask(ctx, t, data, ext, contentType)
|
ossURL, err := Storage.UploadByTask(ctx, t, data, ext, contentType)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// OSS 阶段失败:保留临时文件,下一轮仅重试 OSS
|
_ = dao.Task.UpdateFailedGlobal(ctx, t.Id, err.Error())
|
||||||
_ = dao.Task.UpdateFailedKeepTmpGlobal(ctx, t.Id, err.Error())
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -199,8 +170,6 @@ func (w *asyncWorker) handleOne(ctx context.Context, t *entity.AsynchTask) {
|
|||||||
g.Log().Errorf(ctx, "[worker] update success failed: %v", err)
|
g.Log().Errorf(ctx, "[worker] update success failed: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// 成功后清理临时文件
|
|
||||||
deleteTmpResult(t.TmpFile)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *asyncWorker) rollbackToPending(ctx context.Context, id int64) error {
|
func (w *asyncWorker) rollbackToPending(ctx context.Context, id int64) error {
|
||||||
|
|||||||
244
update.sql
244
update.sql
@@ -1,194 +1,138 @@
|
|||||||
-- model-asynch 三张核心表(pgsql)
|
-- -----------------------王立钊2026-04-22 10:00:00-----------------------
|
||||||
-- 1) asynch_models:模型配置
|
|
||||||
-- 2) asynch_task:异步任务
|
|
||||||
-- 3) asynch_op_log:操作日志(统计用)
|
|
||||||
-- 4) asynch_model_stat:按天模型请求统计(限流/监控用)
|
|
||||||
|
|
||||||
-- =========================
|
--------------------pgsql创建 asynch_models / asynch_task 表语句---------------------------
|
||||||
-- 1) asynch_models
|
|
||||||
-- =========================
|
-- 异步模型表
|
||||||
CREATE TABLE IF NOT EXISTS asynch_models (
|
CREATE TABLE IF NOT EXISTS asynch_models (
|
||||||
-- 基础字段(与现有表保持一致)
|
-- 基础字段(与现有表保持一致)
|
||||||
id BIGINT PRIMARY KEY, -- 主键ID(非自增)
|
id BIGINT PRIMARY KEY, -- 主键ID(非自增)
|
||||||
tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID
|
tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID
|
||||||
creator VARCHAR(64) NOT NULL,
|
creator VARCHAR(64) NOT NULL,
|
||||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
updater VARCHAR(64) NOT NULL,
|
updater VARCHAR(64) NOT NULL,
|
||||||
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
deleted_at TIMESTAMP(6),
|
deleted_at timestamp(6),
|
||||||
|
|
||||||
-- 业务字段
|
-- 业务字段
|
||||||
model_name VARCHAR(128) NOT NULL, -- 模型名称(路由键)
|
model_name VARCHAR(128) NOT NULL, -- 模型名称(路由键)
|
||||||
base_url VARCHAR(256) NOT NULL, -- 模型服务基础地址(如 http://1.2.3.4:8080)
|
base_url VARCHAR(256) NOT NULL, -- 模型服务基础地址(如 http://1.2.3.4:8080)
|
||||||
route VARCHAR(256) NOT NULL DEFAULT '',-- 模型服务路由(如 /v1/infer)
|
route VARCHAR(256) NOT NULL DEFAULT '', -- 模型服务路由(如 /v1/infer)
|
||||||
http_method VARCHAR(8) NOT NULL DEFAULT 'POST', -- 请求方式:GET/POST
|
http_method VARCHAR(8) NOT NULL DEFAULT 'POST', -- 请求方式:GET/POST
|
||||||
api_key VARCHAR(1024) DEFAULT '', -- 请求头绑定(支持多个,逗号分隔):X-API-Key:xxx,operation:true
|
api_key VARCHAR(256) DEFAULT '', -- 请求密钥绑定(请求头),示例:TTS_API_KEY:your-key
|
||||||
|
enabled SMALLINT NOT NULL DEFAULT 1, -- 是否启用:1启用/0停用
|
||||||
enabled SMALLINT NOT NULL DEFAULT 1, -- 是否启用:1启用/0停用
|
max_concurrency INT NOT NULL DEFAULT 10, -- 单模型最大并发(worker/队列消费侧应遵守)
|
||||||
max_concurrency INT NOT NULL DEFAULT 10, -- 单模型最大并发
|
queue_limit INT NOT NULL DEFAULT 1000, -- 单模型排队上限(防堆积,可选)
|
||||||
queue_limit INT NOT NULL DEFAULT 1000, -- 排队上限(近似控制)
|
timeout_ms INT NOT NULL DEFAULT 60000, -- 调用模型服务超时(毫秒)
|
||||||
timeout_seconds INT NOT NULL DEFAULT 60, -- 调用模型服务超时(秒)
|
retry_times SMALLINT NOT NULL DEFAULT 0, -- 失败重试次数(0表示不重试)
|
||||||
|
auto_clean_seconds INT NOT NULL DEFAULT 86400, -- 已下载(state=4)后的保留时间(秒),到期后清理
|
||||||
retry_times SMALLINT NOT NULL DEFAULT 0, -- 失败后最多再重试 N 次(不含首次)
|
remark TEXT DEFAULT '' -- 备注
|
||||||
retry_queue_max_seconds INT NOT NULL DEFAULT 0, -- 失败重试最大排队时间(秒):0=插队到队首;>0=排队超过该时间后插队,否则仍到队尾
|
|
||||||
|
|
||||||
auto_clean_seconds INT NOT NULL DEFAULT 86400, -- 已下载(state=4)后的保留时间(秒)
|
|
||||||
remark TEXT DEFAULT '' -- 备注
|
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_models_tenant_model_name
|
-- 索引/约束
|
||||||
ON asynch_models(tenant_id, model_name);
|
CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_models_tenant_model_name ON asynch_models(tenant_id, model_name);
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_models_tenant_id ON asynch_models(tenant_id);
|
CREATE INDEX IF NOT EXISTS idx_asynch_models_tenant_id ON asynch_models(tenant_id);
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_models_model_name ON asynch_models(model_name);
|
CREATE INDEX IF NOT EXISTS idx_asynch_models_model_name ON asynch_models(model_name);
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_models_enabled ON asynch_models(enabled);
|
CREATE INDEX IF NOT EXISTS idx_asynch_models_enabled ON asynch_models(enabled);
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_models_deleted_at ON asynch_models(deleted_at);
|
CREATE INDEX IF NOT EXISTS idx_asynch_models_deleted_at ON asynch_models(deleted_at);
|
||||||
|
|
||||||
|
-- 表和字段注释
|
||||||
COMMENT ON TABLE asynch_models IS '异步模型表(模型服务配置)';
|
COMMENT ON TABLE asynch_models IS '异步模型表(模型服务配置)';
|
||||||
|
COMMENT ON COLUMN asynch_models.id IS '主键ID(非自增)';
|
||||||
|
COMMENT ON COLUMN asynch_models.tenant_id IS '租户ID';
|
||||||
|
COMMENT ON COLUMN asynch_models.creator IS '创建人';
|
||||||
|
COMMENT ON COLUMN asynch_models.created_at IS '创建时间';
|
||||||
|
COMMENT ON COLUMN asynch_models.updater IS '更新人';
|
||||||
|
COMMENT ON COLUMN asynch_models.updated_at IS '更新时间';
|
||||||
|
COMMENT ON COLUMN asynch_models.deleted_at IS '删除时间(软删)';
|
||||||
COMMENT ON COLUMN asynch_models.model_name IS '模型名称(路由键)';
|
COMMENT ON COLUMN asynch_models.model_name IS '模型名称(路由键)';
|
||||||
COMMENT ON COLUMN asynch_models.api_key IS '请求头绑定(支持多个,逗号分隔):X-API-Key:xxx,operation:true';
|
COMMENT ON COLUMN asynch_models.base_url IS '模型服务基础地址';
|
||||||
COMMENT ON COLUMN asynch_models.retry_times IS '失败后最多再重试 N 次(不含首次)';
|
COMMENT ON COLUMN asynch_models.route IS '模型服务路由';
|
||||||
COMMENT ON COLUMN asynch_models.retry_queue_max_seconds IS '失败重试最大排队时间(秒):0=插队到队首;>0=排队超过该时间后插队,否则仍到队尾';
|
COMMENT ON COLUMN asynch_models.http_method IS '请求方式:GET/POST';
|
||||||
COMMENT ON COLUMN asynch_models.auto_clean_seconds IS '已下载(state=4)后的保留时间(秒),到期清理';
|
COMMENT ON COLUMN asynch_models.api_key IS '请求密钥绑定(请求头),示例:TTS_API_KEY:your-key';
|
||||||
|
COMMENT ON COLUMN asynch_models.enabled IS '是否启用:1启用/0停用';
|
||||||
|
COMMENT ON COLUMN asynch_models.max_concurrency IS '单模型最大并发';
|
||||||
|
COMMENT ON COLUMN asynch_models.queue_limit IS '单模型排队上限';
|
||||||
|
COMMENT ON COLUMN asynch_models.timeout_ms IS '调用模型服务超时(毫秒)';
|
||||||
|
COMMENT ON COLUMN asynch_models.retry_times IS '失败重试次数';
|
||||||
|
COMMENT ON COLUMN asynch_models.auto_clean_seconds IS '已下载(state=4)后的保留时间(秒),到期后清理';
|
||||||
|
COMMENT ON COLUMN asynch_models.remark IS '备注';
|
||||||
|
|
||||||
|
-- 兼容已有库:更新字段注释
|
||||||
|
COMMENT ON COLUMN asynch_models.auto_clean_seconds IS '已下载(state=4)后的保留时间(秒),到期后清理';
|
||||||
|
|
||||||
|
|
||||||
-- =========================
|
-- 异步任务表
|
||||||
-- 2) asynch_task
|
|
||||||
-- =========================
|
|
||||||
CREATE TABLE IF NOT EXISTS asynch_task (
|
CREATE TABLE IF NOT EXISTS asynch_task (
|
||||||
-- 基础字段(与现有表保持一致)
|
-- 基础字段(与现有表保持一致)
|
||||||
id BIGINT PRIMARY KEY, -- 主键ID(非自增)
|
id BIGINT PRIMARY KEY, -- 主键ID(非自增)
|
||||||
tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID
|
tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID
|
||||||
creator VARCHAR(64) NOT NULL,
|
creator VARCHAR(64) NOT NULL,
|
||||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
updater VARCHAR(64) NOT NULL,
|
updater VARCHAR(64) NOT NULL,
|
||||||
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
deleted_at TIMESTAMP(6),
|
deleted_at timestamp(6),
|
||||||
|
|
||||||
-- 任务核心字段
|
-- 任务核心字段
|
||||||
model_name VARCHAR(128) NOT NULL, -- 模型名称
|
model_name VARCHAR(128) NOT NULL, -- 模型名称
|
||||||
task_id VARCHAR(64) NOT NULL, -- 任务ID(对外返回)
|
task_id VARCHAR(64) NOT NULL, -- 任务ID(对外返回)
|
||||||
state SMALLINT NOT NULL DEFAULT 0, -- 0排队中/1执行中/2成功/3失败/4已下载
|
state SMALLINT NOT NULL DEFAULT 0, -- 状态:0排队中/1执行中/2成功/3失败/4已下载
|
||||||
|
oss_file VARCHAR(512) DEFAULT '', -- OSS文件URL/Key
|
||||||
|
file_type VARCHAR(32) DEFAULT '', -- 文件类型(如 mp3/mp4/png/pdf/...)
|
||||||
|
file_size BIGINT NOT NULL DEFAULT 0, -- 文件大小(字节)
|
||||||
|
error_msg TEXT DEFAULT '', -- 错误信息
|
||||||
|
|
||||||
oss_file VARCHAR(512) DEFAULT '', -- 结果文件OSS地址
|
-- 执行与清理字段
|
||||||
file_type VARCHAR(32) DEFAULT '', -- 文件类型(mp3/mp4/png/...)
|
started_at TIMESTAMP, -- 开始执行时间
|
||||||
file_size BIGINT NOT NULL DEFAULT 0, -- 文件大小(字节)
|
finished_at TIMESTAMP, -- 执行结束时间
|
||||||
error_msg TEXT DEFAULT '', -- 错误信息
|
expire_at TIMESTAMP, -- 过期清理时间(已下载后写入:updated_at + auto_clean_seconds)
|
||||||
|
retry_count INT NOT NULL DEFAULT 0, -- 已重试次数(不含首次)
|
||||||
|
enqueue_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 入队时间(用于排队顺序,重试会更新时间到队尾)
|
||||||
|
|
||||||
started_at TIMESTAMP, -- 开始执行时间
|
-- 输入信息(用于排障/重放;如不需要可不写入或置空)
|
||||||
finished_at TIMESTAMP, -- 执行结束时间
|
input_ref TEXT DEFAULT '', -- 输入引用(如上传文件URL/OSS key/业务侧资源ID)
|
||||||
|
request_payload JSONB -- 请求参数(可选)
|
||||||
expire_at TIMESTAMP, -- state=4 后写入,用于清理
|
|
||||||
|
|
||||||
-- 重试/排队
|
|
||||||
retry_count INT NOT NULL DEFAULT 0, -- 已重试次数(不含首次)
|
|
||||||
enqueue_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, -- 入队时间(用于排队顺序)
|
|
||||||
|
|
||||||
-- 任务执行阶段:用于区分“重试模型”与“仅重试 OSS”
|
|
||||||
phase SMALLINT NOT NULL DEFAULT 0, -- 0模型阶段/1OSS阶段
|
|
||||||
tmp_file TEXT DEFAULT '', -- 临时结果文件路径(phase=1 时仅重试 OSS 上传)
|
|
||||||
|
|
||||||
-- 输入信息(可选)
|
|
||||||
input_ref TEXT DEFAULT '', -- 输入引用(如OSS/业务资源ID等)
|
|
||||||
request_payload JSONB -- 请求参数(可选)
|
|
||||||
);
|
);
|
||||||
|
|
||||||
CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_task_tenant_task_id
|
-- 索引/约束
|
||||||
ON asynch_task(tenant_id, task_id);
|
CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_task_tenant_task_id ON asynch_task(tenant_id, task_id);
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_task_tenant_id ON asynch_task(tenant_id);
|
CREATE INDEX IF NOT EXISTS idx_asynch_task_tenant_id ON asynch_task(tenant_id);
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_task_model_name ON asynch_task(model_name);
|
CREATE INDEX IF NOT EXISTS idx_asynch_task_model_name ON asynch_task(model_name);
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_task_state ON asynch_task(state);
|
CREATE INDEX IF NOT EXISTS idx_asynch_task_state ON asynch_task(state);
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_task_enqueue_at ON asynch_task(enqueue_at);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_task_updated_at ON asynch_task(updated_at);
|
CREATE INDEX IF NOT EXISTS idx_asynch_task_updated_at ON asynch_task(updated_at);
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_task_expire_at ON asynch_task(expire_at);
|
CREATE INDEX IF NOT EXISTS idx_asynch_task_expire_at ON asynch_task(expire_at);
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_task_deleted_at ON asynch_task(deleted_at);
|
CREATE INDEX IF NOT EXISTS idx_asynch_task_deleted_at ON asynch_task(deleted_at);
|
||||||
|
|
||||||
|
-- 表和字段注释
|
||||||
COMMENT ON TABLE asynch_task IS '异步任务表';
|
COMMENT ON TABLE asynch_task IS '异步任务表';
|
||||||
COMMENT ON COLUMN asynch_task.state IS '0排队中/1执行中/2成功/3失败/4已下载';
|
COMMENT ON COLUMN asynch_task.id IS '主键ID(非自增)';
|
||||||
|
COMMENT ON COLUMN asynch_task.tenant_id IS '租户ID';
|
||||||
|
COMMENT ON COLUMN asynch_task.creator IS '创建人';
|
||||||
|
COMMENT ON COLUMN asynch_task.created_at IS '创建时间';
|
||||||
|
COMMENT ON COLUMN asynch_task.updater IS '更新人';
|
||||||
|
COMMENT ON COLUMN asynch_task.updated_at IS '更新时间';
|
||||||
|
COMMENT ON COLUMN asynch_task.deleted_at IS '删除时间(软删)';
|
||||||
|
COMMENT ON COLUMN asynch_task.model_name IS '模型名称';
|
||||||
|
COMMENT ON COLUMN asynch_task.task_id IS '任务ID(对外返回)';
|
||||||
|
COMMENT ON COLUMN asynch_task.state IS '状态:0排队中/1执行中/2成功/3失败/4已下载';
|
||||||
|
COMMENT ON COLUMN asynch_task.oss_file IS 'OSS文件URL/Key';
|
||||||
|
COMMENT ON COLUMN asynch_task.file_type IS '文件类型';
|
||||||
|
COMMENT ON COLUMN asynch_task.file_size IS '文件大小(字节)';
|
||||||
|
COMMENT ON COLUMN asynch_task.error_msg IS '错误信息';
|
||||||
|
COMMENT ON COLUMN asynch_task.started_at IS '开始执行时间';
|
||||||
|
COMMENT ON COLUMN asynch_task.finished_at IS '执行结束时间';
|
||||||
|
COMMENT ON COLUMN asynch_task.expire_at IS '过期清理时间(已下载后保留到期)';
|
||||||
COMMENT ON COLUMN asynch_task.retry_count IS '已重试次数(不含首次)';
|
COMMENT ON COLUMN asynch_task.retry_count IS '已重试次数(不含首次)';
|
||||||
COMMENT ON COLUMN asynch_task.enqueue_at IS '入队时间(用于排队顺序)';
|
COMMENT ON COLUMN asynch_task.enqueue_at IS '入队时间(用于排队顺序,重试会更新时间到队尾)';
|
||||||
COMMENT ON COLUMN asynch_task.phase IS '执行阶段:0模型阶段/1OSS阶段(模型已成功,等待上传OSS)';
|
COMMENT ON COLUMN asynch_task.input_ref IS '输入引用(如上传文件URL/OSS key/业务侧资源ID)';
|
||||||
COMMENT ON COLUMN asynch_task.tmp_file IS '临时结果文件路径(phase=1 时仅重试 OSS 上传)';
|
COMMENT ON COLUMN asynch_task.request_payload IS '请求参数(JSON)';
|
||||||
|
|
||||||
|
-- 兼容已有库:增量加字段(PostgreSQL 支持 IF NOT EXISTS)
|
||||||
|
ALTER TABLE asynch_task ADD COLUMN IF NOT EXISTS retry_count INT NOT NULL DEFAULT 0;
|
||||||
|
ALTER TABLE asynch_task ADD COLUMN IF NOT EXISTS enqueue_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP;
|
||||||
|
COMMENT ON COLUMN asynch_task.retry_count IS '已重试次数(不含首次)';
|
||||||
|
COMMENT ON COLUMN asynch_task.enqueue_at IS '入队时间(用于排队顺序,重试会更新时间到队尾)';
|
||||||
|
COMMENT ON COLUMN asynch_task.state IS '状态:0排队中/1执行中/2成功/3失败/4已下载';
|
||||||
|
COMMENT ON COLUMN asynch_task.expire_at IS '过期清理时间(已下载后保留到期)';
|
||||||
|
|
||||||
-- =========================
|
-- 对存量数据进行入队时间回填(只在 enqueue_at 为空时)
|
||||||
-- 3) asynch_op_log
|
UPDATE asynch_task SET enqueue_at = created_at WHERE enqueue_at IS NULL;
|
||||||
-- =========================
|
|
||||||
CREATE TABLE IF NOT EXISTS asynch_op_log (
|
|
||||||
-- 基础字段(与现有表保持一致)
|
|
||||||
id BIGINT PRIMARY KEY,
|
|
||||||
tenant_id BIGINT NOT NULL DEFAULT 0,
|
|
||||||
creator VARCHAR(64) NOT NULL,
|
|
||||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
updater VARCHAR(64) NOT NULL,
|
|
||||||
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
deleted_at TIMESTAMP(6),
|
|
||||||
|
|
||||||
-- 基础审计信息
|
|
||||||
ip VARCHAR(64) DEFAULT '',
|
|
||||||
user_agent VARCHAR(256) DEFAULT '',
|
|
||||||
api_path VARCHAR(256) DEFAULT '',
|
|
||||||
http_method VARCHAR(16) DEFAULT '',
|
|
||||||
|
|
||||||
-- 业务信息
|
|
||||||
biz_name VARCHAR(128) NOT NULL DEFAULT '', -- 调用方业务模块/系统
|
|
||||||
model_name VARCHAR(128) NOT NULL DEFAULT '',
|
|
||||||
task_id VARCHAR(64) NOT NULL DEFAULT '',
|
|
||||||
|
|
||||||
-- 统计字段
|
|
||||||
op_type VARCHAR(64) NOT NULL DEFAULT 'createTask', -- 操作类型(默认创建任务)
|
|
||||||
success SMALLINT NOT NULL DEFAULT 1, -- 1成功/0失败
|
|
||||||
error_msg TEXT DEFAULT '',
|
|
||||||
cost_ms BIGINT NOT NULL DEFAULT 0, -- 耗时(毫秒)
|
|
||||||
|
|
||||||
-- 请求/响应 JSON(用于后期统计分析)
|
|
||||||
request_payload JSONB,
|
|
||||||
response_payload JSONB
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_tenant_time ON asynch_op_log(tenant_id, created_at);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_model_name ON asynch_op_log(model_name);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_biz_name ON asynch_op_log(biz_name);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_task_id ON asynch_op_log(task_id);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_op_type ON asynch_op_log(op_type);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_op_log_deleted_at ON asynch_op_log(deleted_at);
|
|
||||||
|
|
||||||
COMMENT ON TABLE asynch_op_log IS '操作记录日志表(创建任务等,用于统计)';
|
|
||||||
COMMENT ON COLUMN asynch_op_log.biz_name IS '业务名称(调用方模块/系统)';
|
|
||||||
COMMENT ON COLUMN asynch_op_log.model_name IS '模型名称';
|
|
||||||
COMMENT ON COLUMN asynch_op_log.task_id IS '任务ID';
|
|
||||||
COMMENT ON COLUMN asynch_op_log.op_type IS '操作类型(如 createTask/getTaskResult/getTaskBatch 等)';
|
|
||||||
COMMENT ON COLUMN asynch_op_log.success IS '是否成功:1成功/0失败';
|
|
||||||
COMMENT ON COLUMN asynch_op_log.error_msg IS '错误信息(失败时)';
|
|
||||||
COMMENT ON COLUMN asynch_op_log.cost_ms IS '耗时(毫秒)';
|
|
||||||
COMMENT ON COLUMN asynch_op_log.request_payload IS '请求 JSON';
|
|
||||||
COMMENT ON COLUMN asynch_op_log.response_payload IS '响应 JSON';
|
|
||||||
|
|
||||||
|
|
||||||
-- =========================
|
|
||||||
-- 4) asynch_model_stat
|
|
||||||
-- =========================
|
|
||||||
CREATE TABLE IF NOT EXISTS asynch_model_stat (
|
|
||||||
day DATE NOT NULL, -- 天(YYYY-MM-DD)
|
|
||||||
tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID
|
|
||||||
creator VARCHAR(64) NOT NULL DEFAULT '', -- 创建人
|
|
||||||
model_name VARCHAR(128) NOT NULL DEFAULT '', -- 模型名称
|
|
||||||
request_count BIGINT NOT NULL DEFAULT 0, -- 请求次数
|
|
||||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
||||||
PRIMARY KEY(day, tenant_id, creator, model_name)
|
|
||||||
);
|
|
||||||
|
|
||||||
-- 便于时间段/租户/人/模型过滤
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_model_stat_tenant_day ON asynch_model_stat(tenant_id, day);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_model_stat_day ON asynch_model_stat(day);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_model_stat_model_name ON asynch_model_stat(model_name);
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_asynch_model_stat_creator ON asynch_model_stat(creator);
|
|
||||||
|
|
||||||
COMMENT ON TABLE asynch_model_stat IS '按天模型请求统计(用于限流/监控)';
|
|
||||||
COMMENT ON COLUMN asynch_model_stat.day IS '天(YYYY-MM-DD)';
|
|
||||||
COMMENT ON COLUMN asynch_model_stat.tenant_id IS '租户ID';
|
|
||||||
COMMENT ON COLUMN asynch_model_stat.creator IS '创建人';
|
|
||||||
COMMENT ON COLUMN asynch_model_stat.model_name IS '模型名称';
|
|
||||||
COMMENT ON COLUMN asynch_model_stat.request_count IS '请求次数';
|
|
||||||
COMMENT ON COLUMN asynch_model_stat.created_at IS '创建时间';
|
|
||||||
COMMENT ON COLUMN asynch_model_stat.updated_at IS '更新时间';
|
|
||||||
|
|||||||
Reference in New Issue
Block a user