Files
model-asynch/README.md

5.8 KiB
Raw Permalink Blame History

model-asynch模型异步中间件

一个独立的异步中间件服务:按模型配置路由调用不同模型服务,统一生成 task_id,后台异步执行,结果上传 OSS并提供查询/批量领取/自动重试/自动清理能力,便于业务方“拿走结果并转移”。

分支约定:dev 为开发分支;main(或 master为线上主分支。


1. 核心功能

1.1 模型配置asynch_models

  • 增删改查模型服务配置(model_name 唯一标识)
  • 支持配置:
    • 请求地址:base_url + route
    • 请求方式:http_methodGET/POST
    • 请求密钥:api_key(以请求头注入,支持多个 header
    • 超时:timeout_seconds
    • 并发:max_concurrency(按租户+模型的 Redis 分布式信号量限流)
    • 重试:retry_times(失败后最多再重试 N 次)
    • 保留:auto_clean_seconds(任务被业务领取到 state=4 后的保留秒数,到期清理)

1.2 异步任务asynch_task

  • 创建任务:生成 task_id,入库排队
  • 后台 Worker
    • PostgreSQL FOR UPDATE SKIP LOCKED 抢占任务,支持多实例不重复消费
    • 调用模型服务GET/POST
    • 结果上传 OSS调用你们的 OSS 文件服务 oss/file/uploadFile,透传 Authorization/X-User-Info
  • 批量领取结果:批量查询 task_id 列表,返回 task_id/state/oss_file,并把成功的任务从 state=2 更新为 state=4
  • 自动重试:失败 state=3 会由清理器按 retry_times 重新入队到队尾
  • 自动清理:
    • state=4expire_at 到期 → 硬删除任务
    • 失败重试耗尽仍失败 → 硬删除任务
    • state=0/1 超时 → 标记失败(防止卡死)

1.3 统计asynch_model_stat

  • 按天统计:day + tenant_id + creator + model_name -> request_count
  • 统计口径:仅在 Worker 真正调用模型服务时计数OSS 重试不计数)
  • 用途:给其他服务提供全局限流/监控依据

2. 使用流程(业务方如何接入)

第一步:创建模型配置

业务方(或运维)先在中间件里创建/更新模型配置(model_name 为唯一键),例如:

  • POST /model/createModel(或 /model/updateModel

请求示例JSON

{
  "modelName": "model-service",
  "baseUrl": "http://127.0.0.1:8000",
  "route": "/api/v1/chat",
  "httpMethod": "POST",
  "apiKey": "API_KEY:model-key,API_STATE:true,API_NUM:123",
  "enabled": 1,
  "maxConcurrency": 5,
  "queueLimit": 20,
  "timeoutSeconds": 1800,
  "retryTimes": 3,
  "retryQueueMaxSeconds": 600,
  "autoCleanSeconds": 3600,
  "remark": "Model-Service 模型服务"
}

参数说明:

  • modelName:模型名称(唯一标识/路由键)
  • baseUrl模型服务地址Base URL
  • route:模型服务路由(拼接到 baseUrl 后)
  • httpMethod请求方式GET/POST
  • apiKey:请求头绑定(支持多个 header逗号分隔格式 Key:Value;布尔/数字也会以字符串形式注入 header
  • enabled是否启用0禁用/1启用
  • maxConcurrency:单模型最大并发(按租户+模型维度限流)
  • queueLimit:排队上限(近似控制,超过则拒绝创建)
  • timeoutSeconds:调用模型服务超时(秒)
  • retryTimes:失败后最多再重试 N 次(不含首次)
  • retryQueueMaxSeconds失败重试最大排队时间0 表示重试插队到队首;>0 表示排队超过该时间后插队,否则仍到队尾
  • autoCleanSeconds:任务被领取到 state=4 后的保留时间(秒),到期清理
  • remark:备注说明

第二步:创建任务拿到 task_id

业务方发起推理请求时调用:

  • POST /task/createTask(传 modelName + requestPayload (+ bizName)
  • 中间件返回 task_id
  • 业务方将 task_id 落到自己的业务表,并把业务状态置为「生成中」

第三步:同步任务进度(推荐批量)

业务方通过轮询/定时任务同步进度:

  • 推荐:POST /task/getTaskBatch(批量传 taskIds,返回每个任务的 state + oss_file
  • 或单条:GET /task/getTaskResult?taskId=...

业务侧拿到 oss_file 后自行做资源处理(直接保存或转存),并把业务状态更新为「成功/失败」。

说明:批量接口对 state=2(成功) 的任务会自动标记为 state=4(已下载) 并写入 expire_at,用于后续清理。


3. 状态机说明asynch_task.state

state 含义 产生方
0 排队中 创建任务/重试入队
1 执行中 Worker 抢占后
2 成功(已上传 OSS Worker
3 失败 Worker / 超时处理
4 已下载(已领取) 批量领取接口2→4

字段补充:

  • retry_count:已重试次数(不含首次)
  • enqueue_at:入队时间(用于排队顺序,重试会更新为 NOW() 放到队尾)
  • expire_at:仅对 state=4 生效,表示保留到期时间

4. 配置说明config.yml

关键配置:

  • database.default: PostgreSQL 连接
  • redis.default: Redis 连接(并发令牌、可扩展用途)
  • asynch.worker.enabled: 是否启动后台 worker
  • asynch.worker.pollInterval: 轮询间隔
  • asynch.worker.batchSize: 单次抢占数量
  • asynch.worker.goroutines: worker 协程池并发数
  • asynch.worker.taskTimeout: state=0/1 卡死兜底超时
  • asynch.cleaner.enabled: 是否启动清理器
  • asynch.cleaner.interval: 清理器扫描间隔

5. 数据库初始化

项目根目录提供 update.sql:首次部署执行建表 SQL。


6. 开发与发布建议Git

  • dev:日常开发与联调
  • main:线上稳定分支
  • 推荐流程:
    1. main 拉出 dev
    2. 功能完成后提 MR/PR 合并回 main
    3. 打 tag / 发布镜像