feat: HTTP模式支持 - 优化Stream消费逻辑,添加pending消息优先读取

This commit is contained in:
Cold
2025-12-31 16:10:23 +08:00
committed by 张斌
parent 3b8980bb24
commit 4269b4fd79
2 changed files with 49 additions and 9 deletions

View File

@@ -2,6 +2,7 @@ package ragflow
import ( import (
"context" "context"
"runtime/debug"
"strings" "strings"
"time" "time"
@@ -81,11 +82,14 @@ func (q *QueueProcessor) Start(ctx context.Context) error {
continue continue
} }
glog.Debugf(ctx, "读取 %d 条消息,开始发送", len(messages)) glog.Infof(ctx, "✅ 从Stream读取 %d 条消息,开始处理", len(messages))
// 2. 使用协程池提交任务复用goroutine避免频繁创建销毁 // 2. 使用协程池提交任务复用goroutine避免频繁创建销毁
for _, msg := range messages { for i, msg := range messages {
m := msg // 捕获循环变量 m := msg // 捕获循环变量
msgIndex := i + 1
glog.Infof(ctx, "📨 准备处理第 %d/%d 条消息 - ID: %s", msgIndex, len(messages), m.ID)
// 提交到协程池池满时会阻塞等待空闲worker // 提交到协程池池满时会阻塞等待空闲worker
q.pool.Add(ctx, func(ctx context.Context) { q.pool.Add(ctx, func(ctx context.Context) {
q.processMessage(ctx, m) q.processMessage(ctx, m)
@@ -98,9 +102,29 @@ func (q *QueueProcessor) Start(ctx context.Context) error {
// processMessage 处理单条消息(异步执行) // processMessage 处理单条消息(异步执行)
func (q *QueueProcessor) processMessage(ctx context.Context, message redis.StreamMessage) { func (q *QueueProcessor) processMessage(ctx context.Context, message redis.StreamMessage) {
// 捕获panic防止协程崩溃
defer func() {
if r := recover(); r != nil {
glog.Errorf(ctx, "❌ PANIC: 消息处理发生panic - 消息ID: %s, panic内容: %v\n堆栈:\n%s",
message.ID, r, debug.Stack())
}
}()
glog.Infof(ctx, "🔄 开始处理消息 - ID: %s", message.ID)
// 打印实际字段名(调试用)
var fieldNames []string
for key := range message.Values {
fieldNames = append(fieldNames, key)
}
glog.Infof(ctx, "📋 消息字段名列表: %v", fieldNames)
glog.Infof(ctx, "📦 消息完整内容: %+v", message.Values)
// 调用处理函数发送到 RAGFlow // 调用处理函数发送到 RAGFlow
if err := q.handleFunc(ctx, message.Values); err != nil { if err := q.handleFunc(ctx, message.Values); err != nil {
glog.Errorf(ctx, "消息处理失败: %v, 消息ID: %s", err, message.ID) glog.Errorf(ctx, "消息处理失败: %v, 消息ID: %s", err, message.ID)
} else {
glog.Infof(ctx, "✅ 消息处理成功 - ID: %s", message.ID)
} }
// 无论成功失败都 ACK避免重复消费 // 无论成功失败都 ACK避免重复消费

View File

@@ -190,9 +190,6 @@ func CreateConsumerGroup(ctx context.Context, streamKey, groupName string) error
// ReadFromStream 从 Stream 读取消息(消费者组模式) // ReadFromStream 从 Stream 读取消息(消费者组模式)
// 使用 gredis Do() 方法执行 XREADGROUP 命令 // 使用 gredis Do() 方法执行 XREADGROUP 命令
func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count int64, blockMs int64) ([]StreamMessage, error) { func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count int64, blockMs int64) ([]StreamMessage, error) {
glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP GROUP %s %s COUNT %d BLOCK %d STREAMS %s >",
groupName, consumerName, count, blockMs, streamKey)
// 检查是否需要记录trace避免轮询产生大量trace // 检查是否需要记录trace避免轮询产生大量trace
execCtx := ctx execCtx := ctx
if !g.Cfg().MustGet(ctx, "jaeger.traceStream", true).Bool() { if !g.Cfg().MustGet(ctx, "jaeger.traceStream", true).Bool() {
@@ -201,17 +198,36 @@ func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName stri
} }
RECONNECT: RECONNECT:
// XREADGROUP GROUP groupName consumerName COUNT count BLOCK blockMs STREAMS streamKey > // 先尝试读取pending消息ID=0处理积压
glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP GROUP %s %s COUNT %d BLOCK 0 STREAMS %s 0",
groupName, consumerName, count, streamKey)
result, err := redisClient.Do(execCtx, result, err := redisClient.Do(execCtx,
"XREADGROUP", "GROUP", groupName, consumerName, "XREADGROUP", "GROUP", groupName, consumerName,
"COUNT", count, "COUNT", count,
"BLOCK", blockMs, "BLOCK", 0, // 不阻塞,立即返回
"STREAMS", streamKey, ">", "STREAMS", streamKey, "0", // ID=0 读取pending消息
) )
if err != nil { if err != nil {
goto RECONNECT goto RECONNECT
} }
// 如果没有pending消息读取新消息
if result == nil || result.IsEmpty() {
glog.Debugf(ctx, "[DEBUG Redis] 无pending消息读取新消息 XREADGROUP GROUP %s %s COUNT %d BLOCK %d STREAMS %s >",
groupName, consumerName, count, blockMs, streamKey)
result, err = redisClient.Do(execCtx,
"XREADGROUP", "GROUP", groupName, consumerName,
"COUNT", count,
"BLOCK", blockMs,
"STREAMS", streamKey, ">",
)
if err != nil {
goto RECONNECT
}
}
glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP 返回: %+v", result) glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP 返回: %+v", result)
// 预分配容量,避免动态扩容 // 预分配容量,避免动态扩容