package ragflow import ( "context" "strings" "time" "gitee.com/red-future---jilin-g/common/redis" "github.com/gogf/gf/v2/os/glog" ) // 默认批量大小(每次从 Redis 读取并发送的消息数) const defaultBatchSize = 200 // QueueProcessor Stream 处理器,批量读取消息并发送到 RAGFlow type QueueProcessor struct { streamKey string // Stream 键名 groupName string // 消费者组名称 consumerName string // 消费者名称 timeout int64 // 阻塞超时时间(毫秒) batchSize int64 // 最大并发数(信号量容量) stopChan chan struct{} // 停止信号 semaphore chan struct{} // 并发信号量(控制最大并发) handleFunc func(ctx context.Context, message map[string]interface{}) error } // NewQueueProcessor 创建 Stream 处理器 func NewQueueProcessor(streamKey, groupName, consumerName string, timeout, batchSize int64, handleFunc func(ctx context.Context, message map[string]interface{}) error) *QueueProcessor { return &QueueProcessor{ streamKey: streamKey, groupName: groupName, consumerName: consumerName, timeout: timeout, batchSize: batchSize, stopChan: make(chan struct{}), semaphore: make(chan struct{}, batchSize), // 信号量容量 = 最大并发数 handleFunc: handleFunc, } } // Start 启动 Stream 处理器 // 削峰填谷:每次读取 batchSize 条消息,并发发送,发完立刻读下一批 func (q *QueueProcessor) Start(ctx context.Context) error { glog.Infof(ctx, "Stream 处理器启动 - Stream: %s, 消费者组: %s, 消费者: %s, 批量大小: %d", q.streamKey, q.groupName, q.consumerName, q.batchSize) // 确保 Consumer Group 存在(重试直到成功) for { if err := redis.CreateConsumerGroup(ctx, q.streamKey, q.groupName); err != nil { // BUSYGROUP 表示已存在,不是错误 if strings.Contains(err.Error(), "BUSYGROUP") { glog.Debugf(ctx, "Consumer Group 已存在") break } glog.Warningf(ctx, "创建 Consumer Group 失败: %v,1秒后重试", err) time.Sleep(time.Second) continue } glog.Infof(ctx, "Consumer Group 创建成功") break } for { select { case <-q.stopChan: glog.Info(ctx, "Stream 处理器收到停止信号") return nil default: // 1. 从 Redis Stream 读取一批消息 messages, err := redis.ReadFromStream(ctx, q.streamKey, q.groupName, q.consumerName, q.batchSize, q.timeout) if err != nil { glog.Errorf(ctx, "从 Stream 读取消息失败: %v", err) continue } if len(messages) == 0 { continue } glog.Debugf(ctx, "读取 %d 条消息,开始发送", len(messages)) // 2. 用信号量控制并发:获取信号量后发送,完成后释放 for _, msg := range messages { // 获取信号量(阻塞直到有空位) q.semaphore <- struct{}{} go func(m redis.StreamMessage) { defer func() { <-q.semaphore }() // 完成后释放信号量 q.processMessage(ctx, m) }(msg) } // 3. 立刻读下一批(不等待,信号量自动控制并发数) } } } // processMessage 处理单条消息(异步执行) func (q *QueueProcessor) processMessage(ctx context.Context, message redis.StreamMessage) { // 调用处理函数发送到 RAGFlow if err := q.handleFunc(ctx, message.Values); err != nil { glog.Errorf(ctx, "消息处理失败: %v, 消息ID: %s", err, message.ID) } // 无论成功失败都 ACK(避免重复消费) if err := redis.AckMessage(ctx, q.streamKey, q.groupName, message.ID); err != nil { glog.Errorf(ctx, "确认消息失败: %v, 消息ID: %s", err, message.ID) } } // Stop 停止队列处理器 func (q *QueueProcessor) Stop() { close(q.stopChan) }