This commit is contained in:
2026-03-14 10:02:49 +08:00
parent 03b50ef904
commit 830f75a334
75 changed files with 10677 additions and 2 deletions

View File

@@ -0,0 +1,591 @@
package consumer
import (
"context"
"strings"
"gitea.com/red-future/common/db/mongo"
"gitea.com/red-future/common/rabbitmq"
"gitea.com/red-future/common/ragflow"
"gitea.com/red-future/common/redis"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gfile"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
)
// processorConfig 处理器配置(避免重复读取配置)
type processorConfig struct {
chatId string // RAGFlow Chat ID默认
responseExchange string // 响应队列 Exchange
responseRoutingKey string // 响应队列 RoutingKey
followUpExchange string // 追问队列 Exchange
followUpRoutingKey string // 追问队列 RoutingKey
archiveExchange string // 归档队列 Exchange
archiveRoutingKey string // 归档队列 RoutingKey
// Mock模式已删除不再用于压测
}
var cfg *processorConfig
// responsePublisher 响应消息发布器(单例)
var responsePublisher *rabbitmq.Publisher
// followUpPublisher 追问消息发布器(单例)
var followUpPublisher *rabbitmq.Publisher
// archivePublisher 归档消息发布器(单例)
var archivePublisher *rabbitmq.Publisher
// Processor 消息处理器
type Processor struct {
processor *ragflow.QueueProcessor
}
// NewProcessor 创建消息处理器
func NewProcessor(ctx context.Context) (*Processor, error) {
// 从配置读取参数
streamKey := g.Cfg().MustGet(ctx, "stream.streamKey").String()
groupName := g.Cfg().MustGet(ctx, "stream.groupName").String()
consumerName := g.Cfg().MustGet(ctx, "stream.consumerName").String()
batchSize := g.Cfg().MustGet(ctx, "stream.batchSize", 200).Int64()
blockTimeout := g.Cfg().MustGet(ctx, "stream.blockTimeout", 2000).Int64()
// 初始化处理器配置(单例)
cfg = &processorConfig{
chatId: g.Cfg().MustGet(ctx, "ragflow.chat_id").String(),
responseExchange: g.Cfg().MustGet(ctx, "rabbitmq.responseExchange", "ragflow.response").String(),
responseRoutingKey: g.Cfg().MustGet(ctx, "rabbitmq.responseRoutingKey", "response").String(),
followUpExchange: g.Cfg().MustGet(ctx, "followUp.exchange", "followup.delayed").String(),
followUpRoutingKey: g.Cfg().MustGet(ctx, "followUp.routingKey", "followup").String(),
archiveExchange: g.Cfg().MustGet(ctx, "archive.exchange", "archive.delayed").String(),
archiveRoutingKey: g.Cfg().MustGet(ctx, "archive.routingKey", "archive").String(),
}
// 咨询方向配置已从Consul自动加载common/config包init时自动执行
// 初始化响应发布器
responsePublisher = rabbitmq.NewPublisher(cfg.responseExchange, cfg.responseRoutingKey)
glog.Infof(ctx, "响应发布器已初始化 - Exchange: %s, RoutingKey: %s", cfg.responseExchange, cfg.responseRoutingKey)
// 初始化追问发布器
followUpPublisher = rabbitmq.NewPublisher(cfg.followUpExchange, cfg.followUpRoutingKey)
glog.Info(ctx, "追问发布器已初始化")
// 初始化归档发布器
archivePublisher = rabbitmq.NewPublisher(cfg.archiveExchange, cfg.archiveRoutingKey)
glog.Info(ctx, "归档发布器已初始化")
// 创建消息处理器(批量读取 + 并发发送,削峰填谷)
return &Processor{
processor: ragflow.NewQueueProcessor(
streamKey,
groupName,
consumerName,
blockTimeout,
batchSize,
handleMessage,
),
}, nil
}
// Start 启动消息处理
func (p *Processor) Start(ctx context.Context) error {
glog.Info(ctx, "开始消费消息...")
return p.processor.Start(ctx)
}
// Stop 停止消息处理
func (p *Processor) Stop() {
p.processor.Stop()
}
// getChatIdByAccountName 根据客服账号名称从ragflow_config表查询chat_id
func getChatIdByAccountName(ctx context.Context, tenantId, accountName string) string {
if accountName == "" {
return ""
}
db := mongo.GetDB()
if db == nil {
glog.Error(ctx, "MongoDB未初始化")
return ""
}
collection := db.Collection("ragflow_config")
// 从MongoDB查询ragflow_config先尝试字符串tenantId
filter := map[string]interface{}{
"accountName": accountName,
"isDeleted": false,
}
if tenantId != "" {
filter["tenantId"] = tenantId
}
var config struct {
ChatId string `json:"chatId" bson:"chatId"`
}
err := collection.FindOne(ctx, filter).Decode(&config)
// 如果未找到且tenantId可以转为数字尝试用数字查询兼容MongoDB中存储为int的情况
if err != nil && tenantId != "" {
tenantIdInt := gconv.Int(tenantId)
if tenantIdInt > 0 {
filter["tenantId"] = tenantIdInt
err = collection.FindOne(ctx, filter).Decode(&config)
}
}
if err != nil {
glog.Warningf(ctx, "未找到客服账号对应的RAGFlow配置 - 账号: %s, tenantId: %s, err: %v", accountName, tenantId, err)
return ""
}
glog.Infof(ctx, "使用客服账号对应的chat_id - 账号: %s, chat_id: %s", accountName, config.ChatId)
return config.ChatId
}
// getChatIdByDirection 根据用户选择的咨询方向获取对应的chat_id从Consul读取
func getChatIdByDirection(ctx context.Context, userId, platform string) string {
// 从Redis获取用户状态
userState, err := redis.GetUserState(ctx, userId, platform)
if err != nil || userState.Direction == "" {
// 无方向或获取失败返回默认chat_id
if cfg != nil {
return cfg.chatId
}
return ""
}
// 直接使用accountName查询
chatId := ""
if chatId != "" {
glog.Infof(ctx, "使用咨询方向对应的chat_id - 用户: %s, 方向: %s, chat_id: %s", userId, userState.Direction, chatId)
return chatId
}
// 未找到匹配方向返回默认chat_id
glog.Warningf(ctx, "未找到方向对应的chat_id使用默认 - 用户: %s, 方向: %s", userId, userState.Direction)
if cfg != nil {
return cfg.chatId
}
return ""
}
// HandleMessageHTTP 处理HTTP请求的消息导出供controller调用
func HandleMessageHTTP(ctx context.Context, message map[string]interface{}) error {
return handleMessage(ctx, message)
}
// handleMessage 处理单条消息
func handleMessage(ctx context.Context, message map[string]interface{}) (err error) {
// gconv.Map转换结构体时使用驼峰字段名而非json标签
userId := gconv.String(message["UserId"])
content := gconv.String(message["Content"])
messageId := gconv.String(message["MessageId"])
platform := gconv.String(message["Platform"])
tenantId := gconv.String(message["TenantId"])
accountId := gconv.String(message["AccountId"]) // 客服账号ID
accountName := gconv.String(message["AccountName"]) // 客服账号名称如cs_xhs_qixue
messageChatId := gconv.String(message["ChatId"]) // 消息中携带的chat_id从ragflow_config查询
if platform == "" {
platform = "xiaohongshu" // 默认平台
}
// 解析历史对话(由 customerservice 从 MongoDB 读取后携带)
var history []redis.HistoryMessage
if historyData := message["History"]; historyData != nil {
_ = gjson.New(historyData).Scan(&history)
}
glog.Infof(ctx, "处理消息 - 用户: %s, 客服账号: %s, ChatId: %s, 内容: %s, 历史轮数: %d",
userId, accountName, messageChatId, content, len(history))
var answer string
var sessionId string
startTime := gtime.Now()
// 调用RAGFlow处理消息
{
// 1. 获取chat_id优先级消息携带 > 客服账号查询 > 用户方向查询 > 默认)
var chatId string
if messageChatId != "" {
chatId = messageChatId
glog.Infof(ctx, "使用消息携带的chat_id: %s", chatId)
} else if accountName != "" {
chatId = getChatIdByAccountName(ctx, tenantId, accountName)
}
if chatId == "" {
chatId = getChatIdByDirection(ctx, userId, platform)
}
// 2. 检测chatId是否变更不再需要因为缓存key已包含chatId
// 不同chatId自动使用不同的session缓存 }
// 3. 获取或创建 Session使用正确的chat_id包含租户隔离
var isNewSession bool
sessionId, isNewSession, err = getOrCreateSession(ctx, tenantId, userId, chatId)
if err != nil {
// 检测Chat权限错误assistant/chat不属于当前API Key
if strings.Contains(err.Error(), "do not own the assistant") || strings.Contains(err.Error(), "don't own the chat") || strings.Contains(err.Error(), "doesn't exist") || strings.Contains(err.Error(), "not found") {
glog.Warningf(ctx, "创建Session时检测到Chat权限错误尝试自动重建 - 用户: %s, accountName: %s, chat_id: %s", userId, accountName, chatId)
// 调用重建逻辑
if accountName != "" {
if recreateErr := recreateChatIfNeeded(ctx, tenantId, accountName, platform); recreateErr != nil {
glog.Errorf(ctx, "自动重建Chat失败: %v", recreateErr)
return
}
// 重新查询chat_id
newChatId := getChatIdByAccountName(ctx, tenantId, accountName)
if newChatId == "" {
glog.Errorf(ctx, "重建Chat后仍无法获取chat_id")
return
}
chatId = newChatId
glog.Infof(ctx, "Chat重建成功新chat_id: %s", chatId)
// 清理旧session缓存
redis.DelSessionCache(ctx, tenantId, userId)
// 使用新chat_id重新创建session
sessionId, isNewSession, err = getOrCreateSession(ctx, tenantId, userId, chatId)
if err != nil {
glog.Errorf(ctx, "使用新Chat创建Session失败: %v", err)
return
}
} else {
glog.Errorf(ctx, "Chat权限错误但缺少accountName无法自动重建")
return
}
} else {
glog.Errorf(ctx, "获取 Session 失败: %v", err)
return
}
}
// 3. 调用 RAGFlow API
client := ragflow.GetGlobalClient()
if client == nil {
glog.Error(ctx, "RAGFlow 客户端未初始化")
return
}
// 如果是新 Session 且有历史对话,把历史拼接到问题中调用 RAGFlow
if isNewSession && len(history) > 0 {
var newSessionId string
newSessionId, answer, err = callWithHistory(ctx, client, userId, content, history, chatId)
if err != nil {
glog.Errorf(ctx, "带历史调用 RAGFlow 失败: %v", err)
return
}
// 更新 Session 缓存(包含租户隔离)
if newSessionId != "" {
redis.SetSessionCache(ctx, tenantId, userId, newSessionId)
sessionId = newSessionId
}
} else {
// 已有 Session 或无历史,使用普通 APIRAGFlow 内部维护上下文)
var res *ragflow.ChatCompletionRes
res, err = client.ChatCompletion(ctx, chatId, &ragflow.ChatCompletionReq{
Question: content,
SessionId: sessionId,
UserId: userId,
Stream: false,
})
if err != nil {
// 检测Chat不存在或权限错误RAGFlow中Chat被删除或不属于当前API Key
if strings.Contains(err.Error(), "doesn't exist") || strings.Contains(err.Error(), "not found") || strings.Contains(err.Error(), "404") || strings.Contains(err.Error(), "don't own the chat") {
glog.Warningf(ctx, "检测到Chat不存在尝试自动重建 - 用户: %s, accountName: %s, chat_id: %s", userId, accountName, chatId)
// 调用重建逻辑需要accountName
if accountName != "" {
if recreateErr := recreateChatIfNeeded(ctx, tenantId, accountName, platform); recreateErr != nil {
glog.Errorf(ctx, "自动重建Chat失败: %v", recreateErr)
return
}
// 重新查询chat_id
newChatId := getChatIdByAccountName(ctx, tenantId, accountName)
if newChatId == "" {
glog.Errorf(ctx, "重建Chat后仍无法获取chat_id")
return
}
chatId = newChatId
glog.Infof(ctx, "Chat重建成功新chat_id: %s", chatId)
// 清理旧session缓存
redis.DelSessionCache(ctx, tenantId, userId)
// 重新创建session并调用
sessionId, _, err = getOrCreateSession(ctx, tenantId, userId, chatId)
if err != nil {
glog.Errorf(ctx, "创建新Session失败: %v", err)
return
}
// 重试调用
res, err = client.ChatCompletion(ctx, chatId, &ragflow.ChatCompletionReq{
Question: content,
SessionId: sessionId,
UserId: userId,
Stream: false,
})
if err != nil {
glog.Errorf(ctx, "使用新Chat重试失败: %v", err)
return
}
} else {
glog.Errorf(ctx, "Chat不存在但缺少accountName无法自动重建")
return
}
// 检测session ownership错误session不属于当前chat_id
} else if strings.Contains(err.Error(), "don't own the session") {
glog.Warningf(ctx, "Session不属于当前chat_id清理缓存并重新创建 - 用户: %s, 旧session: %s, chat_id: %s", userId, sessionId, chatId)
// 清理缓存(包含租户隔离)
redis.DelSessionCache(ctx, tenantId, userId)
// 重新创建session并调用
sessionId, _, err = getOrCreateSession(ctx, tenantId, userId, chatId)
if err != nil {
glog.Errorf(ctx, "重新创建Session失败: %v", err)
return
}
// 重试调用
res, err = client.ChatCompletion(ctx, chatId, &ragflow.ChatCompletionReq{
Question: content,
SessionId: sessionId,
UserId: userId,
Stream: false,
})
if err != nil {
glog.Errorf(ctx, "重试调用RAGFlow失败: %v", err)
return
}
} else {
glog.Errorf(ctx, "调用 RAGFlow 失败: %v", err)
return
}
}
answer = res.Data.Answer
// 更新 Session 缓存(如果 RAGFlow 返回了新的 session_id包含租户隔离
if res.Data.SessionId != "" && res.Data.SessionId != sessionId {
redis.SetSessionCache(ctx, tenantId, userId, res.Data.SessionId)
sessionId = res.Data.SessionId
}
}
}
// 计算耗时并写入文件
endTime := gtime.Now()
elapsed := endTime.Sub(startTime)
// 截取回复前20字
answerPreview := answer
if len([]rune(answerPreview)) > 20 {
answerPreview = string([]rune(answerPreview)[:20]) + "..."
}
glog.Infof(ctx, "回复 - 用户: %s, 耗时: %s, 回复: %s", userId, elapsed, answerPreview)
// 日志格式:发送时间 | 响应时间 | 耗时 | 用户 | 问题 | 回复前20字
// 写入时间日志,失败时记录警告但不影响主流程
logContent := startTime.Format("Y-m-d H:i:s") + "\t" + endTime.Format("Y-m-d H:i:s") + "\t" + elapsed.String() + "\t" + userId + "\t" + content + "\t" + answerPreview + "\n"
if err := gfile.PutContentsAppend("timelog/ragflow_time.log", logContent); err != nil {
glog.Warningf(ctx, "写入时间日志失败: %v", err)
}
// 3. 更新用户最后活跃时间
redis.SetSessionLastActive(ctx, userId)
// 6. 原样写入 RabbitMQ 结果队列(透传 TenantId、AccountId、AccountName
responseMsg := &redis.ResponseStreamMessage{
UserId: userId,
Platform: platform,
TenantId: tenantId,
AccountId: accountId,
AccountName: accountName,
Question: content,
Content: answer,
SessionId: sessionId,
Timestamp: gtime.Now().Timestamp(),
MessageId: messageId,
}
// 读取请求中的ReplyQueue字段支持多实例独立队列
replyQueue := gconv.String(message["reply_queue"])
if replyQueue != "" && replyQueue != cfg.responseRoutingKey {
// 使用自定义响应队列(多实例场景)
// routing key使用队列名实现精确路由避免广播到所有实例
glog.Infof(ctx, "使用自定义响应队列: %s - 用户: %s", replyQueue, userId)
customPublisher := rabbitmq.NewPublisher(cfg.responseExchange, replyQueue)
if err = customPublisher.PublishWithRoutingKey(ctx, replyQueue, responseMsg); err != nil {
glog.Errorf(ctx, "写入自定义响应队列失败: %v", err)
return
}
glog.Infof(ctx, "响应已写入 RabbitMQ - 用户: %s, routingKey: %s (队列名)", userId, replyQueue)
} else {
// 使用默认响应队列(单实例或旧版本兼容)
// routing key使用tenantId.userId
routingKey := tenantId + "." + userId
if err = responsePublisher.PublishWithRoutingKey(ctx, routingKey, responseMsg); err != nil {
glog.Errorf(ctx, "写入 RabbitMQ 结果队列失败: %v", err)
return
}
glog.Infof(ctx, "响应已写入 RabbitMQ - 用户: %s, routingKey: %s (租户.用户)", userId, routingKey)
}
// 7. 发送追问消息到延时队列
sendFollowUpMessages(ctx, userId, platform)
// 8. 发送归档消息到延时队列60分钟后
sendArchiveMessage(ctx, userId, platform, sessionId, tenantId)
glog.Infof(ctx, "消息处理完成 - 用户: %s", userId)
return
}
// sendArchiveMessage 发送归档消息到延时队列
func sendArchiveMessage(ctx context.Context, userId, platform, sessionId, tenantId string) {
msg := &redis.ArchiveMessage{
UserId: userId,
Platform: platform,
SessionId: sessionId,
TenantId: tenantId,
Timestamp: gtime.Now().Timestamp(),
}
if err := archivePublisher.PublishDelayed(ctx, msg, redis.GetArchiveDelay()); err != nil {
glog.Errorf(ctx, "发送归档消息失败: %v", err)
}
}
// callWithHistory 带历史上下文调用 RAGFlow用于新 Session
// history 由 customerservice 从 MongoDB 读取后通过消息携带
// 使用 RAGFlow 原生 API把历史对话拼接到问题中保留提示词和知识库功能
// chatId 为用户选择方向对应的chat_id而非默认chat_id
func callWithHistory(ctx context.Context, client *ragflow.Client, userId, content string, history []redis.HistoryMessage, chatId string) (sessionId, answer string, err error) {
// 构建带历史上下文的问题
var question string
if len(history) > 0 {
// 限制历史对话长度最多3轮或总字符数不超过8000避免超过RAGFlow输入限制
const maxHistoryRounds = 3
const maxHistoryChars = 8000
var builder strings.Builder
builder.WriteString("[以下是之前的对话历史,请参考]\n")
// 限制历史轮数
historyToUse := history
if len(history) > maxHistoryRounds {
historyToUse = history[len(history)-maxHistoryRounds:] // 只取最近3轮
glog.Infof(ctx, "历史对话超过%d轮截取最近%d轮 - 用户: %s", len(history), maxHistoryRounds, userId)
}
// 拼接历史对话,同时检查字符数限制
for _, h := range historyToUse {
builder.WriteString("用户: ")
builder.WriteString(h.Question)
builder.WriteString("\nAI: ")
builder.WriteString(h.Answer)
builder.WriteString("\n")
// 检查是否超过字符数限制
if builder.Len() > maxHistoryChars {
glog.Warningf(ctx, "历史对话超过%d字符停止追加 - 用户: %s", maxHistoryChars, userId)
break
}
}
builder.WriteString("\n[当前问题]\n")
builder.WriteString(content)
question = builder.String()
} else {
question = content
}
glog.Infof(ctx, "注入 %d 轮历史对话上下文 - chat_id: %s", len(history), chatId)
// 先创建新 session使用方向对应的chat_id
session, err := client.CreateSession(ctx, chatId, &ragflow.CreateSessionReq{
Name: "session_" + userId,
UserId: userId,
})
if err != nil {
glog.Errorf(ctx, "创建 Session 失败: %v", err)
return
}
sessionId = session.Id
// 使用 RAGFlow 原生 API保留提示词和知识库使用方向对应的chat_id
res, err := client.ChatCompletion(ctx, chatId, &ragflow.ChatCompletionReq{
Question: question,
SessionId: sessionId,
UserId: userId,
Stream: false,
})
if err != nil {
return
}
answer = res.Data.Answer
return
}
// sendFollowUpMessages 发送追问消息到延时队列
func sendFollowUpMessages(ctx context.Context, userId, platform string) {
now := gtime.Now().Timestamp()
for followUpType := redis.FollowUpType1; followUpType <= redis.FollowUpType3; followUpType++ {
msg := &redis.FollowUpMessage{
UserId: userId,
Platform: platform,
Content: redis.GetFollowUpContent(followUpType),
FollowUpType: followUpType,
Timestamp: now,
}
if err := followUpPublisher.PublishDelayed(ctx, msg, redis.GetFollowUpDelay(followUpType)); err != nil {
glog.Errorf(ctx, "发送追问消息失败 - 类型: %d, 错误: %v", followUpType, err)
}
}
}
// getOrCreateSession 获取或创建 RAGFlow Session支持租户隔离
// 返回 isNew=true 表示是新创建的 session需要注入历史上下文
func getOrCreateSession(ctx context.Context, tenantId, userId, chatId string) (sessionId string, isNew bool, err error) {
// 先从缓存获取(包含租户隔离)
if sessionId, err = redis.GetSessionCache(ctx, tenantId, userId); err != nil {
return
}
if sessionId != "" {
glog.Infof(ctx, "使用缓存的session - 租户: %s, 用户: %s, chat_id: %s, session: %s", tenantId, userId, chatId, sessionId)
return // 已有 session不是新的
}
// 缓存不存在,创建新 Session使用传入的chat_id
client := ragflow.GetGlobalClient()
if client == nil {
return
}
glog.Infof(ctx, "创建新session - 租户: %s, 用户: %s, chat_id: %s", tenantId, userId, chatId)
session, err := client.CreateSession(ctx, chatId, &ragflow.CreateSessionReq{
Name: "session_" + tenantId + "_" + userId,
UserId: userId,
})
if err != nil {
return
}
sessionId = session.Id
isNew = true // 标记为新创建
// 缓存 Session ID包含租户隔离
redis.SetSessionCache(ctx, tenantId, userId, sessionId)
glog.Infof(ctx, "新session已创建并缓存 - 租户: %s, 用户: %s, session: %s", tenantId, userId, sessionId)
return
}