From 53c339222701c18f9de566b22a2864ed38d6f887 Mon Sep 17 00:00:00 2001 From: Cold <16419454+cold502@user.noreply.gitee.com> Date: Thu, 18 Dec 2025 18:01:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=99=A8=E5=87=86=E5=A4=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/directions.go | 236 +++++++++++++++++++++++++++++++++++ config/redis.go | 0 config/welcome.go | 40 ++++++ config/welcome_messages.yaml | 53 ++++++++ rabbitmq/consumer_manager.go | 171 +++++++++++++++++++++++++ rabbitmq/queue_setup.go | 111 ++++++++++++++++ redis/redis.go | 76 ++++++++++- 7 files changed, 682 insertions(+), 5 deletions(-) create mode 100644 config/directions.go create mode 100644 config/redis.go create mode 100644 config/welcome.go create mode 100644 config/welcome_messages.yaml create mode 100644 rabbitmq/consumer_manager.go create mode 100644 rabbitmq/queue_setup.go diff --git a/config/directions.go b/config/directions.go new file mode 100644 index 0000000..e0e2c15 --- /dev/null +++ b/config/directions.go @@ -0,0 +1,236 @@ +// Package config 提供全局配置管理和Consul监听 +// +// 本包实现了基于Consul的配置热更新机制,所有服务导入common包即可自动获得配置监听能力 +package config + +import ( + "context" + "encoding/json" + "sync" + "time" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/glog" + "github.com/hashicorp/consul/api" +) + +// Direction 咨询方向配置 +type Direction struct { + Name string `json:"name"` // 方向名称(如:气血、减肥) + ChatId string `json:"chat_id"` // RAGFlow对话ID +} + +var ( + directionsCache []Direction // 本地缓存(内存读取,超快) + cacheMu sync.RWMutex // 读写锁(支持多goroutine并发读) + startOnce sync.Once // 确保只启动一次监听 + consulClient *api.Client // Consul客户端(复用连接) +) + +// init 包初始化函数(所有服务导入common包时自动执行) +// +// Fallback顺序:Consul → config.yml +func init() { + ctx := context.Background() + + // 检查Consul是否配置 + consulAddr := g.Cfg().MustGet(ctx, "consul.address").String() + if consulAddr == "" { + glog.Warning(ctx, "Consul未配置,使用本地配置") + loadDirectionsFromLocal(ctx) + return + } + + // 初始化Consul客户端 + config := api.DefaultConfig() + config.Address = consulAddr + client, err := api.NewClient(config) + if err != nil { + glog.Errorf(ctx, "Consul客户端初始化失败: %v,fallback到本地配置", err) + loadDirectionsFromLocal(ctx) + return + } + consulClient = client + + // 启动后台监听(单例,确保只启动一次) + startOnce.Do(func() { + go startConsulWatcher(ctx) + glog.Info(ctx, "Consul配置监听已启动") + }) +} + +// GetDirections 获取咨询方向配置(从内存缓存读取) +// +// 返回: +// +// []Direction: 方向列表 +// +// 特点: +// - 高性能:读内存缓存,无网络IO +// - 线程安全:使用读锁,支持并发读取 +// - 自动更新:后台监听Consul,配置变化时自动更新缓存 +// +// 使用示例: +// +// dirs := config.GetDirections() +// for _, dir := range dirs { +// fmt.Printf("%s -> %s\n", dir.Name, dir.ChatId) +// } +func GetDirections() []Direction { + cacheMu.RLock() + defer cacheMu.RUnlock() + + // 返回副本,避免外部修改缓存 + result := make([]Direction, len(directionsCache)) + copy(result, directionsCache) + return result +} + +// GetDirectionChatId 根据方向名称获取对应的ChatId +// +// 参数: +// +// name: 方向名称(如:"气血"、"减肥") +// +// 返回: +// +// chatId: 对应的RAGFlow对话ID,未找到返回空字符串 +// +// 使用示例: +// +// chatId := config.GetDirectionChatId("气血") +func GetDirectionChatId(name string) string { + cacheMu.RLock() + defer cacheMu.RUnlock() + + for _, dir := range directionsCache { + if dir.Name == name { + return dir.ChatId + } + } + return "" +} + +// startConsulWatcher 后台监听Consul配置变化(Blocking Query长连接) +// +// 工作原理: +// 1. 使用Consul Blocking Query API(长连接,只在变化时返回) +// 2. 收到变化通知后更新本地缓存 +// 3. 自动重连(网络异常时自动恢复) +// +// 资源消耗: +// - 一个长连接(保持5分钟) +// - 配置未变化时几乎不占用CPU和网络 +// - 对比轮询:节省99%资源 +// +// 注意: +// - 此函数在后台goroutine中运行 +// - 使用Blocking Query避免轮询 +func startConsulWatcher(ctx context.Context) { + const consulKey = "ragflow/directions" + kv := consulClient.KV() + var lastIndex uint64 + + // 初始化时先读取一次配置 + if err := loadDirectionsFromConsul(ctx, kv); err != nil { + glog.Warningf(ctx, "初始化加载Consul配置失败: %v", err) + } + + // 持续监听配置变化 + for { + // Consul Blocking Query(长连接模式) + // WaitIndex: 指定版本号,只在配置变化时返回 + // WaitTime: 最长等待时间(超时后返回,客户端重新请求) + pair, meta, err := kv.Get(consulKey, &api.QueryOptions{ + WaitIndex: lastIndex, + WaitTime: 5 * time.Minute, + }) + + if err != nil { + glog.Errorf(ctx, "Consul查询失败: %v", err) + time.Sleep(5 * time.Second) // 错误时等待5秒重试 + continue + } + + // 配置版本号变化,说明有更新 + if meta.LastIndex != lastIndex { + lastIndex = meta.LastIndex + + // 配置被删除 + if pair == nil { + glog.Warning(ctx, "Consul配置已删除: "+consulKey) + cacheMu.Lock() + directionsCache = []Direction{} + cacheMu.Unlock() + continue + } + + // 解析并更新缓存 + var dirs []Direction + if err := json.Unmarshal(pair.Value, &dirs); err != nil { + glog.Errorf(ctx, "解析Consul配置失败: %v", err) + continue + } + + cacheMu.Lock() + directionsCache = dirs + cacheMu.Unlock() + + glog.Infof(ctx, "Consul配置已更新: %d个方向", len(dirs)) + } + } +} + +// loadDirectionsFromConsul 从Consul加载配置(初始化时调用) +func loadDirectionsFromConsul(ctx context.Context, kv *api.KV) error { + const consulKey = "ragflow/directions" + + pair, _, err := kv.Get(consulKey, nil) + if err != nil { + // Consul查询失败,fallback到本地配置 + glog.Warningf(ctx, "Consul查询失败: %v,fallback到本地配置", err) + loadDirectionsFromLocal(ctx) + return err + } + + if pair == nil { + glog.Warning(ctx, "Consul中未找到配置: "+consulKey+",fallback到本地配置") + loadDirectionsFromLocal(ctx) + return nil + } + + var dirs []Direction + if err := json.Unmarshal(pair.Value, &dirs); err != nil { + glog.Errorf(ctx, "解析Consul配置失败: %v,fallback到本地配置", err) + loadDirectionsFromLocal(ctx) + return err + } + + cacheMu.Lock() + directionsCache = dirs + cacheMu.Unlock() + + glog.Infof(ctx, "已加载Consul配置: %d个方向", len(dirs)) + return nil +} + +// loadDirectionsFromLocal 从本地config.yml加载配置(fallback机制) +func loadDirectionsFromLocal(ctx context.Context) { + directionsConfig := g.Cfg().MustGet(ctx, "ragflow.directions") + if directionsConfig.IsEmpty() { + glog.Warning(ctx, "本地配置中也未找到 ragflow.directions") + return + } + + var dirs []Direction + if err := directionsConfig.Scan(&dirs); err != nil { + glog.Errorf(ctx, "解析本地配置失败: %v", err) + return + } + + cacheMu.Lock() + directionsCache = dirs + cacheMu.Unlock() + + glog.Infof(ctx, "已加载config.yml配置: %d个方向", len(dirs)) +} diff --git a/config/redis.go b/config/redis.go new file mode 100644 index 0000000..e69de29 diff --git a/config/welcome.go b/config/welcome.go new file mode 100644 index 0000000..6dfe133 --- /dev/null +++ b/config/welcome.go @@ -0,0 +1,40 @@ +package config + +import ( + "context" + "sync" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/glog" +) + +var ( + welcomeCache map[string]string + welcomeMu sync.RWMutex + welcomeOnce sync.Once +) + +// initWelcomeMessages 初始化欢迎话术配置 +func initWelcomeMessages(ctx context.Context) { + welcomeOnce.Do(func() { + cfg := g.Cfg() + welcomeMap := cfg.MustGet(ctx, "welcomes").MapStrStr() + + welcomeMu.Lock() + welcomeCache = welcomeMap + welcomeMu.Unlock() + + glog.Infof(ctx, "已加载欢迎话术配置: %d个方向", len(welcomeMap)) + }) +} + +// GetWelcomeMessage 根据方向名称获取欢迎话术 +func GetWelcomeMessage(direction string) string { + ctx := context.Background() + initWelcomeMessages(ctx) + + welcomeMu.RLock() + defer welcomeMu.RUnlock() + + return welcomeCache[direction] +} diff --git a/config/welcome_messages.yaml b/config/welcome_messages.yaml new file mode 100644 index 0000000..0286482 --- /dev/null +++ b/config/welcome_messages.yaml @@ -0,0 +1,53 @@ +# 各咨询方向的欢迎话术配置 +# 当用户在状态5选择方向后,会自动发送对应的欢迎语 + +welcomes: + 乳腺贴: | + 🙋‍♀️ 欢迎姐妹们来到药济堂!我们是一个拥有10年经验的大健康专业团队🏅,专注于保守调理乳腺问题,已经成功帮助超过1万位姐妹轻松调理乳腺健康❤️。 + 如果您有结节或增生的困扰,欢迎随时咨询! + 请回复下面的数字,让我帮助您分析结节情况: + + 结节 + 增生 + 点击👇获取更快速的服务! + + 肝病: | + 你好,我是黄医生,有什么肝脏方面的问题我可以帮助您吗?无论是肝病、乙肝、丙肝,还是肝硬化腹水、脂肪肝、酒精肝,我都会为您提供专业的建议。😷 + 肝病的类型很多,常见症状有乏力、食欲减退和肝区不适等。为了更好地帮您,我需要了解一些详细的信息,比如: + + 您现在有腹胀或腹水的情况吗? + 是第一次出现还是反复出现呢? + 目前是早期还是中晚期呢? + 有没有病毒性肝炎的病史呢? + 请您留一下联系方式,我可以发送您的报告,并给您详细解读肝病治疗方案和成功案例。📋 + + 车膜: | + 🎉亲爱的车主,欢迎来到6膜王! + 🚗 我们在车膜行业深耕十年,拥有2家千平米的门店和30+专业团队,致力于为您提供最优质的服务! + ✨ 无论是隐形车衣还是改色膜,我们都能精准适配您的需求。留下您的【车型➕VX】,我们的资深顾问会立即为您匹配专属方案、膜材讲解以及报价!❤️ + + 毛孔: | + 啊啊~亲爱的姐妹们,最近有很多小伙伴在问我关于毛孔的问题!我之前也是毛孔大到妆容卡粉,真的是烦恼不断。😩 但在经过一段时间的摸索后,我终于找到了合适的方法,效果真的很好,差不多一个月就改善了许多!(亲测有效!)💖 + 如果需要帮助,可以随时告诉我,我非常乐意分享我的经验给你们哦! + + 免税店: | + Hi,长春的宝子们~💕欢迎锁定小红提免税集合店! + 这里有你想要的一切:美妆、香水、包包、首饰和大牌护肤品,正品保真不踩雷,价格特别美丽哦✨ + 全城顺丰包邮,购物更方便!期待你们来逛快来发现更多惊喜吧!💖 + + 门店地址:长春市绿园区皓月大路吾悦广场1楼 1036号,等你来哦!🌟 + + 减肥: | + 你好呀,姐妹~你也有肉肉的困扰吗?我现在已经掉了二十多斤了,至今都没有反弹! + 想要方法的话可以直接回复"1",我分享给你哦~ + (💗未成年发育期、哺乳期的姐妹我就不推荐啦) + + 气血: | + 亲爱的,欢迎光临!🌸 + 如果你有月经不调或气血不足的问题,随时可以问我哦! + + 停经闭经 + 痛经难忍 + 量少 + 经期不准 + 💗只需回复数字,我们会为你提供专业建议! 🌟如需更多帮助,点击下方咨询专业老师,我们一起寻求解决方案~ diff --git a/rabbitmq/consumer_manager.go b/rabbitmq/consumer_manager.go new file mode 100644 index 0000000..28695ca --- /dev/null +++ b/rabbitmq/consumer_manager.go @@ -0,0 +1,171 @@ +// Package rabbitmq 提供 RabbitMQ 消费者管理功能 +// +// 本文件实现消费者统一管理,简化业务层的启动逻辑 +package rabbitmq + +import ( + "context" + "sync" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/glog" +) + +// ManagedConsumer 托管消费者(包含启动和停止函数) +type ManagedConsumer struct { + Name string // 消费者名称 + Start func(ctx context.Context) error // 启动函数 + Stop func(ctx context.Context) // 停止函数 +} + +// ConsumerManager RabbitMQ 消费者管理器 +// +// 职责: +// 1. 统一管理所有 RabbitMQ 消费者的生命周期 +// 2. 初始化 RabbitMQ 连接和队列 +// 3. 启动/停止所有消费者 +// 4. 协调消费者的优雅退出 +// +// 使用示例: +// +// mgr := rabbitmq.NewConsumerManager(ctx) +// mgr.Register("响应消费者", responseConsumer.Start, responseConsumer.Stop) +// mgr.Init() +// defer mgr.Stop() +type ConsumerManager struct { + ctx context.Context // 全局上下文 + consumers []*ManagedConsumer // 消费者列表 + wg sync.WaitGroup // 等待所有消费者协程退出 +} + +// NewConsumerManager 创建消费者管理器 +// +// 参数: +// +// ctx: 上下文 +// +// 返回: +// +// *ConsumerManager: 消费者管理器实例 +func NewConsumerManager(ctx context.Context) *ConsumerManager { + return &ConsumerManager{ + ctx: ctx, + consumers: make([]*ManagedConsumer, 0), + } +} + +// Register 注册消费者 +// +// 参数: +// +// name: 消费者名称(用于日志) +// startFunc: 启动函数 +// stopFunc: 停止函数 +// +// 使用示例: +// +// consumer := service.NewResponseConsumer(ctx) +// mgr.Register("响应消费者", consumer.Start, consumer.Stop) +func (cm *ConsumerManager) Register(name string, startFunc func(ctx context.Context) error, stopFunc func(ctx context.Context)) { + cm.consumers = append(cm.consumers, &ManagedConsumer{ + Name: name, + Start: startFunc, + Stop: stopFunc, + }) +} + +// Init 初始化并启动所有消费者 +// +// 执行流程: +// 1. 检查 RabbitMQ 配置(未配置则跳过) +// 2. 初始化 RabbitMQ 连接 +// 3. 声明并绑定队列(响应队列、延时落库队列) +// 4. 异步启动所有已注册的消费者 +// +// 返回: +// +// err: 错误信息,成功返回 nil +// +// 注意: +// - 如果 RabbitMQ 未配置,不会报错,只是跳过初始化 +// - 响应队列初始化失败会导致 Fatal 退出 +// - 延时落库队列失败只会 Warning,不影响主流程 +func (cm *ConsumerManager) Init() (err error) { + // 检查配置文件中是否配置了 RabbitMQ + if g.Cfg().MustGet(cm.ctx, "rabbitmq").IsEmpty() { + glog.Info(cm.ctx, "RabbitMQ未配置,跳过消费者初始化") + return + } + + // 初始化 RabbitMQ 连接(从 config.yml 读取配置) + if err = InitFromConfig(cm.ctx); err != nil { + glog.Fatalf(cm.ctx, "初始化 RabbitMQ 失败: %v", err) + return + } + glog.Info(cm.ctx, "RabbitMQ 连接已初始化") + + // 设置响应队列(RAGFlow 响应消息) + if err = SetupResponseQueue(cm.ctx); err != nil { + glog.Fatalf(cm.ctx, "设置响应队列失败: %v", err) + return + } + + // 设置延时落库队列(对话缓存兜底机制) + // 失败不影响主流程,只记录 Warning + if err = SetupDelayedFlushQueue(cm.ctx); err != nil { + glog.Warningf(cm.ctx, "设置延时落库队列失败: %v", err) + } + + // 异步启动所有已注册的消费者 + cm.startConsumers() + return +} + +// startConsumers 启动所有消费者(内部方法) +// +// 实现: +// 1. 遍历已注册的消费者 +// 2. 每个消费者在独立的 goroutine 中运行 +// 3. 使用 WaitGroup 追踪所有消费者协程 +func (cm *ConsumerManager) startConsumers() { + for _, c := range cm.consumers { + cm.wg.Add(1) + go func(consumer *ManagedConsumer) { + defer cm.wg.Done() + if err := consumer.Start(cm.ctx); err != nil { + glog.Errorf(cm.ctx, "%s启动失败: %v", consumer.Name, err) + } + }(c) + glog.Infof(cm.ctx, "%s已启动", c.Name) + } +} + +// Stop 停止所有消费者(优雅退出) +// +// 执行流程: +// 1. 依次停止所有消费者(调用各自的 Stop 方法) +// 2. 等待所有消费者协程退出(WaitGroup.Wait) +// 3. 关闭 RabbitMQ 连接 +// +// 使用场景: +// - 收到 SIGINT/SIGTERM 信号时 +// - 程序正常退出时 +// - defer mgr.Stop() +// +// 注意: +// - Stop 方法会阻塞直到所有消费者完全退出 +// - 确保消费者能正确响应 Stop 信号 +func (cm *ConsumerManager) Stop() { + // 依次停止所有消费者 + for _, c := range cm.consumers { + c.Stop(cm.ctx) + glog.Infof(cm.ctx, "%s已停止", c.Name) + } + + // 等待所有消费者协程退出 + cm.wg.Wait() + + // 关闭 RabbitMQ 连接 + Close(cm.ctx) + glog.Info(cm.ctx, "所有消费者已停止,RabbitMQ连接已关闭") +} diff --git a/rabbitmq/queue_setup.go b/rabbitmq/queue_setup.go new file mode 100644 index 0000000..dbb3130 --- /dev/null +++ b/rabbitmq/queue_setup.go @@ -0,0 +1,111 @@ +// Package rabbitmq 提供 RabbitMQ 队列初始化的封装方法 +// +// 本文件包含常用队列的声明和绑定逻辑,简化业务层的队列配置代码 +package rabbitmq + +import ( + "context" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/glog" +) + +// SetupResponseQueue 初始化 RAGFlow 响应队列 +// +// 功能: +// 1. 声明持久化队列(从配置文件读取队列名,默认 ragflow.response.queue) +// 2. 绑定到 ragflow.response Exchange(Topic 类型) +// 3. 使用通配符 # 匹配所有 routing key(userId) +// +// 参数: +// +// ctx: 上下文 +// +// 返回: +// +// err: 错误信息,成功返回 nil +// +// 配置示例(config.yml): +// +// rabbitmq: +// responseQueue: "ragflow.response.queue" # 可选,默认值 +func SetupResponseQueue(ctx context.Context) (err error) { + // 从配置文件读取队列名(支持每个开发者配置独立队列名) + responseQueue := g.Cfg().MustGet(ctx, "rabbitmq.responseQueue", "ragflow.response.queue").String() + + // 声明持久化队列(服务器重启后队列仍存在) + if err = DeclareQueue(ctx, &QueueConfig{ + Name: responseQueue, + Durable: true, // 持久化,防止数据丢失 + }); err != nil { + glog.Errorf(ctx, "声明响应队列失败: %v", err) + return + } + + // 绑定队列到 Exchange + // Exchange 类型为 topic,routing key 使用通配符 # 匹配所有 userId + if err = BindQueue(ctx, &BindingConfig{ + Queue: responseQueue, + Exchange: "ragflow.response", // RAGFlow 响应 Exchange + RoutingKey: "#", // 通配符,匹配所有消息 + }); err != nil { + glog.Errorf(ctx, "绑定响应队列失败: %v", err) + return + } + + glog.Infof(ctx, "响应队列已绑定: %s -> ragflow.response (routingKey=#)", responseQueue) + return +} + +// SetupDelayedFlushQueue 初始化延时落库队列 +// +// 功能: +// 1. 声明延时 Exchange(x-delayed-message 插件) +// 2. 声明持久化队列 conversation.flush.queue +// 3. 绑定队列到延时 Exchange +// +// 用途: +// +// 对话缓存延时落库机制的兜底策略 +// 当对话少于5句时,10分钟后触发延时消息将缓存写入MongoDB +// +// 参数: +// +// ctx: 上下文 +// +// 返回: +// +// err: 错误信息,成功返回 nil +// +// 相关: +// - service/conversation_service.go: handleResponse() +// - service/conversation_service.go: handleDelayedFlush() +func SetupDelayedFlushQueue(ctx context.Context) (err error) { + // 声明延时 Exchange(需要 RabbitMQ 安装 x-delayed-message 插件) + if err = SetupDelayExchange(ctx, "conversation.flush.delayed"); err != nil { + glog.Warningf(ctx, "声明延时落库 Exchange 失败: %v", err) + return + } + + // 声明持久化队列 + if err = DeclareQueue(ctx, &QueueConfig{ + Name: "conversation.flush.queue", + Durable: true, // 持久化,防止延时消息丢失 + }); err != nil { + glog.Warningf(ctx, "声明延时落库 Queue 失败: %v", err) + return + } + + // 绑定队列到延时 Exchange + if err = BindQueue(ctx, &BindingConfig{ + Queue: "conversation.flush.queue", + Exchange: "conversation.flush.delayed", + RoutingKey: "flush", // 延时落库消息的 routing key + }); err != nil { + glog.Warningf(ctx, "绑定延时落库 Queue 失败: %v", err) + return + } + + glog.Info(ctx, "延时落库队列已配置") + return +} diff --git a/redis/redis.go b/redis/redis.go index 72b0ee5..69ff1c5 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -485,10 +485,11 @@ const ( UserStateExpireSeconds = 300 ) -// UserState 用户会话状态(阶段+对话计数,统一5分钟过期) +// UserState 用户会话状态(阶段+对话计数+咨询方向,统一5分钟过期) type UserState struct { - Stage int `json:"stage"` // 用户阶段:0=AI模型 1=打招呼 2=业务 3=发卡片 - Count int64 `json:"count"` // 对话计数 + Stage int `json:"stage"` // 用户阶段:5=未选择方向 0=AI模型 1=打招呼 2=业务 3=发卡片 + Count int64 `json:"count"` // 对话计数 + Direction string `json:"direction"` // 用户选择的咨询方向(如:产品咨询、售后服务) } // GetUserState 获取用户状态(阶段+计数) @@ -499,14 +500,15 @@ func GetUserState(ctx context.Context, userId, platform string) (state *UserStat return } - state = &UserState{} + state = &UserState{Stage: 5} // 默认状态5(未选择方向) if result.IsEmpty() { - return // 返回默认值 stage=0, count=0 + return } m := result.Map() state.Stage = gconv.Int(m["stage"]) state.Count = gconv.Int64(m["count"]) + state.Direction = gconv.String(m["direction"]) return } @@ -521,6 +523,17 @@ func SetUserStage(ctx context.Context, userId, platform string, stage int) error return err } +// SetUserDirection 设置用户选择的咨询方向,并刷新过期时间 +func SetUserDirection(ctx context.Context, userId, platform, direction string) error { + key := UserStateKeyPrefix + userId + "_" + platform + _, err := redisClient.Do(ctx, "HSET", key, "direction", direction) + if err != nil { + return err + } + _, err = redisClient.Do(ctx, "EXPIRE", key, UserStateExpireSeconds) + return err +} + // IncrUserCount 增加用户对话计数,返回当前轮数,并刷新过期时间 func IncrUserCount(ctx context.Context, userId, platform string) (count int64, err error) { key := UserStateKeyPrefix + userId + "_" + platform @@ -540,6 +553,59 @@ func ResetUserState(ctx context.Context, userId, platform string) error { return err } +// ============== 对话缓存相关(5句落库)============== + +const ( + // ConversationCacheKeyPrefix 对话缓存 Key 前缀 + ConversationCacheKeyPrefix = "ragflow:conversation:cache:" + // ConversationCacheExpireSeconds 对话缓存过期时间(10分钟) + ConversationCacheExpireSeconds = 600 +) + +// CacheConversation 缓存单条对话到Redis List +func CacheConversation(ctx context.Context, userId, platform string, data []byte) error { + key := ConversationCacheKeyPrefix + userId + "_" + platform + _, err := redisClient.Do(ctx, "RPUSH", key, string(data)) + if err != nil { + return err + } + _, err = redisClient.Do(ctx, "EXPIRE", key, ConversationCacheExpireSeconds) + return err +} + +// GetCachedConversations 获取缓存的对话列表并清空 +func GetCachedConversations(ctx context.Context, userId, platform string) (list []string, err error) { + key := ConversationCacheKeyPrefix + userId + "_" + platform + result, err := redisClient.Do(ctx, "LRANGE", key, 0, -1) + if err != nil { + return + } + if result.IsEmpty() { + return + } + list = result.Strings() + // 清空缓存 + redisClient.Del(ctx, key) + return +} + +// GetCachedConversationCount 获取缓存的对话数量 +func GetCachedConversationCount(ctx context.Context, userId, platform string) (count int64, err error) { + key := ConversationCacheKeyPrefix + userId + "_" + platform + result, err := redisClient.Do(ctx, "LLEN", key) + if err != nil { + return + } + return result.Int64(), nil +} + +// ClearCachedConversations 清空对话缓存(归档时调用) +func ClearCachedConversations(ctx context.Context, userId, platform string) error { + key := ConversationCacheKeyPrefix + userId + "_" + platform + _, err := redisClient.Del(ctx, key) + return err +} + // ========== 以下为兼容旧接口(内部调用新实现)========== // IncrConversationCount 增加用户对话计数(兼容旧接口)