更新goroutine方式处理for循环
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
"gitee.com/red-future---jilin-g/common/redis"
|
||||
"github.com/gogf/gf/v2/os/glog"
|
||||
"github.com/gogf/gf/v2/os/grpool"
|
||||
)
|
||||
|
||||
// 默认批量大小(每次从 Redis 读取并发送的消息数)
|
||||
@@ -18,14 +19,17 @@ type QueueProcessor struct {
|
||||
groupName string // 消费者组名称
|
||||
consumerName string // 消费者名称
|
||||
timeout int64 // 阻塞超时时间(毫秒)
|
||||
batchSize int64 // 最大并发数(信号量容量)
|
||||
batchSize int64 // 最大并发数(协程池大小)
|
||||
stopChan chan struct{} // 停止信号
|
||||
semaphore chan struct{} // 并发信号量(控制最大并发)
|
||||
pool *grpool.Pool // GoFrame协程池
|
||||
handleFunc func(ctx context.Context, message map[string]interface{}) error
|
||||
}
|
||||
|
||||
// NewQueueProcessor 创建 Stream 处理器
|
||||
func NewQueueProcessor(streamKey, groupName, consumerName string, timeout, batchSize int64, handleFunc func(ctx context.Context, message map[string]interface{}) error) *QueueProcessor {
|
||||
// 创建协程池:固定大小,避免频繁创建销毁goroutine
|
||||
pool := grpool.New(int(batchSize))
|
||||
|
||||
return &QueueProcessor{
|
||||
streamKey: streamKey,
|
||||
groupName: groupName,
|
||||
@@ -33,7 +37,7 @@ func NewQueueProcessor(streamKey, groupName, consumerName string, timeout, batch
|
||||
timeout: timeout,
|
||||
batchSize: batchSize,
|
||||
stopChan: make(chan struct{}),
|
||||
semaphore: make(chan struct{}, batchSize), // 信号量容量 = 最大并发数
|
||||
pool: pool, // 使用GoFrame协程池
|
||||
handleFunc: handleFunc,
|
||||
}
|
||||
}
|
||||
@@ -79,16 +83,15 @@ func (q *QueueProcessor) Start(ctx context.Context) error {
|
||||
|
||||
glog.Debugf(ctx, "读取 %d 条消息,开始发送", len(messages))
|
||||
|
||||
// 2. 用信号量控制并发:获取信号量后发送,完成后释放
|
||||
// 2. 使用协程池提交任务:复用goroutine,避免频繁创建销毁
|
||||
for _, msg := range messages {
|
||||
// 获取信号量(阻塞直到有空位)
|
||||
q.semaphore <- struct{}{}
|
||||
go func(m redis.StreamMessage) {
|
||||
defer func() { <-q.semaphore }() // 完成后释放信号量
|
||||
m := msg // 捕获循环变量
|
||||
// 提交到协程池,池满时会阻塞等待空闲worker
|
||||
q.pool.Add(ctx, func(ctx context.Context) {
|
||||
q.processMessage(ctx, m)
|
||||
}(msg)
|
||||
})
|
||||
}
|
||||
// 3. 立刻读下一批(不等待,信号量自动控制并发数)
|
||||
// 3. 立刻读下一批(不等待,协程池自动控制并发数)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -109,4 +112,6 @@ func (q *QueueProcessor) processMessage(ctx context.Context, message redis.Strea
|
||||
// Stop 停止队列处理器
|
||||
func (q *QueueProcessor) Stop() {
|
||||
close(q.stopChan)
|
||||
// 关闭协程池,等待所有任务完成
|
||||
q.pool.Close()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user