Files
common/ragflow/worker_pool.go
2026-03-12 08:51:17 +08:00

118 lines
3.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package ragflow
import (
"context"
"strings"
"time"
"gitee.com/red-future---jilin-g/common/redis"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/grpool"
)
// 默认批量大小(每次从 Redis 读取并发送的消息数)
const defaultBatchSize = 200
// QueueProcessor Stream 处理器,批量读取消息并发送到 RAGFlow
type QueueProcessor struct {
streamKey string // Stream 键名
groupName string // 消费者组名称
consumerName string // 消费者名称
timeout int64 // 阻塞超时时间(毫秒)
batchSize int64 // 最大并发数(协程池大小)
stopChan chan struct{} // 停止信号
pool *grpool.Pool // GoFrame协程池
handleFunc func(ctx context.Context, message map[string]interface{}) error
}
// NewQueueProcessor 创建 Stream 处理器
func NewQueueProcessor(streamKey, groupName, consumerName string, timeout, batchSize int64, handleFunc func(ctx context.Context, message map[string]interface{}) error) *QueueProcessor {
// 创建协程池固定大小避免频繁创建销毁goroutine
pool := grpool.New(int(batchSize))
return &QueueProcessor{
streamKey: streamKey,
groupName: groupName,
consumerName: consumerName,
timeout: timeout,
batchSize: batchSize,
stopChan: make(chan struct{}),
pool: pool, // 使用GoFrame协程池
handleFunc: handleFunc,
}
}
// Start 启动 Stream 处理器
// 削峰填谷:每次读取 batchSize 条消息,并发发送,发完立刻读下一批
func (q *QueueProcessor) Start(ctx context.Context) error {
glog.Infof(ctx, "Stream 处理器启动 - Stream: %s, 消费者组: %s, 消费者: %s, 批量大小: %d",
q.streamKey, q.groupName, q.consumerName, q.batchSize)
// 确保 Consumer Group 存在(重试直到成功)
for {
if err := redis.CreateConsumerGroup(ctx, q.streamKey, q.groupName); err != nil {
// BUSYGROUP 表示已存在,不是错误
if strings.Contains(err.Error(), "BUSYGROUP") {
glog.Debugf(ctx, "Consumer Group 已存在")
break
}
glog.Warningf(ctx, "创建 Consumer Group 失败: %v1秒后重试", err)
time.Sleep(time.Second)
continue
}
glog.Infof(ctx, "Consumer Group 创建成功")
break
}
for {
select {
case <-q.stopChan:
glog.Info(ctx, "Stream 处理器收到停止信号")
return nil
default:
// 1. 从 Redis Stream 读取一批消息
messages, err := redis.ReadFromStream(ctx, q.streamKey, q.groupName, q.consumerName, q.batchSize, q.timeout)
if err != nil {
glog.Errorf(ctx, "从 Stream 读取消息失败: %v", err)
continue
}
if len(messages) == 0 {
continue
}
glog.Debugf(ctx, "读取 %d 条消息,开始发送", len(messages))
// 2. 使用协程池提交任务复用goroutine避免频繁创建销毁
for _, msg := range messages {
m := msg // 捕获循环变量
// 提交到协程池池满时会阻塞等待空闲worker
q.pool.Add(ctx, func(ctx context.Context) {
q.processMessage(ctx, m)
})
}
// 3. 立刻读下一批(不等待,协程池自动控制并发数)
}
}
}
// processMessage 处理单条消息(异步执行)
func (q *QueueProcessor) processMessage(ctx context.Context, message redis.StreamMessage) {
// 调用处理函数发送到 RAGFlow
if err := q.handleFunc(ctx, message.Values); err != nil {
glog.Errorf(ctx, "消息处理失败: %v, 消息ID: %s", err, message.ID)
}
// 无论成功失败都 ACK避免重复消费
if err := redis.AckMessage(ctx, q.streamKey, q.groupName, message.ID); err != nil {
glog.Errorf(ctx, "确认消息失败: %v, 消息ID: %s", err, message.ID)
}
}
// Stop 停止队列处理器
func (q *QueueProcessor) Stop() {
close(q.stopChan)
// 关闭协程池,等待所有任务完成
q.pool.Close()
}