// Package rabbitmq 提供 RabbitMQ 消费者管理功能 // // 本文件实现消费者统一管理,简化业务层的启动逻辑 package rabbitmq import ( "context" "sync" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" ) // ManagedConsumer 托管消费者(包含启动和停止函数) type ManagedConsumer struct { Name string // 消费者名称 Start func(ctx context.Context) error // 启动函数 Stop func(ctx context.Context) // 停止函数 } // ConsumerManager RabbitMQ 消费者管理器 // // 职责: // 1. 统一管理所有 RabbitMQ 消费者的生命周期 // 2. 初始化 RabbitMQ 连接和队列 // 3. 启动/停止所有消费者 // 4. 协调消费者的优雅退出 // // 使用示例: // // mgr := rabbitmq.NewConsumerManager(ctx) // mgr.Register("响应消费者", responseConsumer.Start, responseConsumer.Stop) // mgr.Init() // defer mgr.Stop() type ConsumerManager struct { ctx context.Context // 全局上下文 consumers []*ManagedConsumer // 消费者列表 wg sync.WaitGroup // 等待所有消费者协程退出 } // NewConsumerManager 创建消费者管理器 // // 参数: // // ctx: 上下文 // // 返回: // // *ConsumerManager: 消费者管理器实例 func NewConsumerManager(ctx context.Context) *ConsumerManager { return &ConsumerManager{ ctx: ctx, consumers: make([]*ManagedConsumer, 0), } } // Register 注册消费者 // // 参数: // // name: 消费者名称(用于日志) // startFunc: 启动函数 // stopFunc: 停止函数 // // 使用示例: // // consumer := service.NewResponseConsumer(ctx) // mgr.Register("响应消费者", consumer.Start, consumer.Stop) func (cm *ConsumerManager) Register(name string, startFunc func(ctx context.Context) error, stopFunc func(ctx context.Context)) { cm.consumers = append(cm.consumers, &ManagedConsumer{ Name: name, Start: startFunc, Stop: stopFunc, }) } // Init 初始化并启动所有消费者 // // 执行流程: // 1. 检查 RabbitMQ 配置(未配置则跳过) // 2. 初始化 RabbitMQ 连接 // 3. 声明并绑定队列(响应队列、延时落库队列) // 4. 异步启动所有已注册的消费者 // // 返回: // // err: 错误信息,成功返回 nil // // 注意: // - 如果 RabbitMQ 未配置,不会报错,只是跳过初始化 // - 响应队列初始化失败会导致 Fatal 退出 // - 延时落库队列失败只会 Warning,不影响主流程 func (cm *ConsumerManager) Init() (err error) { // 检查配置文件中是否配置了 RabbitMQ if g.Cfg().MustGet(cm.ctx, "rabbitmq").IsEmpty() { glog.Info(cm.ctx, "RabbitMQ未配置,跳过消费者初始化") return } // 初始化 RabbitMQ 连接(从 config.yml 读取配置) if err = InitFromConfig(cm.ctx); err != nil { glog.Fatalf(cm.ctx, "初始化 RabbitMQ 失败: %v", err) return } glog.Info(cm.ctx, "RabbitMQ 连接已初始化") // 设置响应队列(RAGFlow 响应消息) if err = SetupResponseQueue(cm.ctx); err != nil { glog.Fatalf(cm.ctx, "设置响应队列失败: %v", err) return } // 设置延时落库队列(对话缓存兜底机制) // 失败不影响主流程,只记录 Warning if err = SetupDelayedFlushQueue(cm.ctx); err != nil { glog.Warningf(cm.ctx, "设置延时落库队列失败: %v", err) } // 异步启动所有已注册的消费者 cm.startConsumers() return } // startConsumers 启动所有消费者(内部方法) // // 实现: // 1. 遍历已注册的消费者 // 2. 每个消费者在独立的 goroutine 中运行 // 3. 使用 WaitGroup 追踪所有消费者协程 func (cm *ConsumerManager) startConsumers() { for _, c := range cm.consumers { cm.wg.Add(1) go func(consumer *ManagedConsumer) { defer cm.wg.Done() if err := consumer.Start(cm.ctx); err != nil { glog.Errorf(cm.ctx, "%s启动失败: %v", consumer.Name, err) } }(c) glog.Infof(cm.ctx, "%s已启动", c.Name) } } // Stop 停止所有消费者(优雅退出) // // 执行流程: // 1. 依次停止所有消费者(调用各自的 Stop 方法) // 2. 等待所有消费者协程退出(WaitGroup.Wait) // 3. 关闭 RabbitMQ 连接 // // 使用场景: // - 收到 SIGINT/SIGTERM 信号时 // - 程序正常退出时 // - defer mgr.Stop() // // 注意: // - Stop 方法会阻塞直到所有消费者完全退出 // - 确保消费者能正确响应 Stop 信号 func (cm *ConsumerManager) Stop() { // 依次停止所有消费者 for _, c := range cm.consumers { c.Stop(cm.ctx) glog.Infof(cm.ctx, "%s已停止", c.Name) } // 等待所有消费者协程退出 cm.wg.Wait() // 关闭 RabbitMQ 连接 Close(cm.ctx) glog.Info(cm.ctx, "所有消费者已停止,RabbitMQ连接已关闭") }