model-asynch(模型异步中间件)
一个独立的异步中间件服务:按模型配置路由调用不同模型服务,统一生成 task_id,后台异步执行,结果上传 OSS,并提供查询/批量领取/自动重试/自动清理能力,便于业务方“拿走结果并转移”。
分支约定:
dev为开发分支;main(或 master)为线上主分支。
1. 核心功能
1.1 模型配置(asynch_models)
- 增删改查模型服务配置(
model_name唯一标识) - 支持配置:
- 请求地址:
base_url + route - 请求方式:
http_method(GET/POST) - 请求密钥:
api_key(以请求头注入,支持多个 header) - 超时:
timeout_seconds - 并发:
max_concurrency(按租户+模型的 Redis 分布式信号量限流) - 重试:
retry_times(失败后最多再重试 N 次) - 保留:
auto_clean_seconds(任务被业务领取到state=4后的保留秒数,到期清理)
- 请求地址:
1.2 异步任务(asynch_task)
- 创建任务:生成
task_id,入库排队 - 后台 Worker:
- PostgreSQL
FOR UPDATE SKIP LOCKED抢占任务,支持多实例不重复消费 - 调用模型服务(GET/POST)
- 结果上传 OSS(调用你们的 OSS 文件服务
oss/file/uploadFile,透传Authorization/X-User-Info)
- PostgreSQL
- 批量领取结果:批量查询
task_id列表,返回task_id/state/oss_file,并把成功的任务从state=2更新为state=4 - 自动重试:失败
state=3会由清理器按retry_times重新入队到队尾 - 自动清理:
state=4且expire_at到期 → 硬删除任务- 失败重试耗尽仍失败 → 硬删除任务
state=0/1超时 → 标记失败(防止卡死)
1.3 统计(asynch_model_stat)
- 按天统计:
day + tenant_id + creator + model_name -> request_count - 统计口径:仅在 Worker 真正调用模型服务时计数(OSS 重试不计数)
- 用途:给其他服务提供全局限流/监控依据
2. 使用流程(业务方如何接入)
第一步:创建模型配置
业务方(或运维)先在中间件里创建/更新模型配置(model_name 为唯一键),例如:
POST /model/createModel(或/model/updateModel)
请求示例(JSON):
{
"modelName": "model-service",
"baseUrl": "http://127.0.0.1:8000",
"route": "/api/v1/chat",
"httpMethod": "POST",
"apiKey": "API_KEY:model-key,API_STATE:true,API_NUM:123",
"enabled": 1,
"maxConcurrency": 5,
"queueLimit": 20,
"timeoutSeconds": 1800,
"retryTimes": 3,
"retryQueueMaxSeconds": 600,
"autoCleanSeconds": 3600,
"remark": "Model-Service 模型服务"
}
参数说明:
modelName:模型名称(唯一标识/路由键)baseUrl:模型服务地址(Base URL)route:模型服务路由(拼接到 baseUrl 后)httpMethod:请求方式(GET/POST)apiKey:请求头绑定(支持多个 header,逗号分隔,格式Key:Value;布尔/数字也会以字符串形式注入 header)enabled:是否启用(0禁用/1启用)maxConcurrency:单模型最大并发(按租户+模型维度限流)queueLimit:排队上限(近似控制,超过则拒绝创建)timeoutSeconds:调用模型服务超时(秒)retryTimes:失败后最多再重试 N 次(不含首次)retryQueueMaxSeconds:失败重试最大排队时间(秒);0 表示重试插队到队首;>0 表示排队超过该时间后插队,否则仍到队尾autoCleanSeconds:任务被领取到state=4后的保留时间(秒),到期清理remark:备注说明
第二步:创建任务拿到 task_id
业务方发起推理请求时调用:
POST /task/createTask(传modelName + requestPayload (+ bizName))- 中间件返回
task_id - 业务方将
task_id落到自己的业务表,并把业务状态置为「生成中」
第三步:同步任务进度(推荐批量)
业务方通过轮询/定时任务同步进度:
- 推荐:
POST /task/getTaskBatch(批量传taskIds,返回每个任务的state + oss_file) - 或单条:
GET /task/getTaskResult?taskId=...
业务侧拿到 oss_file 后自行做资源处理(直接保存或转存),并把业务状态更新为「成功/失败」。
说明:批量接口对
state=2(成功)的任务会自动标记为state=4(已下载)并写入expire_at,用于后续清理。
3. 状态机说明(asynch_task.state)
| state | 含义 | 产生方 |
|---|---|---|
| 0 | 排队中 | 创建任务/重试入队 |
| 1 | 执行中 | Worker 抢占后 |
| 2 | 成功(已上传 OSS) | Worker |
| 3 | 失败 | Worker / 超时处理 |
| 4 | 已下载(已领取) | 批量领取接口(2→4) |
字段补充:
retry_count:已重试次数(不含首次)enqueue_at:入队时间(用于排队顺序,重试会更新为 NOW() 放到队尾)expire_at:仅对state=4生效,表示保留到期时间
4. 配置说明(config.yml)
关键配置:
database.default: PostgreSQL 连接redis.default: Redis 连接(并发令牌、可扩展用途)asynch.worker.enabled: 是否启动后台 workerasynch.worker.pollInterval: 轮询间隔asynch.worker.batchSize: 单次抢占数量asynch.worker.goroutines: worker 协程池并发数asynch.worker.taskTimeout: state=0/1 卡死兜底超时asynch.cleaner.enabled: 是否启动清理器asynch.cleaner.interval: 清理器扫描间隔
5. 数据库初始化
项目根目录提供 update.sql:首次部署执行建表 SQL。
6. 开发与发布建议(Git)
dev:日常开发与联调main:线上稳定分支- 推荐流程:
- 从
main拉出dev - 功能完成后提 MR/PR 合并回
main - 打 tag / 发布镜像
- 从
Description
Languages
Go
99%
Dockerfile
1%