From 69d2ace17f361a9e9ecacfb7e0cfc6e9d42c2ed2 Mon Sep 17 00:00:00 2001 From: qhd <1766646056@qq.com> Date: Sat, 31 Jan 2026 07:35:41 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E6=B6=88=E6=81=AF=E9=98=9F?= =?UTF-8?q?=E5=88=97=E6=A8=A1=E5=9D=97=EF=BC=8C=E6=96=B0=E5=A2=9ENATS?= =?UTF-8?q?=E5=92=8CRabbitMQ=E8=BF=9E=E6=8E=A5=E5=AE=9E=E7=8E=B0=EF=BC=8C?= =?UTF-8?q?=E7=A7=BB=E9=99=A4=E6=97=A7=E7=89=88=E6=B6=88=E6=81=AF=E9=98=9F?= =?UTF-8?q?=E5=88=97=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- message/connection_redis.go | 485 ++++++++++++++++++++++++++++++++++++ 1 file changed, 485 insertions(+) create mode 100644 message/connection_redis.go diff --git a/message/connection_redis.go b/message/connection_redis.go new file mode 100644 index 0000000..465238f --- /dev/null +++ b/message/connection_redis.go @@ -0,0 +1,485 @@ +// ============================================================================= +// Redis 数据源连接管理 +// 负责 Redis 数据源的连接、重连、健康检查和优雅关闭 +// 支持多数据源和无限重连 +// ============================================================================= + +package message + +import ( + "context" + "fmt" + "os" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/gogf/gf/v2/database/gredis" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/util/gconv" +) + +// ============================================================================= +// Redis 数据源配置结构 +// ============================================================================= + +type redisDataSourceConfig struct { + name string // 数据源名称 + address string // Redis 地址,如: 127.0.0.1:6379 + db int // 数据库编号 + pass string // 密码 + maxRetries int // 最大重试次数,-1 表示无限重试 + retryInterval time.Duration // 重试间隔 +} + +// ============================================================================= +// Redis 数据源接口 +// ============================================================================= + +type redisDataSource interface { + name() string + getClient() *gredis.Redis + getIsConnected() bool + redisConnect(ctx context.Context) error + redisReconnect(ctx context.Context) error + redisClose(ctx context.Context) error + redisPing(ctx context.Context) bool +} + +// ============================================================================= +// Redis 数据源实现 +// ============================================================================= + +type baseRedisDataSource struct { + config *redisDataSourceConfig + client *gredis.Redis + isConnected bool + mu sync.RWMutex + lastError error + lastErrorTime time.Time + reconnectMu sync.Mutex +} + +func newBaseRedisDataSource(config *redisDataSourceConfig) *baseRedisDataSource { + return &baseRedisDataSource{ + config: config, + isConnected: false, + } +} + +func (d *baseRedisDataSource) name() string { + return d.config.name +} + +func (d *baseRedisDataSource) getClient() *gredis.Redis { + d.mu.RLock() + defer d.mu.RUnlock() + return d.client +} + +func (d *baseRedisDataSource) getIsConnected() bool { + d.mu.RLock() + defer d.mu.RUnlock() + return d.isConnected && d.client != nil +} + +func (d *baseRedisDataSource) redisConnect(ctx context.Context) error { + // 使用互斥锁防止并发重连 + d.reconnectMu.Lock() + defer d.reconnectMu.Unlock() + + d.mu.Lock() + if d.client != nil { + d.client.Close(ctx) + } + d.mu.Unlock() + + // 构建 GoFrame Redis 配置 + redisConfig := &gredis.Config{ + Address: d.config.address, + Db: d.config.db, + Pass: d.config.pass, + } + + // 使用 GoFrame 的 Redis 连接 + redisObj, err := gredis.New(redisConfig) + if err != nil { + d.mu.Lock() + d.isConnected = false + d.lastError = err + d.lastErrorTime = time.Now() + d.mu.Unlock() + return fmt.Errorf("datasource [%s] connection failed: %w", d.config.name, err) + } + + d.mu.Lock() + d.client = redisObj + d.mu.Unlock() + + // 测试连接 + if !d.redisPing(ctx) { + d.mu.Lock() + d.isConnected = false + d.lastError = err + d.lastErrorTime = time.Now() + d.mu.Unlock() + return fmt.Errorf("datasource [%s] ping failed: %w", d.config.name, err) + } + + d.mu.Lock() + d.isConnected = true + d.lastError = nil + d.mu.Unlock() + glog.Infof(ctx, "✅ datasource [%s] connected successfully", d.config.name) + return nil +} + +func (d *baseRedisDataSource) redisReconnect(ctx context.Context) error { + glog.Infof(ctx, "🔄 reconnecting datasource [%s]", d.config.name) + return d.redisConnect(ctx) +} + +func (d *baseRedisDataSource) redisClose(ctx context.Context) error { + d.mu.Lock() + defer d.mu.Unlock() + + if d.client != nil { + if err := d.client.Close(ctx); err != nil { + return fmt.Errorf("datasource [%s] close failed: %w", d.config.name, err) + } + } + + d.isConnected = false + glog.Infof(ctx, "datasource [%s] closed", d.config.name) + return nil +} + +func (d *baseRedisDataSource) redisPing(ctx context.Context) bool { + d.mu.RLock() + client := d.client + d.mu.RUnlock() + + if client == nil { + return false + } + + _, err := client.Do(ctx, "PING") + if err != nil { + return false + } + return true +} + +// ============================================================================= +// Redis 多数据源管理器 +// ============================================================================= + +type redisDataSourceManager struct { + sources map[string]redisDataSource + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + started bool + maxRetries int + reconnectCh chan string +} + +var ( + globalRedisManager *redisDataSourceManager + redisManagerOnce sync.Once +) + +// getRedisManager 获取全局 Redis 管理器 +func getRedisManager() *redisDataSourceManager { + redisManagerOnce.Do(func() { + ctx, cancel := context.WithCancel(context.Background()) + globalRedisManager = &redisDataSourceManager{ + sources: make(map[string]redisDataSource), + ctx: ctx, + cancel: cancel, + started: false, + maxRetries: -1, // 默认无限重试 + reconnectCh: make(chan string, 100), + } + }) + return globalRedisManager +} + +// registerDataSource 注册 Redis 数据源 +func (m *redisDataSourceManager) registerDataSource(config *redisDataSourceConfig) error { + m.mu.Lock() + defer m.mu.Unlock() + + if _, exists := m.sources[config.name]; exists { + return fmt.Errorf("datasource [%s] already exists", config.name) + } + + source := newBaseRedisDataSource(config) + m.sources[config.name] = source + return nil +} + +// getDataSource 获取 Redis 数据源 +func (m *redisDataSourceManager) getDataSource(name string) (redisDataSource, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + source, exists := m.sources[name] + if !exists { + return nil, fmt.Errorf("datasource [%s] not found", name) + } + return source, nil +} + +// getAllDataSourceNames 获取所有 Redis 数据源名称 +func (m *redisDataSourceManager) getAllDataSourceNames() []string { + m.mu.RLock() + defer m.mu.RUnlock() + + names := make([]string, 0, len(m.sources)) + for name := range m.sources { + names = append(names, name) + } + return names +} + +// initializeFromConfig 从配置初始化 Redis 数据源 +func (m *redisDataSourceManager) initializeFromConfig(ctx context.Context) error { + var firstErr error + + // 获取 redis 配置下的所有子键 + redisConfig := g.Cfg().MustGet(ctx, "redis") + if redisConfig.IsNil() { + glog.Warningf(ctx, "no redis configuration found in config.yml") + return nil + } + + // 将配置转换为 map + configMap := redisConfig.Map() + if configMap == nil { + glog.Warningf(ctx, "redis configuration is not a map") + return nil + } + + // 遍历所有 redis 子配置 + for name, subConfig := range configMap { + // 跳过非对象类型的配置 + subMap, ok := subConfig.(map[string]interface{}) + if !ok { + continue + } + + // 检查是否有 address 配置 + address, hasAddress := subMap["address"] + if !hasAddress || gconv.String(address) == "" { + continue + } + + // 构建数据源配置 + config := &redisDataSourceConfig{ + name: name, + address: gconv.String(address), + db: gconv.Int(subMap["db"]), + pass: gconv.String(subMap["pass"]), + maxRetries: gconv.Int(subMap["maxRetries"]), + retryInterval: gconv.Duration(subMap["retryInterval"]), + } + + // 设置默认值 + if config.maxRetries == 0 { + config.maxRetries = -1 // 默认无限重试 + } + if config.retryInterval == 0 { + config.retryInterval = 5 * time.Second + } + + // 注册数据源 + if err := m.registerDataSource(config); err != nil { + glog.Errorf(ctx, "failed to register datasource [%s]: %v", name, err) + if firstErr == nil { + firstErr = err + } + continue + } + + // 连接数据源 + source, _ := m.getDataSource(name) + if err := source.redisConnect(ctx); err != nil { + glog.Errorf(ctx, "failed to initialize datasource [%s]: %v", name, err) + if firstErr == nil { + firstErr = err + } + } + } + + return firstErr +} + +// startHealthCheck 启动健康检查 +func (m *redisDataSourceManager) startHealthCheck() { + if m.started { + return + } + m.started = true + + // 启动健康检查循环 + go m.healthCheckLoop() + + // 启动重连处理循环 + go m.reconnectLoop() +} + +// healthCheckLoop 健康检查循环 +func (m *redisDataSourceManager) healthCheckLoop() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-m.ctx.Done(): + return + case <-ticker.C: + m.checkConnections() + } + } +} + +// reconnectLoop 重连处理循环 +func (m *redisDataSourceManager) reconnectLoop() { + reconnectCounts := make(map[string]int) + + for { + select { + case <-m.ctx.Done(): + return + case name := <-m.reconnectCh: + go func(dsName string) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + source, err := m.getDataSource(dsName) + if err != nil { + glog.Errorf(ctx, "datasource [%s] not found for reconnect", dsName) + return + } + + if err := source.redisReconnect(ctx); err != nil { + glog.Errorf(ctx, "datasource [%s] reconnect failed: %v", dsName, err) + + // 记录重连次数 + reconnectCounts[dsName]++ + + // 检查重连次数限制 + if m.maxRetries > 0 && reconnectCounts[dsName] > m.maxRetries { + glog.Errorf(ctx, "datasource [%s] reconnect count %d exceeds limit %d, stopping auto-reconnect", + dsName, reconnectCounts[dsName], m.maxRetries) + return + } + + // 延迟后重新放入重连队列 + time.Sleep(5 * time.Second) + select { + case m.reconnectCh <- dsName: + default: + // 通道已满,丢弃通知 + } + } else { + // 重连成功,重置计数器 + reconnectCounts[dsName] = 0 + } + }(name) + } + } +} + +// checkConnections 检查连接状态 +func (m *redisDataSourceManager) checkConnections() { + m.mu.RLock() + defer m.mu.RUnlock() + + for name, source := range m.sources { + if !source.getIsConnected() { + glog.Warningf(context.Background(), "datasource [%s] disconnected, queued for reconnect", name) + + // 发送到重连队列 + select { + case m.reconnectCh <- name: + default: + // 通道已满,丢弃通知 + } + } + } +} + +// closeAll 关闭所有 Redis 数据源 +func (m *redisDataSourceManager) closeAll(ctx context.Context) error { + m.cancel() + + m.mu.RLock() + defer m.mu.RUnlock() + + var lastErr error + for name, source := range m.sources { + if err := source.redisClose(ctx); err != nil { + glog.Errorf(ctx, "failed to close datasource [%s]: %v", name, err) + lastErr = err + } + } + return lastErr +} + +// ============================================================================= +// 全局初始化 +// ============================================================================= + +var ( + redisManager = getRedisManager() +) + +// init 初始化 Redis 数据源 +func init() { + ctx := context.Background() + + // 从配置初始化多数据源 + if err := redisManager.initializeFromConfig(ctx); err != nil { + glog.Errorf(ctx, "❌ Failed to initialize Redis datasources: %v", err) + } else { + glog.Infof(ctx, "✅ Redis datasources initialized: %v", redisManager.getAllDataSourceNames()) + } + + // 启动健康检查 + redisManager.startHealthCheck() + + // 设置优雅关闭 + setupGracefulShutdown() +} + +// setupGracefulShutdown 设置优雅关闭 +func setupGracefulShutdown() { + go func() { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + glog.Info(ctx, "🔄 Shutting down Redis connections...") + if err := redisManager.closeAll(ctx); err != nil { + glog.Errorf(ctx, "❌ Failed to close Redis connections: %v", err) + } else { + glog.Info(ctx, "✅ Redis connections closed successfully") + } + }() +} + +// ============================================================================= +// 私有辅助函数 +// ============================================================================= + +// getDefaultDataSource 获取默认数据源 +func getDefaultDataSource() (redisDataSource, error) { + return redisManager.getDataSource("default") +}