From 4d6aa1f38458633fb771fa7d7742a5af9a60f779 Mon Sep 17 00:00:00 2001 From: Cold <16419454+cold502@user.noreply.gitee.com> Date: Wed, 21 Jan 2026 10:20:32 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dredis=E5=92=8Cqueueprocessor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- http/http.go | 4 ---- ragflow/client.go | 7 ------ ragflow/session.go | 3 +++ ragflow/worker_pool.go | 39 ++++++++++++++++++++++------------ redis/redis.go | 48 ++++++++++++++++++++++++++++++++++++------ 5 files changed, 71 insertions(+), 30 deletions(-) diff --git a/http/http.go b/http/http.go index 47c110f..9f0efd7 100644 --- a/http/http.go +++ b/http/http.go @@ -89,9 +89,6 @@ func doRequest(ctx context.Context, method string, url string, headers map[strin defer response.Close() result := response.ReadAll() - // 添加调试日志:打印原始响应 - g.Log().Debugf(ctx, "[HTTP] 原始响应: %s", string(result)) - // 第三方API特例:RAGFlow等第三方API响应格式为{code,data,message}一层结构,直接解析原始JSON到target // 内部API格式为{code:200,message:"",data:{...}}两层结构,需经过DefaultHandlerResponse二次解析 // 判断依据:URL包含/api/v1/(不影响内部API调用) @@ -102,7 +99,6 @@ func doRequest(ctx context.Context, method string, url string, headers map[strin if err = gconv.Struct(result, target); err != nil { return errors.New("第三方API响应解析失败: " + err.Error()) } - g.Log().Debugf(ctx, "[HTTP] 第三方API直接解析target: %+v", target) return } diff --git a/ragflow/client.go b/ragflow/client.go index 128f2fb..502a8c2 100644 --- a/ragflow/client.go +++ b/ragflow/client.go @@ -142,13 +142,6 @@ func (c *Client) request(ctx context.Context, method, path string, body interfac fullURL := endpoint + path - // 添加详细日志:请求信息 - g.Log().Infof(ctx, "RAGFlow请求: %s %s", method, fullURL) - if body != nil { - bodyJSON := g.NewVar(body).String() - g.Log().Infof(ctx, "RAGFlow请求体: %s", bodyJSON) - } - // 使用common/http包 var headers = make(map[string]string) headers["Authorization"] = "Bearer " + c.APIKey diff --git a/ragflow/session.go b/ragflow/session.go index 7659eb2..966d0e8 100644 --- a/ragflow/session.go +++ b/ragflow/session.go @@ -4,6 +4,7 @@ import ( "context" "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/frame/g" ) // 会话管理 @@ -85,9 +86,11 @@ func (c *Client) CreateSession(ctx context.Context, chatId string, req *CreateSe Msg string `json:"message"` } if err := c.request(ctx, "POST", path, req, &res); err != nil { + g.Log().Errorf(ctx, "❌ CreateSession请求失败: chatId=%s, req=%+v, error=%v", chatId, req, err) return nil, err } if res.Code != 0 { + g.Log().Errorf(ctx, "❌ CreateSession返回失败: chatId=%s, req=%+v, code=%d, msg=%s", chatId, req, res.Code, res.Msg) return nil, gerror.Newf("create session failed: %s", res.Msg) } // 检查响应数据是否为空:防止RAGFlow API返回 {"code":0, "data":null} diff --git a/ragflow/worker_pool.go b/ragflow/worker_pool.go index 32b4b67..608dd81 100644 --- a/ragflow/worker_pool.go +++ b/ragflow/worker_pool.go @@ -4,6 +4,7 @@ import ( "context" "runtime/debug" "strings" + "sync" "time" "gitee.com/red-future---jilin-g/common/redis" @@ -16,14 +17,15 @@ const defaultBatchSize = 200 // QueueProcessor Stream 处理器,批量读取消息并发送到 RAGFlow type QueueProcessor struct { - streamKey string // Stream 键名 - groupName string // 消费者组名称 - consumerName string // 消费者名称 - timeout int64 // 阻塞超时时间(毫秒) - batchSize int64 // 最大并发数(协程池大小) - stopChan chan struct{} // 停止信号 - pool *grpool.Pool // GoFrame协程池 - handleFunc func(ctx context.Context, message map[string]interface{}) error + streamKey string // Stream 键名 + groupName string // 消费者组名称 + consumerName string // 消费者名称 + timeout int64 // 阻塞超时时间(毫秒) + batchSize int64 // 最大并发数(协程池大小) + stopChan chan struct{} // 停止信号 + pool *grpool.Pool // GoFrame协程池 + handleFunc func(ctx context.Context, message map[string]interface{}) error + processingMsgs sync.Map // 正在处理的消息ID(去重用) } // NewQueueProcessor 创建 Stream 处理器 @@ -84,14 +86,27 @@ func (q *QueueProcessor) Start(ctx context.Context) error { glog.Infof(ctx, "✅ 从Stream读取到 %d 条消息,开始处理", len(messages)) - // 2. 使用协程池提交任务:复用goroutine,避免频繁创建销毁 + // 2. 去重+立即ACK:对话场景优先实时性,失败不重试 for i, msg := range messages { m := msg // 捕获循环变量 msgIndex := i + 1 + + // 去重:如果消息正在处理,跳过 + if _, exists := q.processingMsgs.LoadOrStore(m.ID, true); exists { + glog.Debugf(ctx, "⏭️ 跳过正在处理的消息 - ID: %s", m.ID) + continue + } + + // 立即ACK:对话场景不需要重试,避免重复消费 + if err := redis.AckMessage(ctx, q.streamKey, q.groupName, m.ID); err != nil { + glog.Errorf(ctx, "确认消息失败: %v, 消息ID: %s", err, m.ID) + } + glog.Infof(ctx, "📨 准备处理第 %d/%d 条消息 - ID: %s", msgIndex, len(messages), m.ID) // 提交到协程池,池满时会阻塞等待空闲worker q.pool.Add(ctx, func(ctx context.Context) { + defer q.processingMsgs.Delete(m.ID) // 处理完成后移除标记 q.processMessage(ctx, m) }) } @@ -127,10 +142,8 @@ func (q *QueueProcessor) processMessage(ctx context.Context, message redis.Strea glog.Infof(ctx, "✅ 消息处理成功 - ID: %s", message.ID) } - // 无论成功失败都 ACK(避免重复消费) - if err := redis.AckMessage(ctx, q.streamKey, q.groupName, message.ID); err != nil { - glog.Errorf(ctx, "确认消息失败: %v, 消息ID: %s", err, message.ID) - } + // ACK已在读取后立即执行,此处无需重复ACK + // 对话场景:失败直接丢弃,不重试(实时性优先) } // Stop 停止队列处理器 diff --git a/redis/redis.go b/redis/redis.go index 648bd99..b4556a6 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -180,9 +180,10 @@ func AddToStream(ctx context.Context, streamKey string, msg interface{}) (messag } // CreateConsumerGroup 创建消费者组(如果不存在) -// XGROUP CREATE streamKey groupName $ MKSTREAM +// XGROUP CREATE streamKey groupName 0 MKSTREAM +// 使用0作为起始ID,从Stream开头读取所有未消费消息 func CreateConsumerGroup(ctx context.Context, streamKey, groupName string) error { - _, err := redisClient.Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "$", "MKSTREAM") + _, err := redisClient.Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM") return err } @@ -205,11 +206,29 @@ RECONNECT: "STREAMS", streamKey, "0", // ID=0 读取pending消息 ) if err != nil { + g.Log().Errorf(ctx, "❌ XREADGROUP读取pending失败: stream=%s, error=%v", streamKey, err) + time.Sleep(time.Second) goto RECONNECT } + // 检查pending结果是否为空(需要检查消息数组是否为空) + hasPending := false + if result != nil && !result.IsEmpty() { + // 尝试解析map格式 + if resultVal := result.Val(); resultVal != nil { + if streamsMap, ok := resultVal.(map[interface{}]interface{}); ok { + for _, streamMsgs := range streamsMap { + if msgsArray, ok := streamMsgs.([]interface{}); ok && len(msgsArray) > 0 { + hasPending = true + break + } + } + } + } + } + // 如果没有pending消息,读取新消息 - if result == nil || result.IsEmpty() { + if !hasPending { result, err = redisClient.Do(execCtx, "XREADGROUP", "GROUP", groupName, consumerName, "COUNT", count, @@ -217,6 +236,8 @@ RECONNECT: "STREAMS", streamKey, ">", ) if err != nil { + g.Log().Errorf(ctx, "❌ XREADGROUP读取新消息失败: stream=%s, error=%v", streamKey, err) + time.Sleep(time.Second) goto RECONNECT } } @@ -234,19 +255,26 @@ RECONNECT: // 尝试 map 格式(GoFrame gredis 返回) if streamsMap, ok := resultVal.(map[interface{}]interface{}); ok { - for _, streamMsgs := range streamsMap { + for streamKey, streamMsgs := range streamsMap { msgsArray, ok := streamMsgs.([]interface{}) if !ok { + g.Log().Errorf(ctx, "❌ streamMsgs类型转换失败: streamKey=%v, 实际类型=%T", streamKey, streamMsgs) continue } - for _, msgData := range msgsArray { + for i, msgData := range msgsArray { msgArray, ok := msgData.([]interface{}) - if !ok || len(msgArray) < 2 { + if !ok { + g.Log().Errorf(ctx, "❌ msgData类型转换失败: index=%d, 实际类型=%T", i, msgData) + continue + } + if len(msgArray) < 2 { + g.Log().Errorf(ctx, "❌ msgArray长度不足: index=%d, len=%d", i, len(msgArray)) continue } msgID := gconv.String(msgArray[0]) fieldsArray, ok := msgArray[1].([]interface{}) if !ok { + g.Log().Errorf(ctx, "❌ fieldsArray类型转换失败: msgID=%s, msgArray[1]类型=%T", msgID, msgArray[1]) continue } values := make(map[string]interface{}, len(fieldsArray)/2) @@ -262,6 +290,9 @@ RECONNECT: }) } } + if len(messages) == 0 { + g.Log().Errorf(ctx, "❌ [ReadFromStream] map格式解析失败: streamsMap长度=%d, 但未提取到消息", len(streamsMap)) + } return messages, nil } @@ -299,8 +330,13 @@ RECONNECT: }) } } + if len(messages) == 0 { + g.Log().Errorf(ctx, "❌ [ReadFromStream] 数组格式解析失败: streamsArray长度=%d, 但未提取到消息", len(streamsArray)) + } + return messages, nil } + g.Log().Errorf(ctx, "❌ [ReadFromStream] 无法识别的result格式, resultVal类型: %T, 值: %+v", resultVal, resultVal) return messages, nil }