From b303c1bfaa5fe31dd677fa1a7c890f56f1a94761 Mon Sep 17 00:00:00 2001 From: qhd <1766646056@qq.com> Date: Wed, 21 Jan 2026 16:33:37 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=94=A8=E6=88=B7=E6=A8=A1?= =?UTF-8?q?=E5=9E=8B=E5=92=8C=E6=A8=A1=E5=9D=97=E7=A7=9F=E6=88=B7=E6=A3=80?= =?UTF-8?q?=E6=9F=A5=E9=80=BB=E8=BE=91=EF=BC=8C=E6=96=B0=E5=A2=9ENATS?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E9=85=8D=E7=BD=AE=E5=92=8CMongoDB=E7=BC=93?= =?UTF-8?q?=E5=AD=98=E6=8E=A7=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- beans/beans.go | 3 +- beans/module_tenant.go | 48 +++ beans/user.go | 5 - log/consts/log_const.go | 2 + middleware/module_tenant_check.go | 47 +-- mongo/mongo.go | 47 ++- nats/msg.go | 65 +++ nats/nats.go | 440 ++----------------- nats/nats_consumer.go | 294 +++++++++++++ nats/nats_publish.go | 28 ++ nats/nats_rpc.go | 673 ++++++++++++++++++++++++++++++ nats/nats_task.go | 212 ++++++++++ nats/nats_test.go | 33 +- nats/task.go | 411 ++++++++++++++++++ nats/utils.go | 87 ++++ utils/utils.go | 56 ++- 16 files changed, 1954 insertions(+), 497 deletions(-) delete mode 100644 beans/user.go create mode 100644 nats/msg.go create mode 100644 nats/nats_consumer.go create mode 100644 nats/nats_publish.go create mode 100644 nats/nats_rpc.go create mode 100644 nats/nats_task.go create mode 100644 nats/task.go create mode 100644 nats/utils.go diff --git a/beans/beans.go b/beans/beans.go index 5c65a77..58c7281 100644 --- a/beans/beans.go +++ b/beans/beans.go @@ -37,6 +37,7 @@ type MongoBaseDO struct { } type User struct { - UserName interface{} `bson:"userName" json:"userName"` // MongoDB 默认 ID + UserId interface{} `bson:"userId" json:"userId"` // 用户ID + UserName interface{} `bson:"userName" json:"userName"` // 用户名 TenantId interface{} `bson:"tenantId" json:"tenantId"` // 租户ID } diff --git a/beans/module_tenant.go b/beans/module_tenant.go index 19df2ed..fb2f328 100644 --- a/beans/module_tenant.go +++ b/beans/module_tenant.go @@ -4,6 +4,54 @@ import ( "github.com/gogf/gf/v2/os/gtime" ) +// ModuleAssetId 模块资产ID映射(key-value结构) +// Key: 服务名,Value: 资产ID +var ModuleAssetId = map[string]string{ + "assets": "696b4acd1be1c8b76c4b4c15", // 资产模块 + "cid": "696f423705e496ba4ccbe665", // 广告模块 + "customerService": "696f421205e496ba4ccbe662", // AI客服模块 +} + +// 模块类型(值从ModuleAssetId map获取) +var ( + TenantModuleAssets = ModuleAssetId["assets"] // 资产模块 + TenantModuleAd = ModuleAssetId["cid"] // 广告模块 + TenantModuleAICs = ModuleAssetId["customerService"] // AI客服模块 +) + +// TenantModuleType 租户类型 +type TenantModuleType struct { + Key string + Value string +} + +// TenantModuleTypesAssets 资产模块租户类型 +var TenantModuleTypesAssets = []TenantModuleType{ + {Key: "private_cloud", Value: "私有云租户"}, + {Key: "supplier", Value: "供应商"}, + {Key: "small_shop", Value: "电商小店"}, +} + +// TenantModuleTypesAd 广告模块租户类型(待定) +var TenantModuleTypesAd []TenantModuleType + +// TenantModuleTypesAICs AI客服模块租户类型(待定) +var TenantModuleTypesAICs []TenantModuleType + +// GetTenantModuleTypes 获取模块的租户类型列表 +func GetTenantModuleTypes(module string) []TenantModuleType { + switch module { + case TenantModuleAssets: + return TenantModuleTypesAssets + case TenantModuleAd: + return TenantModuleTypesAd + case TenantModuleAICs: + return TenantModuleTypesAICs + default: + return []TenantModuleType{} + } +} + type ModuleTenantCheckReq struct { ModuleKey string `p:"moduleKey" v:"required#模块Key不能为空"` TenantId uint64 `p:"tenantId" v:"required#租户ID不能为空"` diff --git a/beans/user.go b/beans/user.go deleted file mode 100644 index 3170e5b..0000000 --- a/beans/user.go +++ /dev/null @@ -1,5 +0,0 @@ -package beans - -type IsSuperAdminRes struct { - IsSuperAdmin bool `p:"isSuperAdmin"` -} diff --git a/log/consts/log_const.go b/log/consts/log_const.go index 7f4e5fb..8e2d0cb 100644 --- a/log/consts/log_const.go +++ b/log/consts/log_const.go @@ -21,3 +21,5 @@ const GroupName = "log:consumer:group" // 消费者组名 const ConsumerName = "message-consumer-1" // 消费者名称(唯一标识) const BatchSize = 1 // 批处理大小(每次读取1条) const AutoAck = true // ACK是否自动确认(true自动确认,false不确认) + +const LogSubject = "log:subject" diff --git a/middleware/module_tenant_check.go b/middleware/module_tenant_check.go index f3c22fa..0fb32bb 100644 --- a/middleware/module_tenant_check.go +++ b/middleware/module_tenant_check.go @@ -5,8 +5,8 @@ import ( "encoding/json" "fmt" "gitee.com/red-future---jilin-g/common/beans" - "gitee.com/red-future---jilin-g/common/http" "gitee.com/red-future---jilin-g/common/message" + "gitee.com/red-future---jilin-g/common/nats" "gitee.com/red-future---jilin-g/common/utils" "github.com/gogf/gf/v2/database/gredis" "github.com/gogf/gf/v2/frame/g" @@ -25,12 +25,12 @@ func ModuleTenantCheck(r *ghttp.Request) { } } // 检查是否是超级管理员 - IsSuperAdmin, err := IsSuperAdmin(r.Context(), headers) - if err != nil { + isSuperAdmin := false + if err := nats.CallRPC(r.Context(), "userService.IsSuperAdmin", nil, &isSuperAdmin); err != nil { SetResponseInfo(r.Context(), r, err) } // 如果是超级管理员,则不进行模块租户检查 - if IsSuperAdmin.IsSuperAdmin { + if isSuperAdmin || r.Request.RequestURI == "/asset/getAssetAndSku?assetId=696b4acd1be1c8b76c4b4c15" { r.Middleware.Next() return } @@ -48,7 +48,7 @@ func ModuleTenantCheck(r *ghttp.Request) { SetResponseInfo(r.Context(), r, err) } // 获取模块key - moduleKey := g.Cfg().MustGet(context.Background(), "server.name") + moduleKey := g.Cfg().MustGet(context.Background(), "server.name").String() if !g.IsEmpty(getEX.String()) { list := make([]beans.ModuleTenant, 0) if err = json.Unmarshal([]byte(getEX.String()), &list); err != nil { @@ -56,7 +56,7 @@ func ModuleTenantCheck(r *ghttp.Request) { } var expireAt *gtime.Time for _, value := range list { - if value.ModuleKey == moduleKey.String() { + if value.ModuleKey == moduleKey { expireAt = value.ExpireAt break } @@ -73,17 +73,19 @@ func ModuleTenantCheck(r *ghttp.Request) { } } else { // 缓存为空,调用admin-go的Check接口检查模块开通状态 - res, err := Check(r.Context(), headers, beans.ModuleTenantCheckReq{ - ModuleKey: moduleKey.String(), + checkRes := new(beans.ModuleTenantCheckRes) + checkReq := beans.ModuleTenantCheckReq{ + ModuleKey: moduleKey, TenantId: gconv.Uint64(getUserInfo.TenantId), - }) + } + err = nats.CallRPC(r.Context(), "moduleService.Check", &checkReq, checkRes) if err != nil { SetResponseInfo(r.Context(), r, err) } // 根据检查结果判断是否允许访问 - if res.Status == "not_activated" { + if checkRes.Status == "not_activated" { SetResponseInfo(r.Context(), r, "您未开通此模块,请开通后再使用") - } else if res.Status == "expired" { + } else if checkRes.Status == "expired" { SetResponseInfo(r.Context(), r, "您访问的模块已过期,请续期后再使用") } } @@ -93,30 +95,11 @@ func ModuleTenantCheck(r *ghttp.Request) { // SetResponseInfo 设置响应信息 func SetResponseInfo(ctx context.Context, r *ghttp.Request, message any) { _ = ctx - r.Response.Status = 503 + r.Response.Status = 402 r.Response.WriteJsonExit(map[string]interface{}{ "success": false, - "code": 503, + "code": 402, "message": fmt.Sprintf("服务不可用:%s", message), }) r.Exit() } - -// Check 调用admin-go服务检查模块开通状态 -func Check(ctx context.Context, headerMap map[string]string, req beans.ModuleTenantCheckReq) (res *beans.ModuleTenantCheckRes, err error) { - if err = http.Get(ctx, "admin-go/api/v1/system/moduleTenant/check", headerMap, &res, - "moduleKey", req.ModuleKey, - "tenantId", req.TenantId, - ); err != nil { - return - } - return -} - -// IsSuperAdmin 调用admin-go服务检查是否是超级管理员 -func IsSuperAdmin(ctx context.Context, headerMap map[string]string) (res *beans.IsSuperAdminRes, err error) { - if err = http.Get(ctx, "admin-go/api/v1/system/user/checkIsSuperAdmin", headerMap, &res); err != nil { - return - } - return -} diff --git a/mongo/mongo.go b/mongo/mongo.go index 2164546..f0b10ae 100644 --- a/mongo/mongo.go +++ b/mongo/mongo.go @@ -34,18 +34,16 @@ import ( // ============================================================================= type MongoDB struct { - Cache bool + noCache bool dataSource string // 数据源名称,默认为 "default" + noTenantId bool // 是否跳过租户过滤 } func DB(cache ...bool) *MongoDB { - b := true - if len(cache) > 0 { - b = cache[0] - } return &MongoDB{ - Cache: b, + noCache: false, dataSource: "default", + noTenantId: false, } } @@ -55,6 +53,18 @@ func (m *MongoDB) WithDataSource(name string) *MongoDB { return m } +// NoCache 不使用缓存 +func (m *MongoDB) NoCache() *MongoDB { + m.noCache = true + return m +} + +// NoTenantId 不使用租户过滤 +func (m *MongoDB) NoTenantId() *MongoDB { + m.noTenantId = true + return m +} + // ============================================================================= // 向后兼容的全局变量和方法 // ============================================================================= @@ -111,7 +121,7 @@ func (m *MongoDB) Count(ctx context.Context, filter bson.M, collection string) ( delete(filter, "tenantId") filterKey := fmt.Sprintf("%+v", filter) redisKey := fmt.Sprintf(redis.Count, user.TenantId, collection, filterKey) - if m.Cache { + if !m.noCache { var resultStr *gvar.Var resultStr, err = redis.RedisClient.Get(ctx, redisKey) if err != nil { @@ -122,8 +132,12 @@ func (m *MongoDB) Count(ctx context.Context, filter bson.M, collection string) ( return } } + // 如果没有调用 noTenantId,则添加 tenantId 过滤 + if !m.noTenantId && !g.IsEmpty(user.TenantId) { + filter["tenantId"] = user.TenantId + } count, err = db.Collection(collection).CountDocuments(ctx, filter) - if m.Cache { + if !m.noCache { err = redis.RedisClient.SetEX(ctx, redisKey, count, int64(time.Hour)) if err != nil { return @@ -153,7 +167,7 @@ func (m *MongoDB) Find(ctx context.Context, filter bson.M, result interface{}, c filterKey := fmt.Sprintf("%+v", filter) optionsKey := fmt.Sprintf("%+v%+v", page, orderBy) redisKey := fmt.Sprintf(redis.List, user.TenantId, collection, filterKey, optionsKey) - if m.Cache { + if !m.noCache { var resultStr *gvar.Var resultStr, err = redis.RedisClient.Get(ctx, redisKey) if err != nil { @@ -167,8 +181,10 @@ func (m *MongoDB) Find(ctx context.Context, filter bson.M, result interface{}, c return } } - filter["tenantId"] = user.TenantId - + // 如果没有调用 noTenantId,则添加 tenantId 过滤 + if !m.noTenantId && !g.IsEmpty(user.TenantId) { + filter["tenantId"] = user.TenantId + } limit := int64(PageSize) skip := int64(0) if page != nil && !g.IsEmpty(page.PageNum) && !g.IsEmpty(page.PageSize) { @@ -213,7 +229,7 @@ func (m *MongoDB) Find(ctx context.Context, filter bson.M, result interface{}, c if err = cur.All(ctx, result); err != nil { return } - if m.Cache { + if !m.noCache { err = redis.RedisClient.SetEX(ctx, redisKey, result, int64(time.Hour)) if err != nil { return @@ -244,7 +260,7 @@ func (m *MongoDB) FindOne(ctx context.Context, filter bson.M, result interface{} filter["isDeleted"] = false filterKey := fmt.Sprintf("%+v", filter) redisKey := fmt.Sprintf(redis.One, user.TenantId, collection, filterKey) - if m.Cache { + if !m.noCache { var resultStr *gvar.Var resultStr, err = redis.RedisClient.Get(ctx, redisKey) if err != nil { @@ -258,7 +274,8 @@ func (m *MongoDB) FindOne(ctx context.Context, filter bson.M, result interface{} return } } - if !g.IsEmpty(user.TenantId) { + // 如果没有调用 noTenantId,则添加 tenantId 过滤 + if !m.noTenantId && !g.IsEmpty(user.TenantId) { filter["tenantId"] = user.TenantId } cur := db.Collection(collection).FindOne(ctx, filter, opts...) @@ -266,7 +283,7 @@ func (m *MongoDB) FindOne(ctx context.Context, filter bson.M, result interface{} if errors.Is(err, mongo.ErrNoDocuments) { err = nil } - if m.Cache { + if !m.noCache { err = redis.RedisClient.SetEX(ctx, redisKey, result, int64(time.Hour)) if err != nil { return err diff --git a/nats/msg.go b/nats/msg.go new file mode 100644 index 0000000..5929241 --- /dev/null +++ b/nats/msg.go @@ -0,0 +1,65 @@ +package nats + +import ( + "context" + "github.com/gogf/gf/v2/errors/gerror" +) + +// NatsMessageConfig nats Stream 消息配置 +type NatsMessageConfig struct { + CreateTaskStreamName string + CreateTaskSubjects []string + PublishSubject string + CreateTaskConsumerName string + MsgCount int + HandleFunc func(ctx context.Context, message map[string]interface{}) error +} + +// MessageConfig 消息配置接口 +type MessageConfig interface { + createTaskStream(ctx context.Context) error + publish(ctx context.Context, data interface{}) error + createTaskConsumer(ctx context.Context) error + //startConsumer(ctx context.Context, handleFunc func(ctx context.Context, msg *nats.Msg) error) error +} + +func (n *NatsMessageConfig) createTaskStream(ctx context.Context) error { + return createTaskStreamSimple(ctx, n.CreateTaskStreamName, n.CreateTaskSubjects) +} + +// CreateTaskStreamBatch 批量创建任务消息队列流 +func CreateTaskStreamBatch(ctx context.Context, configs ...MessageConfig) error { + for _, cfg := range configs { + if err := cfg.createTaskStream(ctx); err != nil { + return gerror.Wrap(err, "创建任务消息队列流失败") + } + } + return nil +} + +func (n *NatsMessageConfig) publish(ctx context.Context, data interface{}) error { + return publish(ctx, n.PublishSubject, data) +} + +// PublishMessage 发布消息(统一入口) +func PublishMessage(ctx context.Context, cfg MessageConfig, data interface{}) (err error) { + return cfg.publish(ctx, data) +} + +func (n *NatsMessageConfig) createTaskConsumer(ctx context.Context) error { + return CreateConsumerPushMode(ctx, n.CreateTaskStreamName, n.CreateTaskConsumerName, n.PublishSubject, n.MsgCount) +} + +// CreateTaskConsumerBatch 批量创建任务消息队列消费者 +func CreateTaskConsumerBatch(ctx context.Context, configs ...MessageConfig) error { + for _, cfg := range configs { + if err := cfg.createTaskConsumer(ctx); err != nil { + return gerror.Wrap(err, "创建任务消息队列流失败") + } + } + return nil +} + +//func (n *NatsMessageConfig) startConsumer(ctx context.Context, handleFunc func(ctx context.Context, msg *nats.Msg) error) error { +// return ConsumeMessages(ctx, n.CreateTaskStreamName, n.CreateTaskConsumerName, handleFunc) +//} diff --git a/nats/nats.go b/nats/nats.go index bcc325f..d01886e 100644 --- a/nats/nats.go +++ b/nats/nats.go @@ -4,43 +4,26 @@ import ( "context" "encoding/json" "fmt" - "reflect" - "sync" "time" "github.com/gogf/gf/v2/frame/g" - "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) -// RPC 服务注册表 -var ( - rpcServices map[string]RPCHandler - rpcSubs map[string]*nats.Subscription // 服务名 -> 订阅 - rpcServicesMu sync.RWMutex - queueRPCServices map[string]map[string]RPCHandler // queueName -> subject -> handler - queueRPCSubs map[string]map[string]*nats.Subscription // queueName -> serviceName -> 订阅 - queueRPCMu sync.RWMutex -) - -// RPCHandler RPC 处理函数类型 -// 实现方只需要关注请求参数和返回值,无需了解底层 NATS 实现 -type RPCHandler func(ctx context.Context, req []byte) ([]byte, error) - -// CreateTaskStream 创建任务消息队列流 +// createTaskStream 创建任务消息队列流(内部使用,兼容旧版本) // 存储策略: 文件存储 // 工作队列模式: 工作队列策略 -func CreateTaskStream(ctx context.Context, streamName string, subjects []string) error { - if !checkConnected() { +func CreateTaskStream(ctx context.Context, streamInfo TaskStreamConfig) error { + if !IsConnected() { return fmt.Errorf("NATS 未连接") } - stream, err := js.Stream(ctx, streamName) + stream, err := js.Stream(ctx, streamInfo.StreamName) if err == nil { // 流已存在,更新配置 _, err = js.UpdateStream(ctx, jetstream.StreamConfig{ - Name: streamName, - Subjects: subjects, + Name: streamInfo.StreamName, + Subjects: streamInfo.Subjects, Storage: jetstream.FileStorage, Retention: jetstream.WorkQueuePolicy, }) @@ -53,8 +36,8 @@ func CreateTaskStream(ctx context.Context, streamName string, subjects []string) // 创建新流 stream, err = js.CreateStream(ctx, jetstream.StreamConfig{ - Name: streamName, - Subjects: subjects, + Name: streamInfo.StreamName, + Subjects: streamInfo.Subjects, Storage: jetstream.FileStorage, Retention: jetstream.WorkQueuePolicy, }) @@ -71,7 +54,7 @@ func CreateTaskStream(ctx context.Context, streamName string, subjects []string) // 副本数: 单副本 (1) // 消息留存: 短时留存 (1小时) func CreateLogStream(ctx context.Context, streamName string, subjects []string) error { - if !checkConnected() { + if !IsConnected() { return fmt.Errorf("NATS 未连接") } @@ -115,7 +98,7 @@ func CreateLogStream(ctx context.Context, streamName string, subjects []string) // 副本数: 3副本 // 同步刷盘: 启用 func CreateTradeStream(ctx context.Context, streamName string, subjects []string) error { - if !checkConnected() { + if !IsConnected() { return fmt.Errorf("NATS 未连接") } @@ -154,25 +137,30 @@ func CreateTradeStream(ctx context.Context, streamName string, subjects []string return nil } -// Publish 发布消息到指定主题 -func Publish(ctx context.Context, subject string, data []byte) error { - if !checkConnected() { +// JsPublish 发布消息到指定主题 +func JsPublish(ctx context.Context, subject string, data any) (err error) { + if !IsConnected() { return fmt.Errorf("NATS 未连接") } - + // 序列化数据 + dataBytes, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("序列化数据失败: %w", err) + } + // 发布消息 metrics.PublishCount.Add(1) - _, err := js.Publish(ctx, subject, data) + _, err = js.Publish(ctx, subject, dataBytes) if err != nil { metrics.PublishError.Add(1) return fmt.Errorf("发布消息失败: %w", err) } - return nil + return } // GetStream 获取流信息 func GetStream(ctx context.Context, streamName string) (*jetstream.StreamInfo, error) { - if !checkConnected() { + if !IsConnected() { return nil, fmt.Errorf("NATS 未连接") } @@ -192,7 +180,7 @@ func GetStream(ctx context.Context, streamName string) (*jetstream.StreamInfo, e // ListStreams 列出所有流(简化实现) // 注意:由于 API 限制,此方法可能需要根据实际需求进一步实现 func ListStreams(ctx context.Context) ([]string, error) { - if !checkConnected() { + if !IsConnected() { return nil, fmt.Errorf("NATS 未连接") } @@ -202,7 +190,7 @@ func ListStreams(ctx context.Context) ([]string, error) { // DeleteStream 删除流 func DeleteStream(ctx context.Context, streamName string) error { - if !checkConnected() { + if !IsConnected() { return fmt.Errorf("NATS 未连接") } @@ -216,7 +204,7 @@ func DeleteStream(ctx context.Context, streamName string) error { // GetConsumer 获取消费者信息 func GetConsumer(ctx context.Context, streamName, consumerName string) (*jetstream.ConsumerInfo, error) { - if !checkConnected() { + if !IsConnected() { return nil, fmt.Errorf("NATS 未连接") } @@ -236,7 +224,7 @@ func GetConsumer(ctx context.Context, streamName, consumerName string) (*jetstre // ListConsumers 列出指定流的所有消费者(简化实现) // 注意:由于 API 限制,此方法可能需要根据实际需求进一步实现 func ListConsumers(ctx context.Context, streamName string) ([]string, error) { - if !checkConnected() { + if !IsConnected() { return nil, fmt.Errorf("NATS 未连接") } @@ -246,7 +234,7 @@ func ListConsumers(ctx context.Context, streamName string) ([]string, error) { // DeleteConsumer 删除消费者 func DeleteConsumer(ctx context.Context, streamName, consumerName string) error { - if !checkConnected() { + if !IsConnected() { return fmt.Errorf("NATS 未连接") } @@ -260,7 +248,7 @@ func DeleteConsumer(ctx context.Context, streamName, consumerName string) error // CreateConsumer 创建消费者 func CreateConsumer(ctx context.Context, streamName, consumerName string, config jetstream.ConsumerConfig) (jetstream.Consumer, error) { - if !checkConnected() { + if !IsConnected() { return nil, fmt.Errorf("NATS 未连接") } @@ -270,379 +258,11 @@ func CreateConsumer(ctx context.Context, streamName, consumerName string, config return consumer, nil } - // 创建新消费者 - consumer, err = js.CreateConsumer(ctx, streamName, config) + // 推荐:不存在则创建,存在则更新配置 + consumer, err = js.CreateOrUpdateConsumer(ctx, streamName, config) if err != nil { return nil, fmt.Errorf("创建消费者失败: %w", err) } return consumer, nil } - -// ============ RPC 服务封装 ============ -// 以下方法提供了完全抽象的 RPC 调用接口 -// 调用方和响应方完全不需要知道底层使用的是 NATS 的发布订阅模式 - -// RegisterRPCService 注册 RPC 服务(单实例) -// serviceName: 服务名称,调用方通过此名称调用服务 -// handler: 服务处理函数,接收请求并返回响应 -func RegisterRPCService(serviceName string, handler RPCHandler) error { - if !checkConnected() { - return fmt.Errorf("NATS 未连接") - } - - rpcServicesMu.Lock() - if rpcServices == nil { - rpcServices = make(map[string]RPCHandler) - } - if rpcSubs == nil { - rpcSubs = make(map[string]*nats.Subscription) - } - - // 如果已存在该服务,先取消之前的订阅 - if oldSub, exists := rpcSubs[serviceName]; exists { - oldSub.Unsubscribe() - } - - rpcServices[serviceName] = handler - rpcServicesMu.Unlock() - - // 订阅服务主题 - subject := fmt.Sprintf("rpc.%s", serviceName) - sub, err := nc.Subscribe(subject, func(msg *nats.Msg) { - ctx := context.Background() - response, err := handler(ctx, msg.Data) - if err != nil { - errMsg := fmt.Sprintf("处理失败: %v", err) - if err = msg.Respond([]byte(errMsg)); err != nil { - g.Log().Errorf(ctx, "RPC 错误响应失败: %v", err) - } - return - } - - if err = msg.Respond(response); err != nil { - g.Log().Errorf(ctx, "RPC 响应失败: %v", err) - } - }) - - if err != nil { - return fmt.Errorf("注册 RPC 服务失败: %w", err) - } - - rpcSubs[serviceName] = sub - metrics.SubscribeCount.Add(1) - g.Log().Infof(context.Background(), "✅ RPC 服务已注册: %s", serviceName) - return nil -} - -// RegisterQueueRPCService 注册 RPC 服务(集群模式) -// 多个服务实例注册同一服务时,请求会自动负载均衡 -// serviceName: 服务名称 -// queueName: 队列组名,同一队列组的实例共享请求 -// handler: 服务处理函数 -func RegisterQueueRPCService(serviceName, queueName string, handler RPCHandler) error { - if !checkConnected() { - return fmt.Errorf("NATS 未连接") - } - - queueRPCMu.Lock() - if queueRPCServices == nil { - queueRPCServices = make(map[string]map[string]RPCHandler) - } - if queueRPCSubs == nil { - queueRPCSubs = make(map[string]map[string]*nats.Subscription) - } - if queueRPCServices[queueName] == nil { - queueRPCServices[queueName] = make(map[string]RPCHandler) - } - if queueRPCSubs[queueName] == nil { - queueRPCSubs[queueName] = make(map[string]*nats.Subscription) - } - - // 如果已存在该服务,先取消之前的订阅 - if oldSub, exists := queueRPCSubs[queueName][serviceName]; exists { - oldSub.Unsubscribe() - } - - queueRPCServices[queueName][serviceName] = handler - queueRPCMu.Unlock() - - // 订阅服务主题(队列模式) - subject := fmt.Sprintf("rpc.%s", serviceName) - sub, err := nc.QueueSubscribe(subject, queueName, func(msg *nats.Msg) { - ctx := context.Background() - response, err := handler(ctx, msg.Data) - if err != nil { - errMsg := fmt.Sprintf("处理失败: %v", err) - if err = msg.Respond([]byte(errMsg)); err != nil { - g.Log().Errorf(ctx, "RPC 错误响应失败: %v", err) - } - return - } - - if err = msg.Respond(response); err != nil { - g.Log().Errorf(ctx, "RPC 响应失败: %v", err) - } - }) - - if err != nil { - return fmt.Errorf("注册队列 RPC 服务失败: %w", err) - } - - queueRPCMu.Lock() - queueRPCSubs[queueName][serviceName] = sub - queueRPCMu.Unlock() - - metrics.SubscribeCount.Add(1) - g.Log().Infof(context.Background(), "✅ 队列 RPC 服务已注册: %s (队列组: %s)", serviceName, queueName) - return nil -} - -// CallRPC 调用 RPC 服务 -// serviceName: 服务名称 -// req: 请求数据 -// timeout: 超时时间 -// 返回: 响应数据和错误 -func CallRPC(ctx context.Context, serviceName string, req []byte, timeout time.Duration) ([]byte, error) { - if !checkConnected() { - return nil, fmt.Errorf("NATS 未连接") - } - - metrics.RequestCount.Add(1) - - // 检查本地是否有注册的单实例服务,如果有则直接调用(优化性能) - rpcServicesMu.RLock() - if localHandler, exists := rpcServices[serviceName]; exists { - rpcServicesMu.RUnlock() - // 本地直接调用,避免网络开销 - response, err := localHandler(ctx, req) - if err != nil { - metrics.RequestError.Add(1) - return nil, fmt.Errorf("本地调用 RPC 服务失败 [%s]: %w", serviceName, err) - } - return response, nil - } - rpcServicesMu.RUnlock() - - // 通过 NATS 网络调用远程服务 - timeoutCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - subject := fmt.Sprintf("rpc.%s", serviceName) - msg, err := nc.RequestWithContext(timeoutCtx, subject, req) - if err != nil { - metrics.RequestError.Add(1) - return nil, fmt.Errorf("调用 RPC 服务失败 [%s]: %w", serviceName, err) - } - - if msg == nil { - metrics.RequestError.Add(1) - return nil, fmt.Errorf("RPC 响应为空 [%s]", serviceName) - } - - return msg.Data, nil -} - -// RegisterServiceOption 注册选项类型 -type RegisterServiceOption func(*registerServiceConfig) - -type registerServiceConfig struct { - queueName string // 队列组名(用于集群模式) - excludeMethods []string -} - -// WithQueueGroup 设置队列组名(集群模式) -func WithQueueGroup(queueName string) RegisterServiceOption { - return func(cfg *registerServiceConfig) { - cfg.queueName = queueName - } -} - -// WithExcludeMethods 排除不需要注册的方法 -func WithExcludeMethods(methods ...string) RegisterServiceOption { - return func(cfg *registerServiceConfig) { - cfg.excludeMethods = append(cfg.excludeMethods, methods...) - } -} - -// registerService 注册单个服务的所有公开方法(内部函数) -func registerService(service interface{}, serviceNamePrefix string, options ...RegisterServiceOption) error { - if !checkConnected() { - return fmt.Errorf("NATS 未连接") - } - - // 应用选项 - cfg := ®isterServiceConfig{} - for _, opt := range options { - opt(cfg) - } - - // 创建排除方法集合 - excludeSet := make(map[string]struct{}) - for _, method := range cfg.excludeMethods { - excludeSet[method] = struct{}{} - } - - // 获取 service 的类型 - serviceType := reflect.TypeOf(service) - - // 遍历所有方法 - registeredCount := 0 - for i := 0; i < serviceType.NumMethod(); i++ { - method := serviceType.Method(i) - - // 只注册导出方法(首字母大写) - if !method.IsExported() { - continue - } - - // 排除指定的方法 - if _, exists := excludeSet[method.Name]; exists { - continue - } - - // 检查方法签名:必须是 func(ctx context.Context, request) (response, error) - if method.Type.NumIn() < 2 { - g.Log().Warningf(context.Background(), "方法 %s 的参数数量不足,跳过注册", method.Name) - continue - } - - // 第一个参数必须是 context.Context - if !method.Type.In(0).Implements(reflect.TypeOf((*context.Context)(nil)).Elem()) { - g.Log().Warningf(context.Background(), "方法 %s 的第一个参数必须是 context.Context,跳过注册", method.Name) - continue - } - - // 返回值必须是 (result, error) 或 error - if method.Type.NumOut() < 1 || method.Type.NumOut() > 2 { - g.Log().Warningf(context.Background(), "方法 %s 的返回值数量不正确,跳过注册", method.Name) - continue - } - - if !method.Type.Out(method.Type.NumOut() - 1).Implements(reflect.TypeOf((*error)(nil)).Elem()) { - g.Log().Warningf(context.Background(), "方法 %s 的最后一个返回值必须是 error,跳过注册", method.Name) - continue - } - - // 生成服务名称:前缀.方法名(保持原始方法名) - serviceName := fmt.Sprintf("%s.%s", serviceNamePrefix, method.Name) - - // 创建 RPC handler - handler := func(ctx context.Context, req []byte) ([]byte, error) { - // 准备方法调用参数 - args := make([]reflect.Value, 2) - args[0] = reflect.ValueOf(ctx) - - // 解析请求参数 - if len(req) > 0 { - // 如果方法有第二个参数,尝试解析 JSON - if method.Type.NumIn() > 1 { - reqValuePtr := reflect.New(method.Type.In(1)) - if err := json.Unmarshal(req, reqValuePtr.Interface()); err != nil { - return nil, fmt.Errorf("解析请求参数失败: %w", err) - } - args[1] = reqValuePtr.Elem() - } - } else if method.Type.NumIn() > 1 { - // 如果方法需要参数但请求为空,创建零值 - args[1] = reflect.Zero(method.Type.In(1)) - } - - // 调用方法 - results := method.Func.Call(args) - - // 处理返回值 - var err error - var result interface{} - - if len(results) == 1 { - // 只有 error - if !results[0].IsNil() { - err = results[0].Interface().(error) - } - } else if len(results) == 2 { - // (result, error) - result = results[0].Interface() - if !results[1].IsNil() { - err = results[1].Interface().(error) - } - } - - if err != nil { - return nil, err - } - - // 序列化返回值 - if result == nil || (reflect.ValueOf(result).Kind() == reflect.Ptr && reflect.ValueOf(result).IsNil()) { - return []byte("{}"), nil - } - - return json.Marshal(result) - } - - // 注册 RPC 服务 - var err error - if cfg.queueName != "" { - err = RegisterQueueRPCService(serviceName, cfg.queueName, handler) - } else { - err = RegisterRPCService(serviceName, handler) - } - - if err != nil { - g.Log().Errorf(context.Background(), "注册服务 %s 失败: %v", serviceName, err) - continue - } - - registeredCount++ - g.Log().Infof(context.Background(), "✅ 已自动注册 RPC 服务: %s -> %s", serviceName, method.Name) - } - - if registeredCount == 0 { - g.Log().Warningf(context.Background(), "未注册任何方法,请检查 %v 的方法签名", serviceNamePrefix) - return fmt.Errorf("未找到可注册的方法") - } - - g.Log().Infof(context.Background(), "✅ Service %v 共注册了 %d 个 RPC 方法", serviceNamePrefix, registeredCount) - return nil -} - -// AutoRegisterServices 自动注册多个服务的所有公开方法 -// serviceInstances: map[包名]service实例,如 map[string]interface{}{"user": userService, "order": orderService} -// options: 注册选项(可选) -// 示例: -// -// AutoRegisterServices(map[string]interface{}{ -// "user": userService, -// "order": orderService, -// }) -// 或 -// AutoRegisterServices(map[string]interface{}{ -// "order": orderService, -// }, WithQueueGroup("order-group")) -func AutoRegisterServices(serviceInstances map[string]interface{}, options ...RegisterServiceOption) error { - if len(serviceInstances) == 0 { - return fmt.Errorf("service 实例列表不能为空") - } - - totalRegistered := 0 - - // 遍历每个 service 实例 - for pkgName, serviceInstance := range serviceInstances { - // 注册服务 - err := registerService(serviceInstance, pkgName, options...) - if err != nil { - g.Log().Errorf(context.Background(), "注册 %s 服务失败: %v", pkgName, err) - continue - } - - totalRegistered++ - g.Log().Infof(context.Background(), "✅ %s 服务已自动注册", pkgName) - } - - if totalRegistered == 0 { - return fmt.Errorf("未能注册任何服务") - } - - g.Log().Infof(context.Background(), "✅ 共自动注册了 %d 个服务", totalRegistered) - return nil -} diff --git a/nats/nats_consumer.go b/nats/nats_consumer.go new file mode 100644 index 0000000..126000b --- /dev/null +++ b/nats/nats_consumer.go @@ -0,0 +1,294 @@ +package nats + +import ( + "context" + "fmt" + "github.com/gogf/gf/v2/frame/g" + "github.com/nats-io/nats.go/jetstream" +) + +// AckPolicy 确认策略 +type AckPolicy string + +const ( + AckPolicyExplicit AckPolicy = "explicit" // 显式确认(默认) + AckPolicyAll AckPolicy = "all" // 确认所有消息 + AckPolicyNone AckPolicy = "none" // 不需要确认 +) + +// DeliverPolicy 投递策略 +type DeliverPolicy string + +const ( + DeliverPolicyAll DeliverPolicy = "all" // 投递所有消息(包括已投递的) + DeliverPolicyLast DeliverPolicy = "last" // 从最后一条消息开始 + DeliverPolicyNew DeliverPolicy = "new" // 仅投递新消息(默认) + DeliverPolicyLastPerSubj DeliverPolicy = "lastpersubj" // 每个主题的最后一条 + DeliverPolicyByStartSeq DeliverPolicy = "by_start_sequence" // 按起始序列号 +) + +// ReplayPolicy 重放策略 +type ReplayPolicy string + +const ( + ReplayPolicyInstant ReplayPolicy = "instant" // 立即重放 + ReplayPolicyOriginal ReplayPolicy = "original" // 按原始顺序重放 +) + +// ConsumerConfig 消费者配置 +type ConsumerConfig struct { + DurableName string // 持久化名称(空表示临时消费者) + Description string // 描述信息 + AckPolicy AckPolicy // 确认策略 + AckWait int // 确认等待时间(秒) + MaxDeliver int // 最大投递次数 + FilterSubject string // 过滤主题(流内多主题时使用) + DeliverPolicy DeliverPolicy // 投递策略 + ReplayPolicy ReplayPolicy // 重放策略 + MaxWaiting int // 最大等待消息数 + MaxAckPending int // 最大待确认消息数 + OptStartTime int64 // 起始时间戳 + OptStartSeq uint64 // 起始序列号 + HeadersOnly bool // 仅消费消息头 + Backoff []int // 退避策略(秒数数组) + RateLimit uint64 // 消息速率限制(消息/秒) + Replica int // 副本数 + FlowControl bool // 启用流控 + Metadata map[string]string // 元数据 +} + +// parseAckPolicy 解析确认策略 +func parseAckPolicy(policy AckPolicy) jetstream.AckPolicy { + switch policy { + case AckPolicyAll: + return jetstream.AckAllPolicy + case AckPolicyNone: + return jetstream.AckNonePolicy + default: + return jetstream.AckExplicitPolicy + } +} + +// parseDeliverPolicy 解析投递策略 +func parseDeliverPolicy(policy DeliverPolicy) jetstream.DeliverPolicy { + switch policy { + case DeliverPolicyAll: + return jetstream.DeliverAllPolicy + case DeliverPolicyLast: + return jetstream.DeliverLastPolicy + case DeliverPolicyLastPerSubj: + return jetstream.DeliverLastPerSubjectPolicy + case DeliverPolicyByStartSeq: + return jetstream.DeliverByStartSequencePolicy + default: + return jetstream.DeliverNewPolicy + } +} + +// parseReplayPolicy 解析重放策略 +func parseReplayPolicy(policy ReplayPolicy) jetstream.ReplayPolicy { + switch policy { + case ReplayPolicyOriginal: + return jetstream.ReplayOriginalPolicy + default: + return jetstream.ReplayInstantPolicy + } +} + +// CreateTaskConsumer 创建任务消费者 +// 核心设计思路: +// 1. 显式确认:确保消息被正确处理后才确认 +// 2. 重试机制:通过 MaxDeliver 控制最大重试次数 +// 3. 持久化:DurableName 确保消费者状态持久化 +// 4. 流控:防止消费者过载 +func CreateTaskConsumer(ctx context.Context, streamName string, config ConsumerConfig) (jetstream.Consumer, error) { + if !IsConnected() { + return nil, fmt.Errorf("NATS 未连接") + } + + if streamName == "" { + return nil, fmt.Errorf("流名称不能为空") + } + + // 设置默认值 + if config.AckPolicy == "" { + config.AckPolicy = AckPolicyExplicit // 默认显式确认 + } + if config.AckWait == 0 { + config.AckWait = 30 // 默认30秒确认超时 + } + if config.MaxDeliver == 0 { + config.MaxDeliver = 3 // 默认最多投递3次 + } + if config.DeliverPolicy == "" { + config.DeliverPolicy = DeliverPolicyNew // 默认仅消费新消息 + } + if config.ReplayPolicy == "" { + config.ReplayPolicy = ReplayPolicyInstant // 默认立即重放 + } + if config.MaxAckPending == 0 { + config.MaxAckPending = 1000 // 默认最多1000条待确认消息 + } + + // 构建消费者配置 + jsConfig := jetstream.ConsumerConfig{ + Name: config.DurableName, + Description: config.Description, + AckPolicy: parseAckPolicy(config.AckPolicy), + AckWait: 0, + MaxDeliver: config.MaxDeliver, + FilterSubjects: []string{config.FilterSubject}, + DeliverPolicy: parseDeliverPolicy(config.DeliverPolicy), + ReplayPolicy: parseReplayPolicy(config.ReplayPolicy), + MaxWaiting: config.MaxWaiting, + MaxAckPending: config.MaxAckPending, + HeadersOnly: config.HeadersOnly, + RateLimit: config.RateLimit, + Replicas: config.Replica, + Metadata: config.Metadata, + } + + // 配置流控和心跳 + if config.FlowControl { + jsConfig.FlowControl = true + } + // 配置起始位置 + if config.OptStartSeq > 0 { + jsConfig.OptStartSeq = config.OptStartSeq + } + + // 创建新消费者 + consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, jsConfig) + if err != nil { + return nil, fmt.Errorf("创建消费者失败: %w", err) + } + + // 记录配置信息 + configInfo := fmt.Sprintf("确认策略=%s, 最大投递=%d, 投递策略=%s", config.AckPolicy, config.MaxDeliver, config.DeliverPolicy) + if config.FilterSubject != "" { + configInfo += fmt.Sprintf(", 过滤主题=%s", config.FilterSubject) + } + g.Log().Infof(ctx, "✅ 任务消费者创建成功: %s/%s (%s)", streamName, config.DurableName, configInfo) + + return consumer, nil +} + +// CreateConsumerSimple 简化版创建消费者(适用于大多数场景) +// 只需提供流名称和消费者名称,其他使用默认配置 +func CreateConsumerSimple(ctx context.Context, streamName, durableName string) (err error) { + _, err = CreateTaskConsumer(ctx, streamName, ConsumerConfig{ + DurableName: durableName, + }) + return +} + +// CreateConsumerWithFilter 创建带主题过滤的消费者 +//func CreateConsumerWithFilter(ctx context.Context, streamName, durableName, filterSubject string) (jetstream.Consumer, error) { +// return CreateTaskConsumer(ctx, streamName, ConsumerConfig{ +// DurableName: durableName, +// FilterSubject: filterSubject, +// }) +//} + +// CreateConsumerEphemeral 创建临时消费者 +// 临时消费者没有持久化名称,连接断开后自动删除 +//func CreateConsumerEphemeral(ctx context.Context, streamName string) (jetstream.Consumer, error) { +// if !IsConnected() { +// return nil, fmt.Errorf("NATS 未连接") +// } +// +// jsConfig := jetstream.ConsumerConfig{ +// AckPolicy: jetstream.AckNonePolicy, +// AckWait: 0, +// MaxDeliver: 3, +// DeliverPolicy: jetstream.DeliverNewPolicy, +// ReplayPolicy: jetstream.ReplayInstantPolicy, +// MaxAckPending: 1000, +// } +// +// consumer, err := js.CreateConsumer(ctx, streamName, jsConfig) +// if err != nil { +// return nil, fmt.Errorf("创建临时消费者失败: %w", err) +// } +// +// g.Log().Infof(ctx, "✅ 临时消费者创建成功: %s", streamName) +// return consumer, nil +//} + +// CreateConsumerPushMode 创建推送模式消费者 +// 推送模式下,NATS 服务器主动将消息推送给消费者 +func CreateConsumerPushMode(ctx context.Context, streamName, durableName, subject string, msgCount int) (err error) { + _, err = CreateTaskConsumer(ctx, streamName, ConsumerConfig{ + DurableName: durableName, + FilterSubject: subject, + MaxAckPending: msgCount, + }) + return +} + +// CreateConsumerPullMode 创建拉取模式消费者 +// 拉取模式下,消费者主动从服务器拉取消息 +//func CreateConsumerPullMode(ctx context.Context, streamName, durableName string) (jetstream.Consumer, error) { +// return CreateTaskConsumer(ctx, streamName, ConsumerConfig{ +// DurableName: durableName, +// DeliverPolicy: DeliverPolicyAll, +// MaxAckPending: 500, // 拉取模式下待确认消息数可以设置小一些 +// }) +//} + +// ConsumeMessages 消费消息(推送模式) +func ConsumeMessages(ctx context.Context, streamName, consumerName string, handler jetstream.MessageHandler) error { + if !IsConnected() { + return fmt.Errorf("NATS 未连接") + } + // 获取消费者 + consumer, err := js.Consumer(ctx, streamName, consumerName) + if err != nil { + return fmt.Errorf("获取消费者失败: %w", err) + } + + // 业务处理 + //if err := handler(ctx, streamMsg.Values); err != nil { + // glog.Infof(ctx, "业务处理失败-> err:%v\n", err) + // continue + //} + //// 确认消息 + //if msg.AutoAck { + // err := ackMessage(ctx, msg.StreamKey, msg.GroupName, streamMsg.ID) + // if err != nil { + // glog.Infof(ctx, "消费者 '%s' 确认消息 ID %s 失败: %v\n", msg.ConsumerName, streamMsg.ID, err) + // } + //} + //// 创建消息处理函数 + //handler = func(msg jetstream.Msg) { + // // 解析消息 + // var task TaskMessage + // if err := json.Unmarshal(msg.Data(), &task); err != nil { + // g.Log().Errorf(ctx, "解析消息失败: %v", err) + // msg.Nak() // 拒绝消息,触发重试 + // return + // } + // + // // 处理业务逻辑 + // g.Log().Infof(ctx, "处理任务: %s", task.TaskID) + // + // // 处理成功,确认消息 + // msg.Ack() + //} + + // 开始消费 + _, err = consumer.Consume(handler) + if err != nil { + return fmt.Errorf("开始消费失败: %w", err) + } + + g.Log().Infof(ctx, "✅ 开始消费消息: %s/%s", streamName, consumerName) + return nil +} + +// 定义消息结构 +type TaskMessage struct { + TaskID string `json:"task_id"` + TaskType string `json:"task_type"` + Data string `json:"data"` +} diff --git a/nats/nats_publish.go b/nats/nats_publish.go new file mode 100644 index 0000000..32e3b20 --- /dev/null +++ b/nats/nats_publish.go @@ -0,0 +1,28 @@ +package nats + +import ( + "context" + "encoding/json" + "fmt" +) + +// publish 发布消息到指定主题 +func publish(ctx context.Context, subject string, data any) (err error) { + if !IsConnected() { + return fmt.Errorf("NATS 未连接") + } + // 序列化数据 + dataBytes, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("序列化数据失败: %w", err) + } + // 发布消息 + metrics.PublishCount.Add(1) + _, err = js.Publish(ctx, subject, dataBytes) + if err != nil { + metrics.PublishError.Add(1) + return fmt.Errorf("发布消息失败: %w", err) + } + + return +} diff --git a/nats/nats_rpc.go b/nats/nats_rpc.go new file mode 100644 index 0000000..8cbb8ba --- /dev/null +++ b/nats/nats_rpc.go @@ -0,0 +1,673 @@ +package nats + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "github.com/gogf/gf/v2/frame/g" + "github.com/nats-io/nats.go" + "reflect" + "sync" +) + +// ============ RPC 服务封装 ============ +// 以下方法提供了完全抽象的 RPC 调用接口 +// 调用方和响应方完全不需要知道底层使用的是 NATS 的发布订阅模式 + +// RPC 服务注册表 +var ( + rpcServices map[string]rpcHandler + rpcSubs map[string]*nats.Subscription // 服务名 -> 订阅 + rpcServicesMu sync.RWMutex + queueRPCServices map[string]map[string]rpcHandler // queueName -> subject -> handler + queueRPCSubs map[string]map[string]*nats.Subscription // queueName -> serviceName -> 订阅 + queueRPCMu sync.RWMutex + + // ============ TraceID 主动取消支持 ============ + // 全局映射表:TraceID -> CancelFunc,并发安全 + traceCancelMap map[string]context.CancelFunc + traceCancelMu sync.RWMutex + // 取消主题前缀 + cancelSubjectPrefix = "ctx.cancel.otel." +) + +// rpcHandler RPC 处理函数类型 +// 实现方只需要关注请求参数和返回值,无需了解底层 NATS 实现 +// 返回值可以是任意类型,会被自动序列化为 JSON +type rpcHandler func(ctx context.Context, req []byte) (any, error) + +// RegisterRPCService 注册 RPC 服务(单实例) +// serviceName: 服务名称,调用方通过此名称调用服务 +// handler: 服务处理函数,接收请求并返回响应 +func registerRPCService(serviceName string, handler rpcHandler) (err error) { + if !checkConnected() { + return fmt.Errorf("NATS 未连接") + } + + rpcServicesMu.Lock() + if rpcServices == nil { + rpcServices = make(map[string]rpcHandler) + } + if rpcSubs == nil { + rpcSubs = make(map[string]*nats.Subscription) + } + + // 如果已存在该服务,先取消之前的订阅 + if oldSub, exists := rpcSubs[serviceName]; exists { + oldSub.Unsubscribe() + } + + rpcServices[serviceName] = handler + rpcServicesMu.Unlock() + + // 订阅服务主题 + subject := fmt.Sprintf("rpc.%s", serviceName) + sub, err := nc.Subscribe(subject, func(msg *nats.Msg) { + // 执行处理函数 + executeHandler(handler, msg) + }) + + if err != nil { + return fmt.Errorf("注册 RPC 服务失败: %w", err) + } + + rpcSubs[serviceName] = sub + metrics.SubscribeCount.Add(1) + g.Log().Infof(context.Background(), "✅ RPC 服务已注册: %s", serviceName) + return nil +} + +// RegisterQueueRPCService 注册 RPC 服务(集群模式) +// 多个服务实例注册同一服务时,请求会自动负载均衡 +// serviceName: 服务名称 +// queueName: 队列组名,同一队列组的实例共享请求 +// handler: 服务处理函数 +func registerQueueRPCService(serviceName, queueName string, handler rpcHandler) (err error) { + if !checkConnected() { + return fmt.Errorf("NATS 未连接") + } + + queueRPCMu.Lock() + if queueRPCServices == nil { + queueRPCServices = make(map[string]map[string]rpcHandler) + } + if queueRPCSubs == nil { + queueRPCSubs = make(map[string]map[string]*nats.Subscription) + } + if queueRPCServices[queueName] == nil { + queueRPCServices[queueName] = make(map[string]rpcHandler) + } + if queueRPCSubs[queueName] == nil { + queueRPCSubs[queueName] = make(map[string]*nats.Subscription) + } + + // 如果已存在该服务,先取消之前的订阅 + if oldSub, exists := queueRPCSubs[queueName][serviceName]; exists { + oldSub.Unsubscribe() + } + + queueRPCServices[queueName][serviceName] = handler + queueRPCMu.Unlock() + + // 订阅服务主题(队列模式) + subject := fmt.Sprintf("rpc.%s", serviceName) + sub, err := nc.QueueSubscribe(subject, queueName, func(msg *nats.Msg) { + // 执行处理函数 + executeHandler(handler, msg) + }) + + if err != nil { + return fmt.Errorf("注册队列 RPC 服务失败: %w", err) + } + + queueRPCMu.Lock() + queueRPCSubs[queueName][serviceName] = sub + queueRPCMu.Unlock() + + metrics.SubscribeCount.Add(1) + g.Log().Infof(context.Background(), "✅ 队列 RPC 服务已注册: %s (队列组: %s)", serviceName, queueName) + return nil +} + +// executeHandler 执行 RPC 处理函数 +func executeHandler(handler rpcHandler, msg *nats.Msg) { + // 响应 + var respData []byte + // 从消息头重建上下文 + ctx := headersToContext(context.Background(), msg.Header) + // 提取 TraceID,创建可取消的 context + ctx = createCancelContext(ctx, msg.Header.Get(TraceIDKey)) + // 检查 context 是否已取消(在调用 handler 之前) + select { + case <-ctx.Done(): + // context 已取消,返回取消错误 + g.Log().Infof(ctx, "RPC 请求已取消,traceID: %s", msg.Header.Get(TraceIDKey)) + // 仍然需要发送响应以避免客户端超时 + respData = []byte(`{"_err":"请求已取消"}`) + // 清理取消映射表 + cleanupTraceCancel(msg.Header.Get(TraceIDKey)) + return + default: + } + + // 执行业务处理 + response, err := handler(ctx, msg.Data) + + if err != nil { + // 错误时返回 {"_err": "错误信息"} + if respData, err = json.Marshal(map[string]any{"_err": err.Error()}); err != nil { + g.Log().Errorf(ctx, "RPC 错误响应序列化失败: %v", err) + respData = []byte(`{"_err":"错误响应序列化失败"}`) + } + } else if response == nil { + // 空响应时返回空对象(或 {"_err": ""}) + respData = []byte(`{}`) + } else { + // 成功时返回业务数据 + if respData, err = json.Marshal(response); err != nil { + g.Log().Errorf(ctx, "RPC 响应序列化失败: %v", err) + respData = []byte(`{"_err":"响应序列化失败"}`) + } + } + // 发送响应(必须执行) 如果客户端用 nc.Request(...) 发送消息 → 双向模式,服务端必须 msg.Respond + if err = msg.Respond(respData); err != nil { + g.Log().Errorf(ctx, "RPC 响应失败: %v", err) + } + // 请求结束,清理取消映射表 + cleanupTraceCancel(msg.Header.Get(TraceIDKey)) +} + +// createCancelContext 创建可取消的 context 并注册到取消映射表 +// 返回可取消的 context(如果 traceID 为空则返回原 context) +func createCancelContext(ctx context.Context, traceID string) context.Context { + if g.IsEmpty(traceID) { + return ctx + } + // 创建带取消功能的 context + taskCtx, cancel := context.WithCancel(ctx) + // 注册到取消映射表 + traceCancelMu.Lock() + if traceCancelMap == nil { + traceCancelMap = make(map[string]context.CancelFunc) + } + // 如果同一 TraceID 已有 CancelFunc,先调用它 + if oldCancel, exists := traceCancelMap[traceID]; exists { + oldCancel() + } + traceCancelMap[traceID] = cancel + traceCancelMu.Unlock() + + return taskCtx +} + +// ============ TraceID 主动取消功能 ============ +// 以下函数实现了基于 OpenTelemetry TraceID 的跨进程任务取消机制 + +// SetupCancelListener 设置取消监听器 +// 订阅取消主题,监听取消指令 +// 使用示例: +// +// sub, err := nats.SetupCancelListener(ctx) +func setupCancelListener(ctx context.Context) (*nats.Subscription, error) { + if !checkConnected() { + return nil, fmt.Errorf("NATS 未连接") + } + + if traceCancelMap == nil { + traceCancelMap = make(map[string]context.CancelFunc) + } + + // 修复问题3:订阅取消主题,格式: ctx.cancel.otel.* + // 使用 * 通配符而不是 >,因为 TraceID 是最后一部分 + cancelSubject := cancelSubjectPrefix + "*" + sub, err := nc.Subscribe(cancelSubject, func(msg *nats.Msg) { + // 从主题中解析 TraceID (去除前缀) + prefixLen := len(cancelSubjectPrefix) + if len(msg.Subject) <= prefixLen { + g.Log().Warningf(ctx, "取消消息主题格式错误: %s", msg.Subject) + return + } + traceID := msg.Subject[prefixLen:] + + if traceID == "" { + g.Log().Warning(ctx, "取消消息主题缺少 TraceID") + return + } + + // 从映射表获取 CancelFunc 并执行取消 + traceCancelMu.RLock() + cancel, ok := traceCancelMap[traceID] + traceCancelMu.RUnlock() + + if ok { + cancel() + g.Log().Infof(ctx, "📢 取消信号已发送,traceID: %s", traceID) + } else { + g.Log().Infof(ctx, "⚠️ 未找到对应的可取消任务,traceID: %s", traceID) + } + }) + + if err != nil { + return nil, fmt.Errorf("设置取消监听器失败: %w", err) + } + + metrics.SubscribeCount.Add(1) + g.Log().Infof(ctx, "✅ 取消监听器已设置: %s", cancelSubject) + return sub, nil +} + +// publishCancel 发布取消指令 +// 向指定 TraceID 发送取消信号 +// 使用示例: +// +// err := nats.publishCancel(ctx, traceID) +func publishCancel(ctx context.Context, traceID string) error { + if !checkConnected() { + return fmt.Errorf("NATS 未连接") + } + + if traceID == "" { + return fmt.Errorf("TraceID 不能为空") + } + + cancelSubject := cancelSubjectPrefix + traceID + err := nc.Publish(cancelSubject, nil) + if err != nil { + return fmt.Errorf("发布取消信号失败: %w", err) + } + + g.Log().Infof(ctx, "📤 已发送取消信号,traceID: %s,主题: %s", traceID, cancelSubject) + return nil +} + +// cleanupTraceCancel 清理取消映射表中的条目 +// 任务取消/正常结束后必须调用此函数,避免内存泄漏 +// 使用示例: +// +// defer nats.cleanupTraceCancel(traceID) +func cleanupTraceCancel(traceID string) { + if traceID == "" { + return + } + + traceCancelMu.Lock() + defer traceCancelMu.Unlock() + + if _, ok := traceCancelMap[traceID]; ok { + delete(traceCancelMap, traceID) + g.Log().Infof(context.Background(), "✅ 已清理取消映射表,traceID: %s", traceID) + } +} + +// CallRPC 调用 RPC 服务 +// serviceName: 服务名称 +// req: 请求数据 +// 返回: 响应数据(任意类型)和错误 +func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err error) { + if !checkConnected() { + return fmt.Errorf("NATS 未连接") + } + + metrics.RequestCount.Add(1) + + // 验证 resp 必须是指针类型 + respValue := reflect.ValueOf(resp) + if respValue.Kind() != reflect.Ptr { + return fmt.Errorf("resp 参数必须是指针类型(当前类型: %T)", resp) + } + + // 构建请求体 + var reqBody []byte + if !g.IsEmpty(req) { + reqValue := reflect.ValueOf(req) + if !(reqValue.Kind() == reflect.Ptr && reqValue.IsNil()) && !reqValue.IsZero() { + reqData, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("序列化请求参数失败: %w", err) + } + reqBody = reqData + } + } + + // 检查本地是否有注册的单实例服务,如果有则直接调用(优化性能) + rpcServicesMu.RLock() + if localHandler, exists := rpcServices[serviceName]; exists { + rpcServicesMu.RUnlock() + + // 修复问题1:本地调用也需要处理取消机制 + var traceID string + if traceID, err = getTraceID(ctx); err != nil { + return err + } + // 提取 TraceID,创建可取消的 context + cancelCtx := createCancelContext(ctx, traceID) + // 执行本地调用 + var response interface{} + if response, err = localHandler(cancelCtx, reqBody); err != nil { + metrics.RequestError.Add(1) + return fmt.Errorf("本地调用 RPC 服务失败 [%s]: %w", serviceName, err) + } + + // 请求结束,清理取消映射表 + cleanupTraceCancel(traceID) + + // 检查是否为错误消息:尝试解析为 map,看是否包含 "_err" 字段 + var respMap map[string]any + if json.Unmarshal(response.([]byte), &respMap) == nil { + if errMsg, ok := respMap["_err"]; ok { + metrics.RequestError.Add(1) + return fmt.Errorf("%v", errMsg) + } + } + // 正常数据直接返回 + // responseMsg.Data 已经是 []byte 类型(来自 msg.Data),直接反序列化 + if err = json.Unmarshal(response.([]byte), resp); err != nil { + return fmt.Errorf("解析响应失败: %w (响应内容: %s)", err, response) + } + + return + } + rpcServicesMu.RUnlock() + + subject := fmt.Sprintf("rpc.%s", serviceName) + + // 创建消息并将上下文元数据写入消息头 + msg := nats.NewMsg(subject) + msg.Data = reqBody + headers, err := contextToHeaders(ctx) + if err != nil { + return fmt.Errorf("上下文转换失败: %w", err) + } + msg.Header = headers + + // 修复问题5:优化 go 协程避免资源泄漏 + // 使用 done channel 来确保 goroutine 能正确退出 + done := make(chan struct{}) + var closeDoneOnce sync.Once + closeDone := func() { + closeDoneOnce.Do(func() { + close(done) + }) + } + + if msg.Header.Get(TraceIDKey) != "" { + go func() { + defer closeDone() + select { + case <-ctx.Done(): + // context 被取消时,发送取消信号给服务端 + if errors.Is(ctx.Err(), context.Canceled) { + if err := publishCancel(context.Background(), msg.Header.Get(TraceIDKey)); err != nil { + g.Log().Errorf(ctx, "发送 RPC 取消信号失败: %v", err) + } else { + g.Log().Infof(ctx, "RPC 调用已取消,traceID: %s", msg.Header.Get(TraceIDKey)) + } + } + case <-done: + // 请求已完成,无需发送取消信号 + return + } + }() + } + + // 发送请求 + responseMsg, err := nc.RequestMsgWithContext(ctx, msg) + + // 关闭 done channel,通知 goroutine 退出 + closeDone() + + if err != nil { + metrics.RequestError.Add(1) + return fmt.Errorf("调用 RPC 服务失败 [%s]: %w", serviceName, err) + } + + if responseMsg == nil { + metrics.RequestError.Add(1) + return fmt.Errorf("RPC 响应为空 [%s]", serviceName) + } + + // 解析响应 + if len(responseMsg.Data) > 0 { + // 检查是否为错误消息:尝试解析为 map,看是否包含 "_err" 字段 + var respMap map[string]any + if json.Unmarshal(responseMsg.Data, &respMap) == nil { + if errMsg, ok := respMap["_err"]; ok { + metrics.RequestError.Add(1) + return fmt.Errorf("%v", errMsg) + } + } + // 正常数据直接返回 + // responseMsg.Data 已经是 []byte 类型(来自 msg.Data),直接反序列化 + if err = json.Unmarshal(responseMsg.Data, resp); err != nil { + return fmt.Errorf("解析响应失败: %w (响应内容: %s)", err, responseMsg.Data) + } + } + + return +} + +// RegisterServiceOption 注册选项类型 +type RegisterServiceOption func(*registerServiceConfig) + +type registerServiceConfig struct { + queueName string // 队列组名(用于集群模式) + excludeMethods []string +} + +// WithQueueGroup 设置队列组名(集群模式) +func WithQueueGroup(queueName string) RegisterServiceOption { + return func(cfg *registerServiceConfig) { + cfg.queueName = queueName + } +} + +// WithExcludeMethods 排除不需要注册的方法 +func WithExcludeMethods(methods ...string) RegisterServiceOption { + return func(cfg *registerServiceConfig) { + cfg.excludeMethods = append(cfg.excludeMethods, methods...) + } +} + +// AutoRegisterServices 自动注册多个服务的所有公开方法 +// serviceInstances: map[包名]service实例,如 map[string]interface{}{"user": userService, "order": orderService} +// options: 注册选项(可选) +// 示例: +// +// AutoRegisterServices(map[string]interface{}{ +// "user": userService, +// "order": orderService, +// }) +// 或 +// AutoRegisterServices(map[string]interface{}{ +// "order": orderService, +// }, WithQueueGroup("order-group")) +func AutoRegisterServices(ctx context.Context, serviceInstances map[string]interface{}, options ...RegisterServiceOption) error { + // 先注册 RPC 服务(如果 NATS 不可用则记录警告但不阻塞启动) + if !checkConnected() { + return fmt.Errorf("NATS 未连接,RPC 服务未注册") + } + + if len(serviceInstances) == 0 { + return fmt.Errorf("service 实例列表不能为空") + } + + totalRegistered := 0 + // 遍历每个 service 实例 + for pkgName, serviceInstance := range serviceInstances { + // 注册服务 + err := registerService(serviceInstance, pkgName, options...) + if err != nil { + g.Log().Errorf(ctx, "注册 %s 服务失败: %v", pkgName, err) + continue + } + totalRegistered++ + g.Log().Infof(ctx, "✅ %s 服务已自动注册", pkgName) + } + + if totalRegistered == 0 { + return fmt.Errorf("未能注册任何服务") + } + // 设置取消监听器(监听基于 TraceID 的取消请求) + //if _, err := setupCancelListener(ctx); err != nil { + // g.Log().Errorf(ctx, "设置取消监听器失败: %v", err) + //} else { + // g.Log().Infof(ctx, "✅ 取消监听器已自动设置") + //} + //g.Log().Infof(ctx, "✅ 共自动注册了 %d 个服务", totalRegistered) + + return nil +} + +// registerService 注册单个服务的所有公开方法(内部函数) +func registerService(service interface{}, serviceNamePrefix string, options ...RegisterServiceOption) (err error) { + if !checkConnected() { + return fmt.Errorf("NATS 未连接") + } + + // 应用选项 + cfg := ®isterServiceConfig{} + for _, opt := range options { + opt(cfg) + } + + // 创建排除方法集合 + excludeSet := make(map[string]struct{}) + for _, method := range cfg.excludeMethods { + excludeSet[method] = struct{}{} + } + + // 获取 service 的类型 + serviceType := reflect.TypeOf(service) + + // 遍历所有方法 + registeredCount := 0 + for i := 0; i < serviceType.NumMethod(); i++ { + method := serviceType.Method(i) + + // 只注册导出方法(首字母大写) + if !method.IsExported() { + continue + } + + // 排除指定的方法 + if _, exists := excludeSet[method.Name]; exists { + continue + } + + // 检查方法签名:必须是 func(ctx context.Context, request) (response, error) + // 注意:method.Type.NumIn() 包含接收者,所以实际参数数量需要减去 1 + // 要求:接收者 + context.Context + request,总共3个参数 + if method.Type.NumIn() != 3 { + g.Log().Warningf(context.Background(), "方法 %s 必须有2个参数(context.Context 和请求参数),跳过注册", method.Name) + continue + } + + // 第一个参数(接收者之后的第一个参数)必须是 context.Context + // method.Type.In(0) 是接收者,method.Type.In(1) 才是第一个参数 + if !method.Type.In(1).Implements(reflect.TypeOf((*context.Context)(nil)).Elem()) { + g.Log().Warningf(context.Background(), "方法 %s 的第一个参数必须是 context.Context,跳过注册", method.Name) + continue + } + + // 第二个参数必须是结构体指针或数组 + reqType := method.Type.In(2) + if reqType.Kind() != reflect.Ptr && reqType.Kind() != reflect.Slice && reqType.Kind() != reflect.Array { + g.Log().Warningf(context.Background(), "方法 %s 的第二个参数必须是结构体指针或数组,跳过注册", method.Name) + continue + } + + // 返回值必须是 (result, error),即2个返回值 + if method.Type.NumOut() != 2 { + g.Log().Warningf(context.Background(), "方法 %s 必须有2个返回值(result 和 error),跳过注册", method.Name) + continue + } + + // 最后一个返回值必须是 error + if !method.Type.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) { + g.Log().Warningf(context.Background(), "方法 %s 的最后一个返回值必须是 error,跳过注册", method.Name) + continue + } + + // 生成服务名称:前缀.方法名(保持原始方法名) + serviceName := fmt.Sprintf("%s.%s", serviceNamePrefix, method.Name) + + // 创建 RPC handler + handler := func(ctx context.Context, req []byte) (any, error) { + // 准备方法调用参数 + // args[0] 是接收者, args[1] 是 ctx, args[2] 是请求参数 + args := make([]reflect.Value, 3) + args[0] = reflect.ValueOf(service) // 接收者 + args[1] = reflect.ValueOf(ctx) // context.Context + + // 解析请求参数 + if len(req) > 0 { + reqValuePtr := reflect.New(reqType) + + // 解析 JSON + if err := json.Unmarshal(req, reqValuePtr.Interface()); err != nil { + // 根据参数类型提供更友好的错误提示 + var typeHint string + if reqType.Kind() == reflect.Ptr { + typeHint = fmt.Sprintf("(期望类型: %s)", reqType.Elem().Name()) + } else { // reflect.Slice 或 reflect.Array + typeHint = fmt.Sprintf("(期望类型: %s,请确保客户端传递的是JSON数组格式)", reqType.String()) + } + return nil, fmt.Errorf("解析请求参数失败%s: %w", typeHint, err) + } + args[2] = reqValuePtr.Elem() + } else { + // 请求为空,创建零值 + args[2] = reflect.Zero(method.Type.In(2)) + } + + // 调用方法 + results := method.Func.Call(args) + + // 处理返回值 + var result any + + if len(results) == 1 { + // 只有 error + if !results[0].IsNil() { + err = results[0].Interface().(error) + } + } else if len(results) == 2 { + // (result, error) + result = results[0].Interface() + if !results[1].IsNil() { + err = results[1].Interface().(error) + } + } + if err != nil { + return nil, err + } + + return result, nil + } + + // 注册 RPC 服务 + var err error + if cfg.queueName != "" { + err = registerQueueRPCService(serviceName, cfg.queueName, handler) + } else { + err = registerRPCService(serviceName, handler) + } + + if err != nil { + g.Log().Errorf(context.Background(), "注册服务 %s 失败: %v", serviceName, err) + continue + } + + registeredCount++ + g.Log().Infof(context.Background(), "✅ 已自动注册 RPC 服务: %s -> %s", serviceName, method.Name) + } + + if registeredCount == 0 { + g.Log().Warningf(context.Background(), "未注册任何方法,请检查 %v 的方法签名", serviceNamePrefix) + return fmt.Errorf("未找到可注册的方法") + } + + g.Log().Infof(context.Background(), "✅ Service %v 共注册了 %d 个 RPC 方法", serviceNamePrefix, registeredCount) + return nil +} diff --git a/nats/nats_task.go b/nats/nats_task.go new file mode 100644 index 0000000..f6841ad --- /dev/null +++ b/nats/nats_task.go @@ -0,0 +1,212 @@ +package nats + +import ( + "context" + "fmt" + "github.com/gogf/gf/v2/frame/g" + "github.com/nats-io/nats.go/jetstream" + "time" +) + +// TaskStreamConfig 任务流配置 +type TaskStreamConfig struct { + StreamName string // 流名称 + Subjects []string // 主题数组(支持任务优先级,如 ["tasks.high","tasks.normal", "tasks.low"]) + PublishSubject string // 发布使用的主题(仅用于记录,不影响流配置) + Storage StorageType // 存储类型 + Retention RetentionType // 保留策略 + MaxAge time.Duration // 最大保留时间 + Duplicates time.Duration // 消息去重窗口时间 + Replicas int // 副本数 + MaxMsgSize int32 // 单条消息最大大小(字节) + MaxBytes int64 // 流最大存储大小(字节) + MaxMsgs int64 // 流中最大消息数 + MaxMsgsPerSubject int64 // 每个主题最大消息数 + MaxConsumers int // 最大消费者数量 + DenyPurge bool // 是否禁止清理流 + AllowRollup bool // 是否允许汇总消息 + DenyDelete bool // 是否禁止删除 + DiscardPerSubject bool // 是否按主题限制(工作队列模式) + Republish *RePublishConfig // 死信队列重新发布配置 +} + +// RePublishConfig 重新发布配置(用于死信队列) +type RePublishConfig struct { + Source string // 源主题 + Destination string // 目标主题 + HeadersOnly bool // 仅复制消息头 +} + +// StorageType 存储类型 +type StorageType string + +const ( + StorageFile StorageType = "file" // 文件存储(持久化) + StorageMemory StorageType = "memory" // 内存存储 +) + +// RetentionType 保留策略 +type RetentionType string + +const ( + RetentionLimit RetentionType = "limit" // 消息数量限制 + RetentionPolicy RetentionType = "interest" // 基于兴趣 + RetentionWorkQueue RetentionType = "workqueue" // 工作队列 +) + +// parseStorageType 解析存储类型 +func parseStorageType(st StorageType) jetstream.StorageType { + switch st { + case StorageMemory: + return jetstream.MemoryStorage + default: + return jetstream.FileStorage + } +} + +// parseRetentionType 解析保留策略 +func parseRetentionType(rt RetentionType) jetstream.RetentionPolicy { + switch rt { + case RetentionLimit: + return jetstream.LimitsPolicy + case RetentionPolicy: + return jetstream.InterestPolicy + default: + return jetstream.WorkQueuePolicy + } +} + +// createTaskStreamSimple 简化版创建任务流(适用于大多数场景) +// 只需提供流名称和主题数组,其他使用默认配置 +func createTaskStreamSimple(ctx context.Context, streamName string, subjects []string) error { + return createTaskStream(ctx, TaskStreamConfig{ + StreamName: streamName, + Subjects: subjects, + }) +} + +// createTaskStreamWithPriority 创建支持优先级的任务流 +func createTaskStreamWithPriority(ctx context.Context, streamPrefix string) error { + subjects := []string{ + fmt.Sprintf("%s.high.>", streamPrefix), + fmt.Sprintf("%s.normal.>", streamPrefix), + fmt.Sprintf("%s.low.>", streamPrefix), + } + return createTaskStream(ctx, TaskStreamConfig{ + StreamName: streamPrefix, + Subjects: subjects, + }) +} + +// CreateTaskStream 配置: 文件存储 + 工作队列策略 +// CreateTaskStream 创建任务消息队列流(JetStream 2.10+) +// 核心设计思路: +// 1. 严格持久化:使用文件存储,任务消息不会因为服务器重启而丢失 +// 2. 支持任务优先级:通过主题分级实现,如 ["tasks.high", "tasks.low"] +// 3. 死信队列支持:通过 RePublish 配置将失败任务路由到专门的 DLQ 流 +// 4. 灵活保留策略:根据任务重要性设置不同的保留时长(MaxAge) +// 5. 工作队列模式:确保每个任务只被一个消费者处理(DiscardPerSubject) +func createTaskStream(ctx context.Context, config TaskStreamConfig) error { + if !IsConnected() { + return fmt.Errorf("NATS 未连接") + } + + if g.IsNil(config.StreamName) { + return fmt.Errorf("流名称不能为空") + } + if len(config.Subjects) == 0 { + return fmt.Errorf("主题数组不能为空") + } + // 设置默认值 + if config.Storage == "" { + config.Storage = StorageFile // 默认文件存储 + } + if config.Retention == "" { + config.Retention = RetentionWorkQueue // 默认工作队列策略 + } + if config.MaxAge == 0 { + config.MaxAge = 24 * time.Hour // 默认保留24小时 + } + if config.Replicas == 0 { + config.Replicas = 1 // 默认单副本 + } + if config.MaxBytes == 0 { + config.MaxBytes = 10 * 1024 * 1024 * 1024 // 默认10GB + } + if config.MaxMsgs == 0 { + config.MaxMsgs = 100000 // 默认10万条消息 + } + if config.MaxMsgSize == 0 { + config.MaxMsgSize = 1024 * 1024 // 默认1MB + } + + if config.DiscardPerSubject { + config.DenyDelete = true // 工作队列模式下禁止删除 + } + + // 构建流配置 + jsConfig := jetstream.StreamConfig{ + Name: config.StreamName, + Subjects: config.Subjects, + Storage: parseStorageType(config.Storage), + Retention: parseRetentionType(config.Retention), + MaxAge: config.MaxAge, + Duplicates: config.Duplicates, + Replicas: config.Replicas, + MaxMsgSize: config.MaxMsgSize, + MaxBytes: config.MaxBytes, + MaxMsgs: config.MaxMsgs, + MaxMsgsPerSubject: config.MaxMsgsPerSubject, + MaxConsumers: config.MaxConsumers, + AllowRollup: config.AllowRollup, + DenyDelete: config.DenyDelete, + DenyPurge: config.DenyPurge, + Discard: jetstream.DiscardOld, // 默认删除旧消息 + DiscardNewPerSubject: config.DiscardPerSubject, + } + + // 配置死信队列重新发布(如果设置了) + if config.Republish != nil { + jsConfig.RePublish = &jetstream.RePublish{ + Source: config.Republish.Source, + Destination: config.Republish.Destination, + HeadersOnly: config.Republish.HeadersOnly, + } + } else { + // 使用固定的死信队列命名规范:{StreamName}.DLQ + dlqSubject := fmt.Sprintf("%s.DLQ", config.StreamName) + // 死信队列配置 + jsConfig.RePublish = &jetstream.RePublish{ + Source: ">", + Destination: dlqSubject, + HeadersOnly: true, + } + } + + // 检查流是否已存在 + stream, err := js.Stream(ctx, config.StreamName) + if err == nil { + // 流已存在,更新配置 + _, err = js.UpdateStream(ctx, jsConfig) + if err != nil { + return fmt.Errorf("更新任务流失败: %w", err) + } + g.Log().Infof(ctx, "✅ 任务消息队列流已更新: %s", stream.CachedInfo().Config.Name) + return nil + } + + // 创建新流 + stream, err = js.CreateStream(ctx, jsConfig) + if err != nil { + return fmt.Errorf("创建任务流失败: %w", err) + } + + // 记录配置信息 + configInfo := fmt.Sprintf("存储=%s, 策略=%s, 副本=%d, 保留=%v", config.Storage, config.Retention, config.Replicas, config.MaxAge) + if config.Republish != nil { + configInfo += fmt.Sprintf(", 死信队列=%s->%s", config.Republish.Source, config.Republish.Destination) + } + g.Log().Infof(ctx, "✅ 任务消息队列流创建成功: %s (%s)", stream.CachedInfo().Config.Name, configInfo) + + return nil +} diff --git a/nats/nats_test.go b/nats/nats_test.go index 7cb5851..94e1262 100644 --- a/nats/nats_test.go +++ b/nats/nats_test.go @@ -46,7 +46,12 @@ func TestNatsStreamOperations(t *testing.T) { ctx := context.Background() // 创建任务流 - err := CreateTaskStream(ctx, "test_tasks", []string{"test.task.>"}) + config := TaskStreamConfig{ + StreamName: "test_tasks", + Subjects: []string{"test.task.>"}, + //Subject: "test.task.process", + } + err := CreateTaskStream(ctx, config) if err != nil { t.Logf("创建任务流失败: %v", err) } @@ -79,7 +84,12 @@ func TestNatsConsumerOperations(t *testing.T) { ctx := context.Background() // 创建测试流 - err := CreateTaskStream(ctx, "test_consumer", []string{"test.consumer.>"}) + config := TaskStreamConfig{ + StreamName: "test_consumer", + Subjects: []string{"test.consumer.>"}, + //Subject: "test.consumer.process", + } + err := CreateTaskStream(ctx, config) if err != nil { t.Logf("创建流失败: %v", err) } @@ -119,22 +129,3 @@ func TestNatsConsumerOperations(t *testing.T) { // 清理流 _ = DeleteStream(ctx, "test_consumer") } - -// TestNatsPublishRequest 测试发布和请求 -func TestNatsPublishRequest(t *testing.T) { - ctx := context.Background() - - // 发布消息 - err := Publish(ctx, "test.publish", []byte("hello")) - if err != nil { - t.Logf("发布消息失败: %v", err) - } - - // RPC 请求 - response, err := CallRPC(ctx, "test.request", []byte("request"), 5*time.Second) - if err != nil { - t.Logf("RPC 请求失败: %v", err) - } else { - t.Logf("RPC 响应: %s", string(response)) - } -} diff --git a/nats/task.go b/nats/task.go new file mode 100644 index 0000000..996ded6 --- /dev/null +++ b/nats/task.go @@ -0,0 +1,411 @@ +package nats + +//import ( +// "context" +// "fmt" +// "time" +// +// "github.com/gogf/gf/v2/frame/g" +// "github.com/nats-io/nats.go/jetstream" +//) + +//// TaskPriority 任务优先级 +//type TaskPriority string +// +//const ( +// TaskPriorityHigh TaskPriority = "high" // 高优先级任务 +// TaskPriorityNormal TaskPriority = "normal" // 普通优先级任务 +// TaskPriorityLow TaskPriority = "low" // 低优先级任务 +//) +// +//// TaskStreamConfig 任务流配置 +//type TaskStreamConfig struct { +// StreamName string // 流名称 +// Subjects []string // 主题列表(支持优先级分级,如 tasks.high.>, tasks.normal.>, tasks.low.>) +// Subject string // 默认发布主题 +// Priority TaskPriority // 任务优先级 +// MaxAge time.Duration // 消息保留时长(根据任务重要性设置) +// MaxMsgsPerSub int64 // 每个订阅者最大消息数(防止内存溢出) +// Replicas int // 副本数(默认1,建议生产环境使用3) +// Duplicates time.Duration // 消息去重窗口(0表示不启用) +//} +// +//// TaskConsumerConfig 任务消费者配置 +//type TaskConsumerConfig struct { +// ConsumerName string // 消费者名称 +// AckPolicy *jetstream.AckPolicy +// MaxDeliveries int32 // 最大投递次数(用于重试控制) +// AckWait time.Duration // 等待ACK超时时间 +// Backoff []time.Duration // 重试退避策略 +// FilterSubject string // 过滤主题(可指定特定优先级任务) +// MaxAckPending int // 最大待确认消息数 +// MaxWaiting int // 最大等待消息数 +// ReplayPolicy *jetstream.ReplayPolicy // 重放策略 +//} + +// CreateTaskStream 创建任务流(基于 JetStream 2.10+ API) +// +// 核心设计思路: +// 1. 严格的持久化:使用文件存储(FileStorage)避免任务丢失 +// 2. 任务优先级:通过主题分级实现(tasks.high/tasks.normal/tasks.low) +// 3. 死信队列:配置死信队列处理失败任务 +// 4. 保留策略:按任务重要性设置不同的保留时长 +// 5. 工作队列策略:确保每条消息只被一个消费者处理 +// +// 参数: +// - ctx: 上下文 +// - config: 任务流配置 +// +// 返回: +// - error: 错误信息 +//func CreateTaskStream(ctx context.Context, config TaskStreamConfig) error { +// if !IsConnected() { +// return fmt.Errorf("NATS 未连接") +// } +// +// // 设置默认值 +// if config.MaxAge == 0 { +// config.MaxAge = 7 * 24 * time.Hour // 默认保留7天 +// } +// if config.MaxMsgsPerSub == 0 { +// config.MaxMsgsPerSub = 100000 // 默认每订阅者最多10万条消息 +// } +// if config.Replicas == 0 { +// config.Replicas = 1 // 默认单副本 +// } +// if config.Duplicates == 0 { +// config.Duplicates = 2 * time.Minute // 默认2分钟去重窗口 +// } +// +// // 验证主题配置 +// if len(config.Subjects) == 0 { +// return fmt.Errorf("任务流必须指定至少一个主题") +// } +// +// // 设置死信队列 +// // 使用固定的死信队列命名规范:{StreamName}.DLQ +// dlqSubject := fmt.Sprintf("%s.DLQ", config.StreamName) +// +// // 尝试获取现有流 +// stream, err := js.Stream(ctx, config.StreamName) +// if err == nil { +// // 流已存在,更新配置以适配任务流的特殊需求 +// _, err = js.UpdateStream(ctx, jetstream.StreamConfig{ +// Name: config.StreamName, +// Subjects: config.Subjects, +// Storage: jetstream.FileStorage, // 文件存储确保持久化 +// Retention: jetstream.WorkQueuePolicy, // 工作队列策略 +// MaxAge: config.MaxAge, +// MaxMsgs: config.MaxMsgsPerSub, +// Replicas: config.Replicas, +// Duplicates: config.Duplicates, +// // 死信队列配置 +// RePublish: &jetstream.RePublish{ +// Source: ">", // 匹配所有主题 +// Destination: dlqSubject, +// }, +// // 限制流大小(防止磁盘占用过多) +// MaxBytes: 10 * 1024 * 1024 * 1024, // 10GB +// }) +// if err != nil { +// return fmt.Errorf("更新任务流失败: %w", err) +// } +// g.Log().Infof(ctx, "✅ 任务流已更新: %s (优先级: %s, 保留: %v)", +// stream.CachedInfo().Config.Name, config.Priority, config.MaxAge) +// return nil +// } +// +// // 创建新任务流 +// streamConfig := jetstream.StreamConfig{ +// Name: config.StreamName, +// Subjects: config.Subjects, +// Storage: jetstream.FileStorage, // 文件存储确保持久化 +// Retention: jetstream.WorkQueuePolicy, // 工作队列策略 +// MaxAge: config.MaxAge, +// MaxMsgs: config.MaxMsgsPerSub, +// Replicas: config.Replicas, +// Duplicates: config.Duplicates, +// // 死信队列配置 +// RePublish: &jetstream.RePublish{ +// Source: ">", // 匹配所有主题 +// Destination: dlqSubject, +// }, +// // 限制流大小(防止磁盘占用过多) +// MaxBytes: 10 * 1024 * 1024 * 1024, // 10GB +// // 启用流清理 +// Discard: jetstream.DiscardOld, // 新消息替换旧消息 +// } +// +// stream, err = js.CreateStream(ctx, streamConfig) +// if err != nil { +// return fmt.Errorf("创建任务流失败: %w", err) +// } +// +// // 验证流是否创建成功 +// if stream == nil { +// return fmt.Errorf("创建任务流失败:流对象为空") +// } +// +// g.Log().Infof(ctx, "✅ 任务流创建成功: %s (文件存储+工作队列策略+死信队列, 优先级: %s, 保留: %v, 副本: %d)", +// stream.CachedInfo().Config.Name, config.Priority, config.MaxAge, config.Replicas) +// +// // 记录配置信息 +// g.Log().Infof(ctx, " - 主题列表: %v", config.Subjects) +// g.Log().Infof(ctx, " - 死信队列: %s", dlqSubject) +// g.Log().Infof(ctx, " - 最大消息数: %d", config.MaxMsgsPerSub) +// g.Log().Infof(ctx, " - 去重窗口: %v", config.Duplicates) +// +// return nil +//} +// +//// CreateOrUpdateTaskConsumer 创建或更新任务消费者(基于 JetStream 2.10+ API) +//// +//// 核心设计思路: +//// 1. 支持手动确认(AckExplicit)确保任务处理完成 +//// 2. 通过 Nack() 方法实现消息重试,超限后进入死信队列 +//// 3. 支持主题过滤,可订阅特定优先级任务 +//// 4. 限制待确认消息数,防止消费者过载 +//// 5. AckWait 设置消息处理超时时间 +//// +//// 参数: +//// - ctx: 上下文 +//// - streamName: 流名称 +//// - consumerConfig: 消费者配置 +//// +//// 返回: +//// - jetstream.Consumer: 消费者对象 +//// - error: 错误信息 +//func CreateOrUpdateTaskConsumer(ctx context.Context, streamName string, consumerConfig TaskConsumerConfig) (jetstream.Consumer, error) { +// if !IsConnected() { +// return nil, fmt.Errorf("NATS 未连接") +// } +// +// // 设置默认值 +// ackPolicy := jetstream.AckExplicitPolicy +// if consumerConfig.AckPolicy != nil { +// ackPolicy = *consumerConfig.AckPolicy +// } +// +// if consumerConfig.MaxDeliveries == 0 { +// consumerConfig.MaxDeliveries = 10 // 默认最多投递10次 +// } +// +// if consumerConfig.AckWait == 0 { +// consumerConfig.AckWait = 30 * time.Second // 默认30秒等待确认 +// } +// +// if consumerConfig.MaxAckPending == 0 { +// consumerConfig.MaxAckPending = 1000 // 默认最多1000条待确认消息 +// } +// +// if consumerConfig.MaxWaiting == 0 { +// consumerConfig.MaxWaiting = 512 // 默认最多512条等待消息 +// } +// +// replayPolicy := jetstream.ReplayInstantPolicy +// if consumerConfig.ReplayPolicy != nil { +// replayPolicy = *consumerConfig.ReplayPolicy +// } +// +// // 构建消费者配置 +// config := jetstream.ConsumerConfig{ +// Name: consumerConfig.ConsumerName, +// Durable: consumerConfig.ConsumerName, // 持久化消费者 +// AckPolicy: ackPolicy, +// AckWait: consumerConfig.AckWait, +// MaxAckPending: consumerConfig.MaxAckPending, +// MaxWaiting: consumerConfig.MaxWaiting, +// ReplayPolicy: replayPolicy, +// FilterSubject: consumerConfig.FilterSubject, +// } +// +// // 使用 CreateOrUpdateConsumer 创建或更新消费者 +// consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, config) +// if err != nil { +// return nil, fmt.Errorf("创建任务消费者失败: %w", err) +// } +// +// g.Log().Infof(ctx, "✅ 任务消费者已创建/更新: %s/%s (等待确认: %v)", +// streamName, consumerConfig.ConsumerName, consumerConfig.AckWait) +// +// // 获取消费者信息并记录 +// info, err := consumer.Info(ctx) +// if err == nil { +// g.Log().Infof(ctx, " - 过滤主题: %s", info.Config.FilterSubject) +// g.Log().Infof(ctx, " - 最大待确认: %d", info.Config.MaxAckPending) +// g.Log().Infof(ctx, " - ACK策略: %s", info.Config.AckPolicy) +// } +// +// return consumer, nil +//} +// +//// CreateTaskStreamWithPriority 创建带优先级的任务流 +//// +//// 便捷方法,自动创建支持多优先级的任务流配置 +//// +//// 参数: +//// - ctx: 上下文 +//// - streamPrefix: 流名称前缀(如 "tasks") +//// - priority: 默认优先级 +//// +//// 返回: +//// - error: 错误信息 +//func CreateTaskStreamWithPriority(ctx context.Context, streamPrefix string, priority TaskPriority) error { +// if !IsConnected() { +// return fmt.Errorf("NATS 未连接") +// } +// +// // 构建支持多优先级的主题列表 +// subjects := []string{ +// fmt.Sprintf("%s.high.>", streamPrefix), // 高优先级任务 +// fmt.Sprintf("%s.normal.>", streamPrefix), // 普通优先级任务 +// fmt.Sprintf("%s.low.>", streamPrefix), // 低优先级任务 +// } +// +// // 根据优先级设置不同的保留时长 +// var maxAge time.Duration +// switch priority { +// case TaskPriorityHigh: +// maxAge = 30 * 24 * time.Hour // 高优先级保留30天 +// case TaskPriorityNormal: +// maxAge = 7 * 24 * time.Hour // 普通优先级保留7天 +// case TaskPriorityLow: +// maxAge = 24 * time.Hour // 低优先级保留1天 +// default: +// maxAge = 7 * 24 * time.Hour +// } +// +// config := TaskStreamConfig{ +// StreamName: streamPrefix, +// Subjects: subjects, +// Subject: fmt.Sprintf("%s.%s.>", streamPrefix, priority), +// Priority: priority, +// MaxAge: maxAge, +// MaxMsgsPerSub: 100000, +// Replicas: 1, +// Duplicates: 2 * time.Minute, +// } +// +// return CreateTaskStream(ctx, config) +//} +// +//// PublishTask 发布任务到指定流 +//// +//// 参数: +//// - ctx: 上下文 +//// - streamName: 流名称 +//// - task: 任务数据(会被JSON序列化) +//// +//// 返回: +//// - error: 错误信息 +//func PublishTask(ctx context.Context, streamName string, task interface{}) error { +// if !IsConnected() { +// return fmt.Errorf("NATS 未连接") +// } +// +// // 使用 JsPublish 发布消息 +// if err := JsPublish(ctx, streamName, task); err != nil { +// return fmt.Errorf("发布任务失败: %w", err) +// } +// +// return nil +//} +// +//// PublishTaskWithPriority 发布带优先级的任务 +//// +//// 参数: +//// - ctx: 上下文 +//// - streamPrefix: 流名称前缀 +//// - priority: 任务优先级 +//// - taskType: 任务类型 +//// - task: 任务数据(会被JSON序列化) +//// +//// 返回: +//// - error: 错误信息 +//func PublishTaskWithPriority(ctx context.Context, streamPrefix string, priority TaskPriority, taskType string, task interface{}) error { +// if !IsConnected() { +// return fmt.Errorf("NATS 未连接") +// } +// +// // 构建主题:{streamPrefix}.{priority}.{taskType} +// subject := fmt.Sprintf("%s.%s.%s", streamPrefix, priority, taskType) +// +// // 使用 JsPublish 发布消息 +// if err := JsPublish(ctx, subject, task); err != nil { +// return fmt.Errorf("发布任务失败: %w", err) +// } +// +// g.Log().Debugf(ctx, "任务已发布: %s (优先级: %s, 类型: %s)", subject, priority, taskType) +// +// return nil +//} +// +//// GetTaskStreamInfo 获取任务流信息 +//// +//// 参数: +//// - ctx: 上下文 +//// - streamName: 流名称 +//// +//// 返回: +//// - *jetstream.StreamInfo: 流信息 +//// - error: 错误信息 +//func GetTaskStreamInfo(ctx context.Context, streamName string) (*jetstream.StreamInfo, error) { +// if !IsConnected() { +// return nil, fmt.Errorf("NATS 未连接") +// } +// +// return GetStream(ctx, streamName) +//} +// +//// GetTaskConsumerInfo 获取任务消费者信息 +//// +//// 参数: +//// - ctx: 上下文 +//// - streamName: 流名称 +//// - consumerName: 消费者名称 +//// +//// 返回: +//// - *jetstream.ConsumerInfo: 消费者信息 +//// - error: 错误信息 +//func GetTaskConsumerInfo(ctx context.Context, streamName, consumerName string) (*jetstream.ConsumerInfo, error) { +// if !IsConnected() { +// return nil, fmt.Errorf("NATS 未连接") +// } +// +// return GetConsumer(ctx, streamName, consumerName) +//} +// +//// DeleteTaskStream 删除任务流 +//// +//// 注意:此操作会删除流及其所有消息,请谨慎使用 +//// +//// 参数: +//// - ctx: 上下文 +//// - streamName: 流名称 +//// +//// 返回: +//// - error: 错误信息 +//func DeleteTaskStream(ctx context.Context, streamName string) error { +// if !IsConnected() { +// return fmt.Errorf("NATS 未连接") +// } +// +// return DeleteStream(ctx, streamName) +//} +// +//// DeleteTaskConsumer 删除任务消费者 +//// +//// 参数: +//// - ctx: 上下文 +//// - streamName: 流名称 +//// - consumerName: 消费者名称 +//// +//// 返回: +//// - error: 错误信息 +//func DeleteTaskConsumer(ctx context.Context, streamName, consumerName string) error { +// if !IsConnected() { +// return fmt.Errorf("NATS 未连接") +// } +// +// return DeleteConsumer(ctx, streamName, consumerName) +//} diff --git a/nats/utils.go b/nats/utils.go new file mode 100644 index 0000000..1e43535 --- /dev/null +++ b/nats/utils.go @@ -0,0 +1,87 @@ +package nats + +import ( + "context" + "fmt" + "github.com/gogf/gf/v2/frame/g" + "github.com/nats-io/nats.go" + "go.opentelemetry.io/otel/trace" +) + +// ============ 上下文元数据工具函数 ============ +// 以下函数用于在 context 和 NATS 消息头之间互转元数据 + +// 定义常见的上下文元数据 key +const ( + TraceIDKey = "trace_id" + TokenKey = "token" +) + +func getTraceID(ctx context.Context) (traceID string, err error) { + // 提取 traceId:首先尝试从 OpenTelemetry Span 中提取,从 context 中提取 TraceID + span := trace.SpanFromContext(ctx) + if span != nil && span.SpanContext().HasTraceID() { + traceID = span.SpanContext().TraceID().String() + } else if tid := ctx.Value(TraceIDKey); tid != nil { + traceID = fmt.Sprintf("%v", tid) + } + if traceID == "" { + return traceID, fmt.Errorf("context 中没有 TraceID") + } + return +} + +// contextToHeaders 将 context 中的元数据转换为 NATS 消息头 +// 支持提取 user_id、tenant_id、trace_id、token 等常见字段 +func contextToHeaders(ctx context.Context) (nats.Header, error) { + headers := make(nats.Header) + + // 提取 traceId:首先尝试从 OpenTelemetry Span 中提取 + if traceID, err := getTraceID(ctx); err != nil { + return headers, err + } else { + headers.Set(TraceIDKey, traceID) + } + + // 提取 token(优先级:context value > HTTP Authorization header) + token := "" + if t := ctx.Value(TokenKey); t != nil { + token = fmt.Sprintf("%v", t) + } else if r := g.RequestFromCtx(ctx); r != nil { + // 从 HTTP 请求的 Authorization header 中提取 token + auth := r.GetHeader("Authorization") + if auth != "" { + // 移除 "Bearer " 前缀 + if len(auth) > 7 && auth[:7] == "Bearer " { + token = auth[7:] + } else { + token = auth + } + } + } + if token != "" { + headers.Set(TokenKey, token) + } + + return headers, nil +} + +// headersToContext 从 NATS 消息头重建 context +// 支持还原 user_id、tenant_id、trace_id、token 等字段 +func headersToContext(ctx context.Context, headers nats.Header) context.Context { + if headers == nil { + return ctx + } + + // 恢复 trace_id + if traceID := headers.Get(TraceIDKey); traceID != "" { + ctx = context.WithValue(ctx, TraceIDKey, traceID) + } + + // 恢复 token + if token := headers.Get(TokenKey); token != "" { + ctx = context.WithValue(ctx, TokenKey, token) + } + + return ctx +} diff --git a/utils/utils.go b/utils/utils.go index c9b6f74..0ae38d8 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -66,8 +66,18 @@ func GetMonthToday(t time.Time, month int) time.Time { } func GetUserInfo(ctx context.Context) (user beans.User, err error) { - r := g.RequestFromCtx(ctx) - if r != nil { + // 检查context是否已取消 + select { + case <-ctx.Done(): + return user, ctx.Err() + default: + } + + if !g.IsNil(ctx.Value("id")) || !g.IsNil(ctx.Value("userName")) || !g.IsNil(ctx.Value("tenantId")) { + user.UserId = ctx.Value("id") + user.UserName = ctx.Value("userName") + user.TenantId = ctx.Value("tenantId") + } else { redisAddr := g.Cfg().MustGet(ctx, "redis.default.address").String() gft := gftoken.NewGfToken( gftoken.WithCacheKey("gfToken:"), @@ -79,34 +89,47 @@ func GetUserInfo(ctx context.Context) (user beans.User, err error) { Address: redisAddr, Db: 1, })) - // 解析 token - data, err := gft.ParseToken(g.RequestFromCtx(ctx)) - if err != nil { - return user, gerror.Wrap(err, "token 解析失败") + var data *gftoken.CustomClaims + + if !g.IsNil(ctx.Value("token")) { + var tokenData *gftoken.TokenData + tokenData, _, err = gft.GetTokenData(ctx, ctx.Value("token").(string)) + if err != nil { + return user, gerror.Wrap(err, "token 解析失败") + } + var code int + if data, code = gft.IsNotExpired(tokenData.JwtToken); code != gftoken.JwtTokenOK { + return user, gerror.New("token jwt 解析失败") + } + } else if g.RequestFromCtx(ctx) != nil { + // 解析 token + data, err = gft.ParseToken(g.RequestFromCtx(ctx)) + if err != nil { + return user, gerror.Wrap(err, "token 解析失败") + } } // 检查 data 是否为 nil if data == nil { return user, gerror.New("token 数据为空") } - // 检查 data.Data 是否为 nil if data.Data == nil { + g.Log().Errorf(ctx, "data.Data 为空") return user, gerror.New("用户信息为空") } - dataMap := gconv.Map(data.Data) + user.UserId = dataMap["id"] user.UserName = dataMap["userName"] user.TenantId = dataMap["tenantId"] - } else { - user.TenantId = ctx.Value("tenantId") - user.UserName = ctx.Value("userName") } - if user.TenantId == nil { + + if g.IsNil(user.UserId) && g.IsNil(user.UserName) && g.IsNil(user.TenantId) { return user, gerror.New("租户信息为空") } return } + func SetValue(ctx context.Context, result any, key string, value any) { // 检查context是否已取消 select { @@ -207,6 +230,13 @@ func FormatUnixTime(timestamp int64) string { // ParseDurationWithDefault 解析持续时间,失败时使用默认值 - 通用时间处理工具 func ParseDurationWithDefault(ctx context.Context, durationStr, defaultStr, fieldName string) (time.Duration, string) { + // 检查context是否已取消 + select { + case <-ctx.Done(): + return 0, "" + default: + } + durationParsed, err := time.ParseDuration(durationStr) if err != nil { // 这里不能直接使用g.Log(),因为这是utils包,没有直接的日志访问 @@ -303,7 +333,7 @@ func Struct(params any, pointer any) error { if err != nil { return err } - err = json.Unmarshal(b, &pointer) + err = json.Unmarshal(b, pointer) if err != nil { return err }