diff --git a/nats/msg.go b/nats/msg.go deleted file mode 100644 index 5929241..0000000 --- a/nats/msg.go +++ /dev/null @@ -1,65 +0,0 @@ -package nats - -import ( - "context" - "github.com/gogf/gf/v2/errors/gerror" -) - -// NatsMessageConfig nats Stream 消息配置 -type NatsMessageConfig struct { - CreateTaskStreamName string - CreateTaskSubjects []string - PublishSubject string - CreateTaskConsumerName string - MsgCount int - HandleFunc func(ctx context.Context, message map[string]interface{}) error -} - -// MessageConfig 消息配置接口 -type MessageConfig interface { - createTaskStream(ctx context.Context) error - publish(ctx context.Context, data interface{}) error - createTaskConsumer(ctx context.Context) error - //startConsumer(ctx context.Context, handleFunc func(ctx context.Context, msg *nats.Msg) error) error -} - -func (n *NatsMessageConfig) createTaskStream(ctx context.Context) error { - return createTaskStreamSimple(ctx, n.CreateTaskStreamName, n.CreateTaskSubjects) -} - -// CreateTaskStreamBatch 批量创建任务消息队列流 -func CreateTaskStreamBatch(ctx context.Context, configs ...MessageConfig) error { - for _, cfg := range configs { - if err := cfg.createTaskStream(ctx); err != nil { - return gerror.Wrap(err, "创建任务消息队列流失败") - } - } - return nil -} - -func (n *NatsMessageConfig) publish(ctx context.Context, data interface{}) error { - return publish(ctx, n.PublishSubject, data) -} - -// PublishMessage 发布消息(统一入口) -func PublishMessage(ctx context.Context, cfg MessageConfig, data interface{}) (err error) { - return cfg.publish(ctx, data) -} - -func (n *NatsMessageConfig) createTaskConsumer(ctx context.Context) error { - return CreateConsumerPushMode(ctx, n.CreateTaskStreamName, n.CreateTaskConsumerName, n.PublishSubject, n.MsgCount) -} - -// CreateTaskConsumerBatch 批量创建任务消息队列消费者 -func CreateTaskConsumerBatch(ctx context.Context, configs ...MessageConfig) error { - for _, cfg := range configs { - if err := cfg.createTaskConsumer(ctx); err != nil { - return gerror.Wrap(err, "创建任务消息队列流失败") - } - } - return nil -} - -//func (n *NatsMessageConfig) startConsumer(ctx context.Context, handleFunc func(ctx context.Context, msg *nats.Msg) error) error { -// return ConsumeMessages(ctx, n.CreateTaskStreamName, n.CreateTaskConsumerName, handleFunc) -//} diff --git a/nats/nats.go b/nats/nats.go deleted file mode 100644 index d01886e..0000000 --- a/nats/nats.go +++ /dev/null @@ -1,268 +0,0 @@ -package nats - -import ( - "context" - "encoding/json" - "fmt" - "time" - - "github.com/gogf/gf/v2/frame/g" - "github.com/nats-io/nats.go/jetstream" -) - -// createTaskStream 创建任务消息队列流(内部使用,兼容旧版本) -// 存储策略: 文件存储 -// 工作队列模式: 工作队列策略 -func CreateTaskStream(ctx context.Context, streamInfo TaskStreamConfig) error { - if !IsConnected() { - return fmt.Errorf("NATS 未连接") - } - - stream, err := js.Stream(ctx, streamInfo.StreamName) - if err == nil { - // 流已存在,更新配置 - _, err = js.UpdateStream(ctx, jetstream.StreamConfig{ - Name: streamInfo.StreamName, - Subjects: streamInfo.Subjects, - Storage: jetstream.FileStorage, - Retention: jetstream.WorkQueuePolicy, - }) - if err != nil { - return fmt.Errorf("更新任务流失败: %w", err) - } - g.Log().Infof(ctx, "✅ 任务消息队列流已更新: %s", stream.CachedInfo().Config.Name) - return nil - } - - // 创建新流 - stream, err = js.CreateStream(ctx, jetstream.StreamConfig{ - Name: streamInfo.StreamName, - Subjects: streamInfo.Subjects, - Storage: jetstream.FileStorage, - Retention: jetstream.WorkQueuePolicy, - }) - if err != nil { - return fmt.Errorf("创建任务流失败: %w", err) - } - - g.Log().Infof(ctx, "✅ 任务消息队列流创建成功: %s (文件存储+工作队列策略)", stream.CachedInfo().Config.Name) - return nil -} - -// CreateLogStream 创建日志流 -// 存储策略: 内存存储 -// 副本数: 单副本 (1) -// 消息留存: 短时留存 (1小时) -func CreateLogStream(ctx context.Context, streamName string, subjects []string) error { - if !IsConnected() { - return fmt.Errorf("NATS 未连接") - } - - maxAge := 1 * time.Hour - - stream, err := js.Stream(ctx, streamName) - if err == nil { - // 流已存在,更新配置 - _, err = js.UpdateStream(ctx, jetstream.StreamConfig{ - Name: streamName, - Subjects: subjects, - Storage: jetstream.MemoryStorage, - Replicas: 1, - MaxAge: maxAge, - }) - if err != nil { - return fmt.Errorf("更新日志流失败: %w", err) - } - g.Log().Infof(ctx, "✅ 日志流已更新: %s", stream.CachedInfo().Config.Name) - return nil - } - - // 创建新流 - stream, err = js.CreateStream(ctx, jetstream.StreamConfig{ - Name: streamName, - Subjects: subjects, - Storage: jetstream.MemoryStorage, - Replicas: 1, - MaxAge: maxAge, - }) - if err != nil { - return fmt.Errorf("创建日志流失败: %w", err) - } - - g.Log().Infof(ctx, "✅ 日志流创建成功: %s (内存存储+单副本+短时留存1小时)", stream.CachedInfo().Config.Name) - return nil -} - -// CreateTradeStream 创建交易业务流 -// 存储策略: 文件存储 -// 副本数: 3副本 -// 同步刷盘: 启用 -func CreateTradeStream(ctx context.Context, streamName string, subjects []string) error { - if !IsConnected() { - return fmt.Errorf("NATS 未连接") - } - - stream, err := js.Stream(ctx, streamName) - if err == nil { - // 流已存在,更新配置 - _, err = js.UpdateStream(ctx, jetstream.StreamConfig{ - Name: streamName, - Subjects: subjects, - Storage: jetstream.FileStorage, - Replicas: 3, - RePublish: nil, - Duplicates: 0, - }) - if err != nil { - return fmt.Errorf("更新交易流失败: %w", err) - } - g.Log().Infof(ctx, "✅ 交易业务流已更新: %s", stream.CachedInfo().Config.Name) - return nil - } - - // 创建新流 - stream, err = js.CreateStream(ctx, jetstream.StreamConfig{ - Name: streamName, - Subjects: subjects, - Storage: jetstream.FileStorage, - Replicas: 3, - RePublish: nil, - Duplicates: 0, - }) - if err != nil { - return fmt.Errorf("创建交易流失败: %w", err) - } - - g.Log().Infof(ctx, "✅ 交易业务流创建成功: %s (文件存储+3副本+同步刷盘)", stream.CachedInfo().Config.Name) - return nil -} - -// JsPublish 发布消息到指定主题 -func JsPublish(ctx context.Context, subject string, data any) (err error) { - if !IsConnected() { - return fmt.Errorf("NATS 未连接") - } - // 序列化数据 - dataBytes, err := json.Marshal(data) - if err != nil { - return fmt.Errorf("序列化数据失败: %w", err) - } - // 发布消息 - metrics.PublishCount.Add(1) - _, err = js.Publish(ctx, subject, dataBytes) - if err != nil { - metrics.PublishError.Add(1) - return fmt.Errorf("发布消息失败: %w", err) - } - - return -} - -// GetStream 获取流信息 -func GetStream(ctx context.Context, streamName string) (*jetstream.StreamInfo, error) { - if !IsConnected() { - return nil, fmt.Errorf("NATS 未连接") - } - - stream, err := js.Stream(ctx, streamName) - if err != nil { - return nil, fmt.Errorf("获取流失败: %w", err) - } - - info, err := stream.Info(ctx) - if err != nil { - return nil, fmt.Errorf("获取流信息失败: %w", err) - } - - return info, nil -} - -// ListStreams 列出所有流(简化实现) -// 注意:由于 API 限制,此方法可能需要根据实际需求进一步实现 -func ListStreams(ctx context.Context) ([]string, error) { - if !IsConnected() { - return nil, fmt.Errorf("NATS 未连接") - } - - // TODO: 根据实际 NATS 版本实现完整的流列表功能 - return []string{}, nil -} - -// DeleteStream 删除流 -func DeleteStream(ctx context.Context, streamName string) error { - if !IsConnected() { - return fmt.Errorf("NATS 未连接") - } - - if err := js.DeleteStream(ctx, streamName); err != nil { - return fmt.Errorf("删除流失败: %w", err) - } - - g.Log().Infof(ctx, "✅ 流已删除: %s", streamName) - return nil -} - -// GetConsumer 获取消费者信息 -func GetConsumer(ctx context.Context, streamName, consumerName string) (*jetstream.ConsumerInfo, error) { - if !IsConnected() { - return nil, fmt.Errorf("NATS 未连接") - } - - consumer, err := js.Consumer(ctx, streamName, consumerName) - if err != nil { - return nil, fmt.Errorf("获取消费者失败: %w", err) - } - - info, err := consumer.Info(ctx) - if err != nil { - return nil, fmt.Errorf("获取消费者信息失败: %w", err) - } - - return info, nil -} - -// ListConsumers 列出指定流的所有消费者(简化实现) -// 注意:由于 API 限制,此方法可能需要根据实际需求进一步实现 -func ListConsumers(ctx context.Context, streamName string) ([]string, error) { - if !IsConnected() { - return nil, fmt.Errorf("NATS 未连接") - } - - // TODO: 根据实际 NATS 版本实现完整的消费者列表功能 - return []string{}, nil -} - -// DeleteConsumer 删除消费者 -func DeleteConsumer(ctx context.Context, streamName, consumerName string) error { - if !IsConnected() { - return fmt.Errorf("NATS 未连接") - } - - if err := js.DeleteConsumer(ctx, streamName, consumerName); err != nil { - return fmt.Errorf("删除消费者失败: %w", err) - } - - g.Log().Infof(ctx, "✅ 消费者已删除: %s/%s", streamName, consumerName) - return nil -} - -// CreateConsumer 创建消费者 -func CreateConsumer(ctx context.Context, streamName, consumerName string, config jetstream.ConsumerConfig) (jetstream.Consumer, error) { - if !IsConnected() { - return nil, fmt.Errorf("NATS 未连接") - } - - // 尝试获取现有消费者 - consumer, err := js.Consumer(ctx, streamName, consumerName) - if err == nil { - return consumer, nil - } - - // 推荐:不存在则创建,存在则更新配置 - consumer, err = js.CreateOrUpdateConsumer(ctx, streamName, config) - if err != nil { - return nil, fmt.Errorf("创建消费者失败: %w", err) - } - - return consumer, nil -} diff --git a/nats/nats_client.go b/nats/nats_client.go deleted file mode 100644 index a4afebb..0000000 --- a/nats/nats_client.go +++ /dev/null @@ -1,313 +0,0 @@ -package nats - -import ( - "context" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/gogf/gf/v2/frame/g" - "github.com/nats-io/nats.go" - "github.com/nats-io/nats.go/jetstream" -) - -var ( - nc *nats.Conn - js jetstream.JetStream - inited bool - natsMu sync.RWMutex - natsURL string - healthCtx context.Context - healthCancel context.CancelFunc - connected bool - reconnectChan chan struct{} - - // 连接状态变化监听器 - connStateListeners []connStateListener - connListenersMu sync.RWMutex - - // 监控指标 - metrics metricsCounter -) - -// Metrics 监控指标 -type metricsCounter struct { - PublishCount atomic.Int64 - PublishError atomic.Int64 - SubscribeCount atomic.Int64 - RequestCount atomic.Int64 - RequestError atomic.Int64 - ConsumeCount atomic.Int64 - ConsumeError atomic.Int64 -} - -// ConnState 连接状态 -type connState int - -const ( - connStateDisconnected connState = iota - connStateConnecting - connStateConnected - connStateReconnecting - connStateClosed -) - -// ConnStateListener 连接状态监听器 -type connStateListener func(state connState, err error) - -// GetMetrics 获取监控指标 -func getMetrics() metricsCounter { - return metrics -} - -// registerConnStateListener 注册连接状态监听器 -func registerConnStateListener(listener connStateListener) { - connListenersMu.Lock() - defer connListenersMu.Unlock() - connStateListeners = append(connStateListeners, listener) -} - -// unregisterConnStateListener 取消注册连接状态监听器 -func unregisterConnStateListener(listener connStateListener) { - connListenersMu.Lock() - defer connListenersMu.Unlock() - for i, l := range connStateListeners { - if l != nil && &l == &listener { - connStateListeners = append(connStateListeners[:i], connStateListeners[i+1:]...) - break - } - } -} - -// notifyConnState 通知所有监听器连接状态变化 -func notifyConnState(state connState, err error) { - connListenersMu.RLock() - listeners := make([]connStateListener, len(connStateListeners)) - copy(listeners, connStateListeners) - connListenersMu.RUnlock() - - for _, listener := range listeners { - if listener != nil { - listener(state, err) - } - } -} - -// init 初始化 NATS 连接 -func init() { - // 从配置文件读取 NATS 地址 - natsURL = g.Cfg().MustGet(context.Background(), "nats.url").String() - if natsURL == "" { - // 默认使用本地地址 - natsURL = nats.DefaultURL - } - - // 创建健康检查上下文 - healthCtx, healthCancel = context.WithCancel(context.Background()) - - // 创建重连通知通道(增大缓冲区避免丢失通知) - reconnectChan = make(chan struct{}, 10) - - // 启动连接 - go initConnection() - - // 启动健康检查协程 - go healthCheck() -} - -// initConnection 初始化连接 -func initConnection() { - ctx := context.Background() - notifyConnState(connStateConnecting, nil) - if err := connect(ctx); err != nil { - g.Log().Errorf(ctx, "NATS 初始连接失败: %v", err) - notifyConnState(connStateDisconnected, err) - } -} - -// connect 建立 NATS 连接 -func connect(ctx context.Context) error { - natsMu.Lock() - defer natsMu.Unlock() - - if nc != nil && !nc.IsClosed() { - nc.Close() - } - - // 连接选项配置 - opts := []nats.Option{ - nats.Name("goframe-nats-client"), - nats.ReconnectWait(2 * time.Second), - nats.MaxReconnects(-1), // 无限重连 - nats.PingInterval(10 * time.Second), - nats.MaxPingsOutstanding(5), - nats.ReconnectHandler(func(nc *nats.Conn) { - g.Log().Infof(ctx, "✅ NATS 重连成功: %s", nc.ConnectedUrl()) - connected = true - - // 重新创建 JetStream 实例 - if newJS, err := jetstream.New(nc); err == nil { - js = newJS - } - - // 通知重连成功 - notifyConnState(connStateConnected, nil) - - // 使用非阻塞发送避免阻塞 - select { - case reconnectChan <- struct{}{}: - default: - // 通道已满,丢弃通知 - } - }), - nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { - g.Log().Warningf(ctx, "⚠️ NATS 连接断开: %v, 准备重连...", err) - connected = false - notifyConnState(connStateReconnecting, err) - }), - nats.ClosedHandler(func(nc *nats.Conn) { - g.Log().Infof(ctx, "NATS 连接已关闭: %s", nc.ConnectedUrl()) - connected = false - notifyConnState(connStateClosed, nil) - }), - nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) { - g.Log().Errorf(ctx, "NATS 错误: %v", err) - }), - } - - var err error - nc, err = nats.Connect(natsURL, opts...) - if err != nil { - return fmt.Errorf("NATS 连接失败: %w", err) - } - - // 等待连接就绪 - if nc.Status() != nats.CONNECTED { - select { - case <-time.After(5 * time.Second): - notifyConnState(connStateDisconnected, fmt.Errorf("连接超时")) - return fmt.Errorf("NATS 连接超时") - case <-nc.StatusChanged(nats.CONNECTED): - } - } - - // 创建 JetStream 实例 - js, err = jetstream.New(nc) - if err != nil { - return fmt.Errorf("创建 JetStream 失败: %w", err) - } - - connected = true - inited = true - g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl()) - notifyConnState(connStateConnected, nil) - return nil -} - -// healthCheck 健康检查协程(仅作为备用检查) -func healthCheck() { - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - for { - select { - case <-healthCtx.Done(): - return - case <-ticker.C: - natsMu.RLock() - currentConnected := connected - currentConn := nc - natsMu.RUnlock() - - if !currentConnected || currentConn == nil || currentConn.IsClosed() { - // 仅记录日志,不尝试重连(NATS 已有自动重连机制) - g.Log().Warning(context.Background(), "NATS 连接断开,等待 NATS 自动重连...") - } - case <-reconnectChan: - // 重连成功的通知(仅记录日志) - g.Log().Info(context.Background(), "收到重连成功通知") - } - } -} - -// checkConnected 检查连接状态 -func checkConnected() bool { - natsMu.RLock() - defer natsMu.RUnlock() - return connected && nc != nil && !nc.IsClosed() -} - -// getConnState 获取当前连接状态 -func getConnState() connState { - natsMu.RLock() - defer natsMu.RUnlock() - - if nc == nil { - return connStateDisconnected - } - - if nc.IsClosed() { - return connStateClosed - } - - if connected { - return connStateConnected - } - - return connStateDisconnected -} - -// shutdown 优雅关闭:自动注销所有已注册的服务并关闭 NATS 连接 -func shutdown() error { - ctx := context.Background() - g.Log().Info(ctx, "开始优雅关闭 NATS RPC 服务...") - - // 注销所有单实例服务 - rpcServicesMu.Lock() - singleServiceCount := len(rpcServices) - for serviceName := range rpcServices { - if sub, exists := rpcSubs[serviceName]; exists { - if err := sub.Unsubscribe(); err != nil { - g.Log().Errorf(ctx, "注销服务 %s 失败: %v", serviceName, err) - } - } - delete(rpcSubs, serviceName) - delete(rpcServices, serviceName) - } - rpcServicesMu.Unlock() - - // 注销所有队列服务 - queueRPCMu.Lock() - queueServiceCount := 0 - for queueName, servicesMap := range queueRPCServices { - queueServiceCount += len(servicesMap) - for serviceName, sub := range queueRPCSubs[queueName] { - if err := sub.Unsubscribe(); err != nil { - g.Log().Errorf(ctx, "注销队列服务 %s (队列: %s) 失败: %v", serviceName, queueName, err) - } - } - delete(queueRPCSubs, queueName) - delete(queueRPCServices, queueName) - } - queueRPCMu.Unlock() - - g.Log().Infof(ctx, "已注销 %d 个单实例服务和 %d 个队列服务", singleServiceCount, queueServiceCount) - - natsMu.Lock() - defer natsMu.Unlock() - - // 停止健康检查协程 - if healthCancel != nil { - healthCancel() - } - - // 关闭连接 - if nc != nil && !nc.IsClosed() { - nc.Close() - connected = false - inited = false - } - g.Log().Info(ctx, "NATS RPC 服务已优雅关闭") - return nil -} diff --git a/nats/nats_consumer.go b/nats/nats_consumer.go deleted file mode 100644 index 126000b..0000000 --- a/nats/nats_consumer.go +++ /dev/null @@ -1,294 +0,0 @@ -package nats - -import ( - "context" - "fmt" - "github.com/gogf/gf/v2/frame/g" - "github.com/nats-io/nats.go/jetstream" -) - -// AckPolicy 确认策略 -type AckPolicy string - -const ( - AckPolicyExplicit AckPolicy = "explicit" // 显式确认(默认) - AckPolicyAll AckPolicy = "all" // 确认所有消息 - AckPolicyNone AckPolicy = "none" // 不需要确认 -) - -// DeliverPolicy 投递策略 -type DeliverPolicy string - -const ( - DeliverPolicyAll DeliverPolicy = "all" // 投递所有消息(包括已投递的) - DeliverPolicyLast DeliverPolicy = "last" // 从最后一条消息开始 - DeliverPolicyNew DeliverPolicy = "new" // 仅投递新消息(默认) - DeliverPolicyLastPerSubj DeliverPolicy = "lastpersubj" // 每个主题的最后一条 - DeliverPolicyByStartSeq DeliverPolicy = "by_start_sequence" // 按起始序列号 -) - -// ReplayPolicy 重放策略 -type ReplayPolicy string - -const ( - ReplayPolicyInstant ReplayPolicy = "instant" // 立即重放 - ReplayPolicyOriginal ReplayPolicy = "original" // 按原始顺序重放 -) - -// ConsumerConfig 消费者配置 -type ConsumerConfig struct { - DurableName string // 持久化名称(空表示临时消费者) - Description string // 描述信息 - AckPolicy AckPolicy // 确认策略 - AckWait int // 确认等待时间(秒) - MaxDeliver int // 最大投递次数 - FilterSubject string // 过滤主题(流内多主题时使用) - DeliverPolicy DeliverPolicy // 投递策略 - ReplayPolicy ReplayPolicy // 重放策略 - MaxWaiting int // 最大等待消息数 - MaxAckPending int // 最大待确认消息数 - OptStartTime int64 // 起始时间戳 - OptStartSeq uint64 // 起始序列号 - HeadersOnly bool // 仅消费消息头 - Backoff []int // 退避策略(秒数数组) - RateLimit uint64 // 消息速率限制(消息/秒) - Replica int // 副本数 - FlowControl bool // 启用流控 - Metadata map[string]string // 元数据 -} - -// parseAckPolicy 解析确认策略 -func parseAckPolicy(policy AckPolicy) jetstream.AckPolicy { - switch policy { - case AckPolicyAll: - return jetstream.AckAllPolicy - case AckPolicyNone: - return jetstream.AckNonePolicy - default: - return jetstream.AckExplicitPolicy - } -} - -// parseDeliverPolicy 解析投递策略 -func parseDeliverPolicy(policy DeliverPolicy) jetstream.DeliverPolicy { - switch policy { - case DeliverPolicyAll: - return jetstream.DeliverAllPolicy - case DeliverPolicyLast: - return jetstream.DeliverLastPolicy - case DeliverPolicyLastPerSubj: - return jetstream.DeliverLastPerSubjectPolicy - case DeliverPolicyByStartSeq: - return jetstream.DeliverByStartSequencePolicy - default: - return jetstream.DeliverNewPolicy - } -} - -// parseReplayPolicy 解析重放策略 -func parseReplayPolicy(policy ReplayPolicy) jetstream.ReplayPolicy { - switch policy { - case ReplayPolicyOriginal: - return jetstream.ReplayOriginalPolicy - default: - return jetstream.ReplayInstantPolicy - } -} - -// CreateTaskConsumer 创建任务消费者 -// 核心设计思路: -// 1. 显式确认:确保消息被正确处理后才确认 -// 2. 重试机制:通过 MaxDeliver 控制最大重试次数 -// 3. 持久化:DurableName 确保消费者状态持久化 -// 4. 流控:防止消费者过载 -func CreateTaskConsumer(ctx context.Context, streamName string, config ConsumerConfig) (jetstream.Consumer, error) { - if !IsConnected() { - return nil, fmt.Errorf("NATS 未连接") - } - - if streamName == "" { - return nil, fmt.Errorf("流名称不能为空") - } - - // 设置默认值 - if config.AckPolicy == "" { - config.AckPolicy = AckPolicyExplicit // 默认显式确认 - } - if config.AckWait == 0 { - config.AckWait = 30 // 默认30秒确认超时 - } - if config.MaxDeliver == 0 { - config.MaxDeliver = 3 // 默认最多投递3次 - } - if config.DeliverPolicy == "" { - config.DeliverPolicy = DeliverPolicyNew // 默认仅消费新消息 - } - if config.ReplayPolicy == "" { - config.ReplayPolicy = ReplayPolicyInstant // 默认立即重放 - } - if config.MaxAckPending == 0 { - config.MaxAckPending = 1000 // 默认最多1000条待确认消息 - } - - // 构建消费者配置 - jsConfig := jetstream.ConsumerConfig{ - Name: config.DurableName, - Description: config.Description, - AckPolicy: parseAckPolicy(config.AckPolicy), - AckWait: 0, - MaxDeliver: config.MaxDeliver, - FilterSubjects: []string{config.FilterSubject}, - DeliverPolicy: parseDeliverPolicy(config.DeliverPolicy), - ReplayPolicy: parseReplayPolicy(config.ReplayPolicy), - MaxWaiting: config.MaxWaiting, - MaxAckPending: config.MaxAckPending, - HeadersOnly: config.HeadersOnly, - RateLimit: config.RateLimit, - Replicas: config.Replica, - Metadata: config.Metadata, - } - - // 配置流控和心跳 - if config.FlowControl { - jsConfig.FlowControl = true - } - // 配置起始位置 - if config.OptStartSeq > 0 { - jsConfig.OptStartSeq = config.OptStartSeq - } - - // 创建新消费者 - consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, jsConfig) - if err != nil { - return nil, fmt.Errorf("创建消费者失败: %w", err) - } - - // 记录配置信息 - configInfo := fmt.Sprintf("确认策略=%s, 最大投递=%d, 投递策略=%s", config.AckPolicy, config.MaxDeliver, config.DeliverPolicy) - if config.FilterSubject != "" { - configInfo += fmt.Sprintf(", 过滤主题=%s", config.FilterSubject) - } - g.Log().Infof(ctx, "✅ 任务消费者创建成功: %s/%s (%s)", streamName, config.DurableName, configInfo) - - return consumer, nil -} - -// CreateConsumerSimple 简化版创建消费者(适用于大多数场景) -// 只需提供流名称和消费者名称,其他使用默认配置 -func CreateConsumerSimple(ctx context.Context, streamName, durableName string) (err error) { - _, err = CreateTaskConsumer(ctx, streamName, ConsumerConfig{ - DurableName: durableName, - }) - return -} - -// CreateConsumerWithFilter 创建带主题过滤的消费者 -//func CreateConsumerWithFilter(ctx context.Context, streamName, durableName, filterSubject string) (jetstream.Consumer, error) { -// return CreateTaskConsumer(ctx, streamName, ConsumerConfig{ -// DurableName: durableName, -// FilterSubject: filterSubject, -// }) -//} - -// CreateConsumerEphemeral 创建临时消费者 -// 临时消费者没有持久化名称,连接断开后自动删除 -//func CreateConsumerEphemeral(ctx context.Context, streamName string) (jetstream.Consumer, error) { -// if !IsConnected() { -// return nil, fmt.Errorf("NATS 未连接") -// } -// -// jsConfig := jetstream.ConsumerConfig{ -// AckPolicy: jetstream.AckNonePolicy, -// AckWait: 0, -// MaxDeliver: 3, -// DeliverPolicy: jetstream.DeliverNewPolicy, -// ReplayPolicy: jetstream.ReplayInstantPolicy, -// MaxAckPending: 1000, -// } -// -// consumer, err := js.CreateConsumer(ctx, streamName, jsConfig) -// if err != nil { -// return nil, fmt.Errorf("创建临时消费者失败: %w", err) -// } -// -// g.Log().Infof(ctx, "✅ 临时消费者创建成功: %s", streamName) -// return consumer, nil -//} - -// CreateConsumerPushMode 创建推送模式消费者 -// 推送模式下,NATS 服务器主动将消息推送给消费者 -func CreateConsumerPushMode(ctx context.Context, streamName, durableName, subject string, msgCount int) (err error) { - _, err = CreateTaskConsumer(ctx, streamName, ConsumerConfig{ - DurableName: durableName, - FilterSubject: subject, - MaxAckPending: msgCount, - }) - return -} - -// CreateConsumerPullMode 创建拉取模式消费者 -// 拉取模式下,消费者主动从服务器拉取消息 -//func CreateConsumerPullMode(ctx context.Context, streamName, durableName string) (jetstream.Consumer, error) { -// return CreateTaskConsumer(ctx, streamName, ConsumerConfig{ -// DurableName: durableName, -// DeliverPolicy: DeliverPolicyAll, -// MaxAckPending: 500, // 拉取模式下待确认消息数可以设置小一些 -// }) -//} - -// ConsumeMessages 消费消息(推送模式) -func ConsumeMessages(ctx context.Context, streamName, consumerName string, handler jetstream.MessageHandler) error { - if !IsConnected() { - return fmt.Errorf("NATS 未连接") - } - // 获取消费者 - consumer, err := js.Consumer(ctx, streamName, consumerName) - if err != nil { - return fmt.Errorf("获取消费者失败: %w", err) - } - - // 业务处理 - //if err := handler(ctx, streamMsg.Values); err != nil { - // glog.Infof(ctx, "业务处理失败-> err:%v\n", err) - // continue - //} - //// 确认消息 - //if msg.AutoAck { - // err := ackMessage(ctx, msg.StreamKey, msg.GroupName, streamMsg.ID) - // if err != nil { - // glog.Infof(ctx, "消费者 '%s' 确认消息 ID %s 失败: %v\n", msg.ConsumerName, streamMsg.ID, err) - // } - //} - //// 创建消息处理函数 - //handler = func(msg jetstream.Msg) { - // // 解析消息 - // var task TaskMessage - // if err := json.Unmarshal(msg.Data(), &task); err != nil { - // g.Log().Errorf(ctx, "解析消息失败: %v", err) - // msg.Nak() // 拒绝消息,触发重试 - // return - // } - // - // // 处理业务逻辑 - // g.Log().Infof(ctx, "处理任务: %s", task.TaskID) - // - // // 处理成功,确认消息 - // msg.Ack() - //} - - // 开始消费 - _, err = consumer.Consume(handler) - if err != nil { - return fmt.Errorf("开始消费失败: %w", err) - } - - g.Log().Infof(ctx, "✅ 开始消费消息: %s/%s", streamName, consumerName) - return nil -} - -// 定义消息结构 -type TaskMessage struct { - TaskID string `json:"task_id"` - TaskType string `json:"task_type"` - Data string `json:"data"` -} diff --git a/nats/nats_publish.go b/nats/nats_publish.go deleted file mode 100644 index 32e3b20..0000000 --- a/nats/nats_publish.go +++ /dev/null @@ -1,28 +0,0 @@ -package nats - -import ( - "context" - "encoding/json" - "fmt" -) - -// publish 发布消息到指定主题 -func publish(ctx context.Context, subject string, data any) (err error) { - if !IsConnected() { - return fmt.Errorf("NATS 未连接") - } - // 序列化数据 - dataBytes, err := json.Marshal(data) - if err != nil { - return fmt.Errorf("序列化数据失败: %w", err) - } - // 发布消息 - metrics.PublishCount.Add(1) - _, err = js.Publish(ctx, subject, dataBytes) - if err != nil { - metrics.PublishError.Add(1) - return fmt.Errorf("发布消息失败: %w", err) - } - - return -} diff --git a/nats/nats_rpc.go b/nats/nats_rpc.go deleted file mode 100644 index e95514b..0000000 --- a/nats/nats_rpc.go +++ /dev/null @@ -1,752 +0,0 @@ -package nats - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "github.com/gogf/gf/v2/frame/g" - "github.com/nats-io/nats.go" - "go.opentelemetry.io/otel/trace" - "reflect" - "sync" -) - -// ============ RPC 服务封装 ============ -// 以下方法提供了完全抽象的 RPC 调用接口 -// 调用方和响应方完全不需要知道底层使用的是 NATS 的发布订阅模式 - -// RPC 服务注册表 -var ( - rpcServices map[string]rpcHandler - rpcSubs map[string]*nats.Subscription // 服务名 -> 订阅 - rpcServicesMu sync.RWMutex - queueRPCServices map[string]map[string]rpcHandler // queueName -> subject -> handler - queueRPCSubs map[string]map[string]*nats.Subscription // queueName -> serviceName -> 订阅 - queueRPCMu sync.RWMutex - - // ============ TraceID 主动取消支持 ============ - // 全局映射表:TraceID -> CancelFunc,并发安全 - traceCancelMap map[string]context.CancelFunc - traceCancelMu sync.RWMutex - // 取消主题前缀 - cancelSubjectPrefix = "ctx.cancel.otel." -) - -// rpcHandler RPC 处理函数类型 -// 实现方只需要关注请求参数和返回值,无需了解底层 NATS 实现 -// 返回值可以是任意类型,会被自动序列化为 JSON -type rpcHandler func(ctx context.Context, req []byte) (any, error) - -// RegisterRPCService 注册 RPC 服务(单实例) -// serviceName: 服务名称,调用方通过此名称调用服务 -// handler: 服务处理函数,接收请求并返回响应 -func registerRPCService(serviceName string, handler rpcHandler) (err error) { - if !checkConnected() { - return fmt.Errorf("NATS 未连接") - } - - rpcServicesMu.Lock() - if rpcServices == nil { - rpcServices = make(map[string]rpcHandler) - } - if rpcSubs == nil { - rpcSubs = make(map[string]*nats.Subscription) - } - - // 如果已存在该服务,先取消之前的订阅 - if oldSub, exists := rpcSubs[serviceName]; exists { - oldSub.Unsubscribe() - } - - rpcServices[serviceName] = handler - rpcServicesMu.Unlock() - - // 订阅服务主题 - subject := fmt.Sprintf("rpc.%s", serviceName) - sub, err := nc.Subscribe(subject, func(msg *nats.Msg) { - // 执行处理函数 - executeHandler(handler, msg) - }) - - if err != nil { - return fmt.Errorf("注册 RPC 服务失败: %w", err) - } - - rpcSubs[serviceName] = sub - metrics.SubscribeCount.Add(1) - g.Log().Infof(context.Background(), "✅ RPC 服务已注册: %s", serviceName) - return nil -} - -// RegisterQueueRPCService 注册 RPC 服务(集群模式) -// 多个服务实例注册同一服务时,请求会自动负载均衡 -// serviceName: 服务名称 -// queueName: 队列组名,同一队列组的实例共享请求 -// handler: 服务处理函数 -func registerQueueRPCService(serviceName, queueName string, handler rpcHandler) (err error) { - if !checkConnected() { - return fmt.Errorf("NATS 未连接") - } - - queueRPCMu.Lock() - if queueRPCServices == nil { - queueRPCServices = make(map[string]map[string]rpcHandler) - } - if queueRPCSubs == nil { - queueRPCSubs = make(map[string]map[string]*nats.Subscription) - } - if queueRPCServices[queueName] == nil { - queueRPCServices[queueName] = make(map[string]rpcHandler) - } - if queueRPCSubs[queueName] == nil { - queueRPCSubs[queueName] = make(map[string]*nats.Subscription) - } - - // 如果已存在该服务,先取消之前的订阅 - if oldSub, exists := queueRPCSubs[queueName][serviceName]; exists { - oldSub.Unsubscribe() - } - - queueRPCServices[queueName][serviceName] = handler - queueRPCMu.Unlock() - - // 订阅服务主题(队列模式) - subject := fmt.Sprintf("rpc.%s", serviceName) - sub, err := nc.QueueSubscribe(subject, queueName, func(msg *nats.Msg) { - // 执行处理函数 - executeHandler(handler, msg) - }) - - if err != nil { - return fmt.Errorf("注册队列 RPC 服务失败: %w", err) - } - - queueRPCMu.Lock() - queueRPCSubs[queueName][serviceName] = sub - queueRPCMu.Unlock() - - metrics.SubscribeCount.Add(1) - g.Log().Infof(context.Background(), "✅ 队列 RPC 服务已注册: %s (队列组: %s)", serviceName, queueName) - return nil -} - -// executeHandler 执行 RPC 处理函数 -func executeHandler(handler rpcHandler, msg *nats.Msg) { - // 响应 - var respData []byte - // 从消息头重建上下文 - ctx := headersToContext(context.Background(), msg.Header) - // 提取 TraceID,创建可取消的 context - ctx = createCancelContext(ctx, msg.Header.Get(TraceIDKey)) - // 检查 context 是否已取消(在调用 handler 之前) - select { - case <-ctx.Done(): - // context 已取消,返回取消错误 - g.Log().Infof(ctx, "RPC 请求已取消,traceID: %s", msg.Header.Get(TraceIDKey)) - // 仍然需要发送响应以避免客户端超时 - respData = []byte(`{"_err":"请求已取消"}`) - // 清理取消映射表 - cleanupTraceCancel(msg.Header.Get(TraceIDKey)) - return - default: - } - - // 执行业务处理 - response, err := handler(ctx, msg.Data) - - if err != nil { - // 错误时返回 {"_err": "错误信息"} - if respData, err = json.Marshal(map[string]any{"_err": err.Error()}); err != nil { - g.Log().Errorf(ctx, "RPC 错误响应序列化失败: %v", err) - respData = []byte(`{"_err":"错误响应序列化失败"}`) - } - } else if response == nil { - // 空响应时返回空对象(或 {"_err": ""}) - respData = []byte(`{}`) - } else { - // 成功时返回业务数据 - if respData, err = json.Marshal(response); err != nil { - g.Log().Errorf(ctx, "RPC 响应序列化失败: %v", err) - respData = []byte(`{"_err":"响应序列化失败"}`) - } - } - // 发送响应(必须执行) 如果客户端用 nc.Request(...) 发送消息 → 双向模式,服务端必须 msg.Respond - if err = msg.Respond(respData); err != nil { - g.Log().Errorf(ctx, "RPC 响应失败: %v", err) - } - // 请求结束,清理取消映射表 - cleanupTraceCancel(msg.Header.Get(TraceIDKey)) -} - -// createCancelContext 创建可取消的 context 并注册到取消映射表 -// 返回可取消的 context(如果 traceID 为空则返回原 context) -func createCancelContext(ctx context.Context, traceID string) context.Context { - if g.IsEmpty(traceID) { - return ctx - } - // 创建带取消功能的 context - taskCtx, cancel := context.WithCancel(ctx) - // 注册到取消映射表 - traceCancelMu.Lock() - if traceCancelMap == nil { - traceCancelMap = make(map[string]context.CancelFunc) - } - // 如果同一 TraceID 已有 CancelFunc,先调用它 - if oldCancel, exists := traceCancelMap[traceID]; exists { - oldCancel() - } - traceCancelMap[traceID] = cancel - traceCancelMu.Unlock() - - return taskCtx -} - -// ============ TraceID 主动取消功能 ============ -// 以下函数实现了基于 OpenTelemetry TraceID 的跨进程任务取消机制 - -// SetupCancelListener 设置取消监听器 -// 订阅取消主题,监听取消指令 -// 使用示例: -// -// sub, err := nats.SetupCancelListener(ctx) -func setupCancelListener(ctx context.Context) (*nats.Subscription, error) { - if !checkConnected() { - return nil, fmt.Errorf("NATS 未连接") - } - - if traceCancelMap == nil { - traceCancelMap = make(map[string]context.CancelFunc) - } - - // 修复问题3:订阅取消主题,格式: ctx.cancel.otel.* - // 使用 * 通配符而不是 >,因为 TraceID 是最后一部分 - cancelSubject := cancelSubjectPrefix + "*" - sub, err := nc.Subscribe(cancelSubject, func(msg *nats.Msg) { - // 从主题中解析 TraceID (去除前缀) - prefixLen := len(cancelSubjectPrefix) - if len(msg.Subject) <= prefixLen { - g.Log().Warningf(ctx, "取消消息主题格式错误: %s", msg.Subject) - return - } - traceID := msg.Subject[prefixLen:] - - if traceID == "" { - g.Log().Warning(ctx, "取消消息主题缺少 TraceID") - return - } - - // 从映射表获取 CancelFunc 并执行取消 - traceCancelMu.RLock() - cancel, ok := traceCancelMap[traceID] - traceCancelMu.RUnlock() - - if ok { - cancel() - g.Log().Infof(ctx, "📢 取消信号已发送,traceID: %s", traceID) - } else { - g.Log().Infof(ctx, "⚠️ 未找到对应的可取消任务,traceID: %s", traceID) - } - }) - - if err != nil { - return nil, fmt.Errorf("设置取消监听器失败: %w", err) - } - - metrics.SubscribeCount.Add(1) - g.Log().Infof(ctx, "✅ 取消监听器已设置: %s", cancelSubject) - return sub, nil -} - -// publishCancel 发布取消指令 -// 向指定 TraceID 发送取消信号 -// 使用示例: -// -// err := nats.publishCancel(ctx, traceID) -func publishCancel(ctx context.Context, traceID string) error { - if !checkConnected() { - return fmt.Errorf("NATS 未连接") - } - - if traceID == "" { - return fmt.Errorf("TraceID 不能为空") - } - - cancelSubject := cancelSubjectPrefix + traceID - err := nc.Publish(cancelSubject, nil) - if err != nil { - return fmt.Errorf("发布取消信号失败: %w", err) - } - - g.Log().Infof(ctx, "📤 已发送取消信号,traceID: %s,主题: %s", traceID, cancelSubject) - return nil -} - -// cleanupTraceCancel 清理取消映射表中的条目 -// 任务取消/正常结束后必须调用此函数,避免内存泄漏 -// 使用示例: -// -// defer nats.cleanupTraceCancel(traceID) -func cleanupTraceCancel(traceID string) { - if traceID == "" { - return - } - - traceCancelMu.Lock() - defer traceCancelMu.Unlock() - - if _, ok := traceCancelMap[traceID]; ok { - delete(traceCancelMap, traceID) - g.Log().Infof(context.Background(), "✅ 已清理取消映射表,traceID: %s", traceID) - } -} - -// CallRPC 调用 RPC 服务 -// serviceName: 服务名称 -// req: 请求数据 -// 返回: 响应数据(任意类型)和错误 -func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err error) { - if !checkConnected() { - return fmt.Errorf("NATS 未连接") - } - - metrics.RequestCount.Add(1) - - // 验证 resp 必须是指针类型 - respValue := reflect.ValueOf(resp) - if respValue.Kind() != reflect.Ptr { - return fmt.Errorf("resp 参数必须是指针类型(当前类型: %T)", resp) - } - - // 构建请求体 - var reqBody []byte - if !g.IsEmpty(req) { - reqValue := reflect.ValueOf(req) - if !(reqValue.Kind() == reflect.Ptr && reqValue.IsNil()) && !reqValue.IsZero() { - reqData, err := json.Marshal(req) - if err != nil { - return fmt.Errorf("序列化请求参数失败: %w", err) - } - reqBody = reqData - } - } - - // 检查本地是否有注册的单实例服务,如果有则直接调用(优化性能) - rpcServicesMu.RLock() - if localHandler, exists := rpcServices[serviceName]; exists { - rpcServicesMu.RUnlock() - - // 修复问题1:本地调用也需要处理取消机制 - var traceID string - if traceID, err = getTraceID(ctx); err != nil { - return err - } - // 提取 TraceID,创建可取消的 context - cancelCtx := createCancelContext(ctx, traceID) - // 执行本地调用 - var response interface{} - if response, err = localHandler(cancelCtx, reqBody); err != nil { - metrics.RequestError.Add(1) - return fmt.Errorf("本地调用 RPC 服务失败 [%s]: %w", serviceName, err) - } - - // 请求结束,清理取消映射表 - cleanupTraceCancel(traceID) - - // 检查是否为错误消息:尝试解析为 map,看是否包含 "_err" 字段 - var respMap map[string]any - if json.Unmarshal(response.([]byte), &respMap) == nil { - if errMsg, ok := respMap["_err"]; ok { - metrics.RequestError.Add(1) - return fmt.Errorf("%v", errMsg) - } - } - // 正常数据直接返回 - // responseMsg.Data 已经是 []byte 类型(来自 msg.Data),直接反序列化 - if err = json.Unmarshal(response.([]byte), resp); err != nil { - return fmt.Errorf("解析响应失败: %w (响应内容: %s)", err, response) - } - - return - } - rpcServicesMu.RUnlock() - - subject := fmt.Sprintf("rpc.%s", serviceName) - - // 创建消息并将上下文元数据写入消息头 - msg := nats.NewMsg(subject) - msg.Data = reqBody - headers, err := contextToHeaders(ctx) - if err != nil { - return fmt.Errorf("上下文转换失败: %w", err) - } - msg.Header = headers - - // 修复问题5:优化 go 协程避免资源泄漏 - // 使用 done channel 来确保 goroutine 能正确退出 - done := make(chan struct{}) - var closeDoneOnce sync.Once - closeDone := func() { - closeDoneOnce.Do(func() { - close(done) - }) - } - - if msg.Header.Get(TraceIDKey) != "" { - go func() { - defer closeDone() - select { - case <-ctx.Done(): - // context 被取消时,发送取消信号给服务端 - if errors.Is(ctx.Err(), context.Canceled) { - if err := publishCancel(context.Background(), msg.Header.Get(TraceIDKey)); err != nil { - g.Log().Errorf(ctx, "发送 RPC 取消信号失败: %v", err) - } else { - g.Log().Infof(ctx, "RPC 调用已取消,traceID: %s", msg.Header.Get(TraceIDKey)) - } - } - case <-done: - // 请求已完成,无需发送取消信号 - return - } - }() - } - - // 发送请求 - responseMsg, err := nc.RequestMsgWithContext(ctx, msg) - - // 关闭 done channel,通知 goroutine 退出 - closeDone() - - if err != nil { - metrics.RequestError.Add(1) - return fmt.Errorf("调用 RPC 服务失败 [%s]: %w", serviceName, err) - } - - if responseMsg == nil { - metrics.RequestError.Add(1) - return fmt.Errorf("RPC 响应为空 [%s]", serviceName) - } - - // 解析响应 - if len(responseMsg.Data) > 0 { - // 检查是否为错误消息:尝试解析为 map,看是否包含 "_err" 字段 - var respMap map[string]any - if json.Unmarshal(responseMsg.Data, &respMap) == nil { - if errMsg, ok := respMap["_err"]; ok { - metrics.RequestError.Add(1) - return fmt.Errorf("%v", errMsg) - } - } - // 正常数据直接返回 - // responseMsg.Data 已经是 []byte 类型(来自 msg.Data),直接反序列化 - if err = json.Unmarshal(responseMsg.Data, resp); err != nil { - return fmt.Errorf("解析响应失败: %w (响应内容: %s)", err, responseMsg.Data) - } - } - - return -} - -// RegisterServiceOption 注册选项类型 -type RegisterServiceOption func(*registerServiceConfig) - -type registerServiceConfig struct { - queueName string // 队列组名(用于集群模式) - excludeMethods []string -} - -// WithQueueGroup 设置队列组名(集群模式) -func WithQueueGroup(queueName string) RegisterServiceOption { - return func(cfg *registerServiceConfig) { - cfg.queueName = queueName - } -} - -// WithExcludeMethods 排除不需要注册的方法 -func WithExcludeMethods(methods ...string) RegisterServiceOption { - return func(cfg *registerServiceConfig) { - cfg.excludeMethods = append(cfg.excludeMethods, methods...) - } -} - -// AutoRegisterServices 自动注册多个服务的所有公开方法 -// serviceInstances: map[包名]service实例,如 map[string]interface{}{"user": userService, "order": orderService} -// options: 注册选项(可选) -// 示例: -// -// AutoRegisterServices(map[string]interface{}{ -// "user": userService, -// "order": orderService, -// }) -// 或 -// AutoRegisterServices(map[string]interface{}{ -// "order": orderService, -// }, WithQueueGroup("order-group")) -func AutoRegisterServices(ctx context.Context, serviceInstances map[string]interface{}, options ...RegisterServiceOption) error { - // 先注册 RPC 服务(如果 NATS 不可用则记录警告但不阻塞启动) - if !checkConnected() { - return fmt.Errorf("NATS 未连接,RPC 服务未注册") - } - - if len(serviceInstances) == 0 { - return fmt.Errorf("service 实例列表不能为空") - } - - totalRegistered := 0 - // 遍历每个 service 实例 - for pkgName, serviceInstance := range serviceInstances { - // 注册服务 - err := registerService(serviceInstance, pkgName, options...) - if err != nil { - g.Log().Errorf(ctx, "注册 %s 服务失败: %v", pkgName, err) - continue - } - totalRegistered++ - g.Log().Infof(ctx, "✅ %s 服务已自动注册", pkgName) - } - - if totalRegistered == 0 { - return fmt.Errorf("未能注册任何服务") - } - // 设置取消监听器(监听基于 TraceID 的取消请求) - if _, err := setupCancelListener(ctx); err != nil { - g.Log().Errorf(ctx, "设置取消监听器失败: %v", err) - } else { - g.Log().Infof(ctx, "✅ 取消监听器已自动设置") - } - g.Log().Infof(ctx, "✅ 共自动注册了 %d 个服务", totalRegistered) - - return nil -} - -// registerService 注册单个服务的所有公开方法(内部函数) -func registerService(service interface{}, serviceNamePrefix string, options ...RegisterServiceOption) (err error) { - if !checkConnected() { - return fmt.Errorf("NATS 未连接") - } - - // 应用选项 - cfg := ®isterServiceConfig{} - for _, opt := range options { - opt(cfg) - } - - // 创建排除方法集合 - excludeSet := make(map[string]struct{}) - for _, method := range cfg.excludeMethods { - excludeSet[method] = struct{}{} - } - - // 获取 service 的类型 - serviceType := reflect.TypeOf(service) - - // 遍历所有方法 - registeredCount := 0 - for i := 0; i < serviceType.NumMethod(); i++ { - method := serviceType.Method(i) - - // 只注册导出方法(首字母大写) - if !method.IsExported() { - continue - } - - // 排除指定的方法 - if _, exists := excludeSet[method.Name]; exists { - continue - } - - // 检查方法签名:必须是 func(ctx context.Context, request) (response, error) - // 注意:method.Type.NumIn() 包含接收者,所以实际参数数量需要减去 1 - // 要求:接收者 + context.Context + request,总共3个参数 - if method.Type.NumIn() != 3 { - g.Log().Warningf(context.Background(), "方法 %s 必须有2个参数(context.Context 和请求参数),跳过注册", method.Name) - continue - } - - // 第一个参数(接收者之后的第一个参数)必须是 context.Context - // method.Type.In(0) 是接收者,method.Type.In(1) 才是第一个参数 - if !method.Type.In(1).Implements(reflect.TypeOf((*context.Context)(nil)).Elem()) { - g.Log().Warningf(context.Background(), "方法 %s 的第一个参数必须是 context.Context,跳过注册", method.Name) - continue - } - - // 第二个参数必须是结构体指针或数组 - reqType := method.Type.In(2) - if reqType.Kind() != reflect.Ptr && reqType.Kind() != reflect.Slice && reqType.Kind() != reflect.Array { - g.Log().Warningf(context.Background(), "方法 %s 的第二个参数必须是结构体指针或数组,跳过注册", method.Name) - continue - } - - // 返回值必须是 (result, error),即2个返回值 - if method.Type.NumOut() != 2 { - g.Log().Warningf(context.Background(), "方法 %s 必须有2个返回值(result 和 error),跳过注册", method.Name) - continue - } - - // 最后一个返回值必须是 error - if !method.Type.Out(1).Implements(reflect.TypeOf((*error)(nil)).Elem()) { - g.Log().Warningf(context.Background(), "方法 %s 的最后一个返回值必须是 error,跳过注册", method.Name) - continue - } - - // 生成服务名称:前缀.方法名(保持原始方法名) - serviceName := fmt.Sprintf("%s.%s", serviceNamePrefix, method.Name) - - // 创建 RPC handler - handler := func(ctx context.Context, req []byte) (any, error) { - // 准备方法调用参数 - // args[0] 是接收者, args[1] 是 ctx, args[2] 是请求参数 - args := make([]reflect.Value, 3) - args[0] = reflect.ValueOf(service) // 接收者 - args[1] = reflect.ValueOf(ctx) // context.Context - - // 解析请求参数 - if len(req) > 0 { - reqValuePtr := reflect.New(reqType) - - // 解析 JSON - if err := json.Unmarshal(req, reqValuePtr.Interface()); err != nil { - // 根据参数类型提供更友好的错误提示 - var typeHint string - if reqType.Kind() == reflect.Ptr { - typeHint = fmt.Sprintf("(期望类型: %s)", reqType.Elem().Name()) - } else { // reflect.Slice 或 reflect.Array - typeHint = fmt.Sprintf("(期望类型: %s,请确保客户端传递的是JSON数组格式)", reqType.String()) - } - return nil, fmt.Errorf("解析请求参数失败%s: %w", typeHint, err) - } - args[2] = reqValuePtr.Elem() - } else { - // 请求为空,创建零值 - args[2] = reflect.Zero(method.Type.In(2)) - } - - // 调用方法 - results := method.Func.Call(args) - - // 处理返回值 - var result any - - if len(results) == 1 { - // 只有 error - if !results[0].IsNil() { - err = results[0].Interface().(error) - } - } else if len(results) == 2 { - // (result, error) - result = results[0].Interface() - if !results[1].IsNil() { - err = results[1].Interface().(error) - } - } - if err != nil { - return nil, err - } - - return result, nil - } - - // 注册 RPC 服务 - var err error - if cfg.queueName != "" { - err = registerQueueRPCService(serviceName, cfg.queueName, handler) - } else { - err = registerRPCService(serviceName, handler) - } - - if err != nil { - g.Log().Errorf(context.Background(), "注册服务 %s 失败: %v", serviceName, err) - continue - } - - registeredCount++ - g.Log().Infof(context.Background(), "✅ 已自动注册 RPC 服务: %s -> %s", serviceName, method.Name) - } - - if registeredCount == 0 { - g.Log().Warningf(context.Background(), "未注册任何方法,请检查 %v 的方法签名", serviceNamePrefix) - return fmt.Errorf("未找到可注册的方法") - } - - g.Log().Infof(context.Background(), "✅ Service %v 共注册了 %d 个 RPC 方法", serviceNamePrefix, registeredCount) - return nil -} - -// ============ 上下文元数据工具函数 ============ -// 以下函数用于在 context 和 NATS 消息头之间互转元数据 - -// 定义常见的上下文元数据 key -const ( - TraceIDKey = "trace_id" - TokenKey = "token" -) - -func getTraceID(ctx context.Context) (traceID string, err error) { - // 提取 traceId:首先尝试从 OpenTelemetry Span 中提取,从 context 中提取 TraceID - span := trace.SpanFromContext(ctx) - if span != nil && span.SpanContext().HasTraceID() { - traceID = span.SpanContext().TraceID().String() - } else if tid := ctx.Value(TraceIDKey); tid != nil { - traceID = fmt.Sprintf("%v", tid) - } - if traceID == "" { - return traceID, fmt.Errorf("context 中没有 TraceID") - } - return -} - -// contextToHeaders 将 context 中的元数据转换为 NATS 消息头 -// 支持提取 user_id、tenant_id、trace_id、token 等常见字段 -func contextToHeaders(ctx context.Context) (nats.Header, error) { - headers := make(nats.Header) - - // 提取 traceId:首先尝试从 OpenTelemetry Span 中提取 - if traceID, err := getTraceID(ctx); err != nil { - return headers, err - } else { - headers.Set(TraceIDKey, traceID) - } - - // 提取 token(优先级:context value > HTTP Authorization header) - token := "" - if t := ctx.Value(TokenKey); t != nil { - token = fmt.Sprintf("%v", t) - } else if r := g.RequestFromCtx(ctx); r != nil { - // 从 HTTP 请求的 Authorization header 中提取 token - auth := r.GetHeader("Authorization") - if auth != "" { - // 移除 "Bearer " 前缀 - if len(auth) > 7 && auth[:7] == "Bearer " { - token = auth[7:] - } else { - token = auth - } - } - } - if token != "" { - headers.Set(TokenKey, token) - } - - return headers, nil -} - -// headersToContext 从 NATS 消息头重建 context -// 支持还原 user_id、tenant_id、trace_id、token 等字段 -func headersToContext(ctx context.Context, headers nats.Header) context.Context { - if headers == nil { - return ctx - } - - // 恢复 trace_id - if traceID := headers.Get(TraceIDKey); traceID != "" { - ctx = context.WithValue(ctx, TraceIDKey, traceID) - } - - // 恢复 token - if token := headers.Get(TokenKey); token != "" { - ctx = context.WithValue(ctx, TokenKey, token) - } - - return ctx -} diff --git a/nats/nats_task.go b/nats/nats_task.go deleted file mode 100644 index f6841ad..0000000 --- a/nats/nats_task.go +++ /dev/null @@ -1,212 +0,0 @@ -package nats - -import ( - "context" - "fmt" - "github.com/gogf/gf/v2/frame/g" - "github.com/nats-io/nats.go/jetstream" - "time" -) - -// TaskStreamConfig 任务流配置 -type TaskStreamConfig struct { - StreamName string // 流名称 - Subjects []string // 主题数组(支持任务优先级,如 ["tasks.high","tasks.normal", "tasks.low"]) - PublishSubject string // 发布使用的主题(仅用于记录,不影响流配置) - Storage StorageType // 存储类型 - Retention RetentionType // 保留策略 - MaxAge time.Duration // 最大保留时间 - Duplicates time.Duration // 消息去重窗口时间 - Replicas int // 副本数 - MaxMsgSize int32 // 单条消息最大大小(字节) - MaxBytes int64 // 流最大存储大小(字节) - MaxMsgs int64 // 流中最大消息数 - MaxMsgsPerSubject int64 // 每个主题最大消息数 - MaxConsumers int // 最大消费者数量 - DenyPurge bool // 是否禁止清理流 - AllowRollup bool // 是否允许汇总消息 - DenyDelete bool // 是否禁止删除 - DiscardPerSubject bool // 是否按主题限制(工作队列模式) - Republish *RePublishConfig // 死信队列重新发布配置 -} - -// RePublishConfig 重新发布配置(用于死信队列) -type RePublishConfig struct { - Source string // 源主题 - Destination string // 目标主题 - HeadersOnly bool // 仅复制消息头 -} - -// StorageType 存储类型 -type StorageType string - -const ( - StorageFile StorageType = "file" // 文件存储(持久化) - StorageMemory StorageType = "memory" // 内存存储 -) - -// RetentionType 保留策略 -type RetentionType string - -const ( - RetentionLimit RetentionType = "limit" // 消息数量限制 - RetentionPolicy RetentionType = "interest" // 基于兴趣 - RetentionWorkQueue RetentionType = "workqueue" // 工作队列 -) - -// parseStorageType 解析存储类型 -func parseStorageType(st StorageType) jetstream.StorageType { - switch st { - case StorageMemory: - return jetstream.MemoryStorage - default: - return jetstream.FileStorage - } -} - -// parseRetentionType 解析保留策略 -func parseRetentionType(rt RetentionType) jetstream.RetentionPolicy { - switch rt { - case RetentionLimit: - return jetstream.LimitsPolicy - case RetentionPolicy: - return jetstream.InterestPolicy - default: - return jetstream.WorkQueuePolicy - } -} - -// createTaskStreamSimple 简化版创建任务流(适用于大多数场景) -// 只需提供流名称和主题数组,其他使用默认配置 -func createTaskStreamSimple(ctx context.Context, streamName string, subjects []string) error { - return createTaskStream(ctx, TaskStreamConfig{ - StreamName: streamName, - Subjects: subjects, - }) -} - -// createTaskStreamWithPriority 创建支持优先级的任务流 -func createTaskStreamWithPriority(ctx context.Context, streamPrefix string) error { - subjects := []string{ - fmt.Sprintf("%s.high.>", streamPrefix), - fmt.Sprintf("%s.normal.>", streamPrefix), - fmt.Sprintf("%s.low.>", streamPrefix), - } - return createTaskStream(ctx, TaskStreamConfig{ - StreamName: streamPrefix, - Subjects: subjects, - }) -} - -// CreateTaskStream 配置: 文件存储 + 工作队列策略 -// CreateTaskStream 创建任务消息队列流(JetStream 2.10+) -// 核心设计思路: -// 1. 严格持久化:使用文件存储,任务消息不会因为服务器重启而丢失 -// 2. 支持任务优先级:通过主题分级实现,如 ["tasks.high", "tasks.low"] -// 3. 死信队列支持:通过 RePublish 配置将失败任务路由到专门的 DLQ 流 -// 4. 灵活保留策略:根据任务重要性设置不同的保留时长(MaxAge) -// 5. 工作队列模式:确保每个任务只被一个消费者处理(DiscardPerSubject) -func createTaskStream(ctx context.Context, config TaskStreamConfig) error { - if !IsConnected() { - return fmt.Errorf("NATS 未连接") - } - - if g.IsNil(config.StreamName) { - return fmt.Errorf("流名称不能为空") - } - if len(config.Subjects) == 0 { - return fmt.Errorf("主题数组不能为空") - } - // 设置默认值 - if config.Storage == "" { - config.Storage = StorageFile // 默认文件存储 - } - if config.Retention == "" { - config.Retention = RetentionWorkQueue // 默认工作队列策略 - } - if config.MaxAge == 0 { - config.MaxAge = 24 * time.Hour // 默认保留24小时 - } - if config.Replicas == 0 { - config.Replicas = 1 // 默认单副本 - } - if config.MaxBytes == 0 { - config.MaxBytes = 10 * 1024 * 1024 * 1024 // 默认10GB - } - if config.MaxMsgs == 0 { - config.MaxMsgs = 100000 // 默认10万条消息 - } - if config.MaxMsgSize == 0 { - config.MaxMsgSize = 1024 * 1024 // 默认1MB - } - - if config.DiscardPerSubject { - config.DenyDelete = true // 工作队列模式下禁止删除 - } - - // 构建流配置 - jsConfig := jetstream.StreamConfig{ - Name: config.StreamName, - Subjects: config.Subjects, - Storage: parseStorageType(config.Storage), - Retention: parseRetentionType(config.Retention), - MaxAge: config.MaxAge, - Duplicates: config.Duplicates, - Replicas: config.Replicas, - MaxMsgSize: config.MaxMsgSize, - MaxBytes: config.MaxBytes, - MaxMsgs: config.MaxMsgs, - MaxMsgsPerSubject: config.MaxMsgsPerSubject, - MaxConsumers: config.MaxConsumers, - AllowRollup: config.AllowRollup, - DenyDelete: config.DenyDelete, - DenyPurge: config.DenyPurge, - Discard: jetstream.DiscardOld, // 默认删除旧消息 - DiscardNewPerSubject: config.DiscardPerSubject, - } - - // 配置死信队列重新发布(如果设置了) - if config.Republish != nil { - jsConfig.RePublish = &jetstream.RePublish{ - Source: config.Republish.Source, - Destination: config.Republish.Destination, - HeadersOnly: config.Republish.HeadersOnly, - } - } else { - // 使用固定的死信队列命名规范:{StreamName}.DLQ - dlqSubject := fmt.Sprintf("%s.DLQ", config.StreamName) - // 死信队列配置 - jsConfig.RePublish = &jetstream.RePublish{ - Source: ">", - Destination: dlqSubject, - HeadersOnly: true, - } - } - - // 检查流是否已存在 - stream, err := js.Stream(ctx, config.StreamName) - if err == nil { - // 流已存在,更新配置 - _, err = js.UpdateStream(ctx, jsConfig) - if err != nil { - return fmt.Errorf("更新任务流失败: %w", err) - } - g.Log().Infof(ctx, "✅ 任务消息队列流已更新: %s", stream.CachedInfo().Config.Name) - return nil - } - - // 创建新流 - stream, err = js.CreateStream(ctx, jsConfig) - if err != nil { - return fmt.Errorf("创建任务流失败: %w", err) - } - - // 记录配置信息 - configInfo := fmt.Sprintf("存储=%s, 策略=%s, 副本=%d, 保留=%v", config.Storage, config.Retention, config.Replicas, config.MaxAge) - if config.Republish != nil { - configInfo += fmt.Sprintf(", 死信队列=%s->%s", config.Republish.Source, config.Republish.Destination) - } - g.Log().Infof(ctx, "✅ 任务消息队列流创建成功: %s (%s)", stream.CachedInfo().Config.Name, configInfo) - - return nil -} diff --git a/nats/nats_test.go b/nats/nats_test.go deleted file mode 100644 index 94e1262..0000000 --- a/nats/nats_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package nats - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/nats-io/nats.go/jetstream" -) - -// TestNatsBasicOperations 测试基础操作 -func TestNatsBasicOperations(t *testing.T) { - // 测试连接状态 - if !IsConnected() { - t.Log("NATS 未连接") - } - - // 测试连接状态获取 - state := GetConnState() - t.Logf("当前连接状态: %d", state) -} - -// TestNatsMetrics 测试监控指标 -func TestNatsMetrics(t *testing.T) { - metrics := GetMetrics() - t.Logf("发布计数: %d", metrics.PublishCount.Load()) - t.Logf("发布错误: %d", metrics.PublishError.Load()) - t.Logf("请求计数: %d", metrics.RequestCount.Load()) -} - -// TestNatsConnStateListener 测试连接状态监听 -func TestNatsConnStateListener(t *testing.T) { - listener := func(state ConnState, err error) { - fmt.Printf("连接状态变化: %d, 错误: %v\n", state, err) - } - - RegisterConnStateListener(listener) - defer UnregisterConnStateListener(listener) - - time.Sleep(1 * time.Second) -} - -// TestNatsStreamOperations 测试流操作 -func TestNatsStreamOperations(t *testing.T) { - ctx := context.Background() - - // 创建任务流 - config := TaskStreamConfig{ - StreamName: "test_tasks", - Subjects: []string{"test.task.>"}, - //Subject: "test.task.process", - } - err := CreateTaskStream(ctx, config) - if err != nil { - t.Logf("创建任务流失败: %v", err) - } - - // 获取流信息 - info, err := GetStream(ctx, "test_tasks") - if err != nil { - t.Logf("获取流信息失败: %v", err) - } else { - t.Logf("流信息: %s", info.Config.Name) - } - - // 列出所有流 - streams, err := ListStreams(ctx) - if err != nil { - t.Logf("列出流失败: %v", err) - } else { - t.Logf("流列表: %v", streams) - } - - // 删除流 - err = DeleteStream(ctx, "test_tasks") - if err != nil { - t.Logf("删除流失败: %v", err) - } -} - -// TestNatsConsumerOperations 测试消费者操作 -func TestNatsConsumerOperations(t *testing.T) { - ctx := context.Background() - - // 创建测试流 - config := TaskStreamConfig{ - StreamName: "test_consumer", - Subjects: []string{"test.consumer.>"}, - //Subject: "test.consumer.process", - } - err := CreateTaskStream(ctx, config) - if err != nil { - t.Logf("创建流失败: %v", err) - } - - // 创建消费者 - consumerConfig := jetstream.ConsumerConfig{ - Name: "test_consumer", - Durable: "test_consumer", - } - _, err = CreateConsumer(ctx, "test_consumer", "test_consumer", consumerConfig) - if err != nil { - t.Logf("创建消费者失败: %v", err) - } - - // 获取消费者信息 - info, err := GetConsumer(ctx, "test_consumer", "test_consumer") - if err != nil { - t.Logf("获取消费者信息失败: %v", err) - } else { - t.Logf("消费者信息: %s", info.Name) - } - - // 列出消费者 - consumers, err := ListConsumers(ctx, "test_consumer") - if err != nil { - t.Logf("列出消费者失败: %v", err) - } else { - t.Logf("消费者列表: %v", consumers) - } - - // 删除消费者 - err = DeleteConsumer(ctx, "test_consumer", "test_consumer") - if err != nil { - t.Logf("删除消费者失败: %v", err) - } - - // 清理流 - _ = DeleteStream(ctx, "test_consumer") -} diff --git a/nats/task.go b/nats/task.go deleted file mode 100644 index 996ded6..0000000 --- a/nats/task.go +++ /dev/null @@ -1,411 +0,0 @@ -package nats - -//import ( -// "context" -// "fmt" -// "time" -// -// "github.com/gogf/gf/v2/frame/g" -// "github.com/nats-io/nats.go/jetstream" -//) - -//// TaskPriority 任务优先级 -//type TaskPriority string -// -//const ( -// TaskPriorityHigh TaskPriority = "high" // 高优先级任务 -// TaskPriorityNormal TaskPriority = "normal" // 普通优先级任务 -// TaskPriorityLow TaskPriority = "low" // 低优先级任务 -//) -// -//// TaskStreamConfig 任务流配置 -//type TaskStreamConfig struct { -// StreamName string // 流名称 -// Subjects []string // 主题列表(支持优先级分级,如 tasks.high.>, tasks.normal.>, tasks.low.>) -// Subject string // 默认发布主题 -// Priority TaskPriority // 任务优先级 -// MaxAge time.Duration // 消息保留时长(根据任务重要性设置) -// MaxMsgsPerSub int64 // 每个订阅者最大消息数(防止内存溢出) -// Replicas int // 副本数(默认1,建议生产环境使用3) -// Duplicates time.Duration // 消息去重窗口(0表示不启用) -//} -// -//// TaskConsumerConfig 任务消费者配置 -//type TaskConsumerConfig struct { -// ConsumerName string // 消费者名称 -// AckPolicy *jetstream.AckPolicy -// MaxDeliveries int32 // 最大投递次数(用于重试控制) -// AckWait time.Duration // 等待ACK超时时间 -// Backoff []time.Duration // 重试退避策略 -// FilterSubject string // 过滤主题(可指定特定优先级任务) -// MaxAckPending int // 最大待确认消息数 -// MaxWaiting int // 最大等待消息数 -// ReplayPolicy *jetstream.ReplayPolicy // 重放策略 -//} - -// CreateTaskStream 创建任务流(基于 JetStream 2.10+ API) -// -// 核心设计思路: -// 1. 严格的持久化:使用文件存储(FileStorage)避免任务丢失 -// 2. 任务优先级:通过主题分级实现(tasks.high/tasks.normal/tasks.low) -// 3. 死信队列:配置死信队列处理失败任务 -// 4. 保留策略:按任务重要性设置不同的保留时长 -// 5. 工作队列策略:确保每条消息只被一个消费者处理 -// -// 参数: -// - ctx: 上下文 -// - config: 任务流配置 -// -// 返回: -// - error: 错误信息 -//func CreateTaskStream(ctx context.Context, config TaskStreamConfig) error { -// if !IsConnected() { -// return fmt.Errorf("NATS 未连接") -// } -// -// // 设置默认值 -// if config.MaxAge == 0 { -// config.MaxAge = 7 * 24 * time.Hour // 默认保留7天 -// } -// if config.MaxMsgsPerSub == 0 { -// config.MaxMsgsPerSub = 100000 // 默认每订阅者最多10万条消息 -// } -// if config.Replicas == 0 { -// config.Replicas = 1 // 默认单副本 -// } -// if config.Duplicates == 0 { -// config.Duplicates = 2 * time.Minute // 默认2分钟去重窗口 -// } -// -// // 验证主题配置 -// if len(config.Subjects) == 0 { -// return fmt.Errorf("任务流必须指定至少一个主题") -// } -// -// // 设置死信队列 -// // 使用固定的死信队列命名规范:{StreamName}.DLQ -// dlqSubject := fmt.Sprintf("%s.DLQ", config.StreamName) -// -// // 尝试获取现有流 -// stream, err := js.Stream(ctx, config.StreamName) -// if err == nil { -// // 流已存在,更新配置以适配任务流的特殊需求 -// _, err = js.UpdateStream(ctx, jetstream.StreamConfig{ -// Name: config.StreamName, -// Subjects: config.Subjects, -// Storage: jetstream.FileStorage, // 文件存储确保持久化 -// Retention: jetstream.WorkQueuePolicy, // 工作队列策略 -// MaxAge: config.MaxAge, -// MaxMsgs: config.MaxMsgsPerSub, -// Replicas: config.Replicas, -// Duplicates: config.Duplicates, -// // 死信队列配置 -// RePublish: &jetstream.RePublish{ -// Source: ">", // 匹配所有主题 -// Destination: dlqSubject, -// }, -// // 限制流大小(防止磁盘占用过多) -// MaxBytes: 10 * 1024 * 1024 * 1024, // 10GB -// }) -// if err != nil { -// return fmt.Errorf("更新任务流失败: %w", err) -// } -// g.Log().Infof(ctx, "✅ 任务流已更新: %s (优先级: %s, 保留: %v)", -// stream.CachedInfo().Config.Name, config.Priority, config.MaxAge) -// return nil -// } -// -// // 创建新任务流 -// streamConfig := jetstream.StreamConfig{ -// Name: config.StreamName, -// Subjects: config.Subjects, -// Storage: jetstream.FileStorage, // 文件存储确保持久化 -// Retention: jetstream.WorkQueuePolicy, // 工作队列策略 -// MaxAge: config.MaxAge, -// MaxMsgs: config.MaxMsgsPerSub, -// Replicas: config.Replicas, -// Duplicates: config.Duplicates, -// // 死信队列配置 -// RePublish: &jetstream.RePublish{ -// Source: ">", // 匹配所有主题 -// Destination: dlqSubject, -// }, -// // 限制流大小(防止磁盘占用过多) -// MaxBytes: 10 * 1024 * 1024 * 1024, // 10GB -// // 启用流清理 -// Discard: jetstream.DiscardOld, // 新消息替换旧消息 -// } -// -// stream, err = js.CreateStream(ctx, streamConfig) -// if err != nil { -// return fmt.Errorf("创建任务流失败: %w", err) -// } -// -// // 验证流是否创建成功 -// if stream == nil { -// return fmt.Errorf("创建任务流失败:流对象为空") -// } -// -// g.Log().Infof(ctx, "✅ 任务流创建成功: %s (文件存储+工作队列策略+死信队列, 优先级: %s, 保留: %v, 副本: %d)", -// stream.CachedInfo().Config.Name, config.Priority, config.MaxAge, config.Replicas) -// -// // 记录配置信息 -// g.Log().Infof(ctx, " - 主题列表: %v", config.Subjects) -// g.Log().Infof(ctx, " - 死信队列: %s", dlqSubject) -// g.Log().Infof(ctx, " - 最大消息数: %d", config.MaxMsgsPerSub) -// g.Log().Infof(ctx, " - 去重窗口: %v", config.Duplicates) -// -// return nil -//} -// -//// CreateOrUpdateTaskConsumer 创建或更新任务消费者(基于 JetStream 2.10+ API) -//// -//// 核心设计思路: -//// 1. 支持手动确认(AckExplicit)确保任务处理完成 -//// 2. 通过 Nack() 方法实现消息重试,超限后进入死信队列 -//// 3. 支持主题过滤,可订阅特定优先级任务 -//// 4. 限制待确认消息数,防止消费者过载 -//// 5. AckWait 设置消息处理超时时间 -//// -//// 参数: -//// - ctx: 上下文 -//// - streamName: 流名称 -//// - consumerConfig: 消费者配置 -//// -//// 返回: -//// - jetstream.Consumer: 消费者对象 -//// - error: 错误信息 -//func CreateOrUpdateTaskConsumer(ctx context.Context, streamName string, consumerConfig TaskConsumerConfig) (jetstream.Consumer, error) { -// if !IsConnected() { -// return nil, fmt.Errorf("NATS 未连接") -// } -// -// // 设置默认值 -// ackPolicy := jetstream.AckExplicitPolicy -// if consumerConfig.AckPolicy != nil { -// ackPolicy = *consumerConfig.AckPolicy -// } -// -// if consumerConfig.MaxDeliveries == 0 { -// consumerConfig.MaxDeliveries = 10 // 默认最多投递10次 -// } -// -// if consumerConfig.AckWait == 0 { -// consumerConfig.AckWait = 30 * time.Second // 默认30秒等待确认 -// } -// -// if consumerConfig.MaxAckPending == 0 { -// consumerConfig.MaxAckPending = 1000 // 默认最多1000条待确认消息 -// } -// -// if consumerConfig.MaxWaiting == 0 { -// consumerConfig.MaxWaiting = 512 // 默认最多512条等待消息 -// } -// -// replayPolicy := jetstream.ReplayInstantPolicy -// if consumerConfig.ReplayPolicy != nil { -// replayPolicy = *consumerConfig.ReplayPolicy -// } -// -// // 构建消费者配置 -// config := jetstream.ConsumerConfig{ -// Name: consumerConfig.ConsumerName, -// Durable: consumerConfig.ConsumerName, // 持久化消费者 -// AckPolicy: ackPolicy, -// AckWait: consumerConfig.AckWait, -// MaxAckPending: consumerConfig.MaxAckPending, -// MaxWaiting: consumerConfig.MaxWaiting, -// ReplayPolicy: replayPolicy, -// FilterSubject: consumerConfig.FilterSubject, -// } -// -// // 使用 CreateOrUpdateConsumer 创建或更新消费者 -// consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, config) -// if err != nil { -// return nil, fmt.Errorf("创建任务消费者失败: %w", err) -// } -// -// g.Log().Infof(ctx, "✅ 任务消费者已创建/更新: %s/%s (等待确认: %v)", -// streamName, consumerConfig.ConsumerName, consumerConfig.AckWait) -// -// // 获取消费者信息并记录 -// info, err := consumer.Info(ctx) -// if err == nil { -// g.Log().Infof(ctx, " - 过滤主题: %s", info.Config.FilterSubject) -// g.Log().Infof(ctx, " - 最大待确认: %d", info.Config.MaxAckPending) -// g.Log().Infof(ctx, " - ACK策略: %s", info.Config.AckPolicy) -// } -// -// return consumer, nil -//} -// -//// CreateTaskStreamWithPriority 创建带优先级的任务流 -//// -//// 便捷方法,自动创建支持多优先级的任务流配置 -//// -//// 参数: -//// - ctx: 上下文 -//// - streamPrefix: 流名称前缀(如 "tasks") -//// - priority: 默认优先级 -//// -//// 返回: -//// - error: 错误信息 -//func CreateTaskStreamWithPriority(ctx context.Context, streamPrefix string, priority TaskPriority) error { -// if !IsConnected() { -// return fmt.Errorf("NATS 未连接") -// } -// -// // 构建支持多优先级的主题列表 -// subjects := []string{ -// fmt.Sprintf("%s.high.>", streamPrefix), // 高优先级任务 -// fmt.Sprintf("%s.normal.>", streamPrefix), // 普通优先级任务 -// fmt.Sprintf("%s.low.>", streamPrefix), // 低优先级任务 -// } -// -// // 根据优先级设置不同的保留时长 -// var maxAge time.Duration -// switch priority { -// case TaskPriorityHigh: -// maxAge = 30 * 24 * time.Hour // 高优先级保留30天 -// case TaskPriorityNormal: -// maxAge = 7 * 24 * time.Hour // 普通优先级保留7天 -// case TaskPriorityLow: -// maxAge = 24 * time.Hour // 低优先级保留1天 -// default: -// maxAge = 7 * 24 * time.Hour -// } -// -// config := TaskStreamConfig{ -// StreamName: streamPrefix, -// Subjects: subjects, -// Subject: fmt.Sprintf("%s.%s.>", streamPrefix, priority), -// Priority: priority, -// MaxAge: maxAge, -// MaxMsgsPerSub: 100000, -// Replicas: 1, -// Duplicates: 2 * time.Minute, -// } -// -// return CreateTaskStream(ctx, config) -//} -// -//// PublishTask 发布任务到指定流 -//// -//// 参数: -//// - ctx: 上下文 -//// - streamName: 流名称 -//// - task: 任务数据(会被JSON序列化) -//// -//// 返回: -//// - error: 错误信息 -//func PublishTask(ctx context.Context, streamName string, task interface{}) error { -// if !IsConnected() { -// return fmt.Errorf("NATS 未连接") -// } -// -// // 使用 JsPublish 发布消息 -// if err := JsPublish(ctx, streamName, task); err != nil { -// return fmt.Errorf("发布任务失败: %w", err) -// } -// -// return nil -//} -// -//// PublishTaskWithPriority 发布带优先级的任务 -//// -//// 参数: -//// - ctx: 上下文 -//// - streamPrefix: 流名称前缀 -//// - priority: 任务优先级 -//// - taskType: 任务类型 -//// - task: 任务数据(会被JSON序列化) -//// -//// 返回: -//// - error: 错误信息 -//func PublishTaskWithPriority(ctx context.Context, streamPrefix string, priority TaskPriority, taskType string, task interface{}) error { -// if !IsConnected() { -// return fmt.Errorf("NATS 未连接") -// } -// -// // 构建主题:{streamPrefix}.{priority}.{taskType} -// subject := fmt.Sprintf("%s.%s.%s", streamPrefix, priority, taskType) -// -// // 使用 JsPublish 发布消息 -// if err := JsPublish(ctx, subject, task); err != nil { -// return fmt.Errorf("发布任务失败: %w", err) -// } -// -// g.Log().Debugf(ctx, "任务已发布: %s (优先级: %s, 类型: %s)", subject, priority, taskType) -// -// return nil -//} -// -//// GetTaskStreamInfo 获取任务流信息 -//// -//// 参数: -//// - ctx: 上下文 -//// - streamName: 流名称 -//// -//// 返回: -//// - *jetstream.StreamInfo: 流信息 -//// - error: 错误信息 -//func GetTaskStreamInfo(ctx context.Context, streamName string) (*jetstream.StreamInfo, error) { -// if !IsConnected() { -// return nil, fmt.Errorf("NATS 未连接") -// } -// -// return GetStream(ctx, streamName) -//} -// -//// GetTaskConsumerInfo 获取任务消费者信息 -//// -//// 参数: -//// - ctx: 上下文 -//// - streamName: 流名称 -//// - consumerName: 消费者名称 -//// -//// 返回: -//// - *jetstream.ConsumerInfo: 消费者信息 -//// - error: 错误信息 -//func GetTaskConsumerInfo(ctx context.Context, streamName, consumerName string) (*jetstream.ConsumerInfo, error) { -// if !IsConnected() { -// return nil, fmt.Errorf("NATS 未连接") -// } -// -// return GetConsumer(ctx, streamName, consumerName) -//} -// -//// DeleteTaskStream 删除任务流 -//// -//// 注意:此操作会删除流及其所有消息,请谨慎使用 -//// -//// 参数: -//// - ctx: 上下文 -//// - streamName: 流名称 -//// -//// 返回: -//// - error: 错误信息 -//func DeleteTaskStream(ctx context.Context, streamName string) error { -// if !IsConnected() { -// return fmt.Errorf("NATS 未连接") -// } -// -// return DeleteStream(ctx, streamName) -//} -// -//// DeleteTaskConsumer 删除任务消费者 -//// -//// 参数: -//// - ctx: 上下文 -//// - streamName: 流名称 -//// - consumerName: 消费者名称 -//// -//// 返回: -//// - error: 错误信息 -//func DeleteTaskConsumer(ctx context.Context, streamName, consumerName string) error { -// if !IsConnected() { -// return fmt.Errorf("NATS 未连接") -// } -// -// return DeleteConsumer(ctx, streamName, consumerName) -//} diff --git a/nats/utils.go b/nats/utils.go deleted file mode 100644 index 1e43535..0000000 --- a/nats/utils.go +++ /dev/null @@ -1,87 +0,0 @@ -package nats - -import ( - "context" - "fmt" - "github.com/gogf/gf/v2/frame/g" - "github.com/nats-io/nats.go" - "go.opentelemetry.io/otel/trace" -) - -// ============ 上下文元数据工具函数 ============ -// 以下函数用于在 context 和 NATS 消息头之间互转元数据 - -// 定义常见的上下文元数据 key -const ( - TraceIDKey = "trace_id" - TokenKey = "token" -) - -func getTraceID(ctx context.Context) (traceID string, err error) { - // 提取 traceId:首先尝试从 OpenTelemetry Span 中提取,从 context 中提取 TraceID - span := trace.SpanFromContext(ctx) - if span != nil && span.SpanContext().HasTraceID() { - traceID = span.SpanContext().TraceID().String() - } else if tid := ctx.Value(TraceIDKey); tid != nil { - traceID = fmt.Sprintf("%v", tid) - } - if traceID == "" { - return traceID, fmt.Errorf("context 中没有 TraceID") - } - return -} - -// contextToHeaders 将 context 中的元数据转换为 NATS 消息头 -// 支持提取 user_id、tenant_id、trace_id、token 等常见字段 -func contextToHeaders(ctx context.Context) (nats.Header, error) { - headers := make(nats.Header) - - // 提取 traceId:首先尝试从 OpenTelemetry Span 中提取 - if traceID, err := getTraceID(ctx); err != nil { - return headers, err - } else { - headers.Set(TraceIDKey, traceID) - } - - // 提取 token(优先级:context value > HTTP Authorization header) - token := "" - if t := ctx.Value(TokenKey); t != nil { - token = fmt.Sprintf("%v", t) - } else if r := g.RequestFromCtx(ctx); r != nil { - // 从 HTTP 请求的 Authorization header 中提取 token - auth := r.GetHeader("Authorization") - if auth != "" { - // 移除 "Bearer " 前缀 - if len(auth) > 7 && auth[:7] == "Bearer " { - token = auth[7:] - } else { - token = auth - } - } - } - if token != "" { - headers.Set(TokenKey, token) - } - - return headers, nil -} - -// headersToContext 从 NATS 消息头重建 context -// 支持还原 user_id、tenant_id、trace_id、token 等字段 -func headersToContext(ctx context.Context, headers nats.Header) context.Context { - if headers == nil { - return ctx - } - - // 恢复 trace_id - if traceID := headers.Get(TraceIDKey); traceID != "" { - ctx = context.WithValue(ctx, TraceIDKey, traceID) - } - - // 恢复 token - if token := headers.Get(TokenKey); token != "" { - ctx = context.WithValue(ctx, TokenKey, token) - } - - return ctx -}