e79afc8fa37304342ad89a8fc3f98d35188774a5
Reviewed-on: #1
model-asynch(模型异步中间件)
一个独立的异步中间件服务:按模型配置路由调用不同模型服务,统一生成 task_id,后台异步执行,结果上传 OSS,并提供查询/批量领取/自动重试/自动清理能力,便于业务方“拿走结果并转移”。
分支约定:
dev为开发分支;main(或 master)为线上主分支。
1. 核心功能
1.1 模型配置(asynch_models)
- 增删改查模型服务配置(
model_name唯一标识) - 支持配置:
- 请求地址:
base_url + route - 请求方式:
http_method(GET/POST) - 请求密钥:
api_key(以请求头注入,示例:TTS_API_KEY:your-key) - 超时:
timeout_ms - 并发:
max_concurrency(按租户+模型的 Redis 分布式信号量限流) - 重试:
retry_times(失败后最多再重试 N 次) - 保留:
auto_clean_seconds(任务被业务领取到state=4后的保留秒数,到期清理)
- 请求地址:
1.2 异步任务(asynch_task)
- 创建任务:生成
task_id,入库排队 - 后台 Worker:
- PostgreSQL
FOR UPDATE SKIP LOCKED抢占任务,支持多实例不重复消费 - 调用模型服务(GET/POST)
- 结果上传 OSS(调用你们的 OSS 文件服务
oss/file/uploadFile,透传Authorization/X-User-Info)
- PostgreSQL
- 批量领取结果:批量查询
task_id列表,返回task_id/state/oss_file,并把成功的任务从state=2更新为state=4 - 自动重试:失败
state=3会由清理器按retry_times重新入队到队尾(保证先来后到) - 自动清理:
state=4且expire_at到期 → 硬删除任务(并尝试删除 OSS)- 失败重试耗尽仍失败 → 硬删除任务(并尝试删除 OSS)
state=0/1超时 → 标记失败(防止卡死)
2. 使用流程(业务方如何接入)
2.1 第一步:配置模型
调用“模型管理”接口新增模型配置(例:TTS):
model_name=ttsbase_url=http://xxx:portroute=/ttshttp_method=POSTapi_key=TTS_API_KEY:your-key(可选)
2.2 第二步:创建任务拿 task_id
业务方调用 CreateTask,传 modelName + requestPayload,中间件返回 task_id。
业务方把 task_id 落到自己的业务表(状态=生成中)。
2.3 第三步:业务方领取结果(推荐批量)
业务方在自己的“定时任务服务/轮询器”中,批量把 task_id 列表传给中间件:
- 返回每个任务的
state + oss_file - 对
state=2(成功)的任务,中间件会更新为state=4(已下载)并写入expire_at = now + auto_clean_seconds
业务方拿到 oss_file 后做“业务转移”:
- 方案 A:直接在业务表保存
oss_file作为最终资源地址 - 方案 B:业务侧下载后重新上传到业务自己的资产域,再保存新地址
state=4的数据允许重复获取,避免业务侧偶发中断导致“领取不到结果”。
3. 状态机说明(asynch_task.state)
| state | 含义 | 产生方 |
|---|---|---|
| 0 | 排队中 | 创建任务/重试入队 |
| 1 | 执行中 | Worker 抢占后 |
| 2 | 成功(已上传 OSS) | Worker |
| 3 | 失败 | Worker / 超时处理 |
| 4 | 已下载(已领取) | 批量领取接口(2→4) |
字段补充:
retry_count:已重试次数(不含首次)enqueue_at:入队时间(用于排队顺序,重试会更新为 NOW() 放到队尾)expire_at:仅对state=4生效,表示保留到期时间
4. 配置说明(config.yml)
关键配置:
database.default: PostgreSQL 连接redis.default: Redis 连接(并发令牌、可扩展用途)asynch.worker.enabled: 是否启动后台 workerasynch.worker.pollInterval: 轮询间隔asynch.worker.batchSize: 单次抢占数量asynch.worker.goroutines: worker 协程池并发数asynch.worker.taskTimeout: state=0/1 卡死兜底超时asynch.cleaner.enabled: 是否启动清理器asynch.cleaner.interval: 清理器扫描间隔
5. 数据库初始化/升级
项目根目录提供 update.sql:
- 首次部署:执行建表 SQL
- 升级:执行
ALTER TABLE ... ADD COLUMN IF NOT EXISTS ...的增量语句
6. 接口文档
更详细接口示例见:docs/api.md(包含模型管理、任务接口、批量领取接口、字段说明)。
7. 开发与发布建议(Git)
dev:日常开发与联调main:线上稳定分支- 推荐流程:
- 从
main拉出dev - 功能完成后提 MR/PR 合并回
main - 打 tag / 发布镜像
- 从
Description
Languages
Go
99%
Dockerfile
1%