修复redis和queueprocessor
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitee.com/red-future---jilin-g/common/redis"
|
||||
@@ -16,14 +17,15 @@ const defaultBatchSize = 200
|
||||
|
||||
// QueueProcessor Stream 处理器,批量读取消息并发送到 RAGFlow
|
||||
type QueueProcessor struct {
|
||||
streamKey string // Stream 键名
|
||||
groupName string // 消费者组名称
|
||||
consumerName string // 消费者名称
|
||||
timeout int64 // 阻塞超时时间(毫秒)
|
||||
batchSize int64 // 最大并发数(协程池大小)
|
||||
stopChan chan struct{} // 停止信号
|
||||
pool *grpool.Pool // GoFrame协程池
|
||||
handleFunc func(ctx context.Context, message map[string]interface{}) error
|
||||
streamKey string // Stream 键名
|
||||
groupName string // 消费者组名称
|
||||
consumerName string // 消费者名称
|
||||
timeout int64 // 阻塞超时时间(毫秒)
|
||||
batchSize int64 // 最大并发数(协程池大小)
|
||||
stopChan chan struct{} // 停止信号
|
||||
pool *grpool.Pool // GoFrame协程池
|
||||
handleFunc func(ctx context.Context, message map[string]interface{}) error
|
||||
processingMsgs sync.Map // 正在处理的消息ID(去重用)
|
||||
}
|
||||
|
||||
// NewQueueProcessor 创建 Stream 处理器
|
||||
@@ -84,14 +86,27 @@ func (q *QueueProcessor) Start(ctx context.Context) error {
|
||||
|
||||
glog.Infof(ctx, "✅ 从Stream读取到 %d 条消息,开始处理", len(messages))
|
||||
|
||||
// 2. 使用协程池提交任务:复用goroutine,避免频繁创建销毁
|
||||
// 2. 去重+立即ACK:对话场景优先实时性,失败不重试
|
||||
for i, msg := range messages {
|
||||
m := msg // 捕获循环变量
|
||||
msgIndex := i + 1
|
||||
|
||||
// 去重:如果消息正在处理,跳过
|
||||
if _, exists := q.processingMsgs.LoadOrStore(m.ID, true); exists {
|
||||
glog.Debugf(ctx, "⏭️ 跳过正在处理的消息 - ID: %s", m.ID)
|
||||
continue
|
||||
}
|
||||
|
||||
// 立即ACK:对话场景不需要重试,避免重复消费
|
||||
if err := redis.AckMessage(ctx, q.streamKey, q.groupName, m.ID); err != nil {
|
||||
glog.Errorf(ctx, "确认消息失败: %v, 消息ID: %s", err, m.ID)
|
||||
}
|
||||
|
||||
glog.Infof(ctx, "📨 准备处理第 %d/%d 条消息 - ID: %s", msgIndex, len(messages), m.ID)
|
||||
|
||||
// 提交到协程池,池满时会阻塞等待空闲worker
|
||||
q.pool.Add(ctx, func(ctx context.Context) {
|
||||
defer q.processingMsgs.Delete(m.ID) // 处理完成后移除标记
|
||||
q.processMessage(ctx, m)
|
||||
})
|
||||
}
|
||||
@@ -127,10 +142,8 @@ func (q *QueueProcessor) processMessage(ctx context.Context, message redis.Strea
|
||||
glog.Infof(ctx, "✅ 消息处理成功 - ID: %s", message.ID)
|
||||
}
|
||||
|
||||
// 无论成功失败都 ACK(避免重复消费)
|
||||
if err := redis.AckMessage(ctx, q.streamKey, q.groupName, message.ID); err != nil {
|
||||
glog.Errorf(ctx, "确认消息失败: %v, 消息ID: %s", err, message.ID)
|
||||
}
|
||||
// ACK已在读取后立即执行,此处无需重复ACK
|
||||
// 对话场景:失败直接丢弃,不重试(实时性优先)
|
||||
}
|
||||
|
||||
// Stop 停止队列处理器
|
||||
|
||||
Reference in New Issue
Block a user