4e6b98b7d365400f6791656c2a5e794294571053
- 新增统计控制器、服务层与数据访问层,提供按天统计接口 - 在 worker 处理任务时原子累加请求计数(仅实际调用模型时计数) - 更新数据库表结构,添加 asynch_model_stat 表及索引 - 更新文档说明统计功能的使用方式与统计口径
model-asynch(模型异步中间件)
一个独立的异步中间件服务:按模型配置路由调用不同模型服务,统一生成 task_id,后台异步执行,结果上传 OSS,并提供查询/批量领取/自动重试/自动清理能力,便于业务方“拿走结果并转移”。
分支约定:
dev为开发分支;main(或 master)为线上主分支。
1. 核心功能
1.1 模型配置(asynch_models)
- 增删改查模型服务配置(
model_name唯一标识) - 支持配置:
- 请求地址:
base_url + route - 请求方式:
http_method(GET/POST) - 请求密钥:
api_key(以请求头注入,示例:TTS_API_KEY:your-key) - 超时:
timeout_seconds - 并发:
max_concurrency(按租户+模型的 Redis 分布式信号量限流) - 重试:
retry_times(失败后最多再重试 N 次) - 保留:
auto_clean_seconds(任务被业务领取到state=4后的保留秒数,到期清理)
- 请求地址:
1.2 异步任务(asynch_task)
- 创建任务:生成
task_id,入库排队 - 后台 Worker:
- PostgreSQL
FOR UPDATE SKIP LOCKED抢占任务,支持多实例不重复消费 - 调用模型服务(GET/POST)
- 结果上传 OSS(调用你们的 OSS 文件服务
oss/file/uploadFile,透传Authorization/X-User-Info)
- PostgreSQL
- 批量领取结果:批量查询
task_id列表,返回task_id/state/oss_file,并把成功的任务从state=2更新为state=4 - 自动重试:失败
state=3会由清理器按retry_times重新入队到队尾(保证先来后到) - 自动清理:
state=4且expire_at到期 → 硬删除任务(并尝试删除 OSS)- 失败重试耗尽仍失败 → 硬删除任务(并尝试删除 OSS)
state=0/1超时 → 标记失败(防止卡死)
1.3 统计(asynch_model_stat)
- 按天统计:
day + tenant_id + creator + model_name -> request_count - 统计口径:仅在 Worker 真正调用模型服务时计数(OSS 重试不计数)
- 用途:给其他服务提供全局限流/监控依据(分布式场景下通过数据库 UPSERT 原子累加保证一致性)
2. 使用流程(业务方如何接入)
2.1 第一步:配置模型
调用“模型管理”接口新增模型配置(例:TTS):
model_name=ttsbase_url=http://xxx:portroute=/ttshttp_method=POSTapi_key=TTS_API_KEY:your-key(可选)
2.2 第二步:创建任务拿 task_id
业务方调用 CreateTask,传 modelName + requestPayload,中间件返回 task_id。
业务方把 task_id 落到自己的业务表(状态=生成中)。
2.3 第三步:业务方领取结果(推荐批量)
业务方在自己的“定时任务服务/轮询器”中,批量把 task_id 列表传给中间件:
- 返回每个任务的
state + oss_file - 对
state=2(成功)的任务,中间件会更新为state=4(已下载)并写入expire_at = now + auto_clean_seconds
业务方拿到 oss_file 后做“业务转移”:
- 方案 A:直接在业务表保存
oss_file作为最终资源地址 - 方案 B:业务侧下载后重新上传到业务自己的资产域,再保存新地址
state=4的数据允许重复获取,避免业务侧偶发中断导致“领取不到结果”。
2.4 获取统计(用于业务侧限流/监控)
业务方可调用统计接口按时间段获取请求次数(默认分页 10 条):
/stat/listModelStat:支持startDay/endDay/tenantId/creator/modelName条件筛选
3. 状态机说明(asynch_task.state)
| state | 含义 | 产生方 |
|---|---|---|
| 0 | 排队中 | 创建任务/重试入队 |
| 1 | 执行中 | Worker 抢占后 |
| 2 | 成功(已上传 OSS) | Worker |
| 3 | 失败 | Worker / 超时处理 |
| 4 | 已下载(已领取) | 批量领取接口(2→4) |
字段补充:
retry_count:已重试次数(不含首次)enqueue_at:入队时间(用于排队顺序,重试会更新为 NOW() 放到队尾)expire_at:仅对state=4生效,表示保留到期时间
4. 配置说明(config.yml)
关键配置:
database.default: PostgreSQL 连接redis.default: Redis 连接(并发令牌、可扩展用途)asynch.worker.enabled: 是否启动后台 workerasynch.worker.pollInterval: 轮询间隔asynch.worker.batchSize: 单次抢占数量asynch.worker.goroutines: worker 协程池并发数asynch.worker.taskTimeout: state=0/1 卡死兜底超时asynch.cleaner.enabled: 是否启动清理器asynch.cleaner.interval: 清理器扫描间隔
5. 数据库初始化/升级
项目根目录提供 update.sql:
- 首次部署:执行建表 SQL
- 升级:执行
ALTER TABLE ... ADD COLUMN IF NOT EXISTS ...的增量语句
6. 接口文档
更详细接口示例见:docs/api.md(包含模型管理、任务接口、批量领取接口、字段说明)。
7. 开发与发布建议(Git)
dev:日常开发与联调main:线上稳定分支- 推荐流程:
- 从
main拉出dev - 功能完成后提 MR/PR 合并回
main - 打 tag / 发布镜像
- 从
Description
Languages
Go
99%
Dockerfile
1%