diff --git a/mongo/link.go b/mongo/link.go new file mode 100644 index 0000000..77f1cec --- /dev/null +++ b/mongo/link.go @@ -0,0 +1,420 @@ +// ============================================================================= +// MongoDB 数据源连接管理 +// 负责数据源的连接、重连、健康检查和优雅关闭 +// ============================================================================= + +package mongo + +import ( + "context" + "fmt" + "os" + "os/signal" + "strings" + "sync" + "syscall" + "time" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/os/grpool" + "github.com/gogf/gf/v2/text/gstr" + "github.com/gogf/gf/v2/util/gconv" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" +) + +// ============================================================================= +// 数据源配置结构 +// ============================================================================= + +type DataSourceConfig struct { + Name string `json:"name"` + Address string `json:"address"` + Database string `json:"database"` + MaxPoolSize int32 `json:"maxPoolSize"` + MinPoolSize int32 `json:"minPoolSize"` + ConnectTimeout time.Duration `json:"connectTimeout"` +} + +// ============================================================================= +// 单个数据源接口 +// ============================================================================= + +type DataSource interface { + Name() string + Database() *mongo.Database + Client() *mongo.Client + IsConnected() bool + Connect(ctx context.Context) error + Reconnect(ctx context.Context) error + Close(ctx context.Context) error +} + +// ============================================================================= +// 数据源实现 +// ============================================================================= + +type BaseDataSource struct { + config *DataSourceConfig + client *mongo.Client + database *mongo.Database + isConnected bool + mu sync.RWMutex + lastError error + lastErrorTime time.Time +} + +func NewBaseDataSource(config *DataSourceConfig) *BaseDataSource { + return &BaseDataSource{ + config: config, + isConnected: false, + } +} + +func (d *BaseDataSource) Name() string { + return d.config.Name +} + +func (d *BaseDataSource) Database() *mongo.Database { + d.mu.RLock() + defer d.mu.RUnlock() + return d.database +} + +func (d *BaseDataSource) Client() *mongo.Client { + d.mu.RLock() + defer d.mu.RUnlock() + return d.client +} + +func (d *BaseDataSource) IsConnected() bool { + d.mu.RLock() + defer d.mu.RUnlock() + return d.isConnected && d.client != nil +} + +func (d *BaseDataSource) Connect(ctx context.Context) error { + d.mu.Lock() + defer d.mu.Unlock() + + if d.client != nil { + d.client.Disconnect(ctx) + } + + // 解析数据库名 + dbName := d.config.Database + if strings.Contains(dbName, "?") { + dbName = gstr.SubStr(dbName, 0, strings.Index(dbName, "?")) + } + + // 构建连接选项 + opt := options.Client(). + ApplyURI(d.config.Address). + SetMaxPoolSize(uint64(d.config.MaxPoolSize)). + SetMinPoolSize(uint64(d.config.MinPoolSize)). + SetConnectTimeout(d.config.ConnectTimeout). + SetMaxConnecting(10). + SetServerSelectionTimeout(10 * time.Second). + SetHeartbeatInterval(10 * time.Second). + SetMaxConnIdleTime(60 * time.Second). + SetRetryWrites(true). + SetRetryReads(true) + + var err error + d.client, err = mongo.Connect(opt) + if err != nil { + d.isConnected = false + d.lastError = err + d.lastErrorTime = time.Now() + return fmt.Errorf("datasource [%s] connection failed: %w", d.config.Name, err) + } + + // 测试连接 + pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err = d.client.Ping(pingCtx, nil); err != nil { + d.isConnected = false + d.lastError = err + d.lastErrorTime = time.Now() + return fmt.Errorf("datasource [%s] ping failed: %w", d.config.Name, err) + } + + d.database = d.client.Database(dbName) + d.isConnected = true + d.lastError = nil + glog.Infof(ctx, "✅ datasource [%s] connected successfully", d.config.Name) + return nil +} + +func (d *BaseDataSource) Reconnect(ctx context.Context) error { + glog.Infof(ctx, "🔄 reconnecting datasource [%s]", d.config.Name) + return d.Connect(ctx) +} + +func (d *BaseDataSource) Close(ctx context.Context) error { + d.mu.Lock() + defer d.mu.Unlock() + + if d.client != nil { + disconnectCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + if err := d.client.Disconnect(disconnectCtx); err != nil { + return fmt.Errorf("datasource [%s] close failed: %w", d.config.Name, err) + } + } + + d.isConnected = false + glog.Infof(ctx, "datasource [%s] closed", d.config.Name) + return nil +} + +// ============================================================================= +// 多数据源管理器 +// ============================================================================= + +type DataSourceManager struct { + sources map[string]DataSource + mu sync.RWMutex + ctx context.Context + cancel context.CancelFunc + started bool + maxRetries int +} + +var ( + globalManager *DataSourceManager + managerOnce sync.Once +) + +// GetManager 获取全局管理器 +func GetManager() *DataSourceManager { + managerOnce.Do(func() { + ctx, cancel := context.WithCancel(context.Background()) + globalManager = &DataSourceManager{ + sources: make(map[string]DataSource), + ctx: ctx, + cancel: cancel, + started: false, + maxRetries: 3, + } + }) + return globalManager +} + +// RegisterDataSource 注册数据源 +func (m *DataSourceManager) RegisterDataSource(config *DataSourceConfig) error { + m.mu.Lock() + defer m.mu.Unlock() + + if _, exists := m.sources[config.Name]; exists { + return fmt.Errorf("datasource [%s] already exists", config.Name) + } + + source := NewBaseDataSource(config) + m.sources[config.Name] = source + return nil +} + +// GetDataSource 获取数据源 +func (m *DataSourceManager) GetDataSource(name string) (DataSource, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + source, exists := m.sources[name] + if !exists { + return nil, fmt.Errorf("datasource [%s] not found", name) + } + return source, nil +} + +// GetAllDataSourceNames 获取所有数据源名称 +func (m *DataSourceManager) GetAllDataSourceNames() []string { + m.mu.RLock() + defer m.mu.RUnlock() + + names := make([]string, 0, len(m.sources)) + for name := range m.sources { + names = append(names, name) + } + return names +} + +// init 初始化多数据源 +func init() { + logPool = grpool.New(1) + serverName = g.Cfg().MustGet(context.TODO(), "server.name").String() + logRedisKey = fmt.Sprintf("log:%s", serverName) + + ctx := context.Background() + + // 从配置初始化多数据源 + if err := manager.InitializeFromConfig(ctx); err != nil { + glog.Errorf(ctx, "❌ Failed to initialize MongoDB datasources: %v", err) + } else { + glog.Infof(ctx, "✅ MongoDB datasources initialized: %v", manager.GetAllDataSourceNames()) + } + + // 启动健康检查 + manager.StartHealthCheck() + + // 设置优雅关闭 + setupGracefulShutdown() +} + +// InitializeFromConfig 从配置初始化数据源 +// 动态读取 config.yml 中 mongo 下的所有配置项 +func (m *DataSourceManager) InitializeFromConfig(ctx context.Context) error { + var firstErr error + + // 获取 mongo 配置下的所有子键 + mongoConfig := g.Cfg().MustGet(ctx, "mongo") + if mongoConfig.IsNil() { + glog.Warningf(ctx, "no mongo configuration found in config.yml") + return nil + } + + // 将配置转换为 map + configMap := mongoConfig.Map() + if configMap == nil { + glog.Warningf(ctx, "mongo configuration is not a map") + return nil + } + + // 遍历所有 mongo 子配置 + for name, subConfig := range configMap { + // 跳过非对象类型的配置 + subMap, ok := subConfig.(map[string]interface{}) + if !ok { + continue + } + + // 检查是否有 address 配置 + address, hasAddress := subMap["address"] + if !hasAddress || gconv.String(address) == "" { + continue + } + + // 构建数据源配置 + config := &DataSourceConfig{ + Name: name, + Address: gconv.String(address), + Database: gconv.String(subMap["database"]), + MaxPoolSize: int32(gconv.Int(subMap["maxPoolSize"])), + MinPoolSize: int32(gconv.Int(subMap["minPoolSize"])), + ConnectTimeout: gconv.Duration(subMap["connectTimeout"]), + } + + // 设置默认值 + if config.MaxPoolSize == 0 { + config.MaxPoolSize = 100 + } + if config.MinPoolSize == 0 { + config.MinPoolSize = 10 + } + if config.ConnectTimeout == 0 { + config.ConnectTimeout = 10 * time.Second + } + + // 注册数据源 + if err := m.RegisterDataSource(config); err != nil { + glog.Errorf(ctx, "failed to register datasource [%s]: %v", name, err) + if firstErr == nil { + firstErr = err + } + continue + } + + // 连接数据源 + source, _ := m.GetDataSource(name) + if err := source.Connect(ctx); err != nil { + glog.Errorf(ctx, "failed to initialize datasource [%s]: %v", name, err) + if firstErr == nil { + firstErr = err + } + } + } + + return firstErr +} + +// StartHealthCheck 启动健康检查 +func (m *DataSourceManager) StartHealthCheck() { + if m.started { + return + } + m.started = true + go m.healthCheckLoop() +} + +// healthCheckLoop 健康检查循环 +func (m *DataSourceManager) healthCheckLoop() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-m.ctx.Done(): + return + case <-ticker.C: + m.checkAndReconnect() + } + } +} + +// checkAndReconnect 检查并重新连接 +func (m *DataSourceManager) checkAndReconnect() { + m.mu.RLock() + defer m.mu.RUnlock() + + for name, source := range m.sources { + if !source.IsConnected() { + glog.Warningf(context.Background(), "datasource [%s] disconnected, attempting reconnect", name) + + reconnectCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := source.Reconnect(reconnectCtx); err != nil { + glog.Errorf(reconnectCtx, "datasource [%s] reconnect failed: %v", name, err) + } else { + glog.Infof(reconnectCtx, "✅ datasource [%s] reconnected successfully", name) + } + } + } +} + +// CloseAll 关闭所有数据源 +func (m *DataSourceManager) CloseAll(ctx context.Context) error { + m.cancel() + + m.mu.RLock() + defer m.mu.RUnlock() + + var lastErr error + for name, source := range m.sources { + if err := source.Close(ctx); err != nil { + glog.Errorf(ctx, "failed to close datasource [%s]: %v", name, err) + lastErr = err + } + } + return lastErr +} + +// setupGracefulShutdown 设置优雅关闭 +func setupGracefulShutdown() { + go func() { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + glog.Info(ctx, "🔄 Shutting down MongoDB connections...") + if err := manager.CloseAll(ctx); err != nil { + glog.Errorf(ctx, "❌ Failed to close MongoDB connections: %v", err) + } else { + glog.Info(ctx, "✅ MongoDB connections closed successfully") + } + }() +} diff --git a/mongo/mongo.go b/mongo/mongo.go index 3a04d72..c510ea5 100644 --- a/mongo/mongo.go +++ b/mongo/mongo.go @@ -1,7 +1,6 @@ // ============================================================================= -// MongoDB 多数据源支持 -// 支持多数据源配置、自动重连、优雅关闭 -// 向后兼容原有的单数据源API +// MongoDB 业务操作封装 +// 提供向后兼容的CRUD操作方法,支持多数据源 // ============================================================================= package mongo @@ -10,12 +9,7 @@ import ( "context" "errors" "fmt" - "os" - "os/signal" "reflect" - "strings" - "sync" - "syscall" "time" "gitee.com/red-future---jilin-g/common/beans" @@ -28,367 +22,12 @@ import ( "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/grpool" "github.com/gogf/gf/v2/os/gtime" - "github.com/gogf/gf/v2/text/gstr" "github.com/gogf/gf/v2/util/gconv" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" ) -// ============================================================================= -// 数据源配置结构 -// ============================================================================= - -type DataSourceConfig struct { - Name string `json:"name"` - Address string `json:"address"` - Database string `json:"database"` - MaxPoolSize int32 `json:"maxPoolSize"` - MinPoolSize int32 `json:"minPoolSize"` - ConnectTimeout time.Duration `json:"connectTimeout"` -} - -// ============================================================================= -// 单个数据源接口 -// ============================================================================= - -type DataSource interface { - Name() string - Database() *mongo.Database - Client() *mongo.Client - IsConnected() bool - Connect(ctx context.Context) error - Reconnect(ctx context.Context) error - Close(ctx context.Context) error -} - -// ============================================================================= -// 数据源实现 -// ============================================================================= - -type BaseDataSource struct { - config *DataSourceConfig - client *mongo.Client - database *mongo.Database - isConnected bool - mu sync.RWMutex - lastError error - lastErrorTime time.Time -} - -func NewBaseDataSource(config *DataSourceConfig) *BaseDataSource { - return &BaseDataSource{ - config: config, - isConnected: false, - } -} - -func (d *BaseDataSource) Name() string { - return d.config.Name -} - -func (d *BaseDataSource) Database() *mongo.Database { - d.mu.RLock() - defer d.mu.RUnlock() - return d.database -} - -func (d *BaseDataSource) Client() *mongo.Client { - d.mu.RLock() - defer d.mu.RUnlock() - return d.client -} - -func (d *BaseDataSource) IsConnected() bool { - d.mu.RLock() - defer d.mu.RUnlock() - return d.isConnected && d.client != nil -} - -func (d *BaseDataSource) Connect(ctx context.Context) error { - d.mu.Lock() - defer d.mu.Unlock() - - if d.client != nil { - d.client.Disconnect(ctx) - } - - // 解析数据库名 - dbName := d.config.Database - if strings.Contains(dbName, "?") { - dbName = gstr.SubStr(dbName, 0, strings.Index(dbName, "?")) - } - - // 构建连接选项 - opt := options.Client(). - ApplyURI(d.config.Address). - SetMaxPoolSize(uint64(d.config.MaxPoolSize)). - SetMinPoolSize(uint64(d.config.MinPoolSize)). - SetConnectTimeout(d.config.ConnectTimeout). - SetMaxConnecting(10). - SetServerSelectionTimeout(10 * time.Second). - SetHeartbeatInterval(10 * time.Second). - SetMaxConnIdleTime(60 * time.Second). - SetRetryWrites(true). - SetRetryReads(true) - - var err error - d.client, err = mongo.Connect(opt) - if err != nil { - d.isConnected = false - d.lastError = err - d.lastErrorTime = time.Now() - return fmt.Errorf("datasource [%s] connection failed: %w", d.config.Name, err) - } - - // 测试连接 - pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - if err = d.client.Ping(pingCtx, nil); err != nil { - d.isConnected = false - d.lastError = err - d.lastErrorTime = time.Now() - return fmt.Errorf("datasource [%s] ping failed: %w", d.config.Name, err) - } - - d.database = d.client.Database(dbName) - d.isConnected = true - d.lastError = nil - glog.Infof(ctx, "✅ datasource [%s] connected successfully", d.config.Name) - return nil -} - -func (d *BaseDataSource) Reconnect(ctx context.Context) error { - glog.Infof(ctx, "🔄 reconnecting datasource [%s]", d.config.Name) - return d.Connect(ctx) -} - -func (d *BaseDataSource) Close(ctx context.Context) error { - d.mu.Lock() - defer d.mu.Unlock() - - if d.client != nil { - disconnectCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - if err := d.client.Disconnect(disconnectCtx); err != nil { - return fmt.Errorf("datasource [%s] close failed: %w", d.config.Name, err) - } - } - - d.isConnected = false - glog.Infof(ctx, "datasource [%s] closed", d.config.Name) - return nil -} - -// ============================================================================= -// 多数据源管理器 -// ============================================================================= - -type DataSourceManager struct { - sources map[string]DataSource - mu sync.RWMutex - ctx context.Context - cancel context.CancelFunc - started bool - maxRetries int -} - -var ( - globalManager *DataSourceManager - managerOnce sync.Once -) - -// GetManager 获取全局管理器 -func GetManager() *DataSourceManager { - managerOnce.Do(func() { - ctx, cancel := context.WithCancel(context.Background()) - globalManager = &DataSourceManager{ - sources: make(map[string]DataSource), - ctx: ctx, - cancel: cancel, - started: false, - maxRetries: 3, - } - }) - return globalManager -} - -// RegisterDataSource 注册数据源 -func (m *DataSourceManager) RegisterDataSource(config *DataSourceConfig) error { - m.mu.Lock() - defer m.mu.Unlock() - - if _, exists := m.sources[config.Name]; exists { - return fmt.Errorf("datasource [%s] already exists", config.Name) - } - - source := NewBaseDataSource(config) - m.sources[config.Name] = source - return nil -} - -// GetDataSource 获取数据源 -func (m *DataSourceManager) GetDataSource(name string) (DataSource, error) { - m.mu.RLock() - defer m.mu.RUnlock() - - source, exists := m.sources[name] - if !exists { - return nil, fmt.Errorf("datasource [%s] not found", name) - } - return source, nil -} - -// GetAllDataSourceNames 获取所有数据源名称 -func (m *DataSourceManager) GetAllDataSourceNames() []string { - m.mu.RLock() - defer m.mu.RUnlock() - - names := make([]string, 0, len(m.sources)) - for name := range m.sources { - names = append(names, name) - } - return names -} - -// InitializeFromConfig 从配置初始化数据源 -// 动态读取 config.yml 中 mongo 下的所有配置项 -func (m *DataSourceManager) InitializeFromConfig(ctx context.Context) error { - var firstErr error - - // 获取 mongo 配置下的所有子键 - mongoConfig := g.Cfg().MustGet(ctx, "mongo") - if mongoConfig.IsNil() { - glog.Warningf(ctx, "no mongo configuration found in config.yml") - return nil - } - - // 将配置转换为 map - configMap := mongoConfig.Map() - if configMap == nil { - glog.Warningf(ctx, "mongo configuration is not a map") - return nil - } - - // 遍历所有 mongo 子配置 - for name, subConfig := range configMap { - // 跳过非对象类型的配置 - subMap, ok := subConfig.(map[string]interface{}) - if !ok { - continue - } - - // 检查是否有 address 配置 - address, hasAddress := subMap["address"] - if !hasAddress || gconv.String(address) == "" { - continue - } - - // 构建数据源配置 - config := &DataSourceConfig{ - Name: name, - Address: gconv.String(address), - Database: gconv.String(subMap["database"]), - MaxPoolSize: int32(gconv.Int(subMap["maxPoolSize"])), - MinPoolSize: int32(gconv.Int(subMap["minPoolSize"])), - ConnectTimeout: gconv.Duration(subMap["connectTimeout"]), - } - - // 设置默认值 - if config.MaxPoolSize == 0 { - config.MaxPoolSize = 100 - } - if config.MinPoolSize == 0 { - config.MinPoolSize = 10 - } - if config.ConnectTimeout == 0 { - config.ConnectTimeout = 10 * time.Second - } - - // 注册数据源 - if err := m.RegisterDataSource(config); err != nil { - glog.Errorf(ctx, "failed to register datasource [%s]: %v", name, err) - if firstErr == nil { - firstErr = err - } - continue - } - - // 连接数据源 - source, _ := m.GetDataSource(name) - if err := source.Connect(ctx); err != nil { - glog.Errorf(ctx, "failed to initialize datasource [%s]: %v", name, err) - if firstErr == nil { - firstErr = err - } - } - } - - return firstErr -} - -// StartHealthCheck 启动健康检查 -func (m *DataSourceManager) StartHealthCheck() { - if m.started { - return - } - m.started = true - go m.healthCheckLoop() -} - -// healthCheckLoop 健康检查循环 -func (m *DataSourceManager) healthCheckLoop() { - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - for { - select { - case <-m.ctx.Done(): - return - case <-ticker.C: - m.checkAndReconnect() - } - } -} - -// checkAndReconnect 检查并重新连接 -func (m *DataSourceManager) checkAndReconnect() { - m.mu.RLock() - defer m.mu.RUnlock() - - for name, source := range m.sources { - if !source.IsConnected() { - glog.Warningf(context.Background(), "datasource [%s] disconnected, attempting reconnect", name) - - reconnectCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - if err := source.Reconnect(reconnectCtx); err != nil { - glog.Errorf(reconnectCtx, "datasource [%s] reconnect failed: %v", name, err) - } else { - glog.Infof(reconnectCtx, "✅ datasource [%s] reconnected successfully", name) - } - } - } -} - -// CloseAll 关闭所有数据源 -func (m *DataSourceManager) CloseAll(ctx context.Context) error { - m.cancel() - - m.mu.RLock() - defer m.mu.RUnlock() - - var lastErr error - for name, source := range m.sources { - if err := source.Close(ctx); err != nil { - glog.Errorf(ctx, "failed to close datasource [%s]: %v", name, err) - lastErr = err - } - } - return lastErr -} - // ============================================================================= // 向后兼容的MongoDB结构体 // ============================================================================= @@ -437,47 +76,6 @@ func GetDB() *mongo.Database { return source.Database() } -// init 初始化多数据源 -func init() { - logPool = grpool.New(1) - serverName = g.Cfg().MustGet(context.TODO(), "server.name").String() - logRedisKey = fmt.Sprintf("log:%s", serverName) - - ctx := context.Background() - - // 从配置初始化多数据源 - if err := manager.InitializeFromConfig(ctx); err != nil { - glog.Errorf(ctx, "❌ Failed to initialize MongoDB datasources: %v", err) - } else { - glog.Infof(ctx, "✅ MongoDB datasources initialized: %v", manager.GetAllDataSourceNames()) - } - - // 启动健康检查 - manager.StartHealthCheck() - - // 设置优雅关闭 - setupGracefulShutdown() -} - -// setupGracefulShutdown 设置优雅关闭 -func setupGracefulShutdown() { - go func() { - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - <-sigCh - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - glog.Info(ctx, "🔄 Shutting down MongoDB connections...") - if err := manager.CloseAll(ctx); err != nil { - glog.Errorf(ctx, "❌ Failed to close MongoDB connections: %v", err) - } else { - glog.Info(ctx, "✅ MongoDB connections closed successfully") - } - }() -} - // ============================================================================= // MongoDB 操作方法(支持多数据源) // =============================================================================