Files
common/rabbitmq/consumer_manager.go
2026-03-12 08:51:13 +08:00

172 lines
4.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package rabbitmq 提供 RabbitMQ 消费者管理功能
//
// 本文件实现消费者统一管理,简化业务层的启动逻辑
package rabbitmq
import (
"context"
"sync"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
)
// ManagedConsumer 托管消费者(包含启动和停止函数)
type ManagedConsumer struct {
Name string // 消费者名称
Start func(ctx context.Context) error // 启动函数
Stop func(ctx context.Context) // 停止函数
}
// ConsumerManager RabbitMQ 消费者管理器
//
// 职责:
// 1. 统一管理所有 RabbitMQ 消费者的生命周期
// 2. 初始化 RabbitMQ 连接和队列
// 3. 启动/停止所有消费者
// 4. 协调消费者的优雅退出
//
// 使用示例:
//
// mgr := rabbitmq.NewConsumerManager(ctx)
// mgr.Register("响应消费者", responseConsumer.Start, responseConsumer.Stop)
// mgr.Init()
// defer mgr.Stop()
type ConsumerManager struct {
ctx context.Context // 全局上下文
consumers []*ManagedConsumer // 消费者列表
wg sync.WaitGroup // 等待所有消费者协程退出
}
// NewConsumerManager 创建消费者管理器
//
// 参数:
//
// ctx: 上下文
//
// 返回:
//
// *ConsumerManager: 消费者管理器实例
func NewConsumerManager(ctx context.Context) *ConsumerManager {
return &ConsumerManager{
ctx: ctx,
consumers: make([]*ManagedConsumer, 0),
}
}
// Register 注册消费者
//
// 参数:
//
// name: 消费者名称(用于日志)
// startFunc: 启动函数
// stopFunc: 停止函数
//
// 使用示例:
//
// consumer := service.NewResponseConsumer(ctx)
// mgr.Register("响应消费者", consumer.Start, consumer.Stop)
func (cm *ConsumerManager) Register(name string, startFunc func(ctx context.Context) error, stopFunc func(ctx context.Context)) {
cm.consumers = append(cm.consumers, &ManagedConsumer{
Name: name,
Start: startFunc,
Stop: stopFunc,
})
}
// Init 初始化并启动所有消费者
//
// 执行流程:
// 1. 检查 RabbitMQ 配置(未配置则跳过)
// 2. 初始化 RabbitMQ 连接
// 3. 声明并绑定队列(响应队列、延时落库队列)
// 4. 异步启动所有已注册的消费者
//
// 返回:
//
// err: 错误信息,成功返回 nil
//
// 注意:
// - 如果 RabbitMQ 未配置,不会报错,只是跳过初始化
// - 响应队列初始化失败会导致 Fatal 退出
// - 延时落库队列失败只会 Warning不影响主流程
func (cm *ConsumerManager) Init() (err error) {
// 检查配置文件中是否配置了 RabbitMQ
if g.Cfg().MustGet(cm.ctx, "rabbitmq").IsEmpty() {
glog.Info(cm.ctx, "RabbitMQ未配置跳过消费者初始化")
return
}
// 初始化 RabbitMQ 连接(从 config.yml 读取配置)
if err = InitFromConfig(cm.ctx); err != nil {
glog.Fatalf(cm.ctx, "初始化 RabbitMQ 失败: %v", err)
return
}
glog.Info(cm.ctx, "RabbitMQ 连接已初始化")
// 设置响应队列RAGFlow 响应消息)
if err = SetupResponseQueue(cm.ctx); err != nil {
glog.Fatalf(cm.ctx, "设置响应队列失败: %v", err)
return
}
// 设置延时落库队列(对话缓存兜底机制)
// 失败不影响主流程,只记录 Warning
if err = SetupDelayedFlushQueue(cm.ctx); err != nil {
glog.Warningf(cm.ctx, "设置延时落库队列失败: %v", err)
}
// 异步启动所有已注册的消费者
cm.startConsumers()
return
}
// startConsumers 启动所有消费者(内部方法)
//
// 实现:
// 1. 遍历已注册的消费者
// 2. 每个消费者在独立的 goroutine 中运行
// 3. 使用 WaitGroup 追踪所有消费者协程
func (cm *ConsumerManager) startConsumers() {
for _, c := range cm.consumers {
cm.wg.Add(1)
go func(consumer *ManagedConsumer) {
defer cm.wg.Done()
if err := consumer.Start(cm.ctx); err != nil {
glog.Errorf(cm.ctx, "%s启动失败: %v", consumer.Name, err)
}
}(c)
glog.Infof(cm.ctx, "%s已启动", c.Name)
}
}
// Stop 停止所有消费者(优雅退出)
//
// 执行流程:
// 1. 依次停止所有消费者(调用各自的 Stop 方法)
// 2. 等待所有消费者协程退出WaitGroup.Wait
// 3. 关闭 RabbitMQ 连接
//
// 使用场景:
// - 收到 SIGINT/SIGTERM 信号时
// - 程序正常退出时
// - defer mgr.Stop()
//
// 注意:
// - Stop 方法会阻塞直到所有消费者完全退出
// - 确保消费者能正确响应 Stop 信号
func (cm *ConsumerManager) Stop() {
// 依次停止所有消费者
for _, c := range cm.consumers {
c.Stop(cm.ctx)
glog.Infof(cm.ctx, "%s已停止", c.Name)
}
// 等待所有消费者协程退出
cm.wg.Wait()
// 关闭 RabbitMQ 连接
Close(cm.ctx)
glog.Info(cm.ctx, "所有消费者已停止RabbitMQ连接已关闭")
}