package nats import ( "context" "fmt" "github.com/gogf/gf/v2/frame/g" "github.com/nats-io/nats.go/jetstream" ) // AckPolicy 确认策略 type AckPolicy string const ( AckPolicyExplicit AckPolicy = "explicit" // 显式确认(默认) AckPolicyAll AckPolicy = "all" // 确认所有消息 AckPolicyNone AckPolicy = "none" // 不需要确认 ) // DeliverPolicy 投递策略 type DeliverPolicy string const ( DeliverPolicyAll DeliverPolicy = "all" // 投递所有消息(包括已投递的) DeliverPolicyLast DeliverPolicy = "last" // 从最后一条消息开始 DeliverPolicyNew DeliverPolicy = "new" // 仅投递新消息(默认) DeliverPolicyLastPerSubj DeliverPolicy = "lastpersubj" // 每个主题的最后一条 DeliverPolicyByStartSeq DeliverPolicy = "by_start_sequence" // 按起始序列号 ) // ReplayPolicy 重放策略 type ReplayPolicy string const ( ReplayPolicyInstant ReplayPolicy = "instant" // 立即重放 ReplayPolicyOriginal ReplayPolicy = "original" // 按原始顺序重放 ) // ConsumerConfig 消费者配置 type ConsumerConfig struct { DurableName string // 持久化名称(空表示临时消费者) Description string // 描述信息 AckPolicy AckPolicy // 确认策略 AckWait int // 确认等待时间(秒) MaxDeliver int // 最大投递次数 FilterSubject string // 过滤主题(流内多主题时使用) DeliverPolicy DeliverPolicy // 投递策略 ReplayPolicy ReplayPolicy // 重放策略 MaxWaiting int // 最大等待消息数 MaxAckPending int // 最大待确认消息数 OptStartTime int64 // 起始时间戳 OptStartSeq uint64 // 起始序列号 HeadersOnly bool // 仅消费消息头 Backoff []int // 退避策略(秒数数组) RateLimit uint64 // 消息速率限制(消息/秒) Replica int // 副本数 FlowControl bool // 启用流控 Metadata map[string]string // 元数据 } // parseAckPolicy 解析确认策略 func parseAckPolicy(policy AckPolicy) jetstream.AckPolicy { switch policy { case AckPolicyAll: return jetstream.AckAllPolicy case AckPolicyNone: return jetstream.AckNonePolicy default: return jetstream.AckExplicitPolicy } } // parseDeliverPolicy 解析投递策略 func parseDeliverPolicy(policy DeliverPolicy) jetstream.DeliverPolicy { switch policy { case DeliverPolicyAll: return jetstream.DeliverAllPolicy case DeliverPolicyLast: return jetstream.DeliverLastPolicy case DeliverPolicyLastPerSubj: return jetstream.DeliverLastPerSubjectPolicy case DeliverPolicyByStartSeq: return jetstream.DeliverByStartSequencePolicy default: return jetstream.DeliverNewPolicy } } // parseReplayPolicy 解析重放策略 func parseReplayPolicy(policy ReplayPolicy) jetstream.ReplayPolicy { switch policy { case ReplayPolicyOriginal: return jetstream.ReplayOriginalPolicy default: return jetstream.ReplayInstantPolicy } } // CreateTaskConsumer 创建任务消费者 // 核心设计思路: // 1. 显式确认:确保消息被正确处理后才确认 // 2. 重试机制:通过 MaxDeliver 控制最大重试次数 // 3. 持久化:DurableName 确保消费者状态持久化 // 4. 流控:防止消费者过载 func CreateTaskConsumer(ctx context.Context, streamName string, config ConsumerConfig) (jetstream.Consumer, error) { if !IsConnected() { return nil, fmt.Errorf("NATS 未连接") } if streamName == "" { return nil, fmt.Errorf("流名称不能为空") } // 设置默认值 if config.AckPolicy == "" { config.AckPolicy = AckPolicyExplicit // 默认显式确认 } if config.AckWait == 0 { config.AckWait = 30 // 默认30秒确认超时 } if config.MaxDeliver == 0 { config.MaxDeliver = 3 // 默认最多投递3次 } if config.DeliverPolicy == "" { config.DeliverPolicy = DeliverPolicyNew // 默认仅消费新消息 } if config.ReplayPolicy == "" { config.ReplayPolicy = ReplayPolicyInstant // 默认立即重放 } if config.MaxAckPending == 0 { config.MaxAckPending = 1000 // 默认最多1000条待确认消息 } // 构建消费者配置 jsConfig := jetstream.ConsumerConfig{ Name: config.DurableName, Description: config.Description, AckPolicy: parseAckPolicy(config.AckPolicy), AckWait: 0, MaxDeliver: config.MaxDeliver, FilterSubjects: []string{config.FilterSubject}, DeliverPolicy: parseDeliverPolicy(config.DeliverPolicy), ReplayPolicy: parseReplayPolicy(config.ReplayPolicy), MaxWaiting: config.MaxWaiting, MaxAckPending: config.MaxAckPending, HeadersOnly: config.HeadersOnly, RateLimit: config.RateLimit, Replicas: config.Replica, Metadata: config.Metadata, } // 配置流控和心跳 if config.FlowControl { jsConfig.FlowControl = true } // 配置起始位置 if config.OptStartSeq > 0 { jsConfig.OptStartSeq = config.OptStartSeq } // 创建新消费者 consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, jsConfig) if err != nil { return nil, fmt.Errorf("创建消费者失败: %w", err) } // 记录配置信息 configInfo := fmt.Sprintf("确认策略=%s, 最大投递=%d, 投递策略=%s", config.AckPolicy, config.MaxDeliver, config.DeliverPolicy) if config.FilterSubject != "" { configInfo += fmt.Sprintf(", 过滤主题=%s", config.FilterSubject) } g.Log().Infof(ctx, "✅ 任务消费者创建成功: %s/%s (%s)", streamName, config.DurableName, configInfo) return consumer, nil } // CreateConsumerSimple 简化版创建消费者(适用于大多数场景) // 只需提供流名称和消费者名称,其他使用默认配置 func CreateConsumerSimple(ctx context.Context, streamName, durableName string) (err error) { _, err = CreateTaskConsumer(ctx, streamName, ConsumerConfig{ DurableName: durableName, }) return } // CreateConsumerWithFilter 创建带主题过滤的消费者 //func CreateConsumerWithFilter(ctx context.Context, streamName, durableName, filterSubject string) (jetstream.Consumer, error) { // return CreateTaskConsumer(ctx, streamName, ConsumerConfig{ // DurableName: durableName, // FilterSubject: filterSubject, // }) //} // CreateConsumerEphemeral 创建临时消费者 // 临时消费者没有持久化名称,连接断开后自动删除 //func CreateConsumerEphemeral(ctx context.Context, streamName string) (jetstream.Consumer, error) { // if !IsConnected() { // return nil, fmt.Errorf("NATS 未连接") // } // // jsConfig := jetstream.ConsumerConfig{ // AckPolicy: jetstream.AckNonePolicy, // AckWait: 0, // MaxDeliver: 3, // DeliverPolicy: jetstream.DeliverNewPolicy, // ReplayPolicy: jetstream.ReplayInstantPolicy, // MaxAckPending: 1000, // } // // consumer, err := js.CreateConsumer(ctx, streamName, jsConfig) // if err != nil { // return nil, fmt.Errorf("创建临时消费者失败: %w", err) // } // // g.Log().Infof(ctx, "✅ 临时消费者创建成功: %s", streamName) // return consumer, nil //} // CreateConsumerPushMode 创建推送模式消费者 // 推送模式下,NATS 服务器主动将消息推送给消费者 func CreateConsumerPushMode(ctx context.Context, streamName, durableName, subject string, msgCount int) (err error) { _, err = CreateTaskConsumer(ctx, streamName, ConsumerConfig{ DurableName: durableName, FilterSubject: subject, MaxAckPending: msgCount, }) return } // CreateConsumerPullMode 创建拉取模式消费者 // 拉取模式下,消费者主动从服务器拉取消息 //func CreateConsumerPullMode(ctx context.Context, streamName, durableName string) (jetstream.Consumer, error) { // return CreateTaskConsumer(ctx, streamName, ConsumerConfig{ // DurableName: durableName, // DeliverPolicy: DeliverPolicyAll, // MaxAckPending: 500, // 拉取模式下待确认消息数可以设置小一些 // }) //} // ConsumeMessages 消费消息(推送模式) func ConsumeMessages(ctx context.Context, streamName, consumerName string, handler jetstream.MessageHandler) error { if !IsConnected() { return fmt.Errorf("NATS 未连接") } // 获取消费者 consumer, err := js.Consumer(ctx, streamName, consumerName) if err != nil { return fmt.Errorf("获取消费者失败: %w", err) } // 业务处理 //if err := handler(ctx, streamMsg.Values); err != nil { // glog.Infof(ctx, "业务处理失败-> err:%v\n", err) // continue //} //// 确认消息 //if msg.AutoAck { // err := ackMessage(ctx, msg.StreamKey, msg.GroupName, streamMsg.ID) // if err != nil { // glog.Infof(ctx, "消费者 '%s' 确认消息 ID %s 失败: %v\n", msg.ConsumerName, streamMsg.ID, err) // } //} //// 创建消息处理函数 //handler = func(msg jetstream.Msg) { // // 解析消息 // var task TaskMessage // if err := json.Unmarshal(msg.Data(), &task); err != nil { // g.Log().Errorf(ctx, "解析消息失败: %v", err) // msg.Nak() // 拒绝消息,触发重试 // return // } // // // 处理业务逻辑 // g.Log().Infof(ctx, "处理任务: %s", task.TaskID) // // // 处理成功,确认消息 // msg.Ack() //} // 开始消费 _, err = consumer.Consume(handler) if err != nil { return fmt.Errorf("开始消费失败: %w", err) } g.Log().Infof(ctx, "✅ 开始消费消息: %s/%s", streamName, consumerName) return nil } // 定义消息结构 type TaskMessage struct { TaskID string `json:"task_id"` TaskType string `json:"task_type"` Data string `json:"data"` }