diff --git a/mongo/mongo.go b/mongo/mongo.go index baabbe5..d02c9ad 100644 --- a/mongo/mongo.go +++ b/mongo/mongo.go @@ -261,31 +261,58 @@ func GetTenantInfo(ctx context.Context) (user do.User, err error) { return } - // 2. token 获取失败,尝试从请求参数获取 customerServiceId + // 2. token 获取失败,尝试从请求参数或context获取 accountName + var accountName string + + // 2.1 尝试从request获取(HTTP请求场景) req := g.RequestFromCtx(ctx) - if req == nil { - return user, gerror.New("无法获取租户信息:无 token 且无 request") + if req != nil { + accountName = req.Get("accountName").String() + if accountName == "" { + accountName = req.Get("account_name").String() + } + // 兼容旧参数名 + if accountName == "" { + accountName = req.Get("customerServiceId").String() + } + if accountName == "" { + accountName = req.Get("customer_service_id").String() + } } - customerServiceId := req.Get("customerServiceId").String() - if customerServiceId == "" { - customerServiceId = req.Get("customer_service_id").String() + // 2.2 request不存在或未获取到,尝试从context.Value获取(WebSocket场景) + if accountName == "" { + if val := ctx.Value("accountName"); val != nil { + if str, ok := val.(string); ok { + accountName = str + } + } + // 兼容旧参数名 + if accountName == "" { + if val := ctx.Value("customerServiceId"); val != nil { + if str, ok := val.(string); ok { + accountName = str + } + } + } } - if customerServiceId == "" { - return user, gerror.New("无法获取租户信息:无 token 且无 customerServiceId 参数") + + if accountName == "" { + return user, gerror.New("无法获取租户信息:无 token 且无 accountName 参数") } // 3. 直接查询 customer_service_account 表获取 tenantId - filter := bson.M{"customerServiceId": customerServiceId, "isDeleted": false} + filter := bson.M{"accountName": accountName, "isDeleted": false} var account struct { TenantId interface{} `bson:"tenantId"` } if findErr := db.Collection("customer_service_account").FindOne(ctx, filter).Decode(&account); findErr != nil { - return user, gerror.Newf("通过 customerServiceId 查询租户失败: %v", findErr) + return user, gerror.Newf("通过 accountName 查询租户失败: %v", findErr) } user.TenantId = account.TenantId - user.UserName = customerServiceId + user.UserName = accountName + err = nil // 清空之前从token获取时的错误 return } diff --git a/rabbitmq/consumer_manager.go b/rabbitmq/consumer_manager.go index 28695ca..54648ce 100644 --- a/rabbitmq/consumer_manager.go +++ b/rabbitmq/consumer_manager.go @@ -104,9 +104,13 @@ func (cm *ConsumerManager) Init() (err error) { } glog.Info(cm.ctx, "RabbitMQ 连接已初始化") - // 设置响应队列(RAGFlow 响应消息) - if err = SetupResponseQueue(cm.ctx); err != nil { - glog.Fatalf(cm.ctx, "设置响应队列失败: %v", err) + // 声明响应Exchange(队列由各消费者自己声明和绑定) + if err = DeclareExchange(cm.ctx, &ExchangeConfig{ + Name: "ragflow.response", + Type: "topic", + Durable: true, + }); err != nil { + glog.Fatalf(cm.ctx, "声明响应Exchange失败: %v", err) return } diff --git a/ragflow/document.go b/ragflow/document.go index 5913572..0d819a7 100644 --- a/ragflow/document.go +++ b/ragflow/document.go @@ -5,10 +5,10 @@ package ragflow import ( "bytes" "context" + "encoding/json" "mime/multipart" "strings" - "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/errors/gerror" ) @@ -44,6 +44,11 @@ type UploadDocumentReq struct { FilePaths []string // 本地文件路径列表 } +// UploadDocumentRes 上传文档响应 +type UploadDocumentRes struct { + Id string `json:"id"` // 文档ID +} + // ListDocumentsReq 列出文档请求 type ListDocumentsReq struct { Page int `json:"page,omitempty"` // 页码,默认 1 @@ -190,28 +195,25 @@ func (c *Client) UploadDocumentFromText(ctx context.Context, datasetId, content, defer resp.Close() // 解析响应 - var result struct { - Code int `json:"code"` - Message string `json:"message"` - Data struct { - Id string `json:"id"` - } `json:"data"` + var response struct { + Code int `json:"code"` + Message string `json:"message"` + Data []UploadDocumentRes `json:"data"` // RAGFlow返回数组 } - bodyBytes := resp.ReadAll() - if err = gjson.DecodeTo(bodyBytes, &result); err != nil { - return "", err + if err := json.Unmarshal(resp.ReadAll(), &response); err != nil { + return "", gerror.Newf("json Decode failed: %v", err) } - if result.Code != 0 { - return "", gerror.Newf("上传文档失败 (code=%d): %s", result.Code, result.Message) + if len(response.Data) == 0 { + return "", gerror.New("上传文档返回data为空") } - if result.Data.Id == "" { - return "", gerror.New("上传成功但未返回文档ID") + if response.Code != 0 { + return "", gerror.Newf("上传文档失败 (code=%d): %s", response.Code, response.Message) } - return result.Data.Id, nil + return response.Data[0].Id, nil } // UploadDocument 上传文档(保留兼容) diff --git a/redis/redis.go b/redis/redis.go index 1ebfe1c..278f912 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -487,9 +487,10 @@ const ( // UserState 用户会话状态(阶段+对话计数+咨询方向,统一5分钟过期) type UserState struct { - Stage int `json:"stage"` // 用户阶段:5=未选择方向 0=AI模型 1=打招呼 2=业务 3=发卡片 - Count int64 `json:"count"` // 对话计数 - Direction string `json:"direction"` // 用户选择的咨询方向(如:产品咨询、售后服务) + Stage int `json:"stage"` // 当前阶段 + Direction string `json:"direction"` // 咨询方向 + Count int64 `json:"count"` // 对话计数(v5.2卡片触发) + CustomerServiceId string `json:"customerServiceId"` // 用户选择的方向对应的客服账号ID } // GetUserState 获取用户状态(阶段+计数) @@ -528,6 +529,17 @@ func SetUserStage(ctx context.Context, userId, platform string, stage int) error return err } +// SetUserCustomerServiceId 设置用户对应的客服账号ID,并刷新过期时间 +func SetUserCustomerServiceId(ctx context.Context, userId, platform, customerServiceId string) error { + key := UserStateKeyPrefix + userId + "_" + platform + _, err := redisClient.Do(ctx, "HSET", key, "customerServiceId", customerServiceId) + if err != nil { + return err + } + _, err = redisClient.Do(ctx, "EXPIRE", key, UserStateExpireSeconds) + return err +} + // SetUserDirection 设置用户选择的咨询方向,并刷新过期时间 func SetUserDirection(ctx context.Context, userId, platform, direction string) error { key := UserStateKeyPrefix + userId + "_" + platform diff --git a/redis/types.go b/redis/types.go index a6a3e36..1ceab65 100644 --- a/redis/types.go +++ b/redis/types.go @@ -10,15 +10,17 @@ type HistoryMessage struct { // SendStreamMessage 发送到 Redis Stream 的消息结构 type SendStreamMessage struct { - UserId string `json:"user_id"` // 用户ID - Content string `json:"content"` // 消息内容 - Timestamp int64 `json:"timestamp"` // 时间戳(秒) - MessageId string `json:"message_id"` // 消息唯一ID - Platform string `json:"platform,omitempty"` // 平台标识 - AccountId string `json:"account_id,omitempty"` // 账号ID - TenantId string `json:"tenant_id,omitempty"` // 租户ID(数据隔离) - ReplyQueue string `json:"reply_queue,omitempty"` // 响应队列名称(支持多实例独立队列) - History []HistoryMessage `json:"history,omitempty"` // 历史对话(归档后恢复时携带) + UserId string `json:"user_id"` // 用户ID + Content string `json:"content"` // 消息内容 + Timestamp int64 `json:"timestamp"` // 时间戳(秒) + MessageId string `json:"message_id"` // 消息唯一ID + Platform string `json:"platform,omitempty"` // 平台标识 + AccountId string `json:"account_id,omitempty"` // 账号ID + TenantId string `json:"tenant_id,omitempty"` // 租户ID(数据隔离) + CustomerServiceId string `json:"customer_service_id,omitempty"` // 客服账号ID + ChatId string `json:"chat_id,omitempty"` // RAGFlow Chat ID(从ragflow_config查询) + ReplyQueue string `json:"reply_queue,omitempty"` // 响应队列名称(支持多实例独立队列) + History []HistoryMessage `json:"history,omitempty"` // 历史对话(归档后恢复时携带) } // BatchStreamMessage 批量消息结构