package nats import ( "context" "encoding/json" "fmt" "time" "github.com/gogf/gf/v2/frame/g" "github.com/nats-io/nats.go/jetstream" ) // createTaskStream 创建任务消息队列流(内部使用,兼容旧版本) // 存储策略: 文件存储 // 工作队列模式: 工作队列策略 func CreateTaskStream(ctx context.Context, streamInfo TaskStreamConfig) error { if !IsConnected() { return fmt.Errorf("NATS 未连接") } stream, err := js.Stream(ctx, streamInfo.StreamName) if err == nil { // 流已存在,更新配置 _, err = js.UpdateStream(ctx, jetstream.StreamConfig{ Name: streamInfo.StreamName, Subjects: streamInfo.Subjects, Storage: jetstream.FileStorage, Retention: jetstream.WorkQueuePolicy, }) if err != nil { return fmt.Errorf("更新任务流失败: %w", err) } g.Log().Infof(ctx, "✅ 任务消息队列流已更新: %s", stream.CachedInfo().Config.Name) return nil } // 创建新流 stream, err = js.CreateStream(ctx, jetstream.StreamConfig{ Name: streamInfo.StreamName, Subjects: streamInfo.Subjects, Storage: jetstream.FileStorage, Retention: jetstream.WorkQueuePolicy, }) if err != nil { return fmt.Errorf("创建任务流失败: %w", err) } g.Log().Infof(ctx, "✅ 任务消息队列流创建成功: %s (文件存储+工作队列策略)", stream.CachedInfo().Config.Name) return nil } // CreateLogStream 创建日志流 // 存储策略: 内存存储 // 副本数: 单副本 (1) // 消息留存: 短时留存 (1小时) func CreateLogStream(ctx context.Context, streamName string, subjects []string) error { if !IsConnected() { return fmt.Errorf("NATS 未连接") } maxAge := 1 * time.Hour stream, err := js.Stream(ctx, streamName) if err == nil { // 流已存在,更新配置 _, err = js.UpdateStream(ctx, jetstream.StreamConfig{ Name: streamName, Subjects: subjects, Storage: jetstream.MemoryStorage, Replicas: 1, MaxAge: maxAge, }) if err != nil { return fmt.Errorf("更新日志流失败: %w", err) } g.Log().Infof(ctx, "✅ 日志流已更新: %s", stream.CachedInfo().Config.Name) return nil } // 创建新流 stream, err = js.CreateStream(ctx, jetstream.StreamConfig{ Name: streamName, Subjects: subjects, Storage: jetstream.MemoryStorage, Replicas: 1, MaxAge: maxAge, }) if err != nil { return fmt.Errorf("创建日志流失败: %w", err) } g.Log().Infof(ctx, "✅ 日志流创建成功: %s (内存存储+单副本+短时留存1小时)", stream.CachedInfo().Config.Name) return nil } // CreateTradeStream 创建交易业务流 // 存储策略: 文件存储 // 副本数: 3副本 // 同步刷盘: 启用 func CreateTradeStream(ctx context.Context, streamName string, subjects []string) error { if !IsConnected() { return fmt.Errorf("NATS 未连接") } stream, err := js.Stream(ctx, streamName) if err == nil { // 流已存在,更新配置 _, err = js.UpdateStream(ctx, jetstream.StreamConfig{ Name: streamName, Subjects: subjects, Storage: jetstream.FileStorage, Replicas: 3, RePublish: nil, Duplicates: 0, }) if err != nil { return fmt.Errorf("更新交易流失败: %w", err) } g.Log().Infof(ctx, "✅ 交易业务流已更新: %s", stream.CachedInfo().Config.Name) return nil } // 创建新流 stream, err = js.CreateStream(ctx, jetstream.StreamConfig{ Name: streamName, Subjects: subjects, Storage: jetstream.FileStorage, Replicas: 3, RePublish: nil, Duplicates: 0, }) if err != nil { return fmt.Errorf("创建交易流失败: %w", err) } g.Log().Infof(ctx, "✅ 交易业务流创建成功: %s (文件存储+3副本+同步刷盘)", stream.CachedInfo().Config.Name) return nil } // JsPublish 发布消息到指定主题 func JsPublish(ctx context.Context, subject string, data any) (err error) { if !IsConnected() { return fmt.Errorf("NATS 未连接") } // 序列化数据 dataBytes, err := json.Marshal(data) if err != nil { return fmt.Errorf("序列化数据失败: %w", err) } // 发布消息 metrics.PublishCount.Add(1) _, err = js.Publish(ctx, subject, dataBytes) if err != nil { metrics.PublishError.Add(1) return fmt.Errorf("发布消息失败: %w", err) } return } // GetStream 获取流信息 func GetStream(ctx context.Context, streamName string) (*jetstream.StreamInfo, error) { if !IsConnected() { return nil, fmt.Errorf("NATS 未连接") } stream, err := js.Stream(ctx, streamName) if err != nil { return nil, fmt.Errorf("获取流失败: %w", err) } info, err := stream.Info(ctx) if err != nil { return nil, fmt.Errorf("获取流信息失败: %w", err) } return info, nil } // ListStreams 列出所有流(简化实现) // 注意:由于 API 限制,此方法可能需要根据实际需求进一步实现 func ListStreams(ctx context.Context) ([]string, error) { if !IsConnected() { return nil, fmt.Errorf("NATS 未连接") } // TODO: 根据实际 NATS 版本实现完整的流列表功能 return []string{}, nil } // DeleteStream 删除流 func DeleteStream(ctx context.Context, streamName string) error { if !IsConnected() { return fmt.Errorf("NATS 未连接") } if err := js.DeleteStream(ctx, streamName); err != nil { return fmt.Errorf("删除流失败: %w", err) } g.Log().Infof(ctx, "✅ 流已删除: %s", streamName) return nil } // GetConsumer 获取消费者信息 func GetConsumer(ctx context.Context, streamName, consumerName string) (*jetstream.ConsumerInfo, error) { if !IsConnected() { return nil, fmt.Errorf("NATS 未连接") } consumer, err := js.Consumer(ctx, streamName, consumerName) if err != nil { return nil, fmt.Errorf("获取消费者失败: %w", err) } info, err := consumer.Info(ctx) if err != nil { return nil, fmt.Errorf("获取消费者信息失败: %w", err) } return info, nil } // ListConsumers 列出指定流的所有消费者(简化实现) // 注意:由于 API 限制,此方法可能需要根据实际需求进一步实现 func ListConsumers(ctx context.Context, streamName string) ([]string, error) { if !IsConnected() { return nil, fmt.Errorf("NATS 未连接") } // TODO: 根据实际 NATS 版本实现完整的消费者列表功能 return []string{}, nil } // DeleteConsumer 删除消费者 func DeleteConsumer(ctx context.Context, streamName, consumerName string) error { if !IsConnected() { return fmt.Errorf("NATS 未连接") } if err := js.DeleteConsumer(ctx, streamName, consumerName); err != nil { return fmt.Errorf("删除消费者失败: %w", err) } g.Log().Infof(ctx, "✅ 消费者已删除: %s/%s", streamName, consumerName) return nil } // CreateConsumer 创建消费者 func CreateConsumer(ctx context.Context, streamName, consumerName string, config jetstream.ConsumerConfig) (jetstream.Consumer, error) { if !IsConnected() { return nil, fmt.Errorf("NATS 未连接") } // 尝试获取现有消费者 consumer, err := js.Consumer(ctx, streamName, consumerName) if err == nil { return consumer, nil } // 推荐:不存在则创建,存在则更新配置 consumer, err = js.CreateOrUpdateConsumer(ctx, streamName, config) if err != nil { return nil, fmt.Errorf("创建消费者失败: %w", err) } return consumer, nil }