From 5f232b0ebddb42dac1f4f96297e8978277a289ee Mon Sep 17 00:00:00 2001 From: Cold <16419454+cold502@user.noreply.gitee.com> Date: Fri, 26 Dec 2025 18:11:00 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0goroutine=E6=96=B9=E5=BC=8F?= =?UTF-8?q?=E5=A4=84=E7=90=86for=E5=BE=AA=E7=8E=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ragflow/worker_pool.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/ragflow/worker_pool.go b/ragflow/worker_pool.go index 98a1b48..f7635b2 100644 --- a/ragflow/worker_pool.go +++ b/ragflow/worker_pool.go @@ -7,6 +7,7 @@ import ( "gitee.com/red-future---jilin-g/common/redis" "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/os/grpool" ) // 默认批量大小(每次从 Redis 读取并发送的消息数) @@ -18,14 +19,17 @@ type QueueProcessor struct { groupName string // 消费者组名称 consumerName string // 消费者名称 timeout int64 // 阻塞超时时间(毫秒) - batchSize int64 // 最大并发数(信号量容量) + batchSize int64 // 最大并发数(协程池大小) stopChan chan struct{} // 停止信号 - semaphore chan struct{} // 并发信号量(控制最大并发) + pool *grpool.Pool // GoFrame协程池 handleFunc func(ctx context.Context, message map[string]interface{}) error } // NewQueueProcessor 创建 Stream 处理器 func NewQueueProcessor(streamKey, groupName, consumerName string, timeout, batchSize int64, handleFunc func(ctx context.Context, message map[string]interface{}) error) *QueueProcessor { + // 创建协程池:固定大小,避免频繁创建销毁goroutine + pool := grpool.New(int(batchSize)) + return &QueueProcessor{ streamKey: streamKey, groupName: groupName, @@ -33,7 +37,7 @@ func NewQueueProcessor(streamKey, groupName, consumerName string, timeout, batch timeout: timeout, batchSize: batchSize, stopChan: make(chan struct{}), - semaphore: make(chan struct{}, batchSize), // 信号量容量 = 最大并发数 + pool: pool, // 使用GoFrame协程池 handleFunc: handleFunc, } } @@ -79,16 +83,15 @@ func (q *QueueProcessor) Start(ctx context.Context) error { glog.Debugf(ctx, "读取 %d 条消息,开始发送", len(messages)) - // 2. 用信号量控制并发:获取信号量后发送,完成后释放 + // 2. 使用协程池提交任务:复用goroutine,避免频繁创建销毁 for _, msg := range messages { - // 获取信号量(阻塞直到有空位) - q.semaphore <- struct{}{} - go func(m redis.StreamMessage) { - defer func() { <-q.semaphore }() // 完成后释放信号量 + m := msg // 捕获循环变量 + // 提交到协程池,池满时会阻塞等待空闲worker + q.pool.Add(ctx, func(ctx context.Context) { q.processMessage(ctx, m) - }(msg) + }) } - // 3. 立刻读下一批(不等待,信号量自动控制并发数) + // 3. 立刻读下一批(不等待,协程池自动控制并发数) } } } @@ -109,4 +112,6 @@ func (q *QueueProcessor) processMessage(ctx context.Context, message redis.Strea // Stop 停止队列处理器 func (q *QueueProcessor) Stop() { close(q.stopChan) + // 关闭协程池,等待所有任务完成 + q.pool.Close() }