From 4269b4fd793a8efaf8563ce44f046802c24a096e Mon Sep 17 00:00:00 2001 From: Cold <16419454+cold502@user.noreply.gitee.com> Date: Wed, 31 Dec 2025 16:10:23 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20HTTP=E6=A8=A1=E5=BC=8F=E6=94=AF?= =?UTF-8?q?=E6=8C=81=20-=20=E4=BC=98=E5=8C=96Stream=E6=B6=88=E8=B4=B9?= =?UTF-8?q?=E9=80=BB=E8=BE=91=EF=BC=8C=E6=B7=BB=E5=8A=A0pending=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E4=BC=98=E5=85=88=E8=AF=BB=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ragflow/worker_pool.go | 30 +++++++++++++++++++++++++++--- redis/redis.go | 28 ++++++++++++++++++++++------ 2 files changed, 49 insertions(+), 9 deletions(-) diff --git a/ragflow/worker_pool.go b/ragflow/worker_pool.go index f7635b2..32b4b67 100644 --- a/ragflow/worker_pool.go +++ b/ragflow/worker_pool.go @@ -2,6 +2,7 @@ package ragflow import ( "context" + "runtime/debug" "strings" "time" @@ -81,11 +82,14 @@ func (q *QueueProcessor) Start(ctx context.Context) error { continue } - glog.Debugf(ctx, "读取 %d 条消息,开始发送", len(messages)) + glog.Infof(ctx, "✅ 从Stream读取到 %d 条消息,开始处理", len(messages)) // 2. 使用协程池提交任务:复用goroutine,避免频繁创建销毁 - for _, msg := range messages { + for i, msg := range messages { m := msg // 捕获循环变量 + msgIndex := i + 1 + glog.Infof(ctx, "📨 准备处理第 %d/%d 条消息 - ID: %s", msgIndex, len(messages), m.ID) + // 提交到协程池,池满时会阻塞等待空闲worker q.pool.Add(ctx, func(ctx context.Context) { q.processMessage(ctx, m) @@ -98,9 +102,29 @@ func (q *QueueProcessor) Start(ctx context.Context) error { // processMessage 处理单条消息(异步执行) func (q *QueueProcessor) processMessage(ctx context.Context, message redis.StreamMessage) { + // 捕获panic,防止协程崩溃 + defer func() { + if r := recover(); r != nil { + glog.Errorf(ctx, "❌ PANIC: 消息处理发生panic - 消息ID: %s, panic内容: %v\n堆栈:\n%s", + message.ID, r, debug.Stack()) + } + }() + + glog.Infof(ctx, "🔄 开始处理消息 - ID: %s", message.ID) + + // 打印实际字段名(调试用) + var fieldNames []string + for key := range message.Values { + fieldNames = append(fieldNames, key) + } + glog.Infof(ctx, "📋 消息字段名列表: %v", fieldNames) + glog.Infof(ctx, "📦 消息完整内容: %+v", message.Values) + // 调用处理函数发送到 RAGFlow if err := q.handleFunc(ctx, message.Values); err != nil { - glog.Errorf(ctx, "消息处理失败: %v, 消息ID: %s", err, message.ID) + glog.Errorf(ctx, "❌ 消息处理失败: %v, 消息ID: %s", err, message.ID) + } else { + glog.Infof(ctx, "✅ 消息处理成功 - ID: %s", message.ID) } // 无论成功失败都 ACK(避免重复消费) diff --git a/redis/redis.go b/redis/redis.go index f1af3ac..c112d41 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -190,9 +190,6 @@ func CreateConsumerGroup(ctx context.Context, streamKey, groupName string) error // ReadFromStream 从 Stream 读取消息(消费者组模式) // 使用 gredis Do() 方法执行 XREADGROUP 命令 func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count int64, blockMs int64) ([]StreamMessage, error) { - glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP GROUP %s %s COUNT %d BLOCK %d STREAMS %s >", - groupName, consumerName, count, blockMs, streamKey) - // 检查是否需要记录trace(避免轮询产生大量trace) execCtx := ctx if !g.Cfg().MustGet(ctx, "jaeger.traceStream", true).Bool() { @@ -201,17 +198,36 @@ func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName stri } RECONNECT: - // XREADGROUP GROUP groupName consumerName COUNT count BLOCK blockMs STREAMS streamKey > + // 先尝试读取pending消息(ID=0),处理积压 + glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP GROUP %s %s COUNT %d BLOCK 0 STREAMS %s 0", + groupName, consumerName, count, streamKey) + result, err := redisClient.Do(execCtx, "XREADGROUP", "GROUP", groupName, consumerName, "COUNT", count, - "BLOCK", blockMs, - "STREAMS", streamKey, ">", + "BLOCK", 0, // 不阻塞,立即返回 + "STREAMS", streamKey, "0", // ID=0 读取pending消息 ) if err != nil { goto RECONNECT } + // 如果没有pending消息,读取新消息 + if result == nil || result.IsEmpty() { + glog.Debugf(ctx, "[DEBUG Redis] 无pending消息,读取新消息 XREADGROUP GROUP %s %s COUNT %d BLOCK %d STREAMS %s >", + groupName, consumerName, count, blockMs, streamKey) + + result, err = redisClient.Do(execCtx, + "XREADGROUP", "GROUP", groupName, consumerName, + "COUNT", count, + "BLOCK", blockMs, + "STREAMS", streamKey, ">", + ) + if err != nil { + goto RECONNECT + } + } + glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP 返回: %+v", result) // 预分配容量,避免动态扩容