package consumer import ( "context" "strings" "gitea.com/red-future/common/db/mongo" "gitea.com/red-future/common/rabbitmq" "gitea.com/red-future/common/ragflow" "gitea.com/red-future/common/redis" "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gfile" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" ) // processorConfig 处理器配置(避免重复读取配置) type processorConfig struct { chatId string // RAGFlow Chat ID(默认) responseExchange string // 响应队列 Exchange responseRoutingKey string // 响应队列 RoutingKey followUpExchange string // 追问队列 Exchange followUpRoutingKey string // 追问队列 RoutingKey archiveExchange string // 归档队列 Exchange archiveRoutingKey string // 归档队列 RoutingKey // Mock模式已删除,不再用于压测 } var cfg *processorConfig // responsePublisher 响应消息发布器(单例) var responsePublisher *rabbitmq.Publisher // followUpPublisher 追问消息发布器(单例) var followUpPublisher *rabbitmq.Publisher // archivePublisher 归档消息发布器(单例) var archivePublisher *rabbitmq.Publisher // Processor 消息处理器 type Processor struct { processor *ragflow.QueueProcessor } // NewProcessor 创建消息处理器 func NewProcessor(ctx context.Context) (*Processor, error) { // 从配置读取参数 streamKey := g.Cfg().MustGet(ctx, "stream.streamKey").String() groupName := g.Cfg().MustGet(ctx, "stream.groupName").String() consumerName := g.Cfg().MustGet(ctx, "stream.consumerName").String() batchSize := g.Cfg().MustGet(ctx, "stream.batchSize", 200).Int64() blockTimeout := g.Cfg().MustGet(ctx, "stream.blockTimeout", 2000).Int64() // 初始化处理器配置(单例) cfg = &processorConfig{ chatId: g.Cfg().MustGet(ctx, "ragflow.chat_id").String(), responseExchange: g.Cfg().MustGet(ctx, "rabbitmq.responseExchange", "ragflow.response").String(), responseRoutingKey: g.Cfg().MustGet(ctx, "rabbitmq.responseRoutingKey", "response").String(), followUpExchange: g.Cfg().MustGet(ctx, "followUp.exchange", "followup.delayed").String(), followUpRoutingKey: g.Cfg().MustGet(ctx, "followUp.routingKey", "followup").String(), archiveExchange: g.Cfg().MustGet(ctx, "archive.exchange", "archive.delayed").String(), archiveRoutingKey: g.Cfg().MustGet(ctx, "archive.routingKey", "archive").String(), } // 咨询方向配置已从Consul自动加载(common/config包init时自动执行) // 初始化响应发布器 responsePublisher = rabbitmq.NewPublisher(cfg.responseExchange, cfg.responseRoutingKey) glog.Infof(ctx, "响应发布器已初始化 - Exchange: %s, RoutingKey: %s", cfg.responseExchange, cfg.responseRoutingKey) // 初始化追问发布器 followUpPublisher = rabbitmq.NewPublisher(cfg.followUpExchange, cfg.followUpRoutingKey) glog.Info(ctx, "追问发布器已初始化") // 初始化归档发布器 archivePublisher = rabbitmq.NewPublisher(cfg.archiveExchange, cfg.archiveRoutingKey) glog.Info(ctx, "归档发布器已初始化") // 创建消息处理器(批量读取 + 并发发送,削峰填谷) return &Processor{ processor: ragflow.NewQueueProcessor( streamKey, groupName, consumerName, blockTimeout, batchSize, handleMessage, ), }, nil } // Start 启动消息处理 func (p *Processor) Start(ctx context.Context) error { glog.Info(ctx, "开始消费消息...") return p.processor.Start(ctx) } // Stop 停止消息处理 func (p *Processor) Stop() { p.processor.Stop() } // getChatIdByAccountName 根据客服账号名称从ragflow_config表查询chat_id func getChatIdByAccountName(ctx context.Context, tenantId, accountName string) string { if accountName == "" { return "" } db := mongo.GetDB() if db == nil { glog.Error(ctx, "MongoDB未初始化") return "" } collection := db.Collection("ragflow_config") // 从MongoDB查询ragflow_config(先尝试字符串tenantId) filter := map[string]interface{}{ "accountName": accountName, "isDeleted": false, } if tenantId != "" { filter["tenantId"] = tenantId } var config struct { ChatId string `json:"chatId" bson:"chatId"` } err := collection.FindOne(ctx, filter).Decode(&config) // 如果未找到且tenantId可以转为数字,尝试用数字查询(兼容MongoDB中存储为int的情况) if err != nil && tenantId != "" { tenantIdInt := gconv.Int(tenantId) if tenantIdInt > 0 { filter["tenantId"] = tenantIdInt err = collection.FindOne(ctx, filter).Decode(&config) } } if err != nil { glog.Warningf(ctx, "未找到客服账号对应的RAGFlow配置 - 账号: %s, tenantId: %s, err: %v", accountName, tenantId, err) return "" } glog.Infof(ctx, "使用客服账号对应的chat_id - 账号: %s, chat_id: %s", accountName, config.ChatId) return config.ChatId } // getChatIdByDirection 根据用户选择的咨询方向获取对应的chat_id(从Consul读取) func getChatIdByDirection(ctx context.Context, userId, platform string) string { // 从Redis获取用户状态 userState, err := redis.GetUserState(ctx, userId, platform) if err != nil || userState.Direction == "" { // 无方向或获取失败,返回默认chat_id if cfg != nil { return cfg.chatId } return "" } // 直接使用accountName查询 chatId := "" if chatId != "" { glog.Infof(ctx, "使用咨询方向对应的chat_id - 用户: %s, 方向: %s, chat_id: %s", userId, userState.Direction, chatId) return chatId } // 未找到匹配方向,返回默认chat_id glog.Warningf(ctx, "未找到方向对应的chat_id,使用默认 - 用户: %s, 方向: %s", userId, userState.Direction) if cfg != nil { return cfg.chatId } return "" } // HandleMessageHTTP 处理HTTP请求的消息(导出供controller调用) func HandleMessageHTTP(ctx context.Context, message map[string]interface{}) error { return handleMessage(ctx, message) } // handleMessage 处理单条消息 func handleMessage(ctx context.Context, message map[string]interface{}) (err error) { // gconv.Map转换结构体时使用驼峰字段名,而非json标签 userId := gconv.String(message["UserId"]) content := gconv.String(message["Content"]) messageId := gconv.String(message["MessageId"]) platform := gconv.String(message["Platform"]) tenantId := gconv.String(message["TenantId"]) accountId := gconv.String(message["AccountId"]) // 客服账号ID accountName := gconv.String(message["AccountName"]) // 客服账号名称(如cs_xhs_qixue) messageChatId := gconv.String(message["ChatId"]) // 消息中携带的chat_id(从ragflow_config查询) if platform == "" { platform = "xiaohongshu" // 默认平台 } // 解析历史对话(由 customerservice 从 MongoDB 读取后携带) var history []redis.HistoryMessage if historyData := message["History"]; historyData != nil { _ = gjson.New(historyData).Scan(&history) } glog.Infof(ctx, "处理消息 - 用户: %s, 客服账号: %s, ChatId: %s, 内容: %s, 历史轮数: %d", userId, accountName, messageChatId, content, len(history)) var answer string var sessionId string startTime := gtime.Now() // 调用RAGFlow处理消息 { // 1. 获取chat_id(优先级:消息携带 > 客服账号查询 > 用户方向查询 > 默认) var chatId string if messageChatId != "" { chatId = messageChatId glog.Infof(ctx, "使用消息携带的chat_id: %s", chatId) } else if accountName != "" { chatId = getChatIdByAccountName(ctx, tenantId, accountName) } if chatId == "" { chatId = getChatIdByDirection(ctx, userId, platform) } // 2. 检测chatId是否变更不再需要,因为缓存key已包含chatId // 不同chatId自动使用不同的session缓存 } // 3. 获取或创建 Session(使用正确的chat_id,包含租户隔离) var isNewSession bool sessionId, isNewSession, err = getOrCreateSession(ctx, tenantId, userId, chatId) if err != nil { // 检测Chat权限错误(assistant/chat不属于当前API Key) if strings.Contains(err.Error(), "do not own the assistant") || strings.Contains(err.Error(), "don't own the chat") || strings.Contains(err.Error(), "doesn't exist") || strings.Contains(err.Error(), "not found") { glog.Warningf(ctx, "创建Session时检测到Chat权限错误,尝试自动重建 - 用户: %s, accountName: %s, chat_id: %s", userId, accountName, chatId) // 调用重建逻辑 if accountName != "" { if recreateErr := recreateChatIfNeeded(ctx, tenantId, accountName, platform); recreateErr != nil { glog.Errorf(ctx, "自动重建Chat失败: %v", recreateErr) return } // 重新查询chat_id newChatId := getChatIdByAccountName(ctx, tenantId, accountName) if newChatId == "" { glog.Errorf(ctx, "重建Chat后仍无法获取chat_id") return } chatId = newChatId glog.Infof(ctx, "Chat重建成功,新chat_id: %s", chatId) // 清理旧session缓存 redis.DelSessionCache(ctx, tenantId, userId) // 使用新chat_id重新创建session sessionId, isNewSession, err = getOrCreateSession(ctx, tenantId, userId, chatId) if err != nil { glog.Errorf(ctx, "使用新Chat创建Session失败: %v", err) return } } else { glog.Errorf(ctx, "Chat权限错误但缺少accountName,无法自动重建") return } } else { glog.Errorf(ctx, "获取 Session 失败: %v", err) return } } // 3. 调用 RAGFlow API client := ragflow.GetGlobalClient() if client == nil { glog.Error(ctx, "RAGFlow 客户端未初始化") return } // 如果是新 Session 且有历史对话,把历史拼接到问题中调用 RAGFlow if isNewSession && len(history) > 0 { var newSessionId string newSessionId, answer, err = callWithHistory(ctx, client, userId, content, history, chatId) if err != nil { glog.Errorf(ctx, "带历史调用 RAGFlow 失败: %v", err) return } // 更新 Session 缓存(包含租户隔离) if newSessionId != "" { redis.SetSessionCache(ctx, tenantId, userId, newSessionId) sessionId = newSessionId } } else { // 已有 Session 或无历史,使用普通 API(RAGFlow 内部维护上下文) var res *ragflow.ChatCompletionRes res, err = client.ChatCompletion(ctx, chatId, &ragflow.ChatCompletionReq{ Question: content, SessionId: sessionId, UserId: userId, Stream: false, }) if err != nil { // 检测Chat不存在或权限错误(RAGFlow中Chat被删除或不属于当前API Key) if strings.Contains(err.Error(), "doesn't exist") || strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "404") || strings.Contains(err.Error(), "don't own the chat") { glog.Warningf(ctx, "检测到Chat不存在,尝试自动重建 - 用户: %s, accountName: %s, chat_id: %s", userId, accountName, chatId) // 调用重建逻辑(需要accountName) if accountName != "" { if recreateErr := recreateChatIfNeeded(ctx, tenantId, accountName, platform); recreateErr != nil { glog.Errorf(ctx, "自动重建Chat失败: %v", recreateErr) return } // 重新查询chat_id newChatId := getChatIdByAccountName(ctx, tenantId, accountName) if newChatId == "" { glog.Errorf(ctx, "重建Chat后仍无法获取chat_id") return } chatId = newChatId glog.Infof(ctx, "Chat重建成功,新chat_id: %s", chatId) // 清理旧session缓存 redis.DelSessionCache(ctx, tenantId, userId) // 重新创建session并调用 sessionId, _, err = getOrCreateSession(ctx, tenantId, userId, chatId) if err != nil { glog.Errorf(ctx, "创建新Session失败: %v", err) return } // 重试调用 res, err = client.ChatCompletion(ctx, chatId, &ragflow.ChatCompletionReq{ Question: content, SessionId: sessionId, UserId: userId, Stream: false, }) if err != nil { glog.Errorf(ctx, "使用新Chat重试失败: %v", err) return } } else { glog.Errorf(ctx, "Chat不存在但缺少accountName,无法自动重建") return } // 检测session ownership错误(session不属于当前chat_id) } else if strings.Contains(err.Error(), "don't own the session") { glog.Warningf(ctx, "Session不属于当前chat_id,清理缓存并重新创建 - 用户: %s, 旧session: %s, chat_id: %s", userId, sessionId, chatId) // 清理缓存(包含租户隔离) redis.DelSessionCache(ctx, tenantId, userId) // 重新创建session并调用 sessionId, _, err = getOrCreateSession(ctx, tenantId, userId, chatId) if err != nil { glog.Errorf(ctx, "重新创建Session失败: %v", err) return } // 重试调用 res, err = client.ChatCompletion(ctx, chatId, &ragflow.ChatCompletionReq{ Question: content, SessionId: sessionId, UserId: userId, Stream: false, }) if err != nil { glog.Errorf(ctx, "重试调用RAGFlow失败: %v", err) return } } else { glog.Errorf(ctx, "调用 RAGFlow 失败: %v", err) return } } answer = res.Data.Answer // 更新 Session 缓存(如果 RAGFlow 返回了新的 session_id,包含租户隔离) if res.Data.SessionId != "" && res.Data.SessionId != sessionId { redis.SetSessionCache(ctx, tenantId, userId, res.Data.SessionId) sessionId = res.Data.SessionId } } } // 计算耗时并写入文件 endTime := gtime.Now() elapsed := endTime.Sub(startTime) // 截取回复前20字 answerPreview := answer if len([]rune(answerPreview)) > 20 { answerPreview = string([]rune(answerPreview)[:20]) + "..." } glog.Infof(ctx, "回复 - 用户: %s, 耗时: %s, 回复: %s", userId, elapsed, answerPreview) // 日志格式:发送时间 | 响应时间 | 耗时 | 用户 | 问题 | 回复前20字 // 写入时间日志,失败时记录警告但不影响主流程 logContent := startTime.Format("Y-m-d H:i:s") + "\t" + endTime.Format("Y-m-d H:i:s") + "\t" + elapsed.String() + "\t" + userId + "\t" + content + "\t" + answerPreview + "\n" if err := gfile.PutContentsAppend("timelog/ragflow_time.log", logContent); err != nil { glog.Warningf(ctx, "写入时间日志失败: %v", err) } // 3. 更新用户最后活跃时间 redis.SetSessionLastActive(ctx, userId) // 6. 原样写入 RabbitMQ 结果队列(透传 TenantId、AccountId、AccountName) responseMsg := &redis.ResponseStreamMessage{ UserId: userId, Platform: platform, TenantId: tenantId, AccountId: accountId, AccountName: accountName, Question: content, Content: answer, SessionId: sessionId, Timestamp: gtime.Now().Timestamp(), MessageId: messageId, } // 读取请求中的ReplyQueue字段,支持多实例独立队列 replyQueue := gconv.String(message["reply_queue"]) if replyQueue != "" && replyQueue != cfg.responseRoutingKey { // 使用自定义响应队列(多实例场景) // routing key使用队列名,实现精确路由(避免广播到所有实例) glog.Infof(ctx, "使用自定义响应队列: %s - 用户: %s", replyQueue, userId) customPublisher := rabbitmq.NewPublisher(cfg.responseExchange, replyQueue) if err = customPublisher.PublishWithRoutingKey(ctx, replyQueue, responseMsg); err != nil { glog.Errorf(ctx, "写入自定义响应队列失败: %v", err) return } glog.Infof(ctx, "响应已写入 RabbitMQ - 用户: %s, routingKey: %s (队列名)", userId, replyQueue) } else { // 使用默认响应队列(单实例或旧版本兼容) // routing key使用:tenantId.userId routingKey := tenantId + "." + userId if err = responsePublisher.PublishWithRoutingKey(ctx, routingKey, responseMsg); err != nil { glog.Errorf(ctx, "写入 RabbitMQ 结果队列失败: %v", err) return } glog.Infof(ctx, "响应已写入 RabbitMQ - 用户: %s, routingKey: %s (租户.用户)", userId, routingKey) } // 7. 发送追问消息到延时队列 sendFollowUpMessages(ctx, userId, platform) // 8. 发送归档消息到延时队列(60分钟后) sendArchiveMessage(ctx, userId, platform, sessionId, tenantId) glog.Infof(ctx, "消息处理完成 - 用户: %s", userId) return } // sendArchiveMessage 发送归档消息到延时队列 func sendArchiveMessage(ctx context.Context, userId, platform, sessionId, tenantId string) { msg := &redis.ArchiveMessage{ UserId: userId, Platform: platform, SessionId: sessionId, TenantId: tenantId, Timestamp: gtime.Now().Timestamp(), } if err := archivePublisher.PublishDelayed(ctx, msg, redis.GetArchiveDelay()); err != nil { glog.Errorf(ctx, "发送归档消息失败: %v", err) } } // callWithHistory 带历史上下文调用 RAGFlow(用于新 Session) // history 由 customerservice 从 MongoDB 读取后通过消息携带 // 使用 RAGFlow 原生 API,把历史对话拼接到问题中,保留提示词和知识库功能 // chatId 为用户选择方向对应的chat_id,而非默认chat_id func callWithHistory(ctx context.Context, client *ragflow.Client, userId, content string, history []redis.HistoryMessage, chatId string) (sessionId, answer string, err error) { // 构建带历史上下文的问题 var question string if len(history) > 0 { // 限制历史对话长度:最多3轮或总字符数不超过8000(避免超过RAGFlow输入限制) const maxHistoryRounds = 3 const maxHistoryChars = 8000 var builder strings.Builder builder.WriteString("[以下是之前的对话历史,请参考]\n") // 限制历史轮数 historyToUse := history if len(history) > maxHistoryRounds { historyToUse = history[len(history)-maxHistoryRounds:] // 只取最近3轮 glog.Infof(ctx, "历史对话超过%d轮,截取最近%d轮 - 用户: %s", len(history), maxHistoryRounds, userId) } // 拼接历史对话,同时检查字符数限制 for _, h := range historyToUse { builder.WriteString("用户: ") builder.WriteString(h.Question) builder.WriteString("\nAI: ") builder.WriteString(h.Answer) builder.WriteString("\n") // 检查是否超过字符数限制 if builder.Len() > maxHistoryChars { glog.Warningf(ctx, "历史对话超过%d字符,停止追加 - 用户: %s", maxHistoryChars, userId) break } } builder.WriteString("\n[当前问题]\n") builder.WriteString(content) question = builder.String() } else { question = content } glog.Infof(ctx, "注入 %d 轮历史对话上下文 - chat_id: %s", len(history), chatId) // 先创建新 session(使用方向对应的chat_id) session, err := client.CreateSession(ctx, chatId, &ragflow.CreateSessionReq{ Name: "session_" + userId, UserId: userId, }) if err != nil { glog.Errorf(ctx, "创建 Session 失败: %v", err) return } sessionId = session.Id // 使用 RAGFlow 原生 API(保留提示词和知识库,使用方向对应的chat_id) res, err := client.ChatCompletion(ctx, chatId, &ragflow.ChatCompletionReq{ Question: question, SessionId: sessionId, UserId: userId, Stream: false, }) if err != nil { return } answer = res.Data.Answer return } // sendFollowUpMessages 发送追问消息到延时队列 func sendFollowUpMessages(ctx context.Context, userId, platform string) { now := gtime.Now().Timestamp() for followUpType := redis.FollowUpType1; followUpType <= redis.FollowUpType3; followUpType++ { msg := &redis.FollowUpMessage{ UserId: userId, Platform: platform, Content: redis.GetFollowUpContent(followUpType), FollowUpType: followUpType, Timestamp: now, } if err := followUpPublisher.PublishDelayed(ctx, msg, redis.GetFollowUpDelay(followUpType)); err != nil { glog.Errorf(ctx, "发送追问消息失败 - 类型: %d, 错误: %v", followUpType, err) } } } // getOrCreateSession 获取或创建 RAGFlow Session(支持租户隔离) // 返回 isNew=true 表示是新创建的 session,需要注入历史上下文 func getOrCreateSession(ctx context.Context, tenantId, userId, chatId string) (sessionId string, isNew bool, err error) { // 先从缓存获取(包含租户隔离) if sessionId, err = redis.GetSessionCache(ctx, tenantId, userId); err != nil { return } if sessionId != "" { glog.Infof(ctx, "使用缓存的session - 租户: %s, 用户: %s, chat_id: %s, session: %s", tenantId, userId, chatId, sessionId) return // 已有 session,不是新的 } // 缓存不存在,创建新 Session(使用传入的chat_id) client := ragflow.GetGlobalClient() if client == nil { return } glog.Infof(ctx, "创建新session - 租户: %s, 用户: %s, chat_id: %s", tenantId, userId, chatId) session, err := client.CreateSession(ctx, chatId, &ragflow.CreateSessionReq{ Name: "session_" + tenantId + "_" + userId, UserId: userId, }) if err != nil { return } sessionId = session.Id isNew = true // 标记为新创建 // 缓存 Session ID(包含租户隔离) redis.SetSessionCache(ctx, tenantId, userId, sessionId) glog.Infof(ctx, "新session已创建并缓存 - 租户: %s, 用户: %s, session: %s", tenantId, userId, sessionId) return }