diff --git a/message/connection_nats.go b/message/connection_nats.go index e00f3f6..5b33021 100644 --- a/message/connection_nats.go +++ b/message/connection_nats.go @@ -8,137 +8,160 @@ import ( "github.com/gogf/gf/v2/frame/g" "github.com/nats-io/nats.go" - "github.com/nats-io/nats.go/jetstream" ) var ( - nc *nats.Conn - js jetstream.JetStream - natsMu sync.RWMutex + muNats sync.RWMutex + natsConns map[string]*nats.Conn // key: 数据源名称, value: NATS 连接 + natsJS map[string]nats.JetStreamContext // key: 数据源名称, value: JetStream 上下文 ) -// natsConnect 建立 NATS 连接 -func natsConnect(ctx context.Context) error { - natsMu.Lock() - defer natsMu.Unlock() +func init() { + natsConns = make(map[string]*nats.Conn) + natsJS = make(map[string]nats.JetStreamContext) +} - // 安全地关闭旧连接 - if oldConn := nc; oldConn != nil && !oldConn.IsClosed() { +// natsConnect 建立 NATS 连接 +func natsConnect(ctx context.Context, name string) error { + + if g.Cfg().MustGet(ctx, "nats").IsEmpty() { + g.Log().Errorf(ctx, "❌ NATS 配置不存在") + return fmt.Errorf("NATS Configuration does not exist") + } + // 确定数据源名称 + dsName := "default" + if !g.IsEmpty(name) { + dsName = name + } + + g.Log().Infof(ctx, "🔔 NATS [%s] 开始创建连接", dsName) + muNats.Lock() + defer muNats.Unlock() + + // 安全地关闭旧连接(仅针对该数据源) + if oldConn, exists := natsConns[dsName]; exists && oldConn != nil && !oldConn.IsClosed() { oldConn.Close() + delete(natsConns, dsName) + delete(natsJS, dsName) } // 从配置文件读取 NATS 地址 - natsURL := g.Cfg().MustGet(ctx, "nats.url").String() + natsURL := g.Cfg().MustGet(ctx, fmt.Sprintf("nats.%s.url", dsName)).String() if natsURL == "" { // 默认使用本地地址 natsURL = nats.DefaultURL } - // 使用独立的日志上下文,避免使用外部可能被取消的上下文 - logCtx := context.Background() - // 连接选项配置 opts := []nats.Option{ - nats.Name("goframe-nats-client"), - nats.ReconnectWait(2 * time.Second), - nats.MaxReconnects(-1), // 无限重连 + nats.Name(fmt.Sprintf("goframe-nats-client-%s", dsName)), + nats.NoReconnect(), nats.PingInterval(10 * time.Second), nats.MaxPingsOutstanding(5), - nats.ReconnectHandler(func(nc *nats.Conn) { - g.Log().Infof(logCtx, "✅ NATS 重连成功: %s", nc.ConnectedUrl()) - - natsMu.Lock() - defer natsMu.Unlock() - // 重新创建 JetStream 实例 - if newJS, err := jetstream.New(nc); err == nil { - js = newJS - } - }), - nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { - g.Log().Warningf(logCtx, "⚠️ NATS 连接断开: %v, 准备重连...", err) - }), nats.ClosedHandler(func(nc *nats.Conn) { - g.Log().Infof(logCtx, "NATS 连接已关闭: %s", nc.ConnectedUrl()) + g.Log().Infof(ctx, "NATS [%s] 连接已关闭: %s", dsName, nc.ConnectedUrl()) }), nats.ErrorHandler(func(nc *nats.Conn, sub *nats.Subscription, err error) { - g.Log().Errorf(logCtx, "NATS 错误: %v", err) + g.Log().Errorf(ctx, "❌ NATS [%s] 错误: %v", dsName, err) }), } - var err error - nc, err = nats.Connect(natsURL, opts...) + newConn, err := nats.Connect(natsURL, opts...) if err != nil { - return fmt.Errorf("NATS 连接失败: %w", err) + g.Log().Errorf(ctx, "❌ NATS [%s] 连接失败: %v", dsName, err) + return err } // 等待连接就绪 - if nc.Status() != nats.CONNECTED { + if newConn.Status() != nats.CONNECTED { select { case <-time.After(5 * time.Second): // 连接超时,清理资源 - if nc != nil { - nc.Close() - } + newConn.Close() + g.Log().Errorf(ctx, "❌ NATS [%s] 连接超时", dsName) return fmt.Errorf("NATS 连接超时") - case <-nc.StatusChanged(nats.CONNECTED): + case <-newConn.StatusChanged(nats.CONNECTED): // 连接成功 + g.Log().Infof(ctx, "✅ NATS [%s] 连接成功: %s", dsName, newConn.ConnectedUrl()) case <-ctx.Done(): // 外部上下文被取消,清理资源 - if nc != nil { - nc.Close() - } + newConn.Close() + g.Log().Errorf(ctx, "NATS [%s] 连接被取消: %v", dsName, ctx.Err()) return fmt.Errorf("NATS 连接被取消: %w", ctx.Err()) } } // 创建 JetStream 实例 - js, err = jetstream.New(nc) + newJS, err := newConn.JetStream(nats.MaxWait(10 * time.Second)) if err != nil { // 创建 JetStream 失败,清理连接 - if nc != nil { - nc.Close() - } - return fmt.Errorf("创建 JetStream 失败: %w", err) + newConn.Close() + g.Log().Errorf(ctx, "❌ NATS [%s] 创建 JetStream 失败: %v", dsName, err) + return err } - g.Log().Infof(ctx, "✅ NATS 连接成功: %s", nc.ConnectedUrl()) + // 保存连接和 JetStream 上下文 + natsConns[dsName] = newConn + natsJS[dsName] = newJS + return nil } // natsPing 检测 NATS 连接状态 -func natsPing() bool { - natsMu.RLock() - defer natsMu.RUnlock() - - if nc == nil || nc.IsClosed() { - return false +func natsPing(ctx context.Context, name string) bool { + // 确定数据源名称 + dsName := "default" + if !g.IsEmpty(name) { + dsName = name } - // 使用 NATS 的状态检查 - if nc.Status() != nats.CONNECTED { + muNats.RLock() + defer muNats.RUnlock() + + nc, exists := natsConns[dsName] + if !exists || nc == nil || nc.IsClosed() || nc.Status() != nats.CONNECTED { + g.Log().Errorf(ctx, "❌ NATS [%s] 连接已关闭或不可用", dsName) return false } - + g.Log().Infof(ctx, "📊 NATS [%s] 连接正常: %s", dsName, nc.ConnectedUrl()) return true } -// natsReconnect 重连 NATS -func natsReconnect(ctx context.Context) error { - if err := natsConnect(ctx); err != nil { - return fmt.Errorf("nats重连失败: %w", err) - } - return nil -} - // natsClose 关闭 NATS 连接 -func natsClose(ctx context.Context) error { - natsMu.Lock() - defer natsMu.Unlock() - - if nc == nil || nc.IsClosed() { - return nil // 连接已经关闭或不存在 +func natsClose(ctx context.Context, name string) error { + // 确定数据源名称 + dsName := "default" + if !g.IsEmpty(name) { + dsName = name } - nc.Close() - g.Log().Infof(ctx, "✅ NATS 连接已关闭") + + muNats.Lock() + defer muNats.Unlock() + + if nc, exists := natsConns[dsName]; exists && nc != nil && !nc.IsClosed() { + nc.Close() + } + delete(natsConns, dsName) + delete(natsJS, dsName) + + g.Log().Infof(ctx, "✅ NATS [%s] 连接已关闭", dsName) return nil } + +// getNatsConn 获取 NATS 连接(内部使用) +func getNatsConn(name string) *nats.Conn { + dsName := "default" + if !g.IsEmpty(name) { + dsName = name + } + return natsConns[dsName] +} + +// getNatsJS 获取 JetStream 上下文(内部使用) +func getNatsJS(name string) nats.JetStreamContext { + dsName := "default" + if !g.IsEmpty(name) { + dsName = name + } + return natsJS[dsName] +} diff --git a/message/connection_rabbitmq.go b/message/connection_rabbitmq.go index c98d2fe..8b8d47a 100644 --- a/message/connection_rabbitmq.go +++ b/message/connection_rabbitmq.go @@ -7,103 +7,158 @@ import ( "github.com/gogf/gf/v2/util/gconv" amqp "github.com/rabbitmq/amqp091-go" "sync" - "time" ) var ( - conn *amqp.Connection - channel *amqp.Channel - rabbitmqMu sync.RWMutex + muRabbitMQ sync.RWMutex + rabbitmqConns map[string]*amqp.Connection + rabbitmqChannels map[string]*amqp.Channel ) -// config RabbitMQ 配置 -type config struct { - Host string - Port int - Username string - Password string - VHost string +func init() { + rabbitmqConns = make(map[string]*amqp.Connection) + rabbitmqChannels = make(map[string]*amqp.Channel) } -func rabbitmqConnect(ctx context.Context) error { - rabbitmqMu.Lock() - defer rabbitmqMu.Unlock() - -LOOP: - cfg := &config{ - Host: g.Cfg().MustGet(ctx, "rabbitmq.host").String(), - Port: g.Cfg().MustGet(ctx, "rabbitmq.port").Int(), - Username: g.Cfg().MustGet(ctx, "rabbitmq.username").String(), - Password: g.Cfg().MustGet(ctx, "rabbitmq.password").String(), - VHost: g.Cfg().MustGet(ctx, "rabbitmq.vhost", "/").String(), +// rabbitmqConnect 建立 RabbitMQ 连接 +func rabbitmqConnect(ctx context.Context, name string) error { + if g.Cfg().MustGet(ctx, "rabbitmq").IsEmpty() { + g.Log().Errorf(ctx, "❌ RabbitMQ 配置不存在") + return fmt.Errorf("RabbitMQ Configuration does not exist") + } + // 确定数据源名称 + dsName := "default" + if !g.IsEmpty(name) { + dsName = name } - url := "amqp://" + cfg.Username + ":" + cfg.Password + "@" + cfg.Host + ":" + gconv.String(cfg.Port) + "/" + cfg.VHost + g.Log().Infof(ctx, "🔔 RabbitMQ [%s] 开始创建连接", dsName) + muRabbitMQ.Lock() + defer muRabbitMQ.Unlock() - var err error - conn, err = amqp.Dial(url) + // 安全地关闭旧连接(仅针对该数据源) + if oldConn, exists := rabbitmqConns[dsName]; exists && oldConn != nil && !oldConn.IsClosed() { + oldConn.Close() + } + if oldChannel, exists := rabbitmqChannels[dsName]; exists && oldChannel != nil && !oldChannel.IsClosed() { + oldChannel.Close() + } + delete(rabbitmqConns, dsName) + delete(rabbitmqChannels, dsName) + + // 从配置文件读取 RabbitMQ 配置 + host := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.host", dsName)).String() + port := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.port", dsName)).Int() + username := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.username", dsName)).String() + password := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.password", dsName)).String() + vHost := g.Cfg().MustGet(ctx, fmt.Sprintf("rabbitmq.%s.vhost", dsName), "/").String() + if g.IsEmpty(host) { + return fmt.Errorf("❌ RabbitMQ 配置错误: host 不能为空 (数据源: %s)", dsName) + } + if g.IsEmpty(port) { + return fmt.Errorf("❌ RabbitMQ 配置错误: port 不能为空 (数据源: %s)", dsName) + } + if g.IsEmpty(username) { + return fmt.Errorf("❌ RabbitMQ 配置错误: username 不能为空 (数据源: %s)", dsName) + } + if g.IsEmpty(password) { + return fmt.Errorf("❌ RabbitMQ 配置错误: password 不能为空 (数据源: %s)", dsName) + } + // 构建连接 URL + url := "amqp://" + username + ":" + password + "@" + host + ":" + gconv.String(port) + "/" + vHost + + // 创建连接 + newConn, err := amqp.Dial(url) if err != nil { - g.Log().Errorf(ctx, "重连失败: %v", err) - - time.Sleep(2 * time.Second) - goto LOOP + g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 连接失败: %v", dsName, err) + return err } - channel, err = conn.Channel() + // 创建 Channel + newChannel, err := newConn.Channel() if err != nil { - g.Log().Errorf(ctx, "创建 Channel 失败: %v", err) - - time.Sleep(2 * time.Second) - goto LOOP + g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 创建 Channel 失败: %v", dsName, err) + newConn.Close() + return err } - g.Log().Info(ctx, "RabbitMQ 重连成功") - return nil -} + // 保存连接和 Channel + rabbitmqConns[dsName] = newConn + rabbitmqChannels[dsName] = newChannel -// rabbitmqReconnect 重新连接 -func rabbitmqReconnect(ctx context.Context) error { - if err := rabbitmqConnect(ctx); err != nil { - return fmt.Errorf("nats重连失败: %w", err) - } + g.Log().Infof(ctx, "✅ RabbitMQ [%s] 连接成功", dsName) return nil } // rabbitmqPing 检测 RabbitMQ 连接状态 -func rabbitmqPing() bool { - rabbitmqMu.RLock() - defer rabbitmqMu.RUnlock() +func rabbitmqPing(ctx context.Context, name string) bool { + // 确定数据源名称 + dsName := "default" + if !g.IsEmpty(name) { + dsName = name + } - if conn == nil || conn.IsClosed() { + muRabbitMQ.RLock() + defer muRabbitMQ.RUnlock() + + conn, exists := rabbitmqConns[dsName] + channel, channelExists := rabbitmqChannels[dsName] + if !exists || conn == nil || conn.IsClosed() || !channelExists || channel == nil || channel.IsClosed() { + g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 连接已关闭或不可用", dsName) return false } + g.Log().Infof(ctx, "📊 RabbitMQ [%s] 连接正常", dsName) return true } -// rabbitmqClose 关闭连接 -func rabbitmqClose(ctx context.Context) error { - rabbitmqMu.Lock() - defer rabbitmqMu.Unlock() +// rabbitmqClose 关闭 RabbitMQ 连接 +func rabbitmqClose(ctx context.Context, name string) error { + // 确定数据源名称 + dsName := "default" + if !g.IsEmpty(name) { + dsName = name + } + + muRabbitMQ.Lock() + defer muRabbitMQ.Unlock() var lastErr error - if channel != nil { + if channel, exists := rabbitmqChannels[dsName]; exists && channel != nil && !channel.IsClosed() { if err := channel.Close(); err != nil { - g.Log().Errorf(ctx, "关闭 RabbitMQ Channel 失败: %v", err) + g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 关闭 Channel 失败: %v", dsName, err) lastErr = err } - channel = nil } + delete(rabbitmqChannels, dsName) - if conn != nil { + if conn, exists := rabbitmqConns[dsName]; exists && conn != nil && !conn.IsClosed() { if err := conn.Close(); err != nil { - g.Log().Errorf(ctx, "关闭 RabbitMQ 连接失败: %v", err) + g.Log().Errorf(ctx, "❌ RabbitMQ [%s] 关闭连接失败: %v", dsName, err) lastErr = err } - conn = nil } + delete(rabbitmqConns, dsName) - g.Log().Info(ctx, "RabbitMQ 连接已关闭") + g.Log().Infof(ctx, "✅ RabbitMQ [%s] 连接已关闭", dsName) return lastErr } + +// getRabbitMQConn 获取 RabbitMQ 连接(内部使用) +func getRabbitMQConn(name string) *amqp.Connection { + dsName := "default" + if !g.IsEmpty(name) { + dsName = name + } + return rabbitmqConns[dsName] +} + +// getRabbitMQChannel 获取 RabbitMQ Channel(内部使用) +func getRabbitMQChannel(name string) *amqp.Channel { + dsName := "default" + if !g.IsEmpty(name) { + dsName = name + } + return rabbitmqChannels[dsName] +} diff --git a/message/connection_redis.go b/message/connection_redis.go index 465238f..528eca0 100644 --- a/message/connection_redis.go +++ b/message/connection_redis.go @@ -1,7 +1,6 @@ // ============================================================================= -// Redis 数据源连接管理 -// 负责 Redis 数据源的连接、重连、健康检查和优雅关闭 -// 支持多数据源和无限重连 +// Redis 连接管理 +// 负责 Redis 的连接、重连、健康检查和优雅关闭 // ============================================================================= package message @@ -9,477 +8,191 @@ package message import ( "context" "fmt" - "os" - "os/signal" "sync" - "syscall" "time" "github.com/gogf/gf/v2/database/gredis" "github.com/gogf/gf/v2/frame/g" - "github.com/gogf/gf/v2/os/glog" - "github.com/gogf/gf/v2/util/gconv" ) -// ============================================================================= -// Redis 数据源配置结构 -// ============================================================================= +var ( + muRedis sync.RWMutex + redisConns map[string]*gredis.Redis + redisConfigs map[string]*gredis.Config +) -type redisDataSourceConfig struct { - name string // 数据源名称 - address string // Redis 地址,如: 127.0.0.1:6379 - db int // 数据库编号 - pass string // 密码 - maxRetries int // 最大重试次数,-1 表示无限重试 - retryInterval time.Duration // 重试间隔 +func init() { + redisConns = make(map[string]*gredis.Redis) + redisConfigs = make(map[string]*gredis.Config) } -// ============================================================================= -// Redis 数据源接口 -// ============================================================================= - -type redisDataSource interface { - name() string - getClient() *gredis.Redis - getIsConnected() bool - redisConnect(ctx context.Context) error - redisReconnect(ctx context.Context) error - redisClose(ctx context.Context) error - redisPing(ctx context.Context) bool -} - -// ============================================================================= -// Redis 数据源实现 -// ============================================================================= - -type baseRedisDataSource struct { - config *redisDataSourceConfig - client *gredis.Redis - isConnected bool - mu sync.RWMutex - lastError error - lastErrorTime time.Time - reconnectMu sync.Mutex -} - -func newBaseRedisDataSource(config *redisDataSourceConfig) *baseRedisDataSource { - return &baseRedisDataSource{ - config: config, - isConnected: false, +// redisConnect 建立 Redis 连接 +// name: 数据源名称,如果为空则使用默认数据源 +func redisConnect(ctx context.Context, name string) error { + if g.Cfg().MustGet(ctx, "redis").IsEmpty() { + g.Log().Errorf(ctx, "❌ Redis 配置不存在") + return fmt.Errorf("redis Configuration does not exist") } -} - -func (d *baseRedisDataSource) name() string { - return d.config.name -} - -func (d *baseRedisDataSource) getClient() *gredis.Redis { - d.mu.RLock() - defer d.mu.RUnlock() - return d.client -} - -func (d *baseRedisDataSource) getIsConnected() bool { - d.mu.RLock() - defer d.mu.RUnlock() - return d.isConnected && d.client != nil -} - -func (d *baseRedisDataSource) redisConnect(ctx context.Context) error { - // 使用互斥锁防止并发重连 - d.reconnectMu.Lock() - defer d.reconnectMu.Unlock() - - d.mu.Lock() - if d.client != nil { - d.client.Close(ctx) + // 确定数据源名称 + dsName := "default" + if !g.IsEmpty(name) { + dsName = name } - d.mu.Unlock() + g.Log().Infof(ctx, "🔔 Redis [%s] 开始创建连接", dsName) + muRedis.Lock() + defer muRedis.Unlock() + + // 安全地关闭旧连接(仅针对该数据源) + if oldRedis, exists := redisConns[dsName]; exists && oldRedis != nil { + oldRedis.Close(ctx) + delete(redisConns, dsName) + } + + // 从配置文件读取 Redis 配置 + redisAddr := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.address", dsName)).String() + if g.IsEmpty(redisAddr) { + g.Log().Errorf(ctx, "❌ Redis 配置错误: address 不能为空 (数据源: %s)", dsName) + return fmt.Errorf("❌ Redis 配置错误: address 不能为空 (数据源: %s)", dsName) + } + redisDB := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.db", dsName)).Int() + if redisDB < 0 || redisDB > 15 { + g.Log().Errorf(ctx, "❌ Redis 配置错误: db 必须在 0-15 之间 (当前值: %d)", redisDB) + return fmt.Errorf("❌ Redis 配置错误: db 必须在 0-15 之间 (当前值: %d)", redisDB) + } + idleTimeout := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.idleTimeout", dsName)).String() + redisIdleTimeout, err := time.ParseDuration(idleTimeout) + if err != nil { + g.Log().Errorf(ctx, "❌ Redis idleTimeout 格式错误: %v", err) + return err + } + maxConnLifetime := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.maxConnLifetime", dsName)).String() + redisMaxConnLifetime, err := time.ParseDuration(maxConnLifetime) + if err != nil { + g.Log().Errorf(ctx, "❌ Redis maxConnLifetime 格式错误: %v", err) + return err + } + waitTimeout := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.waitTimeout", dsName)).String() + redisWaitTimeout, err := time.ParseDuration(waitTimeout) + if err != nil { + g.Log().Errorf(ctx, "❌ Redis waitTimeout 格式错误: %v", err) + return err + } + dialTimeout := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.dialTimeout", dsName)).String() + redisDialTimeout, err := time.ParseDuration(dialTimeout) + if err != nil { + g.Log().Errorf(ctx, "❌ Redis dialTimeout 格式错误: %v", err) + return err + } + readTimeout := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.readTimeout", dsName)).String() + redisReadTimeout, err := time.ParseDuration(readTimeout) + if err != nil { + g.Log().Errorf(ctx, "❌ Redis readTimeout 格式错误: %v", err) + return err + } + writeTimeout := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.writeTimeout", dsName)).String() + redisWriteTimeout, err := time.ParseDuration(writeTimeout) + if err != nil { + g.Log().Errorf(ctx, "❌ Redis writeTimeout 格式错误: %v", err) + return err + } + maxActive := g.Cfg().MustGet(ctx, fmt.Sprintf("redis.%s.maxActive", dsName)).Int() + if g.IsEmpty(maxActive) { + g.Log().Errorf(ctx, "❌ Redis maxActive 配置错误: %v", maxActive) + return fmt.Errorf("❌ Redis maxActive 配置错误") + } // 构建 GoFrame Redis 配置 redisConfig := &gredis.Config{ - Address: d.config.address, - Db: d.config.db, - Pass: d.config.pass, + Address: redisAddr, + Db: redisDB, + IdleTimeout: redisIdleTimeout, + MaxConnLifetime: redisMaxConnLifetime, + WaitTimeout: redisWaitTimeout, + DialTimeout: redisDialTimeout, + ReadTimeout: redisReadTimeout, + WriteTimeout: redisWriteTimeout, + MaxActive: maxActive, } + redisConfigs[dsName] = redisConfig // 使用 GoFrame 的 Redis 连接 - redisObj, err := gredis.New(redisConfig) + newRedis, err := gredis.New(redisConfig) if err != nil { - d.mu.Lock() - d.isConnected = false - d.lastError = err - d.lastErrorTime = time.Now() - d.mu.Unlock() - return fmt.Errorf("datasource [%s] connection failed: %w", d.config.name, err) + g.Log().Errorf(ctx, "❌ Redis [%s] 连接失败: %v", dsName, err) + return err + } + // 测试连接(直接调用避免死锁) + _, err = newRedis.Do(ctx, "PING") + if err != nil { + g.Log().Errorf(ctx, "❌ Redis [%s] 连接失败: ping 失败 - %v", dsName, err) + _ = newRedis.Close(ctx) + return err } - d.mu.Lock() - d.client = redisObj - d.mu.Unlock() - - // 测试连接 - if !d.redisPing(ctx) { - d.mu.Lock() - d.isConnected = false - d.lastError = err - d.lastErrorTime = time.Now() - d.mu.Unlock() - return fmt.Errorf("datasource [%s] ping failed: %w", d.config.name, err) - } - - d.mu.Lock() - d.isConnected = true - d.lastError = nil - d.mu.Unlock() - glog.Infof(ctx, "✅ datasource [%s] connected successfully", d.config.name) + redisConns[dsName] = newRedis + g.Log().Infof(ctx, "✅ Redis [%s] 连接成功: %s (DB: %d)", dsName, redisAddr, redisDB) return nil } -func (d *baseRedisDataSource) redisReconnect(ctx context.Context) error { - glog.Infof(ctx, "🔄 reconnecting datasource [%s]", d.config.name) - return d.redisConnect(ctx) -} - -func (d *baseRedisDataSource) redisClose(ctx context.Context) error { - d.mu.Lock() - defer d.mu.Unlock() - - if d.client != nil { - if err := d.client.Close(ctx); err != nil { - return fmt.Errorf("datasource [%s] close failed: %w", d.config.name, err) - } +// redisPing 检测 Redis 连接状态(带超时保护) +func redisPing(ctx context.Context, name string) bool { + // 确定数据源名称 + dsName := "default" + if !g.IsEmpty(name) { + dsName = name } - d.isConnected = false - glog.Infof(ctx, "datasource [%s] closed", d.config.name) - return nil -} + muRedis.RLock() + defer muRedis.RUnlock() -func (d *baseRedisDataSource) redisPing(ctx context.Context) bool { - d.mu.RLock() - client := d.client - d.mu.RUnlock() - - if client == nil { + rc, exists := redisConns[dsName] + if !exists || rc == nil { + g.Log().Errorf(ctx, "❌ Redis [%s] 连接未建立", dsName) return false } - _, err := client.Do(ctx, "PING") + // 创建带超时的子上下文,避免死锁 + timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + _, err := rc.Do(timeoutCtx, "PING") if err != nil { + g.Log().Errorf(ctx, "❌ Redis [%s] ping 失败: %v", dsName, err) return false } + + g.Log().Infof(ctx, "📊 Redis [%s] 连接正常", dsName) return true } -// ============================================================================= -// Redis 多数据源管理器 -// ============================================================================= - -type redisDataSourceManager struct { - sources map[string]redisDataSource - mu sync.RWMutex - ctx context.Context - cancel context.CancelFunc - started bool - maxRetries int - reconnectCh chan string -} - -var ( - globalRedisManager *redisDataSourceManager - redisManagerOnce sync.Once -) - -// getRedisManager 获取全局 Redis 管理器 -func getRedisManager() *redisDataSourceManager { - redisManagerOnce.Do(func() { - ctx, cancel := context.WithCancel(context.Background()) - globalRedisManager = &redisDataSourceManager{ - sources: make(map[string]redisDataSource), - ctx: ctx, - cancel: cancel, - started: false, - maxRetries: -1, // 默认无限重试 - reconnectCh: make(chan string, 100), - } - }) - return globalRedisManager -} - -// registerDataSource 注册 Redis 数据源 -func (m *redisDataSourceManager) registerDataSource(config *redisDataSourceConfig) error { - m.mu.Lock() - defer m.mu.Unlock() - - if _, exists := m.sources[config.name]; exists { - return fmt.Errorf("datasource [%s] already exists", config.name) +// redisClose 关闭 Redis 连接 +func redisClose(ctx context.Context, name string) error { + // 确定数据源名称 + dsName := "default" + if !g.IsEmpty(name) { + dsName = name } - source := newBaseRedisDataSource(config) - m.sources[config.name] = source + muRedis.Lock() + defer muRedis.Unlock() + + if rc, exists := redisConns[dsName]; exists && rc != nil { + if err := rc.Close(ctx); err != nil { + g.Log().Errorf(ctx, "❌ Redis [%s] 关闭失败: %v", dsName, err) + return err + } + delete(redisConns, dsName) + } + + g.Log().Infof(ctx, "✅ Redis [%s] 连接已关闭", dsName) return nil } -// getDataSource 获取 Redis 数据源 -func (m *redisDataSourceManager) getDataSource(name string) (redisDataSource, error) { - m.mu.RLock() - defer m.mu.RUnlock() - - source, exists := m.sources[name] - if !exists { - return nil, fmt.Errorf("datasource [%s] not found", name) +// getRedisConn 获取 Redis 连接(内部使用) +func getRedisConn(name string) *gredis.Redis { + dsName := "default" + if !g.IsEmpty(name) { + dsName = name } - return source, nil -} - -// getAllDataSourceNames 获取所有 Redis 数据源名称 -func (m *redisDataSourceManager) getAllDataSourceNames() []string { - m.mu.RLock() - defer m.mu.RUnlock() - - names := make([]string, 0, len(m.sources)) - for name := range m.sources { - names = append(names, name) - } - return names -} - -// initializeFromConfig 从配置初始化 Redis 数据源 -func (m *redisDataSourceManager) initializeFromConfig(ctx context.Context) error { - var firstErr error - - // 获取 redis 配置下的所有子键 - redisConfig := g.Cfg().MustGet(ctx, "redis") - if redisConfig.IsNil() { - glog.Warningf(ctx, "no redis configuration found in config.yml") - return nil - } - - // 将配置转换为 map - configMap := redisConfig.Map() - if configMap == nil { - glog.Warningf(ctx, "redis configuration is not a map") - return nil - } - - // 遍历所有 redis 子配置 - for name, subConfig := range configMap { - // 跳过非对象类型的配置 - subMap, ok := subConfig.(map[string]interface{}) - if !ok { - continue - } - - // 检查是否有 address 配置 - address, hasAddress := subMap["address"] - if !hasAddress || gconv.String(address) == "" { - continue - } - - // 构建数据源配置 - config := &redisDataSourceConfig{ - name: name, - address: gconv.String(address), - db: gconv.Int(subMap["db"]), - pass: gconv.String(subMap["pass"]), - maxRetries: gconv.Int(subMap["maxRetries"]), - retryInterval: gconv.Duration(subMap["retryInterval"]), - } - - // 设置默认值 - if config.maxRetries == 0 { - config.maxRetries = -1 // 默认无限重试 - } - if config.retryInterval == 0 { - config.retryInterval = 5 * time.Second - } - - // 注册数据源 - if err := m.registerDataSource(config); err != nil { - glog.Errorf(ctx, "failed to register datasource [%s]: %v", name, err) - if firstErr == nil { - firstErr = err - } - continue - } - - // 连接数据源 - source, _ := m.getDataSource(name) - if err := source.redisConnect(ctx); err != nil { - glog.Errorf(ctx, "failed to initialize datasource [%s]: %v", name, err) - if firstErr == nil { - firstErr = err - } - } - } - - return firstErr -} - -// startHealthCheck 启动健康检查 -func (m *redisDataSourceManager) startHealthCheck() { - if m.started { - return - } - m.started = true - - // 启动健康检查循环 - go m.healthCheckLoop() - - // 启动重连处理循环 - go m.reconnectLoop() -} - -// healthCheckLoop 健康检查循环 -func (m *redisDataSourceManager) healthCheckLoop() { - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - - for { - select { - case <-m.ctx.Done(): - return - case <-ticker.C: - m.checkConnections() - } - } -} - -// reconnectLoop 重连处理循环 -func (m *redisDataSourceManager) reconnectLoop() { - reconnectCounts := make(map[string]int) - - for { - select { - case <-m.ctx.Done(): - return - case name := <-m.reconnectCh: - go func(dsName string) { - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - source, err := m.getDataSource(dsName) - if err != nil { - glog.Errorf(ctx, "datasource [%s] not found for reconnect", dsName) - return - } - - if err := source.redisReconnect(ctx); err != nil { - glog.Errorf(ctx, "datasource [%s] reconnect failed: %v", dsName, err) - - // 记录重连次数 - reconnectCounts[dsName]++ - - // 检查重连次数限制 - if m.maxRetries > 0 && reconnectCounts[dsName] > m.maxRetries { - glog.Errorf(ctx, "datasource [%s] reconnect count %d exceeds limit %d, stopping auto-reconnect", - dsName, reconnectCounts[dsName], m.maxRetries) - return - } - - // 延迟后重新放入重连队列 - time.Sleep(5 * time.Second) - select { - case m.reconnectCh <- dsName: - default: - // 通道已满,丢弃通知 - } - } else { - // 重连成功,重置计数器 - reconnectCounts[dsName] = 0 - } - }(name) - } - } -} - -// checkConnections 检查连接状态 -func (m *redisDataSourceManager) checkConnections() { - m.mu.RLock() - defer m.mu.RUnlock() - - for name, source := range m.sources { - if !source.getIsConnected() { - glog.Warningf(context.Background(), "datasource [%s] disconnected, queued for reconnect", name) - - // 发送到重连队列 - select { - case m.reconnectCh <- name: - default: - // 通道已满,丢弃通知 - } - } - } -} - -// closeAll 关闭所有 Redis 数据源 -func (m *redisDataSourceManager) closeAll(ctx context.Context) error { - m.cancel() - - m.mu.RLock() - defer m.mu.RUnlock() - - var lastErr error - for name, source := range m.sources { - if err := source.redisClose(ctx); err != nil { - glog.Errorf(ctx, "failed to close datasource [%s]: %v", name, err) - lastErr = err - } - } - return lastErr -} - -// ============================================================================= -// 全局初始化 -// ============================================================================= - -var ( - redisManager = getRedisManager() -) - -// init 初始化 Redis 数据源 -func init() { - ctx := context.Background() - - // 从配置初始化多数据源 - if err := redisManager.initializeFromConfig(ctx); err != nil { - glog.Errorf(ctx, "❌ Failed to initialize Redis datasources: %v", err) - } else { - glog.Infof(ctx, "✅ Redis datasources initialized: %v", redisManager.getAllDataSourceNames()) - } - - // 启动健康检查 - redisManager.startHealthCheck() - - // 设置优雅关闭 - setupGracefulShutdown() -} - -// setupGracefulShutdown 设置优雅关闭 -func setupGracefulShutdown() { - go func() { - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - <-sigCh - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - glog.Info(ctx, "🔄 Shutting down Redis connections...") - if err := redisManager.closeAll(ctx); err != nil { - glog.Errorf(ctx, "❌ Failed to close Redis connections: %v", err) - } else { - glog.Info(ctx, "✅ Redis connections closed successfully") - } - }() -} - -// ============================================================================= -// 私有辅助函数 -// ============================================================================= - -// getDefaultDataSource 获取默认数据源 -func getDefaultDataSource() (redisDataSource, error) { - return redisManager.getDataSource("default") + return redisConns[dsName] } diff --git a/message/msg_interfaces.go b/message/msg_interfaces.go index 125cf81..db1c86d 100644 --- a/message/msg_interfaces.go +++ b/message/msg_interfaces.go @@ -6,6 +6,10 @@ type messagePublishConfig interface { GetPublishMsgType() } +type messagePublishDelayConfig interface { + GetPublishDelayMsgType() +} + type messageSubscribeConfig interface { GetSubscribeMsgType() } @@ -15,12 +19,14 @@ type messageSubscribeConfig interface { type messageUtil interface { // Publish 发布消息 Publish(ctx context.Context, msg messagePublishConfig) error + // PublishDelay 发布延迟消息 + PublishDelay(ctx context.Context, msg messagePublishDelayConfig) error // Subscribe 订阅消息 Subscribe(ctx context.Context, msg messageSubscribeConfig) error // Ping 检测连接状态 - ping(ctx context.Context) bool - // Reconnect 重连 - reconnect(ctx context.Context) error + Ping(ctx context.Context) bool + // Connect 连接 + Connect(ctx context.Context) error // Close 关闭连接 - close(ctx context.Context) error + Close(ctx context.Context) error } diff --git a/message/msg_plugin_manager.go b/message/msg_plugin_manager.go index 84c79eb..8340a8a 100644 --- a/message/msg_plugin_manager.go +++ b/message/msg_plugin_manager.go @@ -3,10 +3,10 @@ package message import ( "context" "fmt" - "sync" "time" "github.com/gogf/gf/v2/frame/g" + "sync" ) // MessageType 消息队列类型 @@ -32,7 +32,6 @@ type pluginManager struct { var ( defaultPluginManager = newPluginManager() - // 不再支持默认插件类型,必须显式指定类型 ) // newPluginManager 创建插件管理器 @@ -42,63 +41,6 @@ func newPluginManager() *pluginManager { } } -// RegisterPlugin 注册消息队列插件 -// 所有插件必须通过此方法注册,自动进行连接检测 -// 只有连接成功的插件才会被注册,连接失败的插件不会被注册 -// 异步无限重连,只有连接成功了才注册 -func registerPlugin(msgType messageType, factory configFactory) error { - if factory == nil { - return fmt.Errorf("factory cannot be nil") - } - - // 创建实例 - instance := factory() - ctx := context.Background() - - // 开启异步连接,无限重连直到成功 - go func() { - retryInterval := 2 * time.Second - maxInterval := 30 * time.Second - - for { - select { - case <-ctx.Done(): - g.Log().Errorf(ctx, "❌ [%s] 注册被取消", msgType) - return - default: - // 尝试连接(使用Reconnect方法) - if err := instance.reconnect(ctx); err == nil { - // 连接成功,注册插件 - if err := defaultPluginManager.register(msgType, instance); err != nil { - g.Log().Errorf(ctx, "❌ [%s] 注册插件失败: %v", msgType, err) - instance.close(ctx) - } else { - g.Log().Infof(ctx, "✅ [%s] 插件注册成功", msgType) - } - return - } - - // 连接失败,记录日志并等待重试 - g.Log().Warningf(ctx, "⚠️ [%s] 连接失败,%v 后重试...", msgType, retryInterval) - - select { - case <-time.After(retryInterval): - // 增加重试间隔,但不超过最大值 - retryInterval *= 2 - if retryInterval > maxInterval { - retryInterval = maxInterval - } - case <-ctx.Done(): - g.Log().Errorf(ctx, "❌ [%s] 注册被取消", msgType) - return - } - } - } - }() - - return nil -} - // register 注册插件(内部方法) func (m *pluginManager) register(msgType messageType, instance messageUtil) error { m.mu.Lock() @@ -107,27 +49,66 @@ func (m *pluginManager) register(msgType messageType, instance messageUtil) erro return nil } -// GetMsgPlugin 获取消息队列插件 -func GetMsgPlugin(msgType messageType) (messageUtil, error) { - defaultPluginManager.mu.RLock() - instance, ok := defaultPluginManager.instances[msgType] - defaultPluginManager.mu.RUnlock() - - if !ok { - return nil, fmt.Errorf("unsupported message type: %s", msgType) +// RegisterPlugin 注册消息队列插件 +// 所有插件必须通过此方法注册,自动进行连接检测 +// 只有连接成功的插件才会被注册,连接失败的插件不会被注册 +// 异步无限重连,只有连接成功了才注册 +// name: 数据源名称,用于标识不同的连接实例 +func RegisterPlugin(ctx context.Context, name string, msgType messageType, factory configFactory) error { + if factory == nil { + g.Log().Errorf(ctx, "❌ factory cannot be nil") + return fmt.Errorf("factory cannot be nil") } - - return instance, nil + // 开启异步连接,无限重试直到成功 + go func() { + // 创建实例 + instance := factory() + // 创建通知 channel + pluginKey := fmt.Sprintf("%s-%s", msgType, name) + if !instance.Ping(ctx) { + // 使用统一的重连函数 + if err := commonConnect(ctx, msgType, name, func(ctx context.Context) error { + return instance.Connect(ctx) + }, func(ctx context.Context) error { + return instance.Close(ctx) + }); err != nil { + g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", msgType, name, err) + return + } + } + // 连接成功,注册插件 + defaultPluginManager.mu.Lock() + defaultPluginManager.instances[messageType(pluginKey)] = instance + defaultPluginManager.mu.Unlock() + g.Log().Infof(ctx, "✅ [%s][%s] 插件注册成功", msgType, name) + }() + return nil } -// GetSupportedTypes 获取所有已注册的插件类型 -func GetSupportedTypes() []messageType { - defaultPluginManager.mu.RLock() - defer defaultPluginManager.mu.RUnlock() - - types := make([]messageType, 0, len(defaultPluginManager.instances)) - for t := range defaultPluginManager.instances { - types = append(types, t) - } - return types +// GetMsgPlugin 获取消息队列插件(默认数据源),如果未注册则等待 +func GetMsgPlugin(ctx context.Context, msgType messageType) (messageUtil, error) { + return GetMsgPluginWithName(ctx, msgType, "default") +} + +// GetMsgPluginWithName 获取指定数据源的消息队列插件,如果未注册则等待直到超时 +func GetMsgPluginWithName(ctx context.Context, msgType messageType, name string) (messageUtil, error) { + pluginKey := fmt.Sprintf("%s-%s", msgType, name) + + for { + defaultPluginManager.mu.RLock() + instance, ok := defaultPluginManager.instances[messageType(pluginKey)] + defaultPluginManager.mu.RUnlock() + + if ok { + return instance, nil + } + + // 未注册,等待一段时间后重试 + select { + case <-ctx.Done(): + return nil, fmt.Errorf("wait for plugin ready canceled: %s with datasource: %s", msgType, name) + default: + time.Sleep(3 * time.Second) + } + } } diff --git a/message/nats_msg.go b/message/nats_msg.go index 205ac07..365c171 100644 --- a/message/nats_msg.go +++ b/message/nats_msg.go @@ -6,11 +6,16 @@ import ( "fmt" "github.com/gogf/gf/v2/frame/g" "github.com/nats-io/nats.go" - "github.com/nats-io/nats.go/jetstream" "time" ) type NatsPublishMsgConfig struct { + QueueName string + Durable bool + Data any +} + +type NatsPublishDelayMsgConfig struct { QueueName string Durable bool DelayTime int @@ -19,9 +24,9 @@ type NatsPublishMsgConfig struct { type NatsSubscribeMsgConfig struct { QueueName string + ConsumerName string Durable bool DelayTime int - ConsumerName string AutoAck bool PrefetchCount int HandleFunc func(ctx context.Context, message map[string]interface{}) error @@ -31,32 +36,38 @@ func (*NatsPublishMsgConfig) GetPublishMsgType() { } +func (*NatsPublishDelayMsgConfig) GetPublishDelayMsgType() { + +} + func (*NatsSubscribeMsgConfig) GetSubscribeMsgType() { } +type natsMsg struct { + name string // 数据源名称 +} + func init() { - // 注册 Nats 插件,必须使用 RegisterPlugin 确保连接检测 - registerPlugin(MessageNATS, func() messageUtil { - return &natsMsg{} + // 注册 Nats 插件(默认数据源) + RegisterPlugin(context.Background(), "default", MessageNATS, func() messageUtil { + return &natsMsg{name: "default"} }) } -type natsMsg struct{} - -// Ping 检测 NATS 连接状态 -func (c *natsMsg) ping(_ context.Context) bool { - return natsPing() +// Connect 连接 NATS +func (c *natsMsg) Connect(ctx context.Context) error { + return natsConnect(ctx, c.name) } -// Reconnect 重连 NATS -func (c *natsMsg) reconnect(ctx context.Context) error { - return natsReconnect(ctx) +// Ping 检测 NATS 连接状态 +func (c *natsMsg) Ping(ctx context.Context) bool { + return natsPing(ctx, c.name) } // Close 关闭 NATS 连接 -func (c *natsMsg) close(ctx context.Context) error { - return natsClose(ctx) +func (c *natsMsg) Close(ctx context.Context) error { + return natsClose(ctx, c.name) } // Publish 发布消息 @@ -71,13 +82,31 @@ func (c *natsMsg) Publish(ctx context.Context, msgConfig messagePublishConfig) e if g.IsEmpty(cfg.Data) { return fmt.Errorf("必须提供数据") } + return c.createPublish(ctx, cfg.QueueName, cfg.Durable, 0, cfg.Data) +} + +// PublishDelay 发布延迟消息 +func (c *natsMsg) PublishDelay(ctx context.Context, msgConfig messagePublishDelayConfig) error { + cfg, ok := msgConfig.(*NatsPublishDelayMsgConfig) + if !ok { + return fmt.Errorf("无效的 NATS 配置类型") + } + if g.IsEmpty(cfg.QueueName) { + return fmt.Errorf("必须提供队列名称") + } + if g.IsEmpty(cfg.DelayTime) { + return fmt.Errorf("延迟时间必须大于 0") + } + if g.IsEmpty(cfg.Data) { + return fmt.Errorf("必须提供数据") + } return c.createPublish(ctx, cfg.QueueName, cfg.Durable, cfg.DelayTime, cfg.Data) } // Publish 发布消息 func (c *natsMsg) createPublish(ctx context.Context, subject string, durable bool, delayTime int, data any) error { delayMsg := delayTime > 0 - if err := c.createStreamGroupInternal(ctx, subject, durable, delayMsg); err != nil { + if err := c.createStream(ctx, subject, durable, delayMsg); err != nil { return err } payload, err := json.Marshal(data) @@ -85,96 +114,30 @@ func (c *natsMsg) createPublish(ctx context.Context, subject string, durable boo return fmt.Errorf("序列化数据失败: %w", err) } - msg := &nats.Msg{ - Subject: subject, - Data: payload, - } + m := nats.NewMsg(subject) + m.Data = payload // 所有消息都需要设置数据 if delayMsg { - // 计算目标投递时间 - targetTime := time.Now().Add(time.Duration(delayTime) * time.Second) - delayNs := time.Until(targetTime).Nanoseconds() - if delayNs < 0 { - delayNs = 0 - } - - g.Log().Infof(ctx, "📅 NATS 延迟消息配置: DelayTime=%d秒, TargetTime=%v, DelayNs=%d纳秒(%.2f秒)", - delayTime, targetTime.Format("2006-01-02 15:04:05"), delayNs, float64(delayNs)/float64(time.Second.Nanoseconds())) - - // NATS JetStream 延迟消息使用 Nats-Msg-Delay Header(纳秒数) - msg.Header = nats.Header{ - "Nats-Msg-Delay": []string{fmt.Sprintf("%d", delayNs)}, - } - g.Log().Infof(ctx, "📅 NATS 延迟消息 Header: %v", msg.Header) - - // 获取 Stream 配置验证 - streamName, _ := getStreamInfo(durable, delayMsg) - stream, err := js.Stream(ctx, streamName) - if err == nil { - info, _ := stream.Info(ctx) - g.Log().Infof(ctx, "📅 Stream 配置: AllowMsgSchedules=%v, Storage=%v", - info.Config.AllowMsgSchedules, info.Config.Storage) - if !info.Config.AllowMsgSchedules { - g.Log().Errorf(ctx, "❌ Stream 不支持延迟消息!AllowMsgSchedules=false") - } - } + // 使用 @at 指定具体延迟时间,而不是 @every 重复执行 + futureTime := time.Now().Add(time.Duration(delayTime) * time.Second).Format(time.RFC3339Nano) + m.Header.Set("Nats-Schedule", fmt.Sprintf("@at %s", futureTime)) + m.Subject = subject + ".schedule" + m.Header.Set("Nats-Schedule-Target", subject) + g.Log().Infof(ctx, "📅 NATS 延迟消息配置: DelayTime=%ds, Schedule=@at %s, Header=%s", delayTime, futureTime, m.Header) } // 发布消息到 JetStream - ack, err := js.PublishMsg(ctx, msg) + js := getNatsJS(c.name) + if js == nil { + g.Log().Errorf(ctx, "❌ NATS [%s] JetStream 不存在", c.name) + return fmt.Errorf("NATS JetStream 不存在") + } + ack, err := js.PublishMsg(m) if err != nil { - g.Log().Errorf(ctx, "❌ NATS 发布消息失败: err=%v", err) + g.Log().Errorf(ctx, "❌ NATS 发布消息失败: err=%v, Subject=%s", err, m.Subject) return err } - - g.Log().Infof(ctx, "✅ NATS 发布消息成功: StreamSeq=%d, Domain=%s", ack.Sequence, ack.Domain) - return nil -} - -// createStreamGroup 内部创建消费组 -func (c *natsMsg) createStreamGroupInternal(ctx context.Context, subject string, durable, delayMsg bool) error { - streamName, storage := getStreamInfo(durable, delayMsg) - - // 先检查 Stream 是否存在 - stream, err := js.Stream(ctx, streamName) - if err == nil { - // Stream 已存在,检查配置是否匹配 - info, _ := stream.Info(ctx) - if info.Config.AllowMsgSchedules != delayMsg || info.Config.Storage != storage { - g.Log().Infof(ctx, "🔄 Stream 配置不匹配,正在重新创建: stream=%s, 当前AllowMsgSchedules=%v, 需要%v", - streamName, info.Config.AllowMsgSchedules, delayMsg) - // 删除旧 Stream - if err := js.DeleteStream(ctx, streamName); err != nil { - g.Log().Warningf(ctx, "删除旧 Stream 失败: %v", err) - } - } else { - g.Log().Infof(ctx, "✅ Stream 已存在且配置正确: stream=%s", streamName) - return nil - } - } - - // 构建流配置 - jsConfig := jetstream.StreamConfig{ - Name: streamName, - Subjects: []string{subject}, - AllowMsgSchedules: delayMsg, // 延迟消息核心开关 - Storage: storage, - Discard: jetstream.DiscardOld, // 达到上限删除旧消息 - } - - stream, err = js.CreateStream(ctx, jsConfig) - if err != nil { - return fmt.Errorf("创建任务流失败: %w", err) - } - - // 获取 Stream 信息验证配置 - info, err := stream.Info(ctx) - if err == nil { - g.Log().Infof(ctx, "✅ NATS 队列初始化成功: stream=%s, AllowMsgSchedules=%v, Storage=%v", - streamName, info.Config.AllowMsgSchedules, info.Config.Storage) - } - - g.Log().Infof(ctx, "✅ NATS 队列初始化成功: stream=%s", streamName) + g.Log().Infof(ctx, "✅ NATS 发布消息成功: Stream=%v, StreamSeq=%d", ack.Stream, ack.Sequence) return nil } @@ -196,110 +159,215 @@ func (c *natsMsg) Subscribe(ctx context.Context, msgConfig messageSubscribeConfi if g.IsEmpty(cfg.PrefetchCount) { cfg.PrefetchCount = 1 } - return c.createSubscribeInternal(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.AutoAck, cfg.Durable, cfg.DelayTime, cfg.HandleFunc) + return c.createSubscribe(ctx, cfg.QueueName, cfg.ConsumerName, cfg.PrefetchCount, cfg.DelayTime, cfg.AutoAck, cfg.Durable, cfg.HandleFunc) } // createSubscribe 内部订阅消息 -func (c *natsMsg) createSubscribeInternal(ctx context.Context, subject, consumerName string, prefetchCount int, autoAck, durable bool, delayTime int, handler func(ctx context.Context, message map[string]interface{}) error) error { +func (c *natsMsg) createSubscribe(ctx context.Context, subject, consumerName string, prefetchCount, delayTime int, autoAck, durable bool, handler func(ctx context.Context, message map[string]any) error) error { g.Log().Infof(ctx, "🔔 NATS 开始订阅: QueueName=%s, ConsumerName=%s", subject, consumerName) - delayMsg := delayTime > 0 - streamName, _ := getStreamInfo(durable, delayMsg) - - // 确保 Stream 存在,如果不存在则创建 - if err := c.createStreamGroupInternal(ctx, subject, durable, delayMsg); err != nil { - g.Log().Errorf(ctx, "创建 Stream 失败: %v", err) - return fmt.Errorf("创建 Stream 失败: %w", err) - } - - // Stream 不存在,创建新的 - ackPolicy := jetstream.AckExplicitPolicy - if autoAck { - ackPolicy = jetstream.AckNonePolicy - } - jsConfig := jetstream.ConsumerConfig{ - Name: consumerName, - Durable: consumerName, - FilterSubject: subject, - AckPolicy: ackPolicy, - MaxDeliver: 3, - MaxAckPending: prefetchCount, - } - // 创建新消费者 - consumer, err := js.CreateOrUpdateConsumer(ctx, streamName, jsConfig) - if err != nil { - g.Log().Errorf(ctx, "创建消费者失败: %v", err) - return err - } - - // 获取消费者信息验证 - if cInfo, err := consumer.Info(ctx); err == nil { - g.Log().Infof(ctx, "🔔 消费者创建成功: %s, AckPolicy=%v, MaxAckPending=%d", - cInfo.Name, cInfo.Config.AckPolicy, cInfo.Config.MaxAckPending) - } - - // 创建消息处理函数 - msgHandler := func(msg jetstream.Msg) { - // 记录消息接收时间 - now := time.Now() - meta, err := msg.Metadata() - if err == nil { - g.Log().Infof(ctx, "📨 收到消息: StreamSeq=%d, Published=%v, Received=%v, 距离发布=%.2f秒", - meta.Sequence.Stream, - meta.Timestamp.Format("2006-01-02 15:04:05"), - now.Format("2006-01-02 15:04:05"), - now.Sub(meta.Timestamp).Seconds()) - } - - // 解析消息 + // 创建推送订阅的回调函数 + msgHandler := func(msg *nats.Msg) { var data map[string]any - if err := json.Unmarshal(msg.Data(), &data); err != nil { - g.Log().Errorf(ctx, "解析消息失败: %v", err) - if err := msg.Nak(); err != nil { - g.Log().Errorf(ctx, "Nak 失败: %v", err) - } + if err := json.Unmarshal(msg.Data, &data); err != nil { + g.Log().Errorf(ctx, "❌ 解析消息失败: %v", err) return } + g.Log().Infof(ctx, "📨 收到消息: Subject=%s, Data=%v", msg.Subject, data) + // 处理业务逻辑 if err := handler(ctx, data); err != nil { - g.Log().Errorf(ctx, "处理消息失败: %v", err) - if err := msg.Nak(); err != nil { - g.Log().Errorf(ctx, "Nak 失败: %v", err) + g.Log().Errorf(ctx, "❌ 处理消息失败: %v", err) + if !autoAck { + if err := msg.Nak(); err != nil { + g.Log().Errorf(ctx, "❌ Nak 失败: %v", err) + return + } + return } + } else { + g.Log().Infof(ctx, "✅ 处理消息成功") + } + if err := msg.Ack(); err != nil { + g.Log().Errorf(ctx, "❌ Ack 失败: %v", err) + } + } + delayMsg := delayTime > 0 + // 创建流 + if err := c.createStream(ctx, subject, durable, delayMsg); err != nil { + return err + } + // 获取 JetStream 上下文 + js := getNatsJS(c.name) + if js == nil { + g.Log().Errorf(ctx, "❌ NATS [%s] JetStream 不存在", c.name) + return fmt.Errorf("NATS JetStream 不存在") + } + // 创建推送订阅 + var sub *nats.Subscription + var err error + // 配置订阅选项 - 使用 DeliverSubject 创建 Push Consumer + subOpts := []nats.SubOpt{ + nats.Durable(consumerName), + nats.MaxAckPending(prefetchCount), + nats.DeliverSubject(consumerName), + } + if !autoAck { + subOpts = append(subOpts, nats.ManualAck()) + } + // 使用 Subscribe 创建推送订阅 + sub, err = js.Subscribe(subject, msgHandler, subOpts...) + if err != nil { + g.Log().Errorf(ctx, "创建推送订阅失败: %v", err) + return err + } + g.Log().Infof(ctx, "✅ NATS 推送订阅成功: Consumer=%s", consumerName) + // 启动后台 goroutine 监听上下文取消,用于清理订阅 + go func() { + <-ctx.Done() + g.Log().Infof(ctx, "订阅上下文取消,取消订阅") + if err := sub.Unsubscribe(); err != nil { return } - g.Log().Infof(ctx, "处理消息成功") - if !autoAck { - if err := msg.Ack(); err != nil { - g.Log().Errorf(ctx, "Ack 失败: %v", err) - } - } - } - - // 开始消费 - _, err = consumer.Consume(msgHandler) - if err != nil { - return fmt.Errorf("开始消费失败: %w", err) - } - - g.Log().Infof(ctx, "✅ NATS 订阅成功") + }() return nil } -func getStreamInfo(durable, delayMsg bool) (string, jetstream.StorageType) { +// createStream 内部创建消费组 +func (c *natsMsg) createStream(ctx context.Context, subject string, durable, delayMsg bool) error { + streamName, storage := getStreamInfo(durable, delayMsg) + // 构建流配置 + // 如果是延迟消息,需要包含两个 subjects: + // 1. subject.schedule - 用于发送调度消息 + // 2. subject - 用于实际投递目标 + subjects := []string{subject} + if delayMsg { + subjects = []string{subject, subject + ".schedule"} + } + jsConfig := &StreamConfig{ + Name: streamName, + Subjects: subjects, + AllowMsgSchedules: delayMsg, // 延迟消息核心开关 + Storage: storage, + Discard: DiscardNew, // 达到上限删除旧消息 + } + nc := getNatsConn(c.name) + if !c.Ping(ctx) { + // 使用统一的重连函数 + if err := commonConnect(ctx, MessageNATS, c.name, func(ctx context.Context) error { + return c.Connect(ctx) + }, func(ctx context.Context) error { + return c.Close(ctx) + }); err != nil { + g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageNATS, c.name, err) + return err + } + } + if nc == nil { + g.Log().Errorf(ctx, "❌ NATS [%s] 连接不存在", c.name) + return fmt.Errorf("NATS 连接不存在") + } + err := jsStreamCreate(nc, jsConfig) + if err != nil { + g.Log().Errorf(ctx, "❌ 创建 Stream 失败: err=%v", err) + return err + } + g.Log().Infof(ctx, "✅ 创建 Stream 成功: stream=%s, subjects=%v, allowSchedules=%v", streamName, subjects, delayMsg) + return nil +} + +func getStreamInfo(durable, delayMsg bool) (string, StorageType) { // Stream 不存在,创建新的 streamName := "ordinary_msg_memory" - storage := jetstream.MemoryStorage + storage := MemoryStorage // 延迟消息必须使用 FileStorage(NATS 官方要求) if delayMsg { - streamName = "delay_msg_file" - storage = jetstream.FileStorage + if durable { + streamName = "delay_msg_file" + storage = FileStorage + } else { + streamName = "delay_msg_memory" + storage = MemoryStorage + } } else { if durable { streamName = "ordinary_msg_file" - storage = jetstream.FileStorage + storage = FileStorage } } return streamName, storage } + +const ( + // JSApiStreamCreateT is the endpoint to create new streams. + // Will return JSON response. + JSApiStreamCreateT = "$JS.API.STREAM.CREATE.%s" + + // JSApiStreamUpdateT is the endpoint to update existing streams. + // Will return JSON response. + JSApiStreamUpdateT = "$JS.API.STREAM.UPDATE.%s" +) + +// jsStreamCreate is for sending a stream create for fields that nats.go does not know about yet. +func jsStreamCreate(nc *nats.Conn, cfg *StreamConfig) error { + j, err := json.Marshal(cfg) + if err != nil { + return err + } + + msg, err := nc.Request(fmt.Sprintf(JSApiStreamCreateT, cfg.Name), j, time.Second*3) + if err != nil { + return err + } + + // 检查 API 响应中的错误 + var resp struct { + Error *struct { + Code int `json:"code"` + ErrCode int `json:"err_code"` + Description string `json:"description"` + } `json:"error,omitempty"` + } + if err := json.Unmarshal(msg.Data, &resp); err != nil { + return err + } + if resp.Error != nil { + // 如果 Stream 已存在,尝试更新 + if resp.Error.ErrCode == 10058 { // JSStreamNameExistErr + return jsStreamUpdate(nc, cfg) + } + return fmt.Errorf("JS API error: %s", resp.Error.Description) + } + + return nil +} + +// jsStreamUpdate is for sending a stream create for fields that nats.go does not know about yet. +func jsStreamUpdate(nc *nats.Conn, cfg *StreamConfig) error { + j, err := json.Marshal(cfg) + if err != nil { + return err + } + msg, err := nc.Request(fmt.Sprintf(JSApiStreamUpdateT, cfg.Name), j, time.Second*3) + if err != nil { + return err + } + + // 检查 API 响应中的错误 + var resp struct { + Error *struct { + Code int `json:"code"` + ErrCode int `json:"err_code"` + Description string `json:"description"` + } `json:"error,omitempty"` + } + if err := json.Unmarshal(msg.Data, &resp); err != nil { + return err + } + if resp.Error != nil { + return fmt.Errorf("JS API error: %s", resp.Error.Description) + } + + return nil +} diff --git a/message/nats_rpc.go b/message/nats_rpc.go index e204743..8e09938 100644 --- a/message/nats_rpc.go +++ b/message/nats_rpc.go @@ -31,6 +31,9 @@ var ( traceCancelMu sync.RWMutex // 取消主题前缀 cancelSubjectPrefix = "ctx.cancel.otel." + + // RPC 使用的默认数据源名称 + rpcDefaultDatasource = "default" ) // rpcHandler RPC 处理函数类型 @@ -42,7 +45,7 @@ type rpcHandler func(ctx context.Context, req []byte) (any, error) // serviceName: 服务名称,调用方通过此名称调用服务 // handler: 服务处理函数,接收请求并返回响应 func registerRPCService(serviceName string, handler rpcHandler) (err error) { - if !natsPing() { + if !natsPing(context.Background(), rpcDefaultDatasource) { return fmt.Errorf("NATS 未连接") } @@ -63,6 +66,11 @@ func registerRPCService(serviceName string, handler rpcHandler) (err error) { rpcServicesMu.Unlock() // 订阅服务主题 + nc := getNatsConn(rpcDefaultDatasource) + if nc == nil { + return fmt.Errorf("NATS 连接不存在") + } + subject := fmt.Sprintf("rpc.%s", serviceName) sub, err := nc.Subscribe(subject, func(msg *nats.Msg) { // 执行处理函数 @@ -84,7 +92,7 @@ func registerRPCService(serviceName string, handler rpcHandler) (err error) { // queueName: 队列组名,同一队列组的实例共享请求 // handler: 服务处理函数 func registerQueueRPCService(serviceName, queueName string, handler rpcHandler) (err error) { - if !natsPing() { + if !natsPing(context.Background(), rpcDefaultDatasource) { return fmt.Errorf("NATS 未连接") } @@ -111,6 +119,11 @@ func registerQueueRPCService(serviceName, queueName string, handler rpcHandler) queueRPCMu.Unlock() // 订阅服务主题(队列模式) + nc := getNatsConn(rpcDefaultDatasource) + if nc == nil { + return fmt.Errorf("NATS 连接不存在") + } + subject := fmt.Sprintf("rpc.%s", serviceName) sub, err := nc.QueueSubscribe(subject, queueName, func(msg *nats.Msg) { // 执行处理函数 @@ -209,7 +222,7 @@ func createCancelContext(ctx context.Context, traceID string) context.Context { // // sub, err := nats.SetupCancelListener(ctx) func setupCancelListener(ctx context.Context) (*nats.Subscription, error) { - if !natsPing() { + if !natsPing(ctx, rpcDefaultDatasource) { return nil, fmt.Errorf("NATS 未连接") } @@ -219,6 +232,11 @@ func setupCancelListener(ctx context.Context) (*nats.Subscription, error) { // 修复问题3:订阅取消主题,格式: ctx.cancel.otel.* // 使用 * 通配符而不是 >,因为 TraceID 是最后一部分 + nc := getNatsConn(rpcDefaultDatasource) + if nc == nil { + return nil, fmt.Errorf("NATS 连接不存在") + } + cancelSubject := cancelSubjectPrefix + "*" sub, err := nc.Subscribe(cancelSubject, func(msg *nats.Msg) { // 从主题中解析 TraceID (去除前缀) @@ -261,7 +279,7 @@ func setupCancelListener(ctx context.Context) (*nats.Subscription, error) { // // err := nats.publishCancel(ctx, traceID) func publishCancel(ctx context.Context, traceID string) error { - if !natsPing() { + if !natsPing(ctx, rpcDefaultDatasource) { return fmt.Errorf("NATS 未连接") } @@ -269,6 +287,11 @@ func publishCancel(ctx context.Context, traceID string) error { return fmt.Errorf("TraceID 不能为空") } + nc := getNatsConn(rpcDefaultDatasource) + if nc == nil { + return fmt.Errorf("NATS 连接不存在") + } + cancelSubject := cancelSubjectPrefix + traceID err := nc.Publish(cancelSubject, nil) if err != nil { @@ -303,7 +326,7 @@ func cleanupTraceCancel(traceID string) { // req: 请求数据 // 返回: 响应数据(任意类型)和错误 func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err error) { - if !natsPing() { + if !natsPing(ctx, rpcDefaultDatasource) { return fmt.Errorf("NATS 未连接") } @@ -406,6 +429,11 @@ func CallRPC(ctx context.Context, serviceName string, req any, resp any) (err er } // 发送请求 + nc := getNatsConn(rpcDefaultDatasource) + if nc == nil { + return fmt.Errorf("NATS 连接不存在") + } + responseMsg, err := nc.RequestMsgWithContext(ctx, msg) // 关闭 done channel,通知 goroutine 退出 @@ -475,7 +503,7 @@ func WithExcludeMethods(methods ...string) registerServiceOption { // }, WithQueueGroup("order-group")) func AutoRegisterServices(ctx context.Context, serviceInstances map[string]interface{}, options ...registerServiceOption) error { // 先注册 RPC 服务(如果 NATS 不可用则记录警告但不阻塞启动) - if !natsPing() { + if !natsPing(ctx, rpcDefaultDatasource) { return fmt.Errorf("NATS 未连接,RPC 服务未注册") } @@ -512,7 +540,7 @@ func AutoRegisterServices(ctx context.Context, serviceInstances map[string]inter // registerService 注册单个服务的所有公开方法(内部函数) func registerService(service interface{}, serviceNamePrefix string, options ...registerServiceOption) (err error) { - if !natsPing() { + if !natsPing(context.Background(), rpcDefaultDatasource) { return fmt.Errorf("NATS 未连接") } diff --git a/message/rabbitmq_msg.go b/message/rabbitmq_msg.go index 0a65e20..3eab2a8 100644 --- a/message/rabbitmq_msg.go +++ b/message/rabbitmq_msg.go @@ -11,6 +11,12 @@ import ( ) type RabbitMQPublishMsgConfig struct { + QueueName string + Durable bool + Data any +} + +type RabbitMQPublishDelayMsgConfig struct { QueueName string Durable bool DelayTime int @@ -19,8 +25,6 @@ type RabbitMQPublishMsgConfig struct { type RabbitMQSubscribeMsgConfig struct { QueueName string - Durable bool - DelayTime int ConsumerName string AutoAck bool PrefetchCount int @@ -31,32 +35,36 @@ func (*RabbitMQPublishMsgConfig) GetPublishMsgType() { } +func (*RabbitMQPublishDelayMsgConfig) GetPublishDelayMsgType() {} + func (*RabbitMQSubscribeMsgConfig) GetSubscribeMsgType() { } -func init() { - // 注册 RabbitMQ 插件,必须使用 RegisterPlugin 确保连接检测 - //registerPlugin(MessageRabbitMQ, func() messageUtil { - // return &rabbitMQ{} - //}) +type rabbitMQ struct { + name string // 数据源名称 } -type rabbitMQ struct{} +func init() { + // 注册 RabbitMQ 插件(默认数据源) + RegisterPlugin(context.Background(), "default", MessageRabbitMQ, func() messageUtil { + return &rabbitMQ{name: "default"} + }) +} + +// Connect 连接 RabbitMQ +func (c *rabbitMQ) Connect(ctx context.Context) error { + return rabbitmqConnect(ctx, c.name) +} // Ping 检测 RabbitMQ 连接状态 -func (c *rabbitMQ) ping(ctx context.Context) bool { - return rabbitmqPing() -} - -// Reconnect 重连 RabbitMQ -func (c *rabbitMQ) reconnect(ctx context.Context) error { - return rabbitmqReconnect(ctx) +func (c *rabbitMQ) Ping(ctx context.Context) bool { + return rabbitmqPing(ctx, c.name) } // Close 关闭 RabbitMQ 连接 -func (c *rabbitMQ) close(ctx context.Context) error { - return rabbitmqClose(ctx) +func (c *rabbitMQ) Close(ctx context.Context) error { + return rabbitmqClose(ctx, c.name) } // Publish 发布消息 @@ -71,11 +79,43 @@ func (c *rabbitMQ) Publish(ctx context.Context, msgConfig messagePublishConfig) if cfg.Data == nil { return fmt.Errorf("数据不能为空") } + return c.publishMessageInternal(ctx, cfg.QueueName, cfg.Durable, 0, cfg.Data) +} + +// PublishDelay 发布延迟消息 +func (c *rabbitMQ) PublishDelay(ctx context.Context, msgConfig messagePublishDelayConfig) error { + cfg, ok := msgConfig.(*RabbitMQPublishDelayMsgConfig) + if !ok { + return fmt.Errorf("无效的 RabbitMQ 配置类型") + } + if g.IsEmpty(cfg.QueueName) { + return fmt.Errorf("队列名称不能为空") + } + if cfg.Data == nil { + return fmt.Errorf("数据不能为空") + } return c.publishMessageInternal(ctx, cfg.QueueName, cfg.Durable, cfg.DelayTime, cfg.Data) } // publishMessage 发布消息内部实现 func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string, durable bool, delayTime int, data interface{}) error { + if !c.Ping(ctx) { + if err := commonConnect(ctx, MessageRabbitMQ, c.name, func(ctx context.Context) error { + return c.Connect(ctx) + }, func(ctx context.Context) error { + return c.Close(ctx) + }); err != nil { + g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageRabbitMQ, c.name, err) + return err + } + } + + channel := getRabbitMQChannel(c.name) + if channel == nil || channel.IsClosed() { + g.Log().Errorf(ctx, "❌ RabbitMQ [%s] Channel 不存在或已关闭", c.name) + return fmt.Errorf("RabbitMQ Channel 不存在或已关闭") + } + delayMsg := delayTime > 0 // 1. 决定 Exchange 类型 @@ -86,12 +126,12 @@ func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string, if delayMsg { exchangeType = "x-delayed-message" exchangeName = queueName + ".delayed" - args["x-delayed-type"] = "fanout" // 底层用 topic + args["x-delayed-type"] = "fanout" } - // 2. 声明 Exchange(只声明一次) + // 2. 声明 Exchange(使用 exchangeName 而不是 queueName) if err := channel.ExchangeDeclare( - queueName, // exchange 交换机名称 + exchangeName, // 修复:使用正确的交换机名称 exchangeType, durable, false, // autoDelete @@ -99,7 +139,8 @@ func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string, false, // noWait args, ); err != nil { - return fmt.Errorf("声明 Exchange 失败: %w", err) + g.Log().Errorf(ctx, "❌ 声明 Exchange 失败: %v", err) + return err } // 3. 声明队列 @@ -111,7 +152,8 @@ func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string, false, // noWait nil, // args ); err != nil { - return fmt.Errorf("声明队列失败: %w", err) + g.Log().Errorf(ctx, "❌ 声明队列失败: %v", err) + return err } // 4. 绑定队列 @@ -122,13 +164,15 @@ func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string, false, // noWait nil, // args ); err != nil { - return fmt.Errorf("绑定队列失败: %w", err) + g.Log().Errorf(ctx, "❌ 绑定队列失败: %v", err) + return err } // 5. 序列化数据 body, err := json.Marshal(data) if err != nil { - return fmt.Errorf("序列化数据失败: %w", err) + g.Log().Errorf(ctx, "❌ 序列化数据失败: %v", err) + return err } // 6. 发布消息 deliveryMode := amqp.Transient @@ -142,9 +186,9 @@ func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string, Timestamp: time.Now(), } if delayMsg { - duration := time.Duration(delayTime) * time.Minute + duration := delayTime * 1000 // 延迟时间(毫秒)= 秒 * 1000 publishing.Headers = amqp.Table{ - "x-delay": duration, // 延迟时间(毫秒) + "x-delay": duration, } } err = channel.PublishWithContext( @@ -154,6 +198,11 @@ func (c *rabbitMQ) publishMessageInternal(ctx context.Context, queueName string, false, false, publishing, ) + if err != nil { + g.Log().Errorf(ctx, "❌ 发布消息失败: %v", err) + return err + } + g.Log().Infof(ctx, "📨 发布消息成功: queueName=%s, data=%v", queueName, data) return err } @@ -180,10 +229,28 @@ func (c *rabbitMQ) Subscribe(ctx context.Context, msgConfig messageSubscribeConf // createSubscribe 内部订阅消息 func (c *rabbitMQ) createSubscribeInternal(ctx context.Context, queueName, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error { - g.Log().Infof(ctx, "🔔 RabbitMQ 开始订阅: queueName=%s, consumerName=%s", queueName, consumerName) + g.Log().Infof(ctx, "🔔 RabbitMQ [%s] 开始订阅: queueName=%s, consumerName=%s", c.name, queueName, consumerName) + + if !c.Ping(ctx) { + if err := commonConnect(ctx, MessageRabbitMQ, c.name, func(ctx context.Context) error { + return c.Connect(ctx) + }, func(ctx context.Context) error { + return c.Close(ctx) + }); err != nil { + g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageRabbitMQ, c.name, err) + return err + } + } + + channel := getRabbitMQChannel(c.name) + if channel == nil || channel.IsClosed() { + g.Log().Errorf(ctx, "❌ RabbitMQ [%s] Channel 不存在或已关闭", c.name) + return fmt.Errorf("RabbitMQ Channel 不存在或已关闭") + } if err := channel.Qos(prefetchCount, 0, false); err != nil { - return fmt.Errorf("设置 Qos 失败: %w", err) + g.Log().Errorf(ctx, "❌ 设置 Qos 失败: %v", err) + return err } g.Log().Infof(ctx, "📊 设置 Prefetch Count: %d", prefetchCount) @@ -197,97 +264,48 @@ func (c *rabbitMQ) createSubscribeInternal(ctx context.Context, queueName, consu nil, // args ) if err != nil { - return fmt.Errorf("注册消费者失败: %w", err) + g.Log().Errorf(ctx, "❌ 消费消息失败: %v", err) + return err } - - go func() { - defer func() { - if r := recover(); r != nil { - g.Log().Errorf(ctx, "❌ RabbitMQ 消费者 panic: %v", r) + g.Log().Infof(ctx, "👀 开始监听消息") + for { + select { + case <-ctx.Done(): + // Context 取消,退出 + g.Log().Infof(ctx, "context cancel 监听消息退出") + return nil + case m, ok := <-msg: + if !ok { + // Channel 关闭,退出 + g.Log().Infof(ctx, "channel close 监听消息退出") + return nil } - }() + g.Log().Infof(ctx, "📨 收到消息: %s", string(m.Body)) - // 并发控制信号量 - semaphore := make(chan struct{}, 10) // 限制最大并发数为 10 - - for { - select { - case <-ctx.Done(): - g.Log().Infof(ctx, "🔕 RabbitMQ 消费者停止: queueName=%s, consumerName=%s", queueName, consumerName) - return - case msg, ok := <-msg: - if !ok { - g.Log().Warningf(ctx, "⚠️ RabbitMQ 消息通道关闭") - return + var data map[string]interface{} + if err := json.Unmarshal(m.Body, &data); err != nil { + // 如果不是 JSON,直接使用原始内容 + data = map[string]interface{}{ + "data": string(m.Body), } - - // 获取并发控制槽位 - semaphore <- struct{}{} - - go func(m amqp.Delivery) { - defer func() { - <-semaphore // 释放槽位 - if r := recover(); r != nil { - g.Log().Errorf(ctx, "❌ 消息处理 panic: %v", r) - } - }() - - if err := c.handleMessageWithRetryInternal(ctx, m, handler, autoAck); err != nil { - g.Log().Errorf(ctx, "❌ 消息处理失败(重试次数耗尽): %v", err) - - // 仅在手动 ACK 模式下拒绝消息 - if !autoAck { - // 拒绝消息不再重新入队(避免死循环) - m.Nack(false, false) - } - return - } - - // 仅在手动 ACK 模式下确认消息 - if autoAck { - if err := m.Ack(false); err != nil { - g.Log().Errorf(ctx, "❌ ACK 消息失败: %v", err) - } - } - }(msg) + } + err := handler(ctx, data) + if err != nil { + g.Log().Errorf(ctx, "❌ 消息处理失败: %v", err) + // 仅在手动 ACK 模式下拒绝消息 + if !autoAck { + // 拒绝消息不再重新入队(避免死循环) + m.Nack(false, false) + continue + } + } + g.Log().Infof(ctx, "✅ 消息处理成功: %v", err) + // 仅在手动 ACK 模式下确认消息 + if err := m.Ack(false); err != nil { + g.Log().Errorf(ctx, "❌ AUTO ACK 消息失败: %v", err) + } else { + g.Log().Infof(ctx, "✅ AUTO ACK 消息成功") } } - }() - - return nil -} - -// handleMessageWithRetry 处理消息(支持重试) -func (c *rabbitMQ) handleMessageWithRetryInternal(ctx context.Context, msg amqp.Delivery, handler func(ctx context.Context, message map[string]interface{}) error, autoAck bool) error { - var data map[string]interface{} - - if err := json.Unmarshal(msg.Body, &data); err != nil { - // 如果不是 JSON,直接使用原始内容 - data = map[string]interface{}{ - "data": string(msg.Body), - } } - - // 重试逻辑 - const maxRetry = 3 - for attempt := 0; attempt <= maxRetry; attempt++ { - if attempt > 0 { - g.Log().Infof(ctx, "🔄 消息处理重试 (第%d次)", attempt) - // 指数退避 - time.Sleep(time.Duration(attempt) * time.Second) - } - - err := handler(ctx, data) - if err == nil { - return nil // 成功 - } - - g.Log().Warningf(ctx, "⚠️ 消息处理失败 (第%d次): %v", attempt+1, err) - - if attempt == maxRetry { - return fmt.Errorf("达到最大重试次数 %d: %w", maxRetry, err) - } - } - - return nil } diff --git a/message/reconnect.go b/message/reconnect.go new file mode 100644 index 0000000..f042397 --- /dev/null +++ b/message/reconnect.go @@ -0,0 +1,73 @@ +package message + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/gogf/gf/v2/frame/g" +) + +// connectFunc 连接函数类型 +type connectFunc func(ctx context.Context) error + +// closeFunc 关闭函数类型 +type closeFunc func(ctx context.Context) error + +// reconnectOption 重连选项 +type reconnectOption struct { + maxRetries int // 最大重试次数,0 表示无限重试 + interval time.Duration // 重试间隔 + componentType messageType // 组件类型(nats/redis/rabbitmq) + componentName string // 组件名称(数据源名称) +} + +// defaultReconnectOption 默认重连选项 +func defaultReconnectOption(componentType messageType, componentName string) *reconnectOption { + return &reconnectOption{ + maxRetries: 0, // 无限重试 + interval: 3 * time.Second, + componentType: componentType, + componentName: componentName, + } +} + +// commonReconnect 重连函数(NATS、Redis、RabbitMQ 共用) +func commonReconnect(ctx context.Context, connectFn connectFunc, closeFn closeFunc, opt *reconnectOption) error { + if opt == nil { + opt = defaultReconnectOption("unknown", "default") + } + + for attempt := 0; opt.maxRetries == 0 || attempt < opt.maxRetries; attempt++ { + err := connectFn(ctx) + if err == nil { + g.Log().Infof(ctx, "✅ 连接成功: type=%s, name=%s, attempt=%d", + opt.componentType, opt.componentName, attempt+1) + return nil + } + // 记录失败日志 + g.Log().Warningf(ctx, "⚠️ 连接失败: type=%s, name=%s, attempt=%d, err=%v, 重试中...", + opt.componentType, opt.componentName, attempt+1, err) + // 如果错误信息中包含 "does not exist",则认为是连接失败,不再重试 + if strings.Contains(err.Error(), "does not exist") { + return err + } + // 等待一段时间再重试 + select { + case <-time.After(opt.interval): + case <-ctx.Done(): + if err = closeFn(ctx); err != nil { + return err + } + return ctx.Err() + } + } + return fmt.Errorf("连接失败,已达最大重试次数") +} + +// connect 连接函数,直接调用 commonReconnect +func commonConnect(ctx context.Context, componentType messageType, name string, connectFn func(ctx context.Context) error, closeFn closeFunc) error { + opt := defaultReconnectOption(componentType, name) + return commonReconnect(ctx, connectFn, closeFn, opt) +} diff --git a/message/redis_msg.go b/message/redis_msg.go index 38dd331..343acb9 100644 --- a/message/redis_msg.go +++ b/message/redis_msg.go @@ -3,11 +3,11 @@ package message import ( "context" "fmt" + "github.com/gogf/gf/v2/os/glog" "strings" "time" "github.com/gogf/gf/v2/frame/g" - "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/util/gconv" ) @@ -16,6 +16,9 @@ type RedisPublishMsgConfig struct { Data any } +type RedisPublishDelayMsgConfig struct { +} + type RedisSubscribeMsgConfig struct { QueueName string ConsumerName string @@ -28,18 +31,22 @@ func (*RedisPublishMsgConfig) GetPublishMsgType() { } +func (*RedisPublishDelayMsgConfig) GetPublishDelayMsgType() {} + func (*RedisSubscribeMsgConfig) GetSubscribeMsgType() { } -func init() { - // 注册 Redis 插件(连接由 RegisterPlugin 异步处理) - registerPlugin(MessageRedis, func() messageUtil { - return &redis{} - }) +type redis struct { + name string // 数据源名称 } -type redis struct{} +func init() { + // 注册 Redis 插件(默认数据源) + RegisterPlugin(context.Background(), "default", MessageRedis, func() messageUtil { + return &redis{name: "default"} + }) +} // RedisStreamMessage Redis Stream 消息结构 type redisStreamMessage struct { @@ -47,41 +54,19 @@ type redisStreamMessage struct { Values map[string]interface{} } -// Ping 检测 Redis 连接状态 -func (c *redis) ping(ctx context.Context) bool { - conn, err := getDefaultDataSource() - if err != nil { - return false - } - return conn.redisPing(ctx) +// Connect 连接 Redis +func (c *redis) Connect(ctx context.Context) error { + return redisConnect(ctx, c.name) } -// Reconnect 重连 Redis -func (c *redis) reconnect(ctx context.Context) error { - conn, err := getDefaultDataSource() - if err != nil { - return fmt.Errorf("获取默认连接失败: %w", err) - } - - if err := conn.redisReconnect(ctx); err != nil { - return fmt.Errorf("redis重连失败: %w", err) - } - - return nil +// Ping 检测 Redis 连接状态 +func (c *redis) Ping(ctx context.Context) bool { + return redisPing(ctx, c.name) } // Close 关闭 Redis 连接 -func (c *redis) close(ctx context.Context) error { - conn, err := getDefaultDataSource() - if err != nil { - return fmt.Errorf("获取默认连接失败: %w", err) - } - - if err := conn.redisClose(ctx); err != nil { - return fmt.Errorf("关闭redis连接失败: %w", err) - } - - return nil +func (c *redis) Close(ctx context.Context) error { + return redisClose(ctx, c.name) } // Publish 发布消息 @@ -96,14 +81,16 @@ func (c *redis) Publish(ctx context.Context, msgConfig messagePublishConfig) err if g.IsEmpty(cfg.Data) { return fmt.Errorf("数据不能为空") } - conn, err := getDefaultDataSource() - if err != nil { - return fmt.Errorf("获取默认连接失败: %w", err) - } - if !conn.getIsConnected() { - if err := conn.redisReconnect(ctx); err != nil { - return fmt.Errorf("redis重连失败: %w", err) + rc := getRedisConn(c.name) + if !c.Ping(ctx) { + if err := commonConnect(ctx, MessageRedis, c.name, func(ctx context.Context) error { + return c.Connect(ctx) + }, func(ctx context.Context) error { + return c.Close(ctx) + }); err != nil { + g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageRedis, c.name, err) + return err } } @@ -113,7 +100,7 @@ func (c *redis) Publish(ctx context.Context, msgConfig messagePublishConfig) err for key, val := range values { args = append(args, key, val) } - result, err := conn.getClient().Do(ctx, "XADD", args...) + result, err := rc.Do(ctx, "XADD", args...) if err != nil { g.Log().Errorf(ctx, "❌ Redis 发布消息失败: key=%s, err=%v", cfg.QueueName, err) return err @@ -122,6 +109,12 @@ func (c *redis) Publish(ctx context.Context, msgConfig messagePublishConfig) err return nil } +// PublishDelay 发布延迟消息 +func (c *redis) PublishDelay(ctx context.Context, _ messagePublishDelayConfig) error { + g.Log().Errorf(ctx, "❌ Redis 不支持延迟消息") + return fmt.Errorf("❌ Redis 不支持延迟消息") +} + // Subscribe 订阅消息 func (c *redis) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) error { cfg, ok := msgConfig.(*RedisSubscribeMsgConfig) @@ -142,162 +135,92 @@ func (c *redis) Subscribe(ctx context.Context, msgConfig messageSubscribeConfig) // createSubscribe 内部订阅消息 func (c *redis) createSubscribe(ctx context.Context, key, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error { - go func() { - defer func() { - if r := recover(); r != nil { - g.Log().Errorf(ctx, "❌ Redis 消费者 panic: %v", r) - } - }() - retryTicker := time.NewTicker(time.Second) - defer retryTicker.Stop() +LOOP: + err := c.consumeMessages(ctx, key, consumerName, prefetchCount, autoAck, handler) + if err != nil { + // 对于超时错误,返回nil继续循环,而不是返回错误 + if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") || + strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") { - // 重试计数器 - var consecutiveErrors int - const maxConsecutiveErrors = 3 - - for { - select { - case <-ctx.Done(): - g.Log().Infof(ctx, "🔕 Redis 消费者停止: topic=%s", key) - return - case <-retryTicker.C: - err := c.consumeMessages(ctx, key, consumerName, prefetchCount, autoAck, handler) - if err != nil { - // 对于超时错误,返回nil继续循环,而不是返回错误 - if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") || - strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") { - - consecutiveErrors++ - if consecutiveErrors > maxConsecutiveErrors { - g.Log().Errorf(ctx, "Max retries exceeded, giving up") - return - } - backoffTime := 5 * time.Second - g.Log().Warningf(ctx, "⚠️ 等待 %v 后重试...", backoffTime) - - time.Sleep(backoffTime) - } else { - // 非超时错误(严重错误) - consecutiveErrors = 0 // 重置计数 - g.Log().Errorf(ctx, "严重错误,立即重试: %v", err) - - // 短暂等待后重试 - select { - case <-ctx.Done(): - return - case <-time.After(time.Second): - // 继续循环 - } - } - } else { - // 成功时重置错误计数器 - consecutiveErrors = 0 - } - } + time.Sleep(time.Second) + goto LOOP + } else { + g.Log().Errorf(ctx, "❌ 严重错误: %v", err) } - }() - return nil + } + time.Sleep(time.Second) + goto LOOP } // consumeMessages 消费消息 func (c *redis) consumeMessages(ctx context.Context, key, consumerName string, prefetchCount int, autoAck bool, handler func(ctx context.Context, message map[string]interface{}) error) error { - conn, err := getDefaultDataSource() - if err != nil { - return fmt.Errorf("获取默认连接失败: %w", err) + if !c.Ping(ctx) { + if err := commonConnect(ctx, MessageRedis, c.name, func(ctx context.Context) error { + return c.Connect(ctx) + }, func(ctx context.Context) error { + return c.Close(ctx) + }); err != nil { + g.Log().Errorf(ctx, "❌ [%s][%s] 连接失败: %v", MessageRedis, c.name, err) + return err + } } - if !conn.getIsConnected() { - if err := conn.redisReconnect(ctx); err != nil { - return fmt.Errorf("redis重连失败: %w", err) - } + rc := getRedisConn(c.name) + if rc == nil { + g.Log().Errorf(ctx, "❌ Redis [%s] 连接不存在", c.name) + return fmt.Errorf("Redis 连接不存在") } // 检查消费者组是否存在 - if err := c.createStreamGroup(ctx, key); err != nil { - return fmt.Errorf("create stream group failed: %w", err) + groupName := "default" + _, err := rc.Do(ctx, "XGROUP", "CREATE", key, groupName, "0", "MKSTREAM") + if err != nil { + errStr := err.Error() + if strings.Contains(errStr, "BUSYGROUP") && strings.Contains(errStr, "already exists") { + glog.Infof(ctx, "✅ Redis [%s] 消费者组已存在: %s", c.name, key) + return nil + } + g.Log().Errorf(ctx, "❌ 创建消费组失败: key=%s, err=%v", key, err) + return err } + glog.Infof(ctx, "✅ Redis [%s] 消费者组创建成功: %s", c.name, key) // 使用带重试的命令执行 - result, err := conn.getClient().Do(ctx, "XREADGROUP", "GROUP", "default", consumerName, "COUNT", prefetchCount, "BLOCK", 0, "STREAMS", key, ">") + result, err := rc.Do(ctx, "XREADGROUP", "GROUP", groupName, consumerName, "COUNT", prefetchCount, "BLOCK", 0, "STREAMS", key, ">") if err != nil { - if strings.Contains(err.Error(), "i/o timeout") || strings.Contains(err.Error(), "timeout") || - strings.Contains(err.Error(), "context deadline exceeded") || strings.Contains(err.Error(), "context canceled") { - - } return err } messages, err := c.parseStreamResult(result) if err != nil { + g.Log().Errorf(ctx, "❌ 解析消息失败: %v", err) return err } for _, msg := range messages { // 处理消息 if err := handler(ctx, msg.Values); err != nil { g.Log().Errorf(ctx, "❌ 消息处理失败: messageID=%s, err=%v", msg.ID, err) - continue - } - - // ACK 消息 - if autoAck { - if err := c.ackMessage(ctx, key, "default", msg.ID); err != nil { - g.Log().Errorf(ctx, "❌ ACK 消息失败: messageID=%s, err=%v", msg.ID, err) + // 如果不是自动ACK,则跳过当前消息 + if !autoAck { + continue } + } else { + g.Log().Infof(ctx, "✅ 消息处理成功: messageID=%s", msg.ID) + } + // ACK 消息 + args := make([]interface{}, 0, len(msg.ID)+2) + args = append(args, key, groupName, msg.ID) + _, err = rc.Do(ctx, "XACK", args...) + if err != nil { + g.Log().Errorf(ctx, "❌ ACK 消息失败: messageID=%s, err=%v", msg.ID, err) + } else { + g.Log().Infof(ctx, "✅ ACK 消息成功: messageID=%s", msg.ID) } } return nil } -// createStreamGroup 内部单个创建消费组 -func (c *redis) createStreamGroup(ctx context.Context, key string) error { - conn, err := getDefaultDataSource() - if err != nil { - return fmt.Errorf("获取默认连接失败: %w", err) - } - - if !conn.getIsConnected() { - if err := conn.redisReconnect(ctx); err != nil { - return fmt.Errorf("redis重连失败: %w", err) - } - } - - groupName := "default" - _, err = conn.getClient().Do(ctx, "XGROUP", "CREATE", key, groupName, "0", "MKSTREAM") - if err != nil { - errStr := err.Error() - if strings.Contains(errStr, "BUSYGROUP") && strings.Contains(errStr, "already exists") { - glog.Infof(ctx, "✅ Redis 消费者组已存在: %s", groupName) - return nil - } - return fmt.Errorf("初始化消费者组失败: %w", err) - } - glog.Infof(ctx, "✅ Redis 消费者组创建成功: %s", groupName) - return nil -} - -// ackMessage ACK 消息 -func (c *redis) ackMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error { - conn, err := getDefaultDataSource() - if err != nil { - return fmt.Errorf("获取默认连接失败: %w", err) - } - - if !conn.getIsConnected() { - if err := conn.redisReconnect(ctx); err != nil { - return fmt.Errorf("redis重连失败: %w", err) - } - } - - args := make([]interface{}, 0, len(messageIDs)+2) - args = append(args, streamKey, groupName) - for _, id := range messageIDs { - args = append(args, id) - } - _, err = conn.getClient().Do(ctx, "XACK", args...) - return err -} - // parseStreamResult 解析 Stream 结果 func (c *redis) parseStreamResult(result interface{}) ([]redisStreamMessage, error) { if result == nil { diff --git a/message/store.go b/message/store.go new file mode 100644 index 0000000..15f8bc5 --- /dev/null +++ b/message/store.go @@ -0,0 +1,125 @@ +// Copyright 2019-2025 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package message + +import "fmt" + +type RetentionPolicy int + +const ( + // LimitsPolicy (default) means that messages are retained until any given limit is reached. + // This could be one of MaxMsgs, MaxBytes, or MaxAge. + LimitsPolicy RetentionPolicy = iota + // InterestPolicy specifies that when all known consumers have acknowledged a message it can be removed. + InterestPolicy + // WorkQueuePolicy specifies that when the first worker or subscriber acknowledges the message it can be removed. + WorkQueuePolicy +) + +// MarshalJSON 将 RetentionPolicy 序列化为字符串 +func (rp RetentionPolicy) MarshalJSON() ([]byte, error) { + switch rp { + case LimitsPolicy: + return []byte(`"limits"`), nil + case InterestPolicy: + return []byte(`"interest"`), nil + case WorkQueuePolicy: + return []byte(`"workqueue"`), nil + default: + return nil, fmt.Errorf("can not marshal %v", rp) + } +} + +// UnmarshalJSON 将字符串反序列化为 RetentionPolicy +func (rp *RetentionPolicy) UnmarshalJSON(data []byte) error { + switch string(data) { + case `"limits"`: + *rp = LimitsPolicy + case `"interest"`: + *rp = InterestPolicy + case `"workqueue"`: + *rp = WorkQueuePolicy + default: + return fmt.Errorf("unknown retention policy: %s", string(data)) + } + return nil +} + +type DiscardPolicy int + +const ( + // DiscardOld will remove older messages to return to the limits. + DiscardOld = iota + // DiscardNew will error on a StoreMsg call + DiscardNew +) + +// MarshalJSON 将 DiscardPolicy 序列化为字符串 +func (dp DiscardPolicy) MarshalJSON() ([]byte, error) { + switch dp { + case DiscardOld: + return []byte(`"old"`), nil + case DiscardNew: + return []byte(`"new"`), nil + default: + return nil, fmt.Errorf("can not marshal %v", dp) + } +} + +// UnmarshalJSON 将字符串反序列化为 DiscardPolicy +func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error { + switch string(data) { + case `"old"`: + *dp = DiscardOld + case `"new"`: + *dp = DiscardNew + default: + return fmt.Errorf("unknown discard policy: %s", string(data)) + } + return nil +} + +type StorageType int + +const ( + // FileStorage specifies on disk, designated by the JetStream config StoreDir. + FileStorage = StorageType(22) + // MemoryStorage specifies in memory only. + MemoryStorage = StorageType(33) +) + +// MarshalJSON 将 StorageType 序列化为字符串 +func (st StorageType) MarshalJSON() ([]byte, error) { + switch st { + case MemoryStorage: + return []byte(`"memory"`), nil + case FileStorage: + return []byte(`"file"`), nil + default: + return nil, fmt.Errorf("can not marshal %v", st) + } +} + +// UnmarshalJSON 将字符串反序列化为 StorageType +func (st *StorageType) UnmarshalJSON(data []byte) error { + switch string(data) { + case `"memory"`: + *st = MemoryStorage + case `"file"`: + *st = FileStorage + default: + return fmt.Errorf("unknown storage type: %s", string(data)) + } + return nil +} diff --git a/message/stream.go b/message/stream.go new file mode 100644 index 0000000..4f2a7c1 --- /dev/null +++ b/message/stream.go @@ -0,0 +1,212 @@ +// Copyright 2019-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package message + +import ( + "fmt" + "time" +) + +// StreamConfig will determine the name, subjects and retention policy +// for a given stream. If subjects is empty the name will be used. +type StreamConfig struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + Subjects []string `json:"subjects,omitempty"` + Retention RetentionPolicy `json:"retention"` + MaxConsumers int `json:"max_consumers"` + MaxMsgs int64 `json:"max_msgs"` + MaxBytes int64 `json:"max_bytes"` + MaxAge time.Duration `json:"max_age"` + MaxMsgsPer int64 `json:"max_msgs_per_subject"` + MaxMsgSize int32 `json:"max_msg_size,omitempty"` + Discard DiscardPolicy `json:"discard"` + Storage StorageType `json:"storage"` + Replicas int `json:"num_replicas"` + NoAck bool `json:"no_ack,omitempty"` + Duplicates time.Duration `json:"duplicate_window,omitempty"` + Placement *Placement `json:"placement,omitempty"` + Mirror *StreamSource `json:"mirror,omitempty"` + Sources []*StreamSource `json:"sources,omitempty"` + Compression StoreCompression `json:"compression"` + FirstSeq uint64 `json:"first_seq,omitempty"` + + // Allow applying a subject transform to incoming messages before doing anything else + SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"` + + // Allow republish of the message after being sequenced and stored. + RePublish *RePublish `json:"republish,omitempty"` + + // Allow higher performance, direct access to get individual messages. E.g. KeyValue + AllowDirect bool `json:"allow_direct"` + // Allow higher performance and unified direct access for mirrors as well. + MirrorDirect bool `json:"mirror_direct"` + + // Allow KV like semantics to also discard new on a per subject basis + DiscardNewPer bool `json:"discard_new_per_subject,omitempty"` + + // Optional qualifiers. These can not be modified after set to true. + + // Sealed will seal a stream so no messages can get out or in. + Sealed bool `json:"sealed"` + // DenyDelete will restrict the ability to delete messages. + DenyDelete bool `json:"deny_delete"` + // DenyPurge will restrict the ability to purge messages. + DenyPurge bool `json:"deny_purge"` + // AllowRollup allows messages to be placed into the system and purge + // all older messages using a special msg header. + AllowRollup bool `json:"allow_rollup_hdrs"` + + // The following defaults will apply to consumers when created against + // this stream, unless overridden manually. + // TODO(nat): Can/should we name these better? + ConsumerLimits StreamConsumerLimits `json:"consumer_limits"` + + // AllowMsgTTL allows header initiated per-message TTLs. If disabled, + // then the `NATS-TTL` header will be ignored. + AllowMsgTTL bool `json:"allow_msg_ttl"` + + // SubjectDeleteMarkerTTL sets the TTL of delete marker messages left behind by + // subject delete markers. + SubjectDeleteMarkerTTL time.Duration `json:"subject_delete_marker_ttl,omitempty"` + + // AllowMsgCounter allows a stream to use (only) counter CRDTs. + AllowMsgCounter bool `json:"allow_msg_counter,omitempty"` + + // AllowAtomicPublish allows atomic batch publishing into the stream. + AllowAtomicPublish bool `json:"allow_atomic,omitempty"` + + // AllowMsgSchedules allows the scheduling of messages. + AllowMsgSchedules bool `json:"allow_msg_schedules,omitempty"` + + // PersistMode allows to opt-in to different persistence mode settings. + PersistMode PersistModeType `json:"persist_mode,omitempty"` + + // Metadata is additional metadata for the Stream. + Metadata map[string]string `json:"metadata,omitempty"` +} + +// Used to guide placement of streams and meta controllers in clustered JetStream. +type Placement struct { + Cluster string `json:"cluster,omitempty"` + Tags []string `json:"tags,omitempty"` + Preferred string `json:"preferred,omitempty"` +} + +// StreamSource dictates how streams can source from other streams. +type StreamSource struct { + Name string `json:"name"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty"` + OptStartTime *time.Time `json:"opt_start_time,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` + External *ExternalStream `json:"external,omitempty"` + + // Internal + iname string // For indexing when stream names are the same for multiple sources. +} + +// SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received +type SubjectTransformConfig struct { + Source string `json:"src"` + Destination string `json:"dest"` +} + +// ExternalStream allows you to qualify access to a stream source in another account or domain. +type ExternalStream struct { + ApiPrefix string `json:"api"` + DeliverPrefix string `json:"deliver"` +} + +// RePublish is for republishing messages once committed to a stream. +type RePublish struct { + Source string `json:"src,omitempty"` + Destination string `json:"dest"` + HeadersOnly bool `json:"headers_only,omitempty"` +} + +type StreamConsumerLimits struct { + InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` + MaxAckPending int `json:"max_ack_pending,omitempty"` +} + +// PersistModeType determines what persistence mode the stream uses. +type PersistModeType int + +const ( + // DefaultPersistMode specifies the default persist mode. Writes to the stream will immediately be flushed. + // The publish acknowledgement will be sent after the persisting completes. + DefaultPersistMode = PersistModeType(iota) + // AsyncPersistMode specifies writes to the stream will be flushed asynchronously. + // The publish acknowledgement may be sent before the persisting completes. + // This means writes could be lost if they weren't flushed prior to a hard kill of the server. + AsyncPersistMode +) + +// MarshalJSON 将 PersistModeType 序列化为字符串 +func (pm PersistModeType) MarshalJSON() ([]byte, error) { + switch pm { + case DefaultPersistMode: + return []byte(`"default"`), nil + case AsyncPersistMode: + return []byte(`"async"`), nil + default: + return nil, fmt.Errorf("can not marshal %v", pm) + } +} + +// UnmarshalJSON 将字符串反序列化为 PersistModeType +func (pm *PersistModeType) UnmarshalJSON(data []byte) error { + switch string(data) { + case `"default"`: + *pm = DefaultPersistMode + case `"async"`: + *pm = AsyncPersistMode + default: + return fmt.Errorf("unknown persist mode: %s", string(data)) + } + return nil +} + +type StoreCompression uint8 + +const ( + NoCompression StoreCompression = iota + S2Compression +) + +// MarshalJSON 将 StoreCompression 序列化为字符串 +func (sc StoreCompression) MarshalJSON() ([]byte, error) { + switch sc { + case NoCompression: + return []byte(`"none"`), nil + case S2Compression: + return []byte(`"s2"`), nil + default: + return nil, fmt.Errorf("can not marshal %v", sc) + } +} + +// UnmarshalJSON 将字符串反序列化为 StoreCompression +func (sc *StoreCompression) UnmarshalJSON(data []byte) error { + switch string(data) { + case `"none"`: + *sc = NoCompression + case `"s2"`: + *sc = S2Compression + default: + return fmt.Errorf("unknown store compression: %s", string(data)) + } + return nil +}