Files
customer-server/service/conversation_service.go
2026-03-14 10:02:49 +08:00

354 lines
12 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package service - 对话服务
// 功能处理RAGFlow响应、批量落库、卡片触发逻辑
package service
import (
"context"
"customer-server/dao"
"customer-server/model/entity"
"gitea.com/red-future/common/jaeger"
"gitea.com/red-future/common/rabbitmq"
"gitea.com/red-future/common/redis"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gtime"
)
// conversationService 对话服务(操作 conversation 表)
type conversationService struct{}
// ConversationService 对话服务单例
var ConversationService = new(conversationService)
// ============== RabbitMQ 消费者RAGFlow 响应消息)==============
// ResponseConsumer RabbitMQ 响应消费者
type ResponseConsumer struct {
queueName string
consumer *rabbitmq.Consumer
}
// NewResponseConsumer 创建响应消费者
// 参数: ctx - 上下文
// 返回: ResponseConsumer - 响应消费者实例
// 功能: 创建当前实例的唯一响应队列,支持多实例部署
func NewResponseConsumer(ctx context.Context) *ResponseConsumer {
// 从配置读取基础队列名(用于生成唯一实例队列名)
baseQueue := GetConfigString(ctx, "rabbitmq.responseQueue")
// 生成当前实例的唯一队列名ragflow.response.queue.{hostname}.{uuid8}
// 支持多实例部署,每个实例有独立响应队列
queueName := rabbitmq.GetInstanceQueueName(baseQueue)
glog.Infof(ctx, "响应队列动态生成 - 实例队列: %s", queueName)
return &ResponseConsumer{
queueName: queueName,
}
}
// Start 启动消费者
// 参数: ctx - 上下文
// 返回: err - 错误信息
// 功能: 声明并绑定当前实例的响应队列开始消费RAGFlow响应消息
func (c *ResponseConsumer) Start(ctx context.Context) (err error) {
glog.Infof(ctx, "RabbitMQ 响应消费者启动 - Queue: %s", c.queueName)
// 声明当前实例的动态响应队列
if err = rabbitmq.DeclareQueue(ctx, &rabbitmq.QueueConfig{
Name: c.queueName,
Durable: true,
}); err != nil {
glog.Errorf(ctx, "声明动态响应队列失败: %v", err)
return err
}
// 绑定队列到 Exchange使用队列名作为routing key实现精确路由
// message发送消息时会使用队列名作为routing key
if err = rabbitmq.BindQueue(ctx, &rabbitmq.BindingConfig{
Queue: c.queueName,
Exchange: "ragflow.response",
RoutingKey: c.queueName, // 使用队列名,只接收发给自己的消息
}); err != nil {
glog.Errorf(ctx, "绑定动态响应队列失败: %v", err)
return err
}
glog.Infof(ctx, "动态响应队列已绑定: %s -> ragflow.response (routingKey=#)", c.queueName)
c.consumer = rabbitmq.NewConsumer(c.queueName, handleResponse)
return c.consumer.Start(ctx)
}
// Stop 停止消费者
// 参数: ctx - 上下文
// 功能: 停止消费RAGFlow响应消息
func (c *ResponseConsumer) Stop(ctx context.Context) {
if c.consumer != nil {
c.consumer.Stop(ctx)
}
}
// ============== 卡片触发配置(待接入小红书卡片接口后修改)==============
//
// 【用户状态存储】
// 使用 Redis Hash 存储用户会话状态(阶段+对话计数统一5分钟TTL
// Key: ragflow:user:state:{userId}_{platform}
// Fields: stage阶段、count对话计数
//
// 【状态定义】
// 状态0走AI模型 | 状态1打招呼 | 状态2业务咨询 | 状态3发卡片
//
// 【卡片触发逻辑】
// 对话轮数>=配置值时更新状态为3并发送卡片消息
// 配置项config.yml中的card.triggerCount默认5轮
//
// 【待接入小红书卡片 API 后修改位置】
// checkAndSendCard() 函数中的 cardMessage 变量
const (
// ConversationFlushDelaySeconds 对话缓存延时落库时间(秒)
ConversationFlushDelaySeconds = 600 // 10分钟
)
// ============== 延时落库消费者 ==============
// DelayedFlushMessage 延时落库消息按sessionId
type DelayedFlushMessage struct {
SessionId string `json:"sessionId"`
}
// DelayedFlushConsumer 延时落库消费者
type DelayedFlushConsumer struct {
queueName string
consumer *rabbitmq.Consumer
}
// NewDelayedFlushConsumer 创建延时落库消费者
func NewDelayedFlushConsumer(ctx context.Context) *DelayedFlushConsumer {
return &DelayedFlushConsumer{
queueName: "conversation.flush.queue",
}
}
// Start 启动消费者
func (c *DelayedFlushConsumer) Start(ctx context.Context) (err error) {
glog.Infof(ctx, "延时落库消费者启动 - Queue: %s", c.queueName)
c.consumer = rabbitmq.NewConsumer(c.queueName, handleDelayedFlush)
return c.consumer.Start(ctx)
}
// Stop 停止消费者
func (c *DelayedFlushConsumer) Stop(ctx context.Context) {
if c.consumer != nil {
c.consumer.Stop(ctx)
}
}
// handleDelayedFlush 处理延时落库消息
func handleDelayedFlush(ctx context.Context, body []byte) error {
var msg DelayedFlushMessage
if err := gjson.DecodeTo(body, &msg); err != nil {
glog.Errorf(ctx, "解析延时落库消息失败: %v", err)
return err
}
glog.Infof(ctx, "收到延时落库消息 - SessionId: %s", msg.SessionId)
// 检查是否有未落库的缓存
count, err := redis.GetCachedConversationCount(ctx, msg.SessionId)
if err != nil {
glog.Errorf(ctx, "获取缓存数量失败: %v", err)
return err
}
if count == 0 {
glog.Debugf(ctx, "无需落库(缓存为空或已落库)- SessionId: %s", msg.SessionId)
return nil
}
// 执行落库
if err = flushConversationCache(ctx, msg.SessionId); err != nil {
glog.Errorf(ctx, "延时落库失败: %v", err)
return err
}
glog.Infof(ctx, "延时落库完成 - SessionId: %s", msg.SessionId)
return nil
}
// 延时落库发布器(单例)
var delayedFlushPublisher *rabbitmq.Publisher
// getDelayedFlushPublisher 获取延时落库发布器
func getDelayedFlushPublisher() *rabbitmq.Publisher {
if delayedFlushPublisher == nil {
delayedFlushPublisher = rabbitmq.NewPublisher("conversation.flush.delayed", "flush")
}
return delayedFlushPublisher
}
// sendDelayedFlushMessage 发送延时落库消息
func sendDelayedFlushMessage(ctx context.Context, sessionId string) error {
msg := &DelayedFlushMessage{
SessionId: sessionId,
}
return getDelayedFlushPublisher().PublishDelayed(ctx, msg, ConversationFlushDelaySeconds)
}
// handleResponse 处理 RabbitMQ 消息(幂等)
// 落库逻辑前5句缓存到Redis第5句时批量落库MongoDB超过5句不落库
func handleResponse(ctx context.Context, body []byte) error {
ctx, span := jaeger.NewSpan(ctx, "consumer.response")
defer span.End()
glog.Infof(ctx, ">>> handleResponse 被调用,消息长度: %d", len(body))
// 解析消息到结构体
var msg redis.ResponseStreamMessage
if err := gjson.DecodeTo(body, &msg); err != nil {
jaeger.RecordError(ctx, err, "解析响应消息失败")
return err
}
glog.Infof(ctx, "收到 RAGFlow 响应 - 用户: %s, MessageId: %s", msg.UserId, msg.MessageId)
// 1. 获取当前对话轮数
state, err := redis.GetUserState(ctx, msg.UserId, msg.Platform)
if err != nil {
jaeger.RecordError(ctx, err, "获取用户状态失败")
}
count := state.Count
// 2. 根据轮数决定落库策略
cardTriggerCount := g.Cfg().MustGet(ctx, "card.triggerCount", 5).Int64()
if count <= cardTriggerCount {
// 前N句缓存到Redis按sessionId
msgTime := gtime.NewFromTimeStamp(msg.Timestamp).Time
conversation := &entity.Conversation{
UserId: msg.UserId,
Platform: msg.Platform,
SessionId: msg.SessionId,
Question: msg.Question,
Answer: msg.Content,
MessageId: msg.MessageId,
MsgTime: &msgTime, // 取地址赋值给指针类型
}
conversation.TenantId = msg.TenantId
// 序列化后缓存使用sessionId作为key
data, _ := gjson.Encode(conversation)
if cacheErr := redis.CacheConversation(ctx, msg.SessionId, data); cacheErr != nil {
jaeger.RecordError(ctx, cacheErr, "缓存对话记录失败")
} else {
glog.Debugf(ctx, "对话已缓存到 Redis - SessionId: %s, 第 %d 轮", msg.SessionId, count)
}
// 第1句时发送10分钟延时落库消息兜底
if count == 1 {
if delayErr := sendDelayedFlushMessage(ctx, msg.SessionId); delayErr != nil {
glog.Warningf(ctx, "发送延时落库消息失败: %v", delayErr)
}
}
// 第N句时立即批量落库
if count == cardTriggerCount {
if flushErr := flushConversationCache(ctx, msg.SessionId); flushErr != nil {
jaeger.RecordError(ctx, flushErr, "批量落库失败")
}
}
} else {
// 超过N句不落库已发卡片
glog.Debugf(ctx, "第 %d 轮(>%d跳过落库 - 用户: %s", count, cardTriggerCount, msg.UserId)
}
// 3. 推送给 WebSocket 用户(无论是否落库都推送)
glog.Infof(ctx, "准备推送 WebSocket - 用户: %s_%s, 内容长度: %d", msg.UserId, msg.Platform, len(msg.Content))
if err = WebSocket.PushRAGFlowResponse(ctx, msg.TenantId, msg.UserId, msg.Platform, msg.Content); err != nil {
jaeger.RecordError(ctx, err, "推送 WebSocket 失败")
} else {
glog.Infof(ctx, "WebSocket 推送成功 - 用户: %s_%s", msg.UserId, msg.Platform)
}
return nil
}
// flushConversationCache 将Redis缓存的对话批量落库到MongoDB
func flushConversationCache(ctx context.Context, sessionId string) error {
// 获取缓存的对话列表
cached, err := redis.GetCachedConversations(ctx, sessionId)
if err != nil {
return err
}
if len(cached) == 0 {
return nil
}
// 反序列化
list := make([]*entity.Conversation, 0, len(cached))
for _, data := range cached {
var conv entity.Conversation
if decErr := gjson.DecodeTo([]byte(data), &conv); decErr != nil {
glog.Warningf(ctx, "反序列化对话失败: %v", decErr)
continue
}
list = append(list, &conv)
}
// 批量插入MongoDB
if len(list) > 0 {
if insertErr := dao.Conversation.BatchInsert(ctx, list); insertErr != nil {
return insertErr
}
glog.Infof(ctx, "批量落库成功 - SessionId: %s, 共 %d 条", sessionId, len(list))
}
return nil
}
// checkCardBeforeProcess 检查对话轮数,达到阈值时发卡片
// 返回 handled=true 表示已处理(发送卡片),调用方应跳过后续话术处理
func checkCardBeforeProcess(ctx context.Context, tenantId, userId, platform string) (handled bool, err error) {
// 获取用户当前状态
state, err := redis.GetUserState(ctx, userId, platform)
if err != nil {
return
}
// 状态5未选择方向时不计数等用户选择方向后再开始计数
if state.Stage == 5 {
glog.Debugf(ctx, "用户 %s_%s 处于状态5未选择方向跳过计数", userId, platform)
return false, nil
}
// 增加对话计数
count, err := redis.IncrUserCount(ctx, userId, platform)
if err != nil {
return
}
glog.Infof(ctx, "用户 %s_%s 当前对话轮数: %d", userId, platform, count)
// 对话>=配置轮数发卡片并跳过话术从配置值开始就发卡片不再调用AI
cardTriggerCount := g.Cfg().MustGet(ctx, "card.triggerCount", 5).Int64()
if count >= cardTriggerCount {
glog.Infof(ctx, "用户 %s_%s 对话第 %d 轮(>=%d触发发送卡片", userId, platform, count, cardTriggerCount)
// 更新用户状态为3发卡片状态
if updateErr := redis.SetUserStage(ctx, userId, platform, 3); updateErr != nil {
jaeger.RecordError(ctx, updateErr, "更新用户状态为3失败")
}
cardMessage := "请加一下卡片的联系方式,进行更专业的咨询" // TODO: 替换为实际卡片发送逻辑
if pushErr := WebSocket.PushRAGFlowResponse(ctx, tenantId, userId, platform, cardMessage); pushErr != nil {
jaeger.RecordError(ctx, pushErr, "推送卡片消息失败")
glog.Errorf(ctx, "推送卡片失败 - 用户: %s_%s, 错误: %v", userId, platform, pushErr)
err = pushErr
return
}
glog.Infof(ctx, "卡片消息已推送 - 用户: %s_%s", userId, platform)
handled = true
}
return
}