diff --git a/common/util/mapping.go b/common/util/mapping.go index b071b2e..d229b4c 100644 --- a/common/util/mapping.go +++ b/common/util/mapping.go @@ -13,7 +13,6 @@ import ( "strings" "time" - "gitea.redpowerfuture.com/red-future/common/utils" "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/gconv" @@ -302,12 +301,3 @@ func replaceURLParams(url string, params map[string]any) string { return s }) } - -// InjectCallbackURL 将回调地址注入到请求体中 -func InjectCallbackURL(ctx context.Context, payload map[string]any, callbackURL string) map[string]any { - if callbackURL == "" { - return payload - } - payload[callbackURL] = utils.GetCallbackURL(ctx, "/task/modelCallback") - return payload -} diff --git a/model/dto/model_gateway_task_dto.go b/model/dto/model_gateway_task_dto.go index 9e827c3..8d7b0a1 100644 --- a/model/dto/model_gateway_task_dto.go +++ b/model/dto/model_gateway_task_dto.go @@ -68,6 +68,10 @@ type GetTaskBatchReq struct { TaskIDs []string `p:"taskIds" json:"taskIds" v:"required#taskIds不能为空" dc:"任务ID列表"` } +type GetTaskBatchRes struct { + List []GetTaskBatchItem `json:"list" dc:"任务列表"` +} + type GetTaskBatchItem struct { TaskID string `json:"taskId" dc:"任务ID"` State int `json:"state" dc:"任务状态"` @@ -75,10 +79,6 @@ type GetTaskBatchItem struct { TextResult map[string]any `json:"textResult" dc:"文本结果"` } -type GetTaskBatchRes struct { - List []GetTaskBatchItem `json:"list" dc:"任务列表"` -} - // ListTaskReq 任务列表分页查询 type ListTaskReq struct { g.Meta `path:"/listTask" method:"get" tags:"任务管理" summary:"任务列表" dc:"分页查询任务列表,支持按状态/模型名称/task_id过滤"` diff --git a/model/entity/model_gateway_model.go b/model/entity/model_gateway_model.go index c912dd9..c7f6ec1 100644 --- a/model/entity/model_gateway_model.go +++ b/model/entity/model_gateway_model.go @@ -32,7 +32,6 @@ type modelGatewayModelCol struct { StreamConfig string FirstFrame string LastFrame string - CallbackUrl string MaxTokens string } @@ -66,7 +65,6 @@ var ModelGatewayModelCol = modelGatewayModelCol{ StreamConfig: "stream_config", FirstFrame: "first_frame", LastFrame: "last_frame", - CallbackUrl: "callback_url", MaxTokens: "max_tokens", } @@ -100,6 +98,5 @@ type ModelGatewayModel struct { StreamConfig map[string]any `orm:"stream_config" json:"streamConfig"` FirstFrame string `orm:"first_frame" json:"firstFrame"` LastFrame string `orm:"last_frame" json:"lastFrame"` - CallbackUrl string `orm:"callback_url" json:"callbackUrl"` MaxTokens int `orm:"max_tokens" json:"maxTokens"` } diff --git a/service/gateway/gateway_http_service.go b/service/gateway/gateway_http_service.go index ac5af58..efc3c35 100644 --- a/service/gateway/gateway_http_service.go +++ b/service/gateway/gateway_http_service.go @@ -24,6 +24,7 @@ type UploadFileResponse struct { FileAddressPrefix string `json:"fileAddressPrefix"` // 文件地址前缀 } +// UploadByTask 通过任务上传文件 func UploadByTask(ctx context.Context, data []byte, fileExt string) (oss *UploadFileResponse, err error) { // multipart body := &bytes.Buffer{} @@ -68,12 +69,11 @@ func UploadByTask(ctx context.Context, data []byte, fileExt string) (oss *Upload // CallbackPayload 回调请求体 type CallbackPayload struct { - TaskId string `json:"task_id"` - State int `json:"state"` - OssFile string `json:"oss_file"` - FileType string `json:"file_type"` - Messages map[string]any `json:"messages"` - ErrorMsg string `json:"error_msg"` + TaskId string `json:"task_id"` + State int `json:"state"` + OssFile string `json:"oss_file"` + FileType string `json:"file_type"` + ErrorMsg string `json:"error_msg"` } // TriggerCallback 任务的回调 @@ -85,7 +85,6 @@ func TriggerCallback(ctx context.Context, t *entity.ModelGatewayTask) { State: t.State, OssFile: t.ResultFile.OssFile, FileType: t.ResultFile.FileType, - Messages: t.TextResult, ErrorMsg: t.ErrorMsg, } jsonData, err := json.Marshal(payload) diff --git a/service/task/task_service.go b/service/task/task_service.go index 70d5a01..fe1eba7 100644 --- a/service/task/task_service.go +++ b/service/task/task_service.go @@ -27,10 +27,7 @@ type taskService struct{} // Create 创建任务 func (s *taskService) Create(ctx context.Context, req *dto.CreateTaskReq) (res *dto.CreateTaskRes, err error) { - var ( - startAt = time.Now() - taskID = uuid.NewString() - ) + taskID := uuid.NewString() // 1) 检查模型配置,并且获取模型 userInfo, err := utils.GetUserInfo(ctx) @@ -64,10 +61,6 @@ func (s *taskService) Create(ctx context.Context, req *dto.CreateTaskReq) (res * } // 3) 插入任务记录 - if model.CallMode != nil && *model.CallMode == public.CallModeAsync { - // 异步调用:注入回调地址后提交,拿到 task_id 轮询 - req.RequestPayload = util.InjectCallbackURL(ctx, req.RequestPayload, model.CallbackUrl) - } requestPayload := entity.RequestPayload{ Body: req.RequestPayload, Headers: util.ParseHeadMsgHeaders(model.HeadMsg), @@ -107,8 +100,7 @@ func (s *taskService) Create(ctx context.Context, req *dto.CreateTaskReq) (res * TaskID: taskID, OpType: "createTask", Success: 1, - ErrorMsg: "", - CostMs: time.Since(startAt).Milliseconds(), + CostMs: time.Since(time.Now()).Milliseconds(), RequestPayload: &requestPayload, ResponsePayload: gdb.Map{ "taskId": taskID, diff --git a/service/task/worker.go b/service/task/worker.go index d8d0cab..de31f83 100644 --- a/service/task/worker.go +++ b/service/task/worker.go @@ -35,14 +35,17 @@ type asyncWorker struct { // handleOne 执行一次完整的任务 func (w *asyncWorker) handleOne(ctx context.Context, task *entity.ModelGatewayTask, model *entity.ModelGatewayModel, req *dto.CreateTaskReq) { var ( - body = task.RequestPayload.Body // 核心请求参数 - maxRetry = model.RetryTimes // 重试次数 - startTime = time.Now() - modelMessages = map[string]any{} + body = task.RequestPayload.Body + maxRetry = model.RetryTimes + startTime = time.Now() + result map[string]any + err error ) g.Log().Infof(ctx, "[执行任务][开始] taskId=%s model=%s", task.TaskID, task.ModelName) + // ============================================ // 1) 分布式并发控制 + // ============================================ semKey := fmt.Sprintf("asynch:sem:%s", task.ModelName) maxC := queue.GetRuntimeMaxConcurrency(ctx, task.ModelName, model.MaxConcurrency) acquired, err := queue.AcquireSemaphore(ctx, semKey, maxC, 3600) @@ -53,101 +56,91 @@ func (w *asyncWorker) handleOne(ctx context.Context, task *entity.ModelGatewayTa } if !acquired { _, _ = dao.ModelGatewayTask.Update(ctx, &entity.ModelGatewayTask{ - SQLBaseDO: beans.SQLBaseDO{ - Id: task.Id, - }, - State: public.TaskStatusPending, + SQLBaseDO: beans.SQLBaseDO{Id: task.Id}, + State: public.TaskStatusPending, }) g.Log().Infof(ctx, "[执行任务][排队] 并发已满,放回队列 taskId=%s", task.TaskID) return } defer func() { _ = queue.ReleaseSemaphore(ctx, semKey) }() + // ============================================ // 2) 调用模型 + // ============================================ switch { case model.CallMode != nil && *model.CallMode == public.CallModeStream: - rawBytes, err := w.callModelStream(ctx, task, model, body) - if err != nil { - w.failTask(ctx, task, startTime, err.Error()) - return - } - modelMessages, err = util.ParseStreamResponse(rawBytes, model.StreamConfig) - if err != nil { - w.failTask(ctx, task, startTime, err.Error()) + rawBytes, streamErr := w.callModelStream(ctx, task, model, body) + if streamErr != nil { + w.failTask(ctx, task, startTime, streamErr.Error()) return } + result, err = util.ParseStreamResponse(rawBytes, model.StreamConfig) case model.CallMode != nil && *model.CallMode == public.CallModeAsync: - modelMessages, err = w.callModel(ctx, task, model, body) - if err != nil { - w.failTask(ctx, task, startTime, err.Error()) - return - } - modelMessages, err = util.PullTaskResult(ctx, modelMessages, model.QueryConfig, model.HeadMsg) - if err != nil { - w.failTask(ctx, task, startTime, err.Error()) - return + result, err = w.callModel(ctx, task, model, body) + if err == nil { + result, err = util.PullTaskResult(ctx, result, model.QueryConfig, model.HeadMsg) } default: - modelMessages, err = w.callModel(ctx, task, model, body) - if err != nil { - w.failTask(ctx, task, startTime, err.Error()) - return - } + result, err = w.callModel(ctx, task, model, body) } - - // 3) 保存临时文件 - tmpPath, err := util.SaveTempFileByType(task.TaskID, modelMessages, task.TmpFile) - if err == nil && tmpPath != "" { - task.TmpFile = tmpPath - task.Phase = 1 - _, err = dao.ModelGatewayTask.Update(ctx, task) - if err != nil { - g.Log().Errorf(ctx, "[执行任务][失败] 更新数据库失败 taskId=%s err=%v", task.TaskID, err) - } - } - - // 4) 解析校验 + 响应映射(可重试,失败重新调模型) - modelMessages, err = w.parseAndRetry(ctx, modelMessages, task, model, req, maxRetry, startTime) if err != nil { - task.TextResult = modelMessages w.failTask(ctx, task, startTime, err.Error()) return } + // ============================================ + // 3) 缓存临时文件 + // ============================================ + if tmpPath, tmpErr := util.SaveTempFileByType(task.TaskID, result, task.TmpFile); tmpErr == nil && tmpPath != "" { + task.TmpFile = tmpPath + task.Phase = 1 + _, _ = dao.ModelGatewayTask.Update(ctx, task) + } + + // ============================================ + // 4) 解析校验 + 响应映射(可重试) + // ============================================ + result, err = w.parseAndRetry(ctx, result, task, model, req, maxRetry, startTime) + if err != nil { + task.TextResult = result + w.failTask(ctx, task, startTime, err.Error()) + return + } + + // ============================================ // 5) 上传 OSS(可重试) + // ============================================ var oss *gateway.UploadFileResponse for attempt := 0; attempt <= maxRetry; attempt++ { if attempt > 0 { g.Log().Infof(ctx, "[执行任务][重试] OSS上传 第%d/%d次 taskId=%s", attempt, maxRetry, task.TaskID) } - oss, err = w.uploadOSS(ctx, task) + oss, err = gateway.UploadByTask(ctx, gjson.New(result).MustToJson(), "json") if err == nil { break } - g.Log().Errorf(ctx, "[执行任务][失败] OSS上传失败 taskId=%s attempt=%d/%d err=%v", - task.TaskID, attempt, maxRetry, err) + g.Log().Errorf(ctx, "[执行任务][失败] OSS上传失败 taskId=%s attempt=%d/%d err=%v", task.TaskID, attempt, maxRetry, err) if attempt == maxRetry { - task.State = 3 + task.State = public.TaskStatusFailed task.ErrorMsg = err.Error() task.Phase = 1 - _, err = dao.ModelGatewayTask.Update(ctx, task) - if err != nil { - g.Log().Errorf(ctx, "[执行任务][失败] 更新数据库失败 taskId=%s err=%v", task.TaskID, err) - } + _, _ = dao.ModelGatewayTask.Update(ctx, task) w.failTask(ctx, task, startTime, fmt.Sprintf("OSS上传重试耗尽: %v", err)) return } } - // 6) 成功回调 - task.State = 2 + // ============================================ + // 6) 成功收尾 + // ============================================ + task.State = public.TaskStatusSuccess task.DurationSeconds = int64(time.Since(startTime).Seconds()) task.ResultFile = &entity.ResultFile{ OssFile: oss.FileAddressPrefix + oss.FileURL, FileType: oss.FileFormat, FileSize: int64(oss.FileSize), } - task.TextResult = modelMessages + task.TextResult = result if _, err = dao.ModelGatewayTask.Update(ctx, task); err != nil { g.Log().Errorf(ctx, "[执行任务][失败] 更新数据库失败 taskId=%s err=%v", task.TaskID, err) return @@ -159,10 +152,9 @@ func (w *asyncWorker) handleOne(ctx context.Context, task *entity.ModelGatewayTa go gateway.TriggerPromptsCallback(context.WithoutCancel(ctx), task, req.EpicycleId) } - g.Log().Infof(ctx, "[执行任务][成功] taskId=%s duration=%ds fileType=%s textLen=%d callbackUrl=%s", - task.TaskID, task.DurationSeconds, oss.FileFormat, len(body), task.CallbackURL) + g.Log().Infof(ctx, "[执行任务][成功] taskId=%s duration=%ds fileType=%s", + task.TaskID, task.DurationSeconds, oss.FileFormat) - // 7) 删除临时文件 _ = os.Remove(task.TmpFile) } @@ -495,16 +487,6 @@ func InvokeModel(ctx context.Context, model *entity.ModelGatewayModel, body map[ // return mappedResponse, nil // } -// uploadOSS 从临时文件上传 OSS -func (w *asyncWorker) uploadOSS(ctx context.Context, t *entity.ModelGatewayTask) (*gateway.UploadFileResponse, error) { - data, err := os.ReadFile(t.TmpFile) - if err != nil { - return nil, fmt.Errorf("读取临时文件失败: %w", err) - } - _, ext := util.DetectFileType(data) - return gateway.UploadByTask(ctx, data, ext) -} - // failTask 任务失败统一处理:更新数据库 + 释放排队 + 回调 func (w *asyncWorker) failTask(ctx context.Context, t *entity.ModelGatewayTask, startTime time.Time, errMsg string) { t.State = 3 diff --git a/update.sql b/update.sql index f60414c..248f060 100644 --- a/update.sql +++ b/update.sql @@ -1,120 +1,101 @@ --- model-asynch 核心表(pgsql) --- 1) asynch_models:模型配置 --- 2) asynch_task:异步任务 --- 3) logs_model_op:操作日志(统计用) --- 4) logs_model_stat:按天模型请求统计(限流/监控用) - -- ========================= --- 1) asynch_models +-- model_gateway_models -- ========================= -CREATE TABLE IF NOT EXISTS asynch_models ( - -- ========== 基础字段 ========== - id BIGINT PRIMARY KEY, - tenant_id BIGINT NOT NULL DEFAULT 0, - creator VARCHAR(64) NOT NULL, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - updater VARCHAR(64) NOT NULL, - updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - deleted_at TIMESTAMP(6), - - -- ========== 模型标识 ========== - model_name VARCHAR(128) NOT NULL, - model_type SMALLINT NOT NULL DEFAULT 0, - operator_name VARCHAR(64) NOT NULL DEFAULT '', - - -- ========== 请求配置 ========== - base_url VARCHAR(256) NOT NULL, - http_method VARCHAR(8) NOT NULL DEFAULT 'POST', - head_msg JSONB NOT NULL DEFAULT '{}'::jsonb, - api_key VARCHAR(256) NOT NULL DEFAULT '', - - -- ========== 状态开关 ========== - is_private SMALLINT NOT NULL DEFAULT 0, - enabled SMALLINT NOT NULL DEFAULT 1, - is_chat_model SMALLINT NOT NULL DEFAULT 0, - is_async SMALLINT NOT NULL DEFAULT 0, - is_stream SMALLINT NOT NULL DEFAULT 0, - is_owner SMALLINT NOT NULL DEFAULT 99, - - -- ========== 配置相关 ========== - form_json JSONB NOT NULL DEFAULT '{}'::jsonb, - request_mapping JSONB NOT NULL DEFAULT '{}'::jsonb, - response_mapping JSONB NOT NULL DEFAULT '{}'::jsonb, - response_body JSONB NOT NULL DEFAULT '{}'::jsonb, - token_config JSONB NOT NULL DEFAULT '{}'::jsonb, - extend_mapping JSONB NOT NULL DEFAULT '{}'::jsonb, - query_config JSONB NOT NULL DEFAULT '{}'::jsonb, - stream_config JSONB NOT NULL DEFAULT '{}'::jsonb, - first_frame VARCHAR(128) NOT NULL DEFAULT '', - last_frame VARCHAR(128) NOT NULL DEFAULT '', - - -- ========== 限制与重试 ========== - max_concurrency INT NOT NULL DEFAULT 10, - timeout_seconds INT NOT NULL DEFAULT 600, - retry_times SMALLINT NOT NULL DEFAULT 3, - auto_clean_seconds INT NOT NULL DEFAULT 86400, - - -- ========== 其他 ========== - response_token_field VARCHAR(128) NOT NULL DEFAULT '', +CREATE TABLE IF NOT EXISTS model_gateway_models ( + id int8 PRIMARY KEY, + tenant_id int8 NOT NULL DEFAULT 0, + creator varchar(64) NOT NULL, + created_at timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP, + updater varchar(64) NOT NULL, + updated_at timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP, + deleted_at timestamp(6), + model_name varchar(128) NOT NULL, + model_type int2 NOT NULL DEFAULT 0, + operator_name varchar(64) NOT NULL DEFAULT '', + base_url varchar(256) NOT NULL, + http_method varchar(8) NOT NULL DEFAULT 'POST', + head_msg jsonb NOT NULL DEFAULT '{}', + api_key varchar(256) NOT NULL DEFAULT '', + is_private int2 NOT NULL DEFAULT 0, + enabled int2 NOT NULL DEFAULT 1, + is_chat_model int2 NOT NULL DEFAULT 0, + is_owner int2 NOT NULL DEFAULT 99, + form_json jsonb NOT NULL DEFAULT '{}', + request_mapping jsonb NOT NULL DEFAULT '{}', + response_mapping jsonb NOT NULL DEFAULT '{}', + response_body varchar(128) NOT NULL DEFAULT '', + token_config jsonb NOT NULL DEFAULT '{}', + extend_mapping jsonb NOT NULL DEFAULT '{}', + query_config jsonb NOT NULL DEFAULT '{}', + stream_config jsonb NOT NULL DEFAULT '{}', + first_frame varchar(128) NOT NULL DEFAULT '', + last_frame varchar(128) NOT NULL DEFAULT '', + max_concurrency int4 NOT NULL DEFAULT 10, + timeout_seconds int4 NOT NULL DEFAULT 600, + retry_times int2 NOT NULL DEFAULT 3, + auto_clean_seconds int4 NOT NULL DEFAULT 86400, + response_token_field varchar(128) NOT NULL DEFAULT '', + call_mode int2 NOT NULL DEFAULT 0, + required_fields jsonb NOT NULL DEFAULT '[]', + max_tokens int4 DEFAULT 0 ); --- ========== 索引 ========== -CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_models_tenant_creator_chat ON asynch_models(tenant_id, creator) WHERE is_chat_model = 1 AND deleted_at IS NULL; -CREATE UNIQUE INDEX IF NOT EXISTS uk_asynch_models_tenant_model_name ON asynch_models(tenant_id, creator, model_name); -CREATE INDEX IF NOT EXISTS idx_asynch_models_tenant_id ON asynch_models(tenant_id); -CREATE INDEX IF NOT EXISTS idx_asynch_models_model_name ON asynch_models(model_name); -CREATE INDEX IF NOT EXISTS idx_asynch_models_model_type ON asynch_models(model_type); -CREATE INDEX IF NOT EXISTS idx_asynch_models_enabled ON asynch_models(enabled); -CREATE INDEX IF NOT EXISTS idx_asynch_models_deleted_at ON asynch_models(deleted_at); +CREATE UNIQUE INDEX IF NOT EXISTS uk_model_gateway_models_tenant_creator_model ON model_gateway_models (tenant_id, creator, model_name); +CREATE INDEX IF NOT EXISTS idx_model_gateway_models_model_name ON model_gateway_models (model_name); +CREATE INDEX IF NOT EXISTS idx_model_gateway_models_model_type ON model_gateway_models (model_type); +CREATE INDEX IF NOT EXISTS idx_model_gateway_models_tenant_id ON model_gateway_models (tenant_id); +CREATE INDEX IF NOT EXISTS idx_model_gateway_models_deleted_at ON model_gateway_models (deleted_at); +CREATE INDEX IF NOT EXISTS idx_model_gateway_models_enabled ON model_gateway_models (enabled); --- ========== 注释 ========== -COMMENT ON TABLE asynch_models IS '模型配置表'; +COMMENT ON TABLE model_gateway_models IS '模型配置表'; +COMMENT ON COLUMN model_gateway_models.id IS '主键ID(非自增)'; +COMMENT ON COLUMN model_gateway_models.tenant_id IS '租户ID'; +COMMENT ON COLUMN model_gateway_models.creator IS '创建人'; +COMMENT ON COLUMN model_gateway_models.created_at IS '创建时间'; +COMMENT ON COLUMN model_gateway_models.updater IS '更新人'; +COMMENT ON COLUMN model_gateway_models.updated_at IS '更新时间'; +COMMENT ON COLUMN model_gateway_models.deleted_at IS '删除时间(软删)'; +COMMENT ON COLUMN model_gateway_models.model_name IS '模型名称'; +COMMENT ON COLUMN model_gateway_models.model_type IS '模型类型'; +COMMENT ON COLUMN model_gateway_models.operator_name IS '运营商名称'; +COMMENT ON COLUMN model_gateway_models.base_url IS '模型地址'; +COMMENT ON COLUMN model_gateway_models.http_method IS '请求方式 GET/POST'; +COMMENT ON COLUMN model_gateway_models.head_msg IS '请求头信息'; +COMMENT ON COLUMN model_gateway_models.api_key IS '调用凭证/密钥'; -COMMENT ON COLUMN asynch_models.id IS '主键ID(非自增)'; -COMMENT ON COLUMN asynch_models.tenant_id IS '租户ID'; -COMMENT ON COLUMN asynch_models.creator IS '创建人'; -COMMENT ON COLUMN asynch_models.created_at IS '创建时间'; -COMMENT ON COLUMN asynch_models.updater IS '更新人'; -COMMENT ON COLUMN asynch_models.updated_at IS '更新时间'; -COMMENT ON COLUMN asynch_models.deleted_at IS '删除时间(软删)'; +COMMENT ON COLUMN model_gateway_models.is_private IS '是否私有化:0-私有 1-公共'; +COMMENT ON COLUMN model_gateway_models.enabled IS '是否启用:0-停用 1-启用'; +COMMENT ON COLUMN model_gateway_models.is_chat_model IS '是否为对话模型:0-否 1-是'; +COMMENT ON COLUMN model_gateway_models.is_owner IS '1=当前用户创建 0=超级管理员'; + +COMMENT ON COLUMN model_gateway_models.form_json IS '动态表单结构'; +COMMENT ON COLUMN model_gateway_models.request_mapping IS '请求映射'; +COMMENT ON COLUMN model_gateway_models.response_mapping IS '返回映射'; +COMMENT ON COLUMN model_gateway_models.response_body IS '返回主体'; +COMMENT ON COLUMN model_gateway_models.token_config IS 'Token计算配置'; +COMMENT ON COLUMN model_gateway_models.extend_mapping IS '附加映射'; +COMMENT ON COLUMN model_gateway_models.query_config IS '查询/回调配置'; +COMMENT ON COLUMN model_gateway_models.stream_config IS '流式输出配置'; +COMMENT ON COLUMN model_gateway_models.first_frame IS '首帧图片参数'; +COMMENT ON COLUMN model_gateway_models.last_frame IS '尾帧图片参数'; +COMMENT ON COLUMN model_gateway_models.max_concurrency IS '最大并发数'; +COMMENT ON COLUMN model_gateway_models.timeout_seconds IS '调用模型超时(秒)'; +COMMENT ON COLUMN model_gateway_models.retry_times IS '失败重试次数'; +COMMENT ON COLUMN model_gateway_models.auto_clean_seconds IS '任务完成后自动清理时间(秒)'; +COMMENT ON COLUMN model_gateway_models.response_token_field IS '响应中消耗token的字段映射'; +COMMENT ON COLUMN model_gateway_models.call_mode IS '调用模式:0-同步 1-异步 2-流式'; +COMMENT ON COLUMN model_gateway_models.required_fields IS '必选字段列表'; +COMMENT ON COLUMN model_gateway_models.max_tokens IS '最大 token 数,0 表示不传'; -COMMENT ON COLUMN asynch_models.model_name IS '模型名称'; -COMMENT ON COLUMN asynch_models.model_type IS '模型类型'; -COMMENT ON COLUMN asynch_models.operator_name IS '运营商名称'; -COMMENT ON COLUMN asynch_models.base_url IS '模型地址'; -COMMENT ON COLUMN asynch_models.http_method IS '请求方式 GET/POST'; -COMMENT ON COLUMN asynch_models.head_msg IS '请求头信息'; -COMMENT ON COLUMN asynch_models.api_key IS '调用凭证/密钥'; -COMMENT ON COLUMN asynch_models.is_private IS '是否私有化:0-私有 1-公共'; -COMMENT ON COLUMN asynch_models.enabled IS '是否启用:0-停用 1-启用'; -COMMENT ON COLUMN asynch_models.is_chat_model IS '是否为对话模型:0-否 1-是'; -COMMENT ON COLUMN asynch_models.is_async IS '是否异步:0-同步 1-异步'; -COMMENT ON COLUMN asynch_models.is_stream IS '是否流式:0-非流式 1-流式'; -COMMENT ON COLUMN asynch_models.is_owner IS '1=当前用户创建 0=超级管理员'; -COMMENT ON COLUMN asynch_models.form_json IS '动态表单结构'; -COMMENT ON COLUMN asynch_models.request_mapping IS '请求映射'; -COMMENT ON COLUMN asynch_models.response_mapping IS '返回映射'; -COMMENT ON COLUMN asynch_models.response_body IS '返回主体'; -COMMENT ON COLUMN asynch_models.token_config IS 'Token计算配置'; -COMMENT ON COLUMN asynch_models.extend_mapping IS '附加映射'; -COMMENT ON COLUMN asynch_models.query_config IS '查询/回调配置'; -COMMENT ON COLUMN asynch_models.stream_config IS '流式输出配置'; -COMMENT ON COLUMN asynch_models.first_frame IS '首帧图片参数'; -COMMENT ON COLUMN asynch_models.last_frame IS '尾帧图片参数'; -COMMENT ON COLUMN asynch_models.max_concurrency IS '最大并发数'; -COMMENT ON COLUMN asynch_models.timeout_seconds IS '调用模型超时(秒)'; -COMMENT ON COLUMN asynch_models.retry_times IS '失败重试次数'; -COMMENT ON COLUMN asynch_models.auto_clean_seconds IS '任务完成后自动清理时间(秒)'; -COMMENT ON COLUMN asynch_models.response_token_field IS '响应中消耗token的字段映射'; -- ========================= -- model_gateway_task -- ========================= -CREATE TABLE model_gateway_task ( - id int8 PRIMARY KEY, - tenant_id int8 NOT NULL DEFAULT 0, - creator varchar(64) NOT NULL, +CREATE TABLE IF NOT EXISTS model_gateway_task ( + id int8 PRIMARY KEY, + tenant_id int8 NOT NULL DEFAULT 0, + creator varchar(64) NOT NULL, created_at timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP, updater varchar(64) NOT NULL, updated_at timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -134,12 +115,12 @@ CREATE TABLE model_gateway_task ( expend_tokens int8 NOT NULL DEFAULT 0, duration_seconds int8 NOT NULL DEFAULT 0, epicycle_id varchar(64) NOT NULL DEFAULT '' -); + ); -CREATE UNIQUE INDEX uk_model_gateway_task_tenant_creator_task_id ON model_gateway_task (tenant_id, creator, task_id); -CREATE INDEX idx_model_gateway_task_task_id ON model_gateway_task (task_id); -CREATE INDEX idx_model_gateway_task_state ON model_gateway_task (state); -CREATE INDEX idx_model_gateway_task_deleted_at ON model_gateway_task (deleted_at); +CREATE UNIQUE INDEX IF NOT EXISTS uk_model_gateway_task_tenant_creator_task_id ON model_gateway_task (tenant_id, creator, task_id); +CREATE INDEX IF NOT EXISTS idx_model_gateway_task_task_id ON model_gateway_task (task_id); +CREATE INDEX IF NOT EXISTS idx_model_gateway_task_state ON model_gateway_task (state); +CREATE INDEX IF NOT EXISTS idx_model_gateway_task_deleted_at ON model_gateway_task (deleted_at); COMMENT ON TABLE model_gateway_task IS '模型网关任务表'; COMMENT ON COLUMN model_gateway_task.id IS '主键ID'; @@ -168,91 +149,85 @@ COMMENT ON COLUMN model_gateway_task.epicycle_id IS '轮次ID'; -- ========================= --- 3) logs_model_op +-- model_gateway_log_stat -- ========================= -CREATE TABLE IF NOT EXISTS logs_model_op ( - -- 基础字段 - id BIGINT PRIMARY KEY, - tenant_id BIGINT NOT NULL DEFAULT 0, - creator VARCHAR(64) NOT NULL, - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - updater VARCHAR(64) NOT NULL, - updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - deleted_at TIMESTAMP(6), - -- 基础审计信息 - ip VARCHAR(64) DEFAULT '', - user_agent VARCHAR(256) DEFAULT '', - api_path VARCHAR(256) DEFAULT '', - http_method VARCHAR(16) DEFAULT '', - -- 业务信息 - biz_name VARCHAR(128) NOT NULL DEFAULT '', -- 调用方业务模块/系统 - model_name VARCHAR(128) NOT NULL DEFAULT '', - task_id VARCHAR(64) NOT NULL DEFAULT '', - -- 统计字段 - op_type VARCHAR(64) NOT NULL DEFAULT 'createTask', -- 操作类型(默认创建任务) - success SMALLINT NOT NULL DEFAULT 1, -- 1成功/0失败 - error_msg TEXT DEFAULT '', - cost_ms BIGINT NOT NULL DEFAULT 0, -- 耗时(毫秒) - -- 请求/响应 JSON(用于后期统计分析) - request_payload JSONB, - response_payload JSONB -); +CREATE TABLE IF NOT EXISTS model_gateway_log_stat ( + day date NOT NULL, + tenant_id int8 NOT NULL DEFAULT 0, + creator varchar(64) NOT NULL DEFAULT '', + model_name varchar(128) NOT NULL DEFAULT '', + request_count int8 NOT NULL DEFAULT 0, + created_at timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (day, tenant_id, creator, model_name) + ); -CREATE INDEX IF NOT EXISTS idx_logs_model_op_tenant_time ON logs_model_op(tenant_id, created_at); -CREATE INDEX IF NOT EXISTS idx_logs_model_op_model_name ON logs_model_op(model_name); -CREATE INDEX IF NOT EXISTS idx_logs_model_op_biz_name ON logs_model_op(biz_name); -CREATE INDEX IF NOT EXISTS idx_logs_model_op_task_id ON logs_model_op(task_id); -CREATE INDEX IF NOT EXISTS idx_logs_model_op_op_type ON logs_model_op(op_type); -CREATE INDEX IF NOT EXISTS idx_logs_model_op_deleted_at ON logs_model_op(deleted_at); +CREATE INDEX IF NOT EXISTS idx_model_gateway_log_stat_day ON model_gateway_log_stat (day); +CREATE INDEX IF NOT EXISTS idx_model_gateway_log_stat_creator ON model_gateway_log_stat (creator); +CREATE INDEX IF NOT EXISTS idx_model_gateway_log_stat_model_name ON model_gateway_log_stat (model_name); +CREATE INDEX IF NOT EXISTS idx_model_gateway_log_stat_tenant_day ON model_gateway_log_stat (tenant_id, day); -COMMENT ON TABLE logs_model_op IS '操作记录日志表(创建任务等,用于统计)'; -COMMENT ON COLUMN logs_model_op.id IS '主键ID(非自增)'; -COMMENT ON COLUMN logs_model_op.tenant_id IS '租户ID'; -COMMENT ON COLUMN logs_model_op.creator IS '创建人'; -COMMENT ON COLUMN logs_model_op.created_at IS '创建时间'; -COMMENT ON COLUMN logs_model_op.updater IS '更新人'; -COMMENT ON COLUMN logs_model_op.updated_at IS '更新时间'; -COMMENT ON COLUMN logs_model_op.deleted_at IS '删除时间(软删)'; -COMMENT ON COLUMN logs_model_op.ip IS '客户端IP'; -COMMENT ON COLUMN logs_model_op.user_agent IS 'User-Agent'; -COMMENT ON COLUMN logs_model_op.api_path IS '接口路径'; -COMMENT ON COLUMN logs_model_op.http_method IS 'HTTP方法'; -COMMENT ON COLUMN logs_model_op.biz_name IS '业务名称(调用方模块/系统)'; -COMMENT ON COLUMN logs_model_op.model_name IS '模型名称'; -COMMENT ON COLUMN logs_model_op.task_id IS '任务ID'; -COMMENT ON COLUMN logs_model_op.op_type IS '操作类型(如 createTask/getTaskResult/getTaskBatch 等)'; -COMMENT ON COLUMN logs_model_op.success IS '是否成功:1成功/0失败'; -COMMENT ON COLUMN logs_model_op.error_msg IS '错误信息(失败时)'; -COMMENT ON COLUMN logs_model_op.cost_ms IS '耗时(毫秒)'; -COMMENT ON COLUMN logs_model_op.request_payload IS '请求 JSON'; -COMMENT ON COLUMN logs_model_op.response_payload IS '响应 JSON'; +COMMENT ON TABLE model_gateway_log_stat IS '按天统计表'; +COMMENT ON COLUMN model_gateway_log_stat.day IS '天(YYYY-MM-DD)'; +COMMENT ON COLUMN model_gateway_log_stat.tenant_id IS '租户ID'; +COMMENT ON COLUMN model_gateway_log_stat.creator IS '创建人'; +COMMENT ON COLUMN model_gateway_log_stat.model_name IS '模型名称'; +COMMENT ON COLUMN model_gateway_log_stat.request_count IS '请求次数'; +COMMENT ON COLUMN model_gateway_log_stat.created_at IS '创建时间'; +COMMENT ON COLUMN model_gateway_log_stat.updated_at IS '更新时间'; -- ========================= --- 4) logs_model_stat +-- model_gateway_logs_op -- ========================= -CREATE TABLE IF NOT EXISTS logs_model_stat ( - day DATE NOT NULL, -- 天(YYYY-MM-DD) - tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID - creator VARCHAR(64) NOT NULL DEFAULT '', -- 创建人 - model_name VARCHAR(128) NOT NULL DEFAULT '', -- 模型名称 - request_count BIGINT NOT NULL DEFAULT 0, -- 请求次数 - created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY(day, tenant_id, creator, model_name) -); +CREATE TABLE IF NOT EXISTS model_gateway_logs_op ( + id int8 PRIMARY KEY, + tenant_id int8 NOT NULL DEFAULT 0, + creator varchar(64) NOT NULL, + created_at timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP, + updater varchar(64) NOT NULL, + updated_at timestamp(6) NOT NULL DEFAULT CURRENT_TIMESTAMP, + deleted_at timestamp(6), + ip varchar(64) DEFAULT '', + user_agent varchar(256) DEFAULT '', + api_path varchar(256) DEFAULT '', + http_method varchar(16) DEFAULT '', + biz_name varchar(128) NOT NULL DEFAULT '', + model_name varchar(128) NOT NULL DEFAULT '', + task_id varchar(64) NOT NULL DEFAULT '', + op_type varchar(64) NOT NULL DEFAULT 'createTask', + success int2 NOT NULL DEFAULT 1, + error_msg text DEFAULT '', + cost_ms int8 NOT NULL DEFAULT 0, + request_payload jsonb, + response_payload jsonb + ); --- 便于时间段/租户/人/模型过滤 -CREATE INDEX IF NOT EXISTS idx_logs_model_stat_tenant_day ON logs_model_stat(tenant_id, day); -CREATE INDEX IF NOT EXISTS idx_logs_model_stat_day ON logs_model_stat(day); -CREATE INDEX IF NOT EXISTS idx_logs_model_stat_model_name ON logs_model_stat(model_name); -CREATE INDEX IF NOT EXISTS idx_logs_model_stat_creator ON logs_model_stat(creator); +CREATE INDEX IF NOT EXISTS idx_model_gateway_logs_op_task_id ON model_gateway_logs_op (task_id); +CREATE INDEX IF NOT EXISTS idx_model_gateway_logs_op_biz_name ON model_gateway_logs_op (biz_name); +CREATE INDEX IF NOT EXISTS idx_model_gateway_logs_op_model_name ON model_gateway_logs_op (model_name); +CREATE INDEX IF NOT EXISTS idx_model_gateway_logs_op_op_type ON model_gateway_logs_op (op_type); +CREATE INDEX IF NOT EXISTS idx_model_gateway_logs_op_deleted_at ON model_gateway_logs_op (deleted_at); +CREATE INDEX IF NOT EXISTS idx_model_gateway_logs_op_tenant_time ON model_gateway_logs_op (tenant_id, created_at); -COMMENT ON TABLE logs_model_stat IS '按天模型请求统计(用于限流/监控)'; -COMMENT ON COLUMN logs_model_stat.day IS '天(YYYY-MM-DD)'; -COMMENT ON COLUMN logs_model_stat.tenant_id IS '租户ID'; -COMMENT ON COLUMN logs_model_stat.creator IS '创建人'; -COMMENT ON COLUMN logs_model_stat.model_name IS '模型名称'; -COMMENT ON COLUMN logs_model_stat.request_count IS '请求次数'; -COMMENT ON COLUMN logs_model_stat.created_at IS '创建时间'; -COMMENT ON COLUMN logs_model_stat.updated_at IS '更新时间'; +COMMENT ON TABLE model_gateway_logs_op IS '操作日志表'; +COMMENT ON COLUMN model_gateway_logs_op.id IS '主键ID(非自增)'; +COMMENT ON COLUMN model_gateway_logs_op.tenant_id IS '租户ID'; +COMMENT ON COLUMN model_gateway_logs_op.creator IS '创建人'; +COMMENT ON COLUMN model_gateway_logs_op.created_at IS '创建时间'; +COMMENT ON COLUMN model_gateway_logs_op.updater IS '更新人'; +COMMENT ON COLUMN model_gateway_logs_op.updated_at IS '更新时间'; +COMMENT ON COLUMN model_gateway_logs_op.deleted_at IS '删除时间(软删)'; +COMMENT ON COLUMN model_gateway_logs_op.ip IS '客户端IP'; +COMMENT ON COLUMN model_gateway_logs_op.user_agent IS 'User-Agent'; +COMMENT ON COLUMN model_gateway_logs_op.api_path IS '接口路径'; +COMMENT ON COLUMN model_gateway_logs_op.http_method IS 'HTTP方法'; +COMMENT ON COLUMN model_gateway_logs_op.biz_name IS '业务名称(调用方模块/系统)'; +COMMENT ON COLUMN model_gateway_logs_op.model_name IS '模型名称'; +COMMENT ON COLUMN model_gateway_logs_op.task_id IS '任务ID'; +COMMENT ON COLUMN model_gateway_logs_op.op_type IS '操作类型'; +COMMENT ON COLUMN model_gateway_logs_op.success IS '是否成功:1成功/0失败'; +COMMENT ON COLUMN model_gateway_logs_op.error_msg IS '错误信息(失败时)'; +COMMENT ON COLUMN model_gateway_logs_op.cost_ms IS '耗时(毫秒)'; +COMMENT ON COLUMN model_gateway_logs_op.request_payload IS '请求 JSON'; +COMMENT ON COLUMN model_gateway_logs_op.response_payload IS '响应 JSON'; \ No newline at end of file