Files
common/ragflow/worker_pool.go
2026-03-12 08:51:07 +08:00

113 lines
3.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package ragflow
import (
"context"
"strings"
"time"
"gitee.com/red-future---jilin-g/common/redis"
"github.com/gogf/gf/v2/os/glog"
)
// 默认批量大小(每次从 Redis 读取并发送的消息数)
const defaultBatchSize = 200
// QueueProcessor Stream 处理器,批量读取消息并发送到 RAGFlow
type QueueProcessor struct {
streamKey string // Stream 键名
groupName string // 消费者组名称
consumerName string // 消费者名称
timeout int64 // 阻塞超时时间(毫秒)
batchSize int64 // 最大并发数(信号量容量)
stopChan chan struct{} // 停止信号
semaphore chan struct{} // 并发信号量(控制最大并发)
handleFunc func(ctx context.Context, message map[string]interface{}) error
}
// NewQueueProcessor 创建 Stream 处理器
func NewQueueProcessor(streamKey, groupName, consumerName string, timeout, batchSize int64, handleFunc func(ctx context.Context, message map[string]interface{}) error) *QueueProcessor {
return &QueueProcessor{
streamKey: streamKey,
groupName: groupName,
consumerName: consumerName,
timeout: timeout,
batchSize: batchSize,
stopChan: make(chan struct{}),
semaphore: make(chan struct{}, batchSize), // 信号量容量 = 最大并发数
handleFunc: handleFunc,
}
}
// Start 启动 Stream 处理器
// 削峰填谷:每次读取 batchSize 条消息,并发发送,发完立刻读下一批
func (q *QueueProcessor) Start(ctx context.Context) error {
glog.Infof(ctx, "Stream 处理器启动 - Stream: %s, 消费者组: %s, 消费者: %s, 批量大小: %d",
q.streamKey, q.groupName, q.consumerName, q.batchSize)
// 确保 Consumer Group 存在(重试直到成功)
for {
if err := redis.CreateConsumerGroup(ctx, q.streamKey, q.groupName); err != nil {
// BUSYGROUP 表示已存在,不是错误
if strings.Contains(err.Error(), "BUSYGROUP") {
glog.Debugf(ctx, "Consumer Group 已存在")
break
}
glog.Warningf(ctx, "创建 Consumer Group 失败: %v1秒后重试", err)
time.Sleep(time.Second)
continue
}
glog.Infof(ctx, "Consumer Group 创建成功")
break
}
for {
select {
case <-q.stopChan:
glog.Info(ctx, "Stream 处理器收到停止信号")
return nil
default:
// 1. 从 Redis Stream 读取一批消息
messages, err := redis.ReadFromStream(ctx, q.streamKey, q.groupName, q.consumerName, q.batchSize, q.timeout)
if err != nil {
glog.Errorf(ctx, "从 Stream 读取消息失败: %v", err)
continue
}
if len(messages) == 0 {
continue
}
glog.Debugf(ctx, "读取 %d 条消息,开始发送", len(messages))
// 2. 用信号量控制并发:获取信号量后发送,完成后释放
for _, msg := range messages {
// 获取信号量(阻塞直到有空位)
q.semaphore <- struct{}{}
go func(m redis.StreamMessage) {
defer func() { <-q.semaphore }() // 完成后释放信号量
q.processMessage(ctx, m)
}(msg)
}
// 3. 立刻读下一批(不等待,信号量自动控制并发数)
}
}
}
// processMessage 处理单条消息(异步执行)
func (q *QueueProcessor) processMessage(ctx context.Context, message redis.StreamMessage) {
// 调用处理函数发送到 RAGFlow
if err := q.handleFunc(ctx, message.Values); err != nil {
glog.Errorf(ctx, "消息处理失败: %v, 消息ID: %s", err, message.ID)
}
// 无论成功失败都 ACK避免重复消费
if err := redis.AckMessage(ctx, q.streamKey, q.groupName, message.ID); err != nil {
glog.Errorf(ctx, "确认消息失败: %v, 消息ID: %s", err, message.ID)
}
}
// Stop 停止队列处理器
func (q *QueueProcessor) Stop() {
close(q.stopChan)
}