// ============================================================================= // Meilisearch 数据源连接管理 // 负责数据源的连接、重连、健康检查和优雅关闭 // ============================================================================= package meilisearch import ( "context" "fmt" "os" "os/signal" "sync" "syscall" "time" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/util/gconv" ms "github.com/meilisearch/meilisearch-go" ) // ============================================================================= // 数据源配置结构 // ============================================================================= type DataSourceConfig struct { Name string `json:"name"` Host string `json:"host"` Port int `json:"port"` APIKey string `json:"apiKey"` Timeout time.Duration `json:"timeout"` } // ============================================================================= // 单个数据源接口 // ============================================================================= type DataSource interface { Name() string Client() interface{} IsConnected() bool Connect(ctx context.Context) error Reconnect(ctx context.Context) error Close(ctx context.Context) error } // ============================================================================= // 数据源实现 // ============================================================================= type BaseDataSource struct { config *DataSourceConfig client interface{} isConnected bool mu sync.RWMutex lastError error lastErrorTime time.Time } func NewBaseDataSource(config *DataSourceConfig) *BaseDataSource { return &BaseDataSource{ config: config, isConnected: false, } } func (d *BaseDataSource) Name() string { return d.config.Name } func (d *BaseDataSource) Client() interface{} { d.mu.RLock() defer d.mu.RUnlock() return d.client } func (d *BaseDataSource) IsConnected() bool { d.mu.RLock() defer d.mu.RUnlock() return d.isConnected && d.client != nil } func (d *BaseDataSource) Connect(ctx context.Context) error { d.mu.Lock() defer d.mu.Unlock() // 构建客户端 host := d.config.Host if d.config.Port > 0 { host = fmt.Sprintf("%s:%d", d.config.Host, d.config.Port) } d.client = ms.New(host, ms.WithAPIKey(d.config.APIKey)) // 测试连接 if err := d.healthCheck(ctx); err != nil { d.isConnected = false d.lastError = err d.lastErrorTime = time.Now() return fmt.Errorf("datasource [%s] connection failed: %w", d.config.Name, err) } d.isConnected = true d.lastError = nil glog.Infof(ctx, "✅ datasource [%s] connected successfully", d.config.Name) return nil } // healthCheck 健康检查 func (d *BaseDataSource) healthCheck(ctx context.Context) error { if d.client == nil { return fmt.Errorf("client is nil") } // 获取版本信息来测试连接 if c, ok := d.client.(interface{ GetVersion() (*ms.Version, error) }); ok { status, err := c.GetVersion() if err != nil { return err } glog.Debugf(ctx, "Meilisearch version: %s", status.PkgVersion) } else { // 如果没有GetVersion方法,尝试其他方法验证连接 if _, ok := d.client.(interface { GetIndexes(interface{}) (interface{}, error) }); ok { return nil } } return nil } func (d *BaseDataSource) Reconnect(ctx context.Context) error { glog.Infof(ctx, "🔄 reconnecting datasource [%s]", d.config.Name) return d.Connect(ctx) } func (d *BaseDataSource) Close(ctx context.Context) error { d.mu.Lock() defer d.mu.Unlock() // Meilisearch 客户端不需要显式关闭,只重置状态 d.client = nil d.isConnected = false glog.Infof(ctx, "datasource [%s] closed", d.config.Name) return nil } // ============================================================================= // 多数据源管理器 // ============================================================================= type DataSourceManager struct { sources map[string]DataSource mu sync.RWMutex ctx context.Context cancel context.CancelFunc started bool maxRetries int } var ( globalManager *DataSourceManager managerOnce sync.Once ) // GetManager 获取全局管理器 func GetManager() *DataSourceManager { managerOnce.Do(func() { ctx, cancel := context.WithCancel(context.Background()) globalManager = &DataSourceManager{ sources: make(map[string]DataSource), ctx: ctx, cancel: cancel, started: false, maxRetries: 3, } }) return globalManager } // RegisterDataSource 注册数据源 func (m *DataSourceManager) RegisterDataSource(config *DataSourceConfig) error { m.mu.Lock() defer m.mu.Unlock() if _, exists := m.sources[config.Name]; exists { return fmt.Errorf("datasource [%s] already exists", config.Name) } source := NewBaseDataSource(config) m.sources[config.Name] = source return nil } // GetDataSource 获取数据源 func (m *DataSourceManager) GetDataSource(name string) (DataSource, error) { m.mu.RLock() defer m.mu.RUnlock() source, exists := m.sources[name] if !exists { return nil, fmt.Errorf("datasource [%s] not found", name) } return source, nil } // GetAllDataSourceNames 获取所有数据源名称 func (m *DataSourceManager) GetAllDataSourceNames() []string { m.mu.RLock() defer m.mu.RUnlock() names := make([]string, 0, len(m.sources)) for name := range m.sources { names = append(names, name) } return names } // init 初始化多数据源 func init() { ctx := context.Background() // 从配置初始化多数据源 if err := manager.InitializeFromConfig(ctx); err != nil { glog.Errorf(ctx, "❌ Failed to initialize Meilisearch datasources: %v", err) } else { glog.Infof(ctx, "✅ Meilisearch datasources initialized: %v", manager.GetAllDataSourceNames()) } // 启动健康检查 manager.StartHealthCheck() // 设置优雅关闭 setupGracefulShutdown() } // InitializeFromConfig 从配置初始化数据源 // 动态读取 config.yml 中 meilisearch 下的所有配置项 func (m *DataSourceManager) InitializeFromConfig(ctx context.Context) error { var firstErr error // 获取 meilisearch 配置下的所有子键 meilisearchConfig := g.Cfg().MustGet(ctx, "meilisearch") if meilisearchConfig.IsNil() { glog.Warningf(ctx, "no meilisearch configuration found in config.yml") return nil } // 将配置转换为 map configMap := meilisearchConfig.Map() if configMap == nil { glog.Warningf(ctx, "meilisearch configuration is not a map") return nil } // 遍历所有 meilisearch 子配置 for name, subConfig := range configMap { // 跳过非对象类型的配置 subMap, ok := subConfig.(map[string]interface{}) if !ok { continue } // 检查是否有 host 配置 host, hasHost := subMap["host"] if !hasHost || gconv.String(host) == "" { continue } // 构建数据源配置 config := &DataSourceConfig{ Name: name, Host: gconv.String(host), Port: int(gconv.Int(subMap["port"])), APIKey: gconv.String(subMap["apiKey"]), } // 设置默认值 if config.Port == 0 { config.Port = 7700 } if config.Host == "" { config.Host = "http://localhost" } // 可选:从配置读取超时时间 if timeoutVal, hasTimeout := subMap["timeout"]; hasTimeout { config.Timeout = gconv.Duration(timeoutVal) } else { config.Timeout = 10 * time.Second } // 注册数据源 if err := m.RegisterDataSource(config); err != nil { glog.Errorf(ctx, "failed to register datasource [%s]: %v", name, err) if firstErr == nil { firstErr = err } continue } // 连接数据源 source, _ := m.GetDataSource(name) if err := source.Connect(ctx); err != nil { glog.Errorf(ctx, "failed to initialize datasource [%s]: %v", name, err) if firstErr == nil { firstErr = err } } } return firstErr } // StartHealthCheck 启动健康检查 func (m *DataSourceManager) StartHealthCheck() { if m.started { return } m.started = true go m.healthCheckLoop() } // healthCheckLoop 健康检查循环 func (m *DataSourceManager) healthCheckLoop() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-m.ctx.Done(): return case <-ticker.C: m.checkAndReconnect() } } } // checkAndReconnect 检查并重新连接 func (m *DataSourceManager) checkAndReconnect() { m.mu.RLock() defer m.mu.RUnlock() for name, source := range m.sources { if !source.IsConnected() { glog.Warningf(context.Background(), "datasource [%s] disconnected, attempting reconnect", name) reconnectCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := source.Reconnect(reconnectCtx); err != nil { glog.Errorf(reconnectCtx, "datasource [%s] reconnect failed: %v", name, err) } else { glog.Infof(reconnectCtx, "✅ datasource [%s] reconnected successfully", name) } } } } // CloseAll 关闭所有数据源 func (m *DataSourceManager) CloseAll(ctx context.Context) error { m.cancel() m.mu.RLock() defer m.mu.RUnlock() var lastErr error for name, source := range m.sources { if err := source.Close(ctx); err != nil { glog.Errorf(ctx, "failed to close datasource [%s]: %v", name, err) lastErr = err } } return lastErr } // setupGracefulShutdown 设置优雅关闭 func setupGracefulShutdown() { go func() { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) <-sigCh ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() glog.Info(ctx, "🔄 Shutting down Meilisearch connections...") if err := manager.CloseAll(ctx); err != nil { glog.Errorf(ctx, "❌ Failed to close Meilisearch connections: %v", err) } else { glog.Info(ctx, "✅ Meilisearch connections closed successfully") } }() }