Files
model-asynch/service/storage_oss.go
WangLiZhao f6c70a451e feat: 新增操作日志、任务分页查询与模型失败重试优化
- 新增操作日志表(asynch_op_log)及对应DAO,记录任务创建等操作的审计信息
- 新增任务分页查询接口(ListTask)及对应DTO、Service和DAO方法
- 优化模型调用失败重试逻辑:支持配置重试排队策略(插队到队首或队尾)
- 新增临时文件存储机制,当模型调用成功但OSS上传失败时,下次仅重试OSS上传
- 模型配置新增retry_queue_max_seconds字段,控制失败重试排队策略
- 更新数据库表结构(asynch_models、asynch_task、新增asynch_op_log)及同步更新SQL
- 配置文件调整:超时单位改为秒,更新服务地址和轮询间隔
- 修复模型列表查询支持按名称模糊搜索
2026-04-25 10:42:21 +08:00

83 lines
2.3 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package service
import (
"bytes"
"context"
"fmt"
"mime/multipart"
"time"
"model-asynch/model/entity"
commonHttp "gitea.com/red-future/common/http"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/util/gconv"
"github.com/gogf/gf/v2/util/guid"
)
// 对接你们的 oss 文件服务POST oss/file/uploadFile (multipart/form-data)
type ossStorage struct{}
type uploadFileResponse struct {
FileURL string `json:"fileURL"` // 文件 URL
FileSize int `json:"fileSize"` // 文件大小(字节)
FileName string `json:"fileName"` // 文件名
FileFormat string `json:"fileFormat"` // 文件格式
FileAddressPrefix string `json:"fileAddressPrefix"` // 文件地址前缀
}
func (s *ossStorage) UploadByTask(ctx context.Context, _ *entity.AsynchTask, data []byte, fileExt string, _ string) (ossURL string, err error) {
// multipart
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
ext := fileExt
if ext == "" {
ext = ".bin"
}
if ext[0] != '.' {
ext = "." + ext
}
filename := fmt.Sprintf("asynch_%d_%s%s", time.Now().Unix(), guid.S(), ext)
part, err := writer.CreateFormFile("file", filename)
if err != nil {
return "", err
}
if _, err := part.Write(data); err != nil {
return "", err
}
contentType := writer.FormDataContentType()
if err := writer.Close(); err != nil {
return "", err
}
headers := forwardHeaders(ctx)
headers["Content-Type"] = contentType
fullURL := "oss/file/uploadFile"
g.Log().Infof(ctx, "[OSS] upload start url=%s filename=%s size=%d", fullURL, filename, len(data))
var resp uploadFileResponse
if err := commonHttp.Post(ctx, fullURL, headers, &resp, body.Bytes()); err != nil {
return "", err
}
fmt.Println("打印结果 resp:", resp)
g.Log().Infof(ctx, "[OSS] upload success url=%s size=%d format=%s", resp.FileURL, resp.FileSize, resp.FileFormat)
return resp.FileURL, nil
}
// setTaskHeadersToCtx 把任务入库时保存的 header 信息注入 ctx给 worker 调 OSS 用
func setTaskHeadersToCtx(ctx context.Context, headers map[string]string) context.Context {
if headers == nil {
return ctx
}
if v := gconv.String(headers["Authorization"]); v != "" {
ctx = context.WithValue(ctx, "token", v)
}
if v := gconv.String(headers["X-User-Info"]); v != "" {
ctx = context.WithValue(ctx, "xUserInfo", v)
}
return ctx
}