update: 更新配置文件中的服务地址,修改模型管理相关代码,调整数据结构和逻辑,优化模型列表查询和会话模型设置,更新数据库表结构和索引,修改模块名称和依赖版本

This commit is contained in:
2026-05-15 14:56:26 +08:00
parent adf1d0ae6e
commit bac9d7713f
27 changed files with 286 additions and 292 deletions

View File

@@ -5,8 +5,8 @@ import (
"fmt"
"math"
"model-asynch/consts/public"
"model-asynch/model/entity"
"model-gateway/consts/public"
"model-gateway/model/entity"
"gitea.com/red-future/common/db/gfdb"
"github.com/gogf/gf/v2/frame/g"
@@ -14,9 +14,9 @@ import (
// AutoTuneResult 单次调参结果(按 model_name
type AutoTuneResult struct {
ModelName string `json:"modelName"` // 模型名称asynch_models.model_name
Samples int `json:"samples"` // 统计样本数(窗口内 state=2/3 且 started_at/finished_at 非空的任务数量)
P90Exec float64 `json:"p90ExecSeconds"` // 执行耗时 P90口径finished_at - started_at
ModelName string `json:"modelName"` // 模型名称asynch_models.model_name
Samples int `json:"samples"` // 统计样本数(窗口内 state=2/3 且 started_at/finished_at 非空的任务数量)
P90Exec float64 `json:"p90ExecSeconds"` // 执行耗时 P90口径finished_at - started_at
CapMaxConcurrency int `json:"capMaxConcurrency"` // 配置上限asynch_models.max_concurrencycap不会被动态调参覆盖
OldMaxConcurrency int `json:"oldMaxConcurrency"` // 调参前运行时值Redis若无则等于 cap

View File

@@ -4,7 +4,7 @@ import (
"context"
"encoding/json"
"model-asynch/model/entity"
"model-gateway/model/entity"
"gitea.com/red-future/common/http"
"github.com/gogf/gf/v2/frame/g"
@@ -65,23 +65,3 @@ func triggerPromptsCallback(ctx context.Context, t *entity.AsynchTask, epicycleI
}
g.Log().Infof(ctx, "[提示词回调] 发送成功 epicycleId=%d 回调地址=%s 消息体大小=%d字节", t.EpicycleId, callbackURL, len(jsonData))
}
// IsSuperAdmin 调用admin-go服务检查是否是超级管理员
func IsSuperAdmin(ctx context.Context) (res bool, err error) {
headers := forwardHeaders(ctx)
var r = make(map[string]bool)
if err = http.Get(ctx, "admin-go/api/v1/system/user/checkIsSuperAdmin", headers, &r); err != nil {
return false, err
}
return r["isSuperAdmin"], err
}
// IsAdmin 调用admin-go服务检查是否是管理员
func IsAdmin(ctx context.Context) (res bool, err error) {
headers := forwardHeaders(ctx)
var r = make(map[string]bool)
if err = http.Get(ctx, "admin-go/api/v1/system/user/checkIsSuperAdmin", headers, &r); err != nil {
return false, err
}
return r["isSuperAdmin"], err
}

View File

@@ -4,7 +4,7 @@ import (
"context"
"time"
"model-asynch/dao"
"model-gateway/dao"
"github.com/gogf/gf/v2/frame/g"
)

View File

@@ -11,7 +11,7 @@ import (
"strings"
"time"
"model-asynch/model/entity"
"model-gateway/model/entity"
"github.com/gogf/gf/v2/container/gvar"
"github.com/gogf/gf/v2/frame/g"

View File

@@ -3,13 +3,15 @@ package service
import (
"context"
"errors"
"sort"
"model-asynch/dao"
"model-asynch/model/dto"
"model-asynch/model/entity"
"model-gateway/dao"
"model-gateway/model/dto"
"model-gateway/model/entity"
"gitea.com/red-future/common/beans"
"gitea.com/red-future/common/db/gfdb"
"gitea.com/red-future/common/http"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/database/gdb"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
)
@@ -18,32 +20,45 @@ var Model = &modelService{}
type modelService struct{}
func (s *modelService) Create(ctx context.Context, req *dto.CreateModelReq) (res *dto.CreateModelRes, err error) {
m := &entity.AsynchModel{
ModelName: req.ModelName,
ModelsType: req.ModelsType,
BaseURL: req.BaseURL,
HttpMethod: req.HttpMethod,
HeadMsg: req.HeadMsg,
IsPrivate: req.IsPrivate,
Enabled: req.Enabled,
IsChatModel: req.IsChatModel,
ApiKey: req.ApiKey,
Form: req.Form,
RequestMapping: req.RequestMapping,
ResponseMapping: req.ResponseMapping,
ResponseBody: req.ResponseBody,
TokenMapping: req.TokenMapping,
MaxConcurrency: req.MaxConcurrency,
QueueLimit: req.QueueLimit,
TimeoutSeconds: req.TimeoutSeconds,
ExpectedSeconds: req.ExpectedSeconds,
RetryTimes: req.RetryTimes,
RetryQueueMaxSeconds: req.RetryQueueMaxSeconds,
AutoCleanSeconds: req.AutoCleanSeconds,
Remark: req.Remark,
// IsSuperAdmin 调用admin-go服务检查是否是超级管理员
func (s *modelService) IsSuperAdmin(ctx context.Context) (res bool, err error) {
headers := forwardHeaders(ctx)
var r = make(map[string]bool)
if err = http.Get(ctx, "admin-go/api/v1/system/user/checkIsSuperAdmin", headers, &r); err != nil {
return false, err
}
id, err := dao.Model.Insert(ctx, m)
return r["isSuperAdmin"], err
}
func (s *modelService) Create(ctx context.Context, req *dto.CreateModelReq) (res *dto.CreateModelRes, err error) {
// 获取当前会话模型
if !g.IsEmpty(req.IsChatModel) && *req.IsChatModel == 1 {
var model *entity.AsynchModel
model, err = dao.Model.GetByIsChatModel(ctx)
if err != nil {
return nil, err
}
// 如果有会话模型,那就改变为 0
if model != nil {
_, err = dao.Model.Update(ctx, &dto.UpdateModelReq{
ID: model.Id,
IsChatModel: gconv.PtrInt(0),
})
if err != nil {
return nil, err
}
}
}
req.IsOwner = gconv.PtrInt(1)
admin, err := s.IsSuperAdmin(ctx)
if err != nil {
return
}
if admin {
req.IsOwner = gconv.PtrInt(0)
}
id, err := dao.Model.Insert(ctx, req)
if err != nil {
return nil, err
}
@@ -52,23 +67,55 @@ func (s *modelService) Create(ctx context.Context, req *dto.CreateModelReq) (res
func (s *modelService) Update(ctx context.Context, req *dto.UpdateModelReq) error {
//根据当前 isChatModel 来判断是否更新模型
if req.IsChatModel == 1 {
user, err := utils.GetUserInfo(ctx)
if err != nil {
return err
}
if req.IsChatModel == gconv.PtrInt(1) {
//判断当前用户是否有会话模型
model, err := dao.Model.GetByIsChatModel(ctx, user.UserName)
model, err := dao.Model.GetByIsChatModel(ctx)
if err != nil {
return err
}
if model != nil {
return errors.New("用户已存在会话模型,不能创建新的会话模型")
return errors.New("用户已存在会话模型,不能创建")
}
_, err = dao.Model.Update(ctx, req)
}
req.IsOwner = gconv.PtrInt(1)
admin, err := s.IsSuperAdmin(ctx)
if err != nil {
return err
}
_, err := dao.Model.Update(ctx, req)
if admin {
req.IsOwner = gconv.PtrInt(0)
_, err = dao.Model.Update(ctx, req)
if err != nil {
return err
}
return nil
}
var user *beans.User
user, err = utils.GetUserInfo(ctx)
if err != nil {
return err
}
// 判断当前传过来的模型id的模型是否是超级管理员的。如果是超管的进行创建否则更新
var count int
count, err = dao.Model.Count(ctx, &dto.GetModelReq{
ID: req.ID,
Creator: user.UserName,
})
if err != nil {
return err
}
if count == 0 {
insertDto := new(dto.CreateModelReq)
err = gconv.Struct(req, insertDto)
if err != nil {
return err
}
_, err = dao.Model.Insert(ctx, insertDto)
return err
}
_, err = dao.Model.Update(ctx, req)
return err
}
@@ -89,27 +136,29 @@ func (s *modelService) Get(ctx context.Context, id int64) (*entity.AsynchModel,
return model, nil
}
func (s *modelService) List(ctx context.Context, pageNum, pageSize int, req *dto.ListModelReq) (list []*entity.AsynchModel, total int64, err error) {
isSuperAdmin, err := IsSuperAdmin(ctx)
if err != nil {
return nil, 0, err
}
user, err := utils.GetUserInfo(ctx)
if err != nil {
return nil, 0, err
}
func (s *modelService) List(ctx context.Context, req *dto.ListModelReq) (list []*entity.AsynchModel, total int, err error) {
var models []*entity.AsynchModel
var count int64
if isSuperAdmin {
models, count, err = dao.Model.List(ctx, pageNum, pageSize, req.ModelName, req.ModelType, req.IsPrivate)
} else {
models, count, err = s.getModelsWithDedup(ctx, user.UserName, pageNum, pageSize, req.ModelName, req.ModelType, req.IsPrivate)
req.IsOwner = gconv.PtrInt(1)
admin, err := s.IsSuperAdmin(ctx)
if err != nil {
return
}
if admin {
req.IsOwner = gconv.PtrInt(0)
}
var user *beans.User
user, err = utils.GetUserInfo(ctx)
if err != nil {
return nil, 0, err
}
req.Creator = user.UserName
models, total, err = dao.Model.GetByCreatorAndPlatform(ctx, req)
if err != nil {
return
}
// 处理列表中每条记录的 JSONB 字段
for _, m := range models {
@@ -118,61 +167,7 @@ func (s *modelService) List(ctx context.Context, pageNum, pageSize int, req *dto
m.ResponseMapping = ParseJSONField(m.ResponseMapping)
m.ResponseBody = ParseJSONField(m.ResponseBody)
}
return models, count, nil
}
// getModelsWithDedup 获取普通用户的模型列表并去重
func (s *modelService) getModelsWithDedup(ctx context.Context, creator string, pageNum, pageSize int, modelNameLike string, modelType int, isPrivate int) (list []*entity.AsynchModel, total int64, err error) {
// 1. 查全量数据(不分页,便于去重)
allModels, err := dao.Model.GetByCreatorAndPlatform(ctx, creator, modelNameLike, modelType, isPrivate)
if err != nil {
return nil, 0, err
}
// 2. 按 modelName 去重,保留当前用户的
modelMap := make(map[string]*entity.AsynchModel)
for _, m := range allModels {
if m == nil {
continue
}
name := m.ModelName
_, ok := modelMap[name]
if !ok {
// 没有冲突,直接放进去
modelMap[name] = m
} else {
// 有冲突,保留当前用户创建的
if m.Creator == creator {
modelMap[name] = m
}
// 如果现有的就是当前用户的,不做任何替换
}
}
// 3. 转回切片并排序
deduped := make([]*entity.AsynchModel, 0, len(modelMap))
for _, m := range modelMap {
deduped = append(deduped, m)
}
sort.Slice(deduped, func(i, j int) bool {
return deduped[i].CreatedAt.After(deduped[j].CreatedAt)
})
// 4. 手动分页
total = int64(len(deduped))
if pageNum > 0 && pageSize > 0 {
start := (pageNum - 1) * pageSize
if start >= len(deduped) {
return []*entity.AsynchModel{}, total, nil
}
end := start + pageSize
if end > len(deduped) {
end = len(deduped)
}
deduped = deduped[start:end]
}
return deduped, total, nil
return models, total, nil
}
// GetModelTypesFromConfig 从配置文件读取模型类型
@@ -202,11 +197,6 @@ func GetModelTypesFromConfig(ctx context.Context) map[int]string {
}
func (s *modelService) UpdateChatModel(ctx context.Context, req *dto.UpdateChatModelReq) error {
user, err := utils.GetUserInfo(ctx)
if err != nil {
return err
}
// 校验新会话模型是否存在
newModel, err := dao.Model.Get(ctx, req.Id)
if err != nil {
@@ -217,48 +207,40 @@ func (s *modelService) UpdateChatModel(ctx context.Context, req *dto.UpdateChatM
}
// 获取当前用户会话模型
currentModel, err := dao.Model.GetByIsChatModel(ctx, user.UserName)
currentModel, err := dao.Model.GetByIsChatModel(ctx)
if err != nil {
return err
}
if currentModel.ModelsType != 1 {
return errors.New("当前模型为非推理模型,不能设置为会话模型")
}
err = gfdb.DB(ctx).Transaction(ctx, func(ctx context.Context, tx gdb.TX) error {
if !g.IsEmpty(currentModel) {
if currentModel.ModelType != 1 {
return errors.New("当前模型为非推理模型,不能设置为会话模型")
}
// 如果点击的就是当前会话模型已经是1取消它设为0
if currentModel != nil && currentModel.Id == req.Id {
_, err = dao.Model.UpdateByID(ctx, &dto.UpdateModelReq{
// 如果点击的就是当前会话模型已经是1取消它设为0
if currentModel.Id != req.Id {
_, err = dao.Model.Update(ctx, &dto.UpdateModelReq{
ID: currentModel.Id,
IsChatModel: gconv.PtrInt(0),
})
if err != nil {
return err
}
}
}
// 设置当前为会话模型设为1
_, err = dao.Model.Update(ctx, &dto.UpdateModelReq{
ID: req.Id,
IsChatModel: 0,
IsChatModel: gconv.PtrInt(1),
})
return err
}
// 如果之前有会话模型取消它设为0
if currentModel != nil {
_, err = dao.Model.UpdateByID(ctx, &dto.UpdateModelReq{
ID: currentModel.Id,
IsChatModel: 0,
})
if err != nil {
return err
}
}
// 设置当前为会话模型设为1
_, err = dao.Model.UpdateByID(ctx, &dto.UpdateModelReq{
ID: req.Id,
IsChatModel: 1,
})
return err
}
func (s *modelService) GetIsChatModel(ctx context.Context) (*entity.AsynchModel, error) {
user, err := utils.GetUserInfo(ctx)
if err != nil {
return nil, err
}
model, err := dao.Model.GetByIsChatModel(ctx, user.UserName)
model, err := dao.Model.GetByIsChatModel(ctx)
if err != nil {
return nil, err
}

View File

@@ -3,8 +3,8 @@ package service
import (
"context"
"model-asynch/dao"
"model-asynch/model/dto"
"model-gateway/dao"
"model-gateway/model/dto"
)
type statService struct{}

View File

@@ -4,7 +4,7 @@ import (
"context"
"errors"
"model-asynch/model/entity"
"model-gateway/model/entity"
)
// StorageService 结果存储OSS/MinIO抽象

View File

@@ -7,7 +7,7 @@ import (
"mime/multipart"
"time"
"model-asynch/model/entity"
"model-gateway/model/entity"
commonHttp "gitea.com/red-future/common/http"
"github.com/gogf/gf/v2/frame/g"

View File

@@ -3,11 +3,12 @@ package service
import (
"context"
"errors"
"fmt"
"time"
"model-asynch/dao"
"model-asynch/model/dto"
"model-asynch/model/entity"
"model-gateway/dao"
"model-gateway/model/dto"
"model-gateway/model/entity"
"github.com/gogf/gf/v2/database/gdb"
"github.com/gogf/gf/v2/frame/g"
@@ -20,6 +21,7 @@ var Task = &taskService{}
type taskService struct{}
func (s *taskService) Create(ctx context.Context, req *dto.CreateTaskReq) (res *dto.CreateTaskRes, err error) {
fmt.Printf("打印请求:%+v", req)
startAt := time.Now()
// 固化 token/user 等信息
ctx = asyncCtx(ctx)
@@ -29,7 +31,7 @@ func (s *taskService) Create(ctx context.Context, req *dto.CreateTaskReq) (res *
if err != nil {
return nil, err
}
if m == nil || m.Enabled != 1 {
if m == nil || (m.Enabled != nil && *m.Enabled != 1) {
return nil, errors.New("模型不存在或未启用")
}

View File

@@ -7,8 +7,8 @@ import (
"time"
"unicode/utf8"
"model-asynch/dao"
"model-asynch/model/entity"
"model-gateway/dao"
"model-gateway/model/entity"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/grpool"
@@ -95,7 +95,7 @@ func (w *asyncWorker) handleOne(ctx context.Context, t *entity.AsynchTask, epicy
// ================================
return
}
if m == nil || m.Enabled != 1 {
if m == nil || (m.Enabled != nil && *m.Enabled != 1) {
errMsg := "模型不存在或未启用"
_ = dao.Task.UpdateFailedGlobal(ctx, t.Id, errMsg)
ReleaseQueueSlot(ctx, t.ModelName, t.TaskID)
@@ -172,9 +172,6 @@ func (w *asyncWorker) handleOne(ctx context.Context, t *entity.AsynchTask, epicy
contentType, ext = DetectFileType(data)
if utf8.Valid(data) && (strings.HasPrefix(contentType, "text/") || contentType == "application/json") {
textResult = string(data)
if len(textResult) > 20000 {
textResult = textResult[:20000]
}
}
tmpPath, err := saveTmpResult(t.TaskID, data, ext)
if err == nil && tmpPath != "" {