From 4e6b98b7d365400f6791656c2a5e794294571053 Mon Sep 17 00:00:00 2001 From: WangLiZhao <1838393649@qq.com> Date: Mon, 27 Apr 2026 10:42:42 +0800 Subject: [PATCH] =?UTF-8?q?feat(stat):=20=E6=B7=BB=E5=8A=A0=E6=A8=A1?= =?UTF-8?q?=E5=9E=8B=E8=AF=B7=E6=B1=82=E6=8C=89=E5=A4=A9=E7=BB=9F=E8=AE=A1?= =?UTF-8?q?=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增统计控制器、服务层与数据访问层,提供按天统计接口 - 在 worker 处理任务时原子累加请求计数(仅实际调用模型时计数) - 更新数据库表结构,添加 asynch_model_stat 表及索引 - 更新文档说明统计功能的使用方式与统计口径 --- README.md | 10 ++++- consts/public/table_name.go | 1 + controller/stat_controller.go | 20 ++++++++++ dao/stat_dao.go | 61 +++++++++++++++++++++++++++++++ dao/task_dao_bg.go | 2 +- main.go | 1 + model/dto/stat_dto.go | 23 ++++++++++++ model/entity/asynch_model_stat.go | 16 ++++++++ service/stat_service.go | 40 ++++++++++++++++++++ service/worker.go | 3 ++ update.sql | 31 ++++++++++++++++ 11 files changed, 206 insertions(+), 2 deletions(-) create mode 100644 controller/stat_controller.go create mode 100644 dao/stat_dao.go create mode 100644 model/dto/stat_dto.go create mode 100644 model/entity/asynch_model_stat.go create mode 100644 service/stat_service.go diff --git a/README.md b/README.md index 881494f..925caff 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,11 @@ - 失败重试耗尽仍失败 → 硬删除任务(并尝试删除 OSS) - `state=0/1` 超时 → 标记失败(防止卡死) +### 1.3 统计(asynch_model_stat) +- 按天统计:`day + tenant_id + creator + model_name -> request_count` +- 统计口径:仅在 Worker 真正调用模型服务时计数(OSS 重试不计数) +- 用途:给其他服务提供全局限流/监控依据(分布式场景下通过数据库 UPSERT 原子累加保证一致性) + --- ## 2. 使用流程(业务方如何接入) @@ -59,6 +64,10 @@ > `state=4` 的数据允许重复获取,避免业务侧偶发中断导致“领取不到结果”。 +### 2.4 获取统计(用于业务侧限流/监控) +业务方可调用统计接口按时间段获取请求次数(默认分页 10 条): +- `/stat/listModelStat`:支持 `startDay/endDay/tenantId/creator/modelName` 条件筛选 + --- ## 3. 状态机说明(asynch_task.state) @@ -115,4 +124,3 @@ 1) 从 `main` 拉出 `dev` 2) 功能完成后提 MR/PR 合并回 `main` 3) 打 tag / 发布镜像 - diff --git a/consts/public/table_name.go b/consts/public/table_name.go index 24e2570..4fa0e8c 100644 --- a/consts/public/table_name.go +++ b/consts/public/table_name.go @@ -4,4 +4,5 @@ const ( TableNameModel = "asynch_models" // 异步模型表 TableNameTask = "asynch_task" // 异步任务表 TableNameOpLog = "asynch_op_log" // 异步操作日志表 + TableNameStat = "asynch_model_stat" // 按天统计表(请求次数) ) diff --git a/controller/stat_controller.go b/controller/stat_controller.go new file mode 100644 index 0000000..e0938d9 --- /dev/null +++ b/controller/stat_controller.go @@ -0,0 +1,20 @@ +package controller + +import ( + "context" + + "model-asynch/model/dto" + "model-asynch/service" +) + +type stat struct{} + +// Stat 统计控制器 +var Stat = new(stat) + +// ListModelStat 统计列表 +func (c *stat) ListModelStat(ctx context.Context, req *dto.ListModelStatReq) (res *dto.ListModelStatRes, err error) { + ctx = ensureUser(ctx) + return service.Stat.List(ctx, req) +} + diff --git a/dao/stat_dao.go b/dao/stat_dao.go new file mode 100644 index 0000000..edce22f --- /dev/null +++ b/dao/stat_dao.go @@ -0,0 +1,61 @@ +package dao + +import ( + "context" + "fmt" + "time" + + "model-asynch/consts/public" + "model-asynch/model/entity" + + "gitea.com/red-future/common/db/gfdb" + "github.com/gogf/gf/v2/os/gtime" +) + +type statDao struct{} + +var Stat = &statDao{} + +// IncRequestCount 原子累加(支持分布式/多协程):按天+租户+创建人+模型 +1 +func (d *statDao) IncRequestCount(ctx context.Context, day time.Time, tenantId int64, creator, modelName string) error { + sql := fmt.Sprintf(` +INSERT INTO %s(day, tenant_id, creator, model_name, request_count, created_at, updated_at) +VALUES(?, ?, ?, ?, 1, NOW(), NOW()) +ON CONFLICT (day, tenant_id, creator, model_name) +DO UPDATE SET request_count = %s.request_count + 1, updated_at = NOW()`, + public.TableNameStat, public.TableNameStat, + ) + _, err := gfdb.DB(ctx).Exec(ctx, sql, gtime.New(day).Format("Y-m-d"), tenantId, creator, modelName) + return err +} + +func (d *statDao) List(ctx context.Context, pageNum, pageSize int, startDay, endDay string, tenantId *int64, creator, modelName string) (list []*entity.AsynchModelStat, total int64, err error) { + m := gfdb.DB(ctx).Model(ctx, public.TableNameStat).Where("1=1") + if startDay != "" { + m = m.Where("day >= ?", startDay) + } + if endDay != "" { + m = m.Where("day <= ?", endDay) + } + if tenantId != nil { + m = m.Where("tenant_id = ?", *tenantId) + } + if creator != "" { + m = m.WhereLike("creator", "%"+creator+"%") + } + if modelName != "" { + m = m.WhereLike("model_name", "%"+modelName+"%") + } + m = m.OrderDesc("day").OrderDesc("request_count") + if pageNum > 0 && pageSize > 0 { + m = m.Page(pageNum, pageSize) + } + r, totalInt, err := m.AllAndCount(false) + if err != nil { + return nil, 0, err + } + total = int64(totalInt) + err = r.Structs(&list) + return +} + diff --git a/dao/task_dao_bg.go b/dao/task_dao_bg.go index 5159139..7f92b2f 100644 --- a/dao/task_dao_bg.go +++ b/dao/task_dao_bg.go @@ -20,7 +20,7 @@ func (d *taskDao) ClaimPendingGlobal(ctx context.Context, batchSize int) (tasks } err = gfdb.DB(ctx).Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { sql := fmt.Sprintf( - `SELECT id, tenant_id, model_name, task_id, input_ref, request_payload, phase, tmp_file + `SELECT id, tenant_id, creator, model_name, task_id, input_ref, request_payload, phase, tmp_file FROM %s WHERE deleted_at IS NULL AND state = 0 ORDER BY enqueue_at ASC diff --git a/main.go b/main.go index e8eec22..60c2e15 100644 --- a/main.go +++ b/main.go @@ -27,6 +27,7 @@ func main() { http.RouteRegister([]interface{}{ controller.Model, controller.Task, + controller.Stat, }) // 启动后台任务:worker + 清理器 diff --git a/model/dto/stat_dto.go b/model/dto/stat_dto.go new file mode 100644 index 0000000..2341d66 --- /dev/null +++ b/model/dto/stat_dto.go @@ -0,0 +1,23 @@ +package dto + +import ( + "gitea.com/red-future/common/beans" + "github.com/gogf/gf/v2/frame/g" +) + +// ListModelStatReq 统计列表 +type ListModelStatReq struct { + g.Meta `path:"/listModelStat" method:"post" tags:"统计" summary:"模型请求统计列表" dc:"按天统计模型请求次数,支持分页与条件筛选"` + Page *beans.Page `p:"page" json:"page" dc:"分页参数(默认10条)"` + StartDay string `p:"startDay" json:"startDay" dc:"开始日期(YYYY-MM-DD,可选)"` + EndDay string `p:"endDay" json:"endDay" dc:"结束日期(YYYY-MM-DD,可选)"` + TenantID *int64 `p:"tenantId" json:"tenantId" dc:"租户ID(可选)"` + Creator string `p:"creator" json:"creator" dc:"创建人(可选,模糊匹配)"` + ModelName string `p:"modelName" json:"modelName" dc:"模型名称(可选,模糊匹配)"` +} + +type ListModelStatRes struct { + List any `json:"list" dc:"列表数据"` + Total int64 `json:"total" dc:"总数"` +} + diff --git a/model/entity/asynch_model_stat.go b/model/entity/asynch_model_stat.go new file mode 100644 index 0000000..7ba8455 --- /dev/null +++ b/model/entity/asynch_model_stat.go @@ -0,0 +1,16 @@ +package entity + +import "github.com/gogf/gf/v2/os/gtime" + +// AsynchModelStat 按天统计:某天/租户/创建人/模型的请求次数 +// 注:这里不走通用 SQLBaseDO,采用联合唯一键(day,tenant_id,creator,model_name)做 UPSERT 原子累加。 +type AsynchModelStat struct { + Day *gtime.Time `orm:"day" json:"day"` // 日期(建议仅使用日期部分) + TenantId int64 `orm:"tenant_id" json:"tenantId,string"` + Creator string `orm:"creator" json:"creator"` + ModelName string `orm:"model_name" json:"modelName"` + RequestCount int64 `orm:"request_count" json:"requestCount"` + CreatedAt *gtime.Time `orm:"created_at" json:"createdAt"` + UpdatedAt *gtime.Time `orm:"updated_at" json:"updatedAt"` +} + diff --git a/service/stat_service.go b/service/stat_service.go new file mode 100644 index 0000000..8509e96 --- /dev/null +++ b/service/stat_service.go @@ -0,0 +1,40 @@ +package service + +import ( + "context" + + "model-asynch/dao" + "model-asynch/model/dto" +) + +type statService struct{} + +var Stat = &statService{} + +func (s *statService) List(ctx context.Context, req *dto.ListModelStatReq) (res *dto.ListModelStatRes, err error) { + pageNum, pageSize := 1, 10 + if req != nil && req.Page != nil { + if req.Page.PageNum > 0 { + pageNum = int(req.Page.PageNum) + } + if req.Page.PageSize > 0 { + pageSize = int(req.Page.PageSize) + } + } + startDay, endDay := "", "" + var tenantID *int64 + creator, modelName := "", "" + if req != nil { + startDay = req.StartDay + endDay = req.EndDay + tenantID = req.TenantID + creator = req.Creator + modelName = req.ModelName + } + list, total, err := dao.Stat.List(ctx, pageNum, pageSize, startDay, endDay, tenantID, creator, modelName) + if err != nil { + return nil, err + } + return &dto.ListModelStatRes{List: list, Total: total}, nil +} + diff --git a/service/worker.go b/service/worker.go index 8107829..faa59e7 100644 --- a/service/worker.go +++ b/service/worker.go @@ -163,6 +163,9 @@ func (w *asyncWorker) handleOne(ctx context.Context, t *entity.AsynchTask) { } } if data == nil { + // 统计:仅在真正请求模型时 +1(OSS 重试不计入) + _ = dao.Stat.IncRequestCount(ctx, time.Now(), int64(t.TenantId), t.Creator, t.ModelName) + data, err = InvokeModel(ctx, m, payload) if err != nil { _ = dao.Task.UpdateFailedGlobal(ctx, t.Id, err.Error()) diff --git a/update.sql b/update.sql index 18c7e09..04ae003 100644 --- a/update.sql +++ b/update.sql @@ -2,6 +2,7 @@ -- 1) asynch_models:模型配置 -- 2) asynch_task:异步任务 -- 3) asynch_op_log:操作日志(统计用) +-- 4) asynch_model_stat:按天模型请求统计(限流/监控用) -- ========================= -- 1) asynch_models @@ -161,3 +162,33 @@ COMMENT ON COLUMN asynch_op_log.error_msg IS '错误信息(失败时)'; COMMENT ON COLUMN asynch_op_log.cost_ms IS '耗时(毫秒)'; COMMENT ON COLUMN asynch_op_log.request_payload IS '请求 JSON'; COMMENT ON COLUMN asynch_op_log.response_payload IS '响应 JSON'; + + +-- ========================= +-- 4) asynch_model_stat +-- ========================= +CREATE TABLE IF NOT EXISTS asynch_model_stat ( + day DATE NOT NULL, -- 天(YYYY-MM-DD) + tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID + creator VARCHAR(64) NOT NULL DEFAULT '', -- 创建人 + model_name VARCHAR(128) NOT NULL DEFAULT '', -- 模型名称 + request_count BIGINT NOT NULL DEFAULT 0, -- 请求次数 + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY(day, tenant_id, creator, model_name) +); + +-- 便于时间段/租户/人/模型过滤 +CREATE INDEX IF NOT EXISTS idx_asynch_model_stat_tenant_day ON asynch_model_stat(tenant_id, day); +CREATE INDEX IF NOT EXISTS idx_asynch_model_stat_day ON asynch_model_stat(day); +CREATE INDEX IF NOT EXISTS idx_asynch_model_stat_model_name ON asynch_model_stat(model_name); +CREATE INDEX IF NOT EXISTS idx_asynch_model_stat_creator ON asynch_model_stat(creator); + +COMMENT ON TABLE asynch_model_stat IS '按天模型请求统计(用于限流/监控)'; +COMMENT ON COLUMN asynch_model_stat.day IS '天(YYYY-MM-DD)'; +COMMENT ON COLUMN asynch_model_stat.tenant_id IS '租户ID'; +COMMENT ON COLUMN asynch_model_stat.creator IS '创建人'; +COMMENT ON COLUMN asynch_model_stat.model_name IS '模型名称'; +COMMENT ON COLUMN asynch_model_stat.request_count IS '请求次数'; +COMMENT ON COLUMN asynch_model_stat.created_at IS '创建时间'; +COMMENT ON COLUMN asynch_model_stat.updated_at IS '更新时间';