package ragflow import ( "context" "sync" "gitee.com/red-future---jilin-g/common/redis" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/grpool" ) // WorkerPool RAGFlow 请求处理协程池 type WorkerPool struct { pool *grpool.Pool size int } // 单例模式相关变量 var ( workerPoolInstance *WorkerPool workerPoolOnce sync.Once ) // GetWorkerPoolWithSize 获取指定大小的协程池单例 // 使用 sync.Once 确保只创建一次,size 仅首次调用生效 func GetWorkerPoolWithSize(size int) *WorkerPool { workerPoolOnce.Do(func() { if size <= 0 { size = 200 // 默认大小 } workerPoolInstance = &WorkerPool{ pool: grpool.New(size), size: size, } }) return workerPoolInstance } // GetWorkerPool 获取协程池单例(使用默认大小 200) func GetWorkerPool() *WorkerPool { return GetWorkerPoolWithSize(200) } // NewWorkerPool 创建协程池(兼容旧代码,内部使用单例) // 参数: // - size: 协程池大小,仅首次调用生效 // // 返回: // - *WorkerPool: 协程池单例实例 // - error: 创建失败时返回错误 func NewWorkerPool(size int) (*WorkerPool, error) { if size <= 0 { return nil, gerror.New("协程池大小必须大于0") } return GetWorkerPoolWithSize(size), nil } // Submit 提交任务到协程池 // 参数: // - ctx: 上下文 // - task: 要执行的任务函数 // // 返回:error 提交失败时返回错误 func (w *WorkerPool) Submit(ctx context.Context, task func(ctx context.Context)) error { return w.pool.Add(ctx, func(ctx context.Context) { defer func() { if r := recover(); r != nil { glog.Errorf(ctx, "协程池任务执行 panic: %v", r) } }() task(ctx) }) } // Size 获取协程池大小 func (w *WorkerPool) Size() int { return w.size } // Jobs 获取当前等待执行的任务数量 func (w *WorkerPool) Jobs() int { return w.pool.Jobs() } // Close 关闭协程池 func (w *WorkerPool) Close() { w.pool.Close() } // WorkerStats 协程池统计信息 type WorkerStats struct { PoolSize int // 协程池大小 Jobs int // 等待执行的任务数 } // Stats 获取协程池统计信息 func (w *WorkerPool) Stats() WorkerStats { return WorkerStats{ PoolSize: w.size, Jobs: w.pool.Jobs(), } } // PrintStats 打印协程池统计信息 func (w *WorkerPool) PrintStats(ctx context.Context) { stats := w.Stats() glog.Infof(ctx, "协程池统计 - 池大小: %d, 等待任务: %d", stats.PoolSize, stats.Jobs) } // QueueProcessor Stream 处理器,从 Redis Stream 中取出任务并提交到协程池 type QueueProcessor struct { pool *WorkerPool streamKey string // Stream 键名 groupName string // 消费者组名称 consumerName string // 消费者名称 timeout int64 // 阻塞超时时间(毫秒) batchSize int64 // 每次读取的消息数量 stopChan chan struct{} handleFunc func(ctx context.Context, message map[string]interface{}) error } // NewQueueProcessor 创建 Stream 处理器 // 参数: // - pool: 协程池 // - streamKey: Redis Stream 键名 // - groupName: 消费者组名称 // - consumerName: 消费者名称(唯一标识) // - timeout: 从 Stream 取消息的超时时间(毫秒) // - batchSize: 每次读取的消息数量 // - handleFunc: 消息处理函数 func NewQueueProcessor(pool *WorkerPool, streamKey, groupName, consumerName string, timeout int64, batchSize int64, handleFunc func(ctx context.Context, message map[string]interface{}) error) *QueueProcessor { return &QueueProcessor{ pool: pool, streamKey: streamKey, groupName: groupName, consumerName: consumerName, timeout: timeout, batchSize: batchSize, stopChan: make(chan struct{}), handleFunc: handleFunc, } } // Start 启动 Stream 处理器 // 会阻塞运行,持续从 Redis Stream 中取出消息并提交到协程池处理 func (q *QueueProcessor) Start(ctx context.Context) error { glog.Infof(ctx, "Stream 处理器启动 - Stream: %s, 消费者组: %s, 消费者: %s, 超时: %dms", q.streamKey, q.groupName, q.consumerName, q.timeout) for { select { case <-q.stopChan: glog.Info(ctx, "Stream 处理器收到停止信号") return nil default: // 从 Redis Stream 中读取消息 messages, err := q.fetchMessages(ctx) if err != nil { glog.Errorf(ctx, "从 Stream 读取消息失败: %v", err) continue } // 没有新消息,继续等待 if len(messages) == 0 { continue } // 处理每条消息 for _, msg := range messages { // 提交到协程池处理 if err := q.submitTask(ctx, msg); err != nil { glog.Errorf(ctx, "提交任务到协程池失败: %v, 消息ID: %s", err, msg.ID) } } } } } // Stop 停止队列处理器 func (q *QueueProcessor) Stop() { close(q.stopChan) } // fetchMessages 从 Redis Stream 中读取消息 func (q *QueueProcessor) fetchMessages(ctx context.Context) ([]redis.StreamMessage, error) { // 从消费者组读取消息 return redis.ReadFromStream(ctx, q.streamKey, q.groupName, q.consumerName, q.batchSize, q.timeout) } // submitTask 将消息处理任务提交到协程池 func (q *QueueProcessor) submitTask(ctx context.Context, message redis.StreamMessage) error { return q.pool.Submit(ctx, func(ctx context.Context) { // 处理消息 if err := q.handleFunc(ctx, message.Values); err != nil { glog.Errorf(ctx, "处理消息失败: %v, 消息ID: %s", err, message.ID) return } // 处理成功后确认消息 if err := redis.AckMessage(ctx, q.streamKey, q.groupName, message.ID); err != nil { glog.Errorf(ctx, "确认消息失败: %v, 消息ID: %s", err, message.ID) } else { glog.Debugf(ctx, "消息处理完成并已确认: %s", message.ID) } }) }