# model-asynch(模型异步中间件) 一个独立的异步中间件服务:按模型配置路由调用不同模型服务,统一生成 `task_id`,后台异步执行,结果上传 OSS,并提供查询/批量领取/自动重试/自动清理能力,便于业务方“拿走结果并转移”。 > 分支约定:`dev` 为开发分支;`main`(或 master)为线上主分支。 --- ## 1. 核心功能 ### 1.1 模型配置(asynch_models) - 增删改查模型服务配置(`model_name` 唯一标识) - 支持配置: - 请求地址:`base_url + route` - 请求方式:`http_method`(GET/POST) - 请求密钥:`api_key`(以请求头注入,支持多个 header) - 超时:`timeout_seconds` - 并发:`max_concurrency`(按租户+模型的 Redis 分布式信号量限流) - 重试:`retry_times`(失败后最多再重试 N 次) - 保留:`auto_clean_seconds`(任务被业务领取到 `state=4` 后的保留秒数,到期清理) ### 1.2 异步任务(asynch_task) - 创建任务:生成 `task_id`,入库排队 - 后台 Worker: - PostgreSQL `FOR UPDATE SKIP LOCKED` 抢占任务,支持多实例不重复消费 - 调用模型服务(GET/POST) - 结果上传 OSS(调用你们的 OSS 文件服务 `oss/file/uploadFile`,透传 `Authorization/X-User-Info`) - 批量领取结果:批量查询 `task_id` 列表,返回 `task_id/state/oss_file`,并把成功的任务从 `state=2` 更新为 `state=4` - 自动重试:失败 `state=3` 会由清理器按 `retry_times` 重新入队到队尾 - 自动清理: - `state=4` 且 `expire_at` 到期 → 硬删除任务 - 失败重试耗尽仍失败 → 硬删除任务 - `state=0/1` 超时 → 标记失败(防止卡死) ### 1.3 统计(asynch_model_stat) - 按天统计:`day + tenant_id + creator + model_name -> request_count` - 统计口径:仅在 Worker 真正调用模型服务时计数(OSS 重试不计数) - 用途:给其他服务提供全局限流/监控依据 --- ## 2. 使用流程(业务方如何接入) ### 第一步:创建模型配置 业务方(或运维)先在中间件里创建/更新模型配置(`model_name` 为唯一键),例如: - `POST /model/createModel`(或 `/model/updateModel`) 请求示例(JSON): ```json { "modelName": "model-service", "baseUrl": "http://127.0.0.1:8000", "route": "/api/v1/chat", "httpMethod": "POST", "apiKey": "API_KEY:model-key,API_STATE:true,API_NUM:123", "enabled": 1, "maxConcurrency": 5, "queueLimit": 20, "timeoutSeconds": 1800, "retryTimes": 3, "retryQueueMaxSeconds": 600, "autoCleanSeconds": 3600, "remark": "Model-Service 模型服务" } ``` 参数说明: - `modelName`:模型名称(唯一标识/路由键) - `baseUrl`:模型服务地址(Base URL) - `route`:模型服务路由(拼接到 baseUrl 后) - `httpMethod`:请求方式(GET/POST) - `apiKey`:请求头绑定(支持多个 header,逗号分隔,格式 `Key:Value`;布尔/数字也会以字符串形式注入 header) - `enabled`:是否启用(0禁用/1启用) - `maxConcurrency`:单模型最大并发(按租户+模型维度限流) - `queueLimit`:排队上限(近似控制,超过则拒绝创建) - `timeoutSeconds`:调用模型服务超时(秒) - `retryTimes`:失败后最多再重试 N 次(不含首次) - `retryQueueMaxSeconds`:失败重试最大排队时间(秒);0 表示重试插队到队首;>0 表示排队超过该时间后插队,否则仍到队尾 - `autoCleanSeconds`:任务被领取到 `state=4` 后的保留时间(秒),到期清理 - `remark`:备注说明 ### 第二步:创建任务拿到 task_id 业务方发起推理请求时调用: - `POST /task/createTask`(传 `modelName + requestPayload (+ bizName)`) - 中间件返回 `task_id` - 业务方将 `task_id` 落到自己的业务表,并把业务状态置为「生成中」 ### 第三步:同步任务进度(推荐批量) 业务方通过轮询/定时任务同步进度: - 推荐:`POST /task/getTaskBatch`(批量传 `taskIds`,返回每个任务的 `state + oss_file`) - 或单条:`GET /task/getTaskResult?taskId=...` 业务侧拿到 `oss_file` 后自行做资源处理(直接保存或转存),并把业务状态更新为「成功/失败」。 > 说明:批量接口对 `state=2(成功)` 的任务会自动标记为 `state=4(已下载)` 并写入 `expire_at`,用于后续清理。 --- ## 3. 状态机说明(asynch_task.state) | state | 含义 | 产生方 | |---:|---|---| | 0 | 排队中 | 创建任务/重试入队 | | 1 | 执行中 | Worker 抢占后 | | 2 | 成功(已上传 OSS) | Worker | | 3 | 失败 | Worker / 超时处理 | | 4 | 已下载(已领取) | 批量领取接口(2→4) | 字段补充: - `retry_count`:已重试次数(不含首次) - `enqueue_at`:入队时间(用于排队顺序,重试会更新为 NOW() 放到队尾) - `expire_at`:仅对 `state=4` 生效,表示保留到期时间 --- ## 4. 配置说明(config.yml) 关键配置: - `database.default`: PostgreSQL 连接 - `redis.default`: Redis 连接(并发令牌、可扩展用途) - `asynch.worker.enabled`: 是否启动后台 worker - `asynch.worker.pollInterval`: 轮询间隔 - `asynch.worker.batchSize`: 单次抢占数量 - `asynch.worker.goroutines`: worker 协程池并发数 - `asynch.worker.taskTimeout`: state=0/1 卡死兜底超时 - `asynch.cleaner.enabled`: 是否启动清理器 - `asynch.cleaner.interval`: 清理器扫描间隔 --- ## 5. 数据库初始化 项目根目录提供 `update.sql`:首次部署执行建表 SQL。 --- ## 6. 开发与发布建议(Git) - `dev`:日常开发与联调 - `main`:线上稳定分支 - 推荐流程: 1) 从 `main` 拉出 `dev` 2) 功能完成后提 MR/PR 合并回 `main` 3) 打 tag / 发布镜像