From 3f4ac12f52848505bc9157f53487f8d4667f2364 Mon Sep 17 00:00:00 2001 From: Cold <16419454+cold502@user.noreply.gitee.com> Date: Thu, 4 Dec 2025 17:39:31 +0800 Subject: [PATCH] redis --- http/http.go | 9 ++- ragflow/worker_pool.go | 165 +++++++++++++++++++++++++++++++++++++++++ redis/redis.go | 162 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 332 insertions(+), 4 deletions(-) create mode 100644 ragflow/worker_pool.go diff --git a/http/http.go b/http/http.go index 5997f90..0023e64 100644 --- a/http/http.go +++ b/http/http.go @@ -4,6 +4,11 @@ import ( "context" "errors" "fmt" + "net/http" + "reflect" + "regexp" + "strings" + _ "gitee.com/red-future---jilin-g/common/consul" "gitee.com/red-future---jilin-g/common/jaeger" "gitee.com/red-future---jilin-g/common/utils" @@ -13,10 +18,6 @@ import ( "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" - "net/http" - "reflect" - "regexp" - "strings" ) type ResponseEmpty struct { diff --git a/ragflow/worker_pool.go b/ragflow/worker_pool.go new file mode 100644 index 0000000..682786f --- /dev/null +++ b/ragflow/worker_pool.go @@ -0,0 +1,165 @@ +package ragflow + +import ( + "context" + + "gitee.com/red-future---jilin-g/common/redis" + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/os/grpool" +) + +// WorkerPool RAGFlow 请求处理协程池 +type WorkerPool struct { + pool *grpool.Pool + size int +} + +// NewWorkerPool 创建协程池 +// 参数: +// - size: 协程池大小,建议设置为 CPU 核心数的 2-4 倍 +// +// 返回: +// - *WorkerPool: 协程池实例 +// - error: 创建失败时返回错误 +func NewWorkerPool(size int) (*WorkerPool, error) { + if size <= 0 { + return nil, gerror.New("协程池大小必须大于0") + } + + pool := grpool.New(size) + + return &WorkerPool{ + pool: pool, + size: size, + }, nil +} + +// Submit 提交任务到协程池 +// 参数: +// - ctx: 上下文 +// - task: 要执行的任务函数 +// +// 返回:error 提交失败时返回错误 +func (w *WorkerPool) Submit(ctx context.Context, task func(ctx context.Context)) error { + return w.pool.Add(ctx, func(ctx context.Context) { + defer func() { + if r := recover(); r != nil { + glog.Errorf(ctx, "协程池任务执行 panic: %v", r) + } + }() + + task(ctx) + }) +} + +// Size 获取协程池大小 +func (w *WorkerPool) Size() int { + return w.size +} + +// Jobs 获取当前等待执行的任务数量 +func (w *WorkerPool) Jobs() int { + return w.pool.Jobs() +} + +// Close 关闭协程池 +func (w *WorkerPool) Close() { + w.pool.Close() +} + +// WorkerStats 协程池统计信息 +type WorkerStats struct { + PoolSize int // 协程池大小 + Jobs int // 等待执行的任务数 +} + +// Stats 获取协程池统计信息 +func (w *WorkerPool) Stats() WorkerStats { + return WorkerStats{ + PoolSize: w.size, + Jobs: w.pool.Jobs(), + } +} + +// PrintStats 打印协程池统计信息 +func (w *WorkerPool) PrintStats(ctx context.Context) { + stats := w.Stats() + glog.Infof(ctx, "协程池统计 - 池大小: %d, 等待任务: %d", stats.PoolSize, stats.Jobs) +} + +// QueueProcessor 队列处理器,从 Redis 队列中取出任务并提交到协程池 +type QueueProcessor struct { + pool *WorkerPool + queueKey string + timeout int + stopChan chan struct{} + handleFunc func(ctx context.Context, message string) error +} + +// NewQueueProcessor 创建队列处理器 +// 参数: +// - pool: 协程池 +// - queueKey: Redis 队列键名 +// - timeout: 从队列取消息的超时时间(秒) +// - handleFunc: 消息处理函数 +func NewQueueProcessor(pool *WorkerPool, queueKey string, timeout int, handleFunc func(ctx context.Context, message string) error) *QueueProcessor { + return &QueueProcessor{ + pool: pool, + queueKey: queueKey, + timeout: timeout, + stopChan: make(chan struct{}), + handleFunc: handleFunc, + } +} + +// Start 启动队列处理器 +// 会阻塞运行,持续从 Redis 队列中取出消息并提交到协程池处理 +func (q *QueueProcessor) Start(ctx context.Context) error { + glog.Infof(ctx, "队列处理器启动 - 队列: %s, 超时: %ds", q.queueKey, q.timeout) + + for { + select { + case <-q.stopChan: + glog.Info(ctx, "队列处理器收到停止信号") + return nil + default: + // 从 Redis 队列中取出消息 + message, err := q.fetchMessage(ctx) + if err != nil { + glog.Errorf(ctx, "从队列取消息失败: %v", err) + continue + } + + // 队列为空,继续等待 + if message == "" { + continue + } + + // 提交到协程池处理 + if err := q.submitTask(ctx, message); err != nil { + glog.Errorf(ctx, "提交任务到协程池失败: %v", err) + } + } + } +} + +// Stop 停止队列处理器 +func (q *QueueProcessor) Stop() { + close(q.stopChan) +} + +// fetchMessage 从 Redis 队列中取出消息 +func (q *QueueProcessor) fetchMessage(ctx context.Context) (string, error) { + // 调用 Redis 队列的 PopFromQueue 方法从队列中取出消息 + return redis.PopFromQueue(ctx, q.queueKey, q.timeout) +} + +// submitTask 将消息处理任务提交到协程池 +func (q *QueueProcessor) submitTask(ctx context.Context, message string) error { + return q.pool.Submit(ctx, func(ctx context.Context) { + if err := q.handleFunc(ctx, message); err != nil { + glog.Errorf(ctx, "处理消息失败: %v, 消息: %s", err, message) + } + }) +} diff --git a/redis/redis.go b/redis/redis.go index 5f5b962..fb197dd 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -1,6 +1,9 @@ package redis import ( + "context" + "time" + "github.com/gogf/gf/v2/database/gredis" "github.com/gogf/gf/v2/frame/g" ) @@ -10,3 +13,162 @@ var RedisClient *gredis.Redis func init() { RedisClient = g.Redis() } + +// 队列操作常量 +const ( + // RAGFlow 请求队列 Key + RAGFlowRequestQueueKey = "ragflow:request:queue" + // 会话最后活跃时间 Key 前缀 + SessionLastActiveKeyPrefix = "ragflow:session:" +) + +// PushToQueue 将消息推入队列(LPUSH从左侧入队) +// 用于 Controller 层将 RAGFlow 请求推入队列,由后台 Goroutine 处理 +// 参数: +// - queueKey: 队列键名 +// - message: 要推入的消息内容 +// +// 返回:error 推入失败时返回错误 +func PushToQueue(ctx context.Context, queueKey string, message interface{}) error { + _, err := RedisClient.Do(ctx, "LPUSH", queueKey, message) + return err +} + +// PopFromQueue 从队列中阻塞取出消息(BRPOP从右侧出队) +// 后台 Goroutine 使用此方法从队列中取出请求进行处理 +// 参数: +// - queueKey: 队列键名 +// - timeout: 阻塞超时时间(秒),0表示永久阻塞 +// +// 返回: +// - string: 取出的消息内容,超时或队列为空返回空字符串 +// - error: 操作失败时返回错误 +func PopFromQueue(ctx context.Context, queueKey string, timeout int) (string, error) { + result, err := RedisClient.Do(ctx, "BRPOP", queueKey, timeout) + if err != nil { + return "", err + } + + // BRPOP 返回 [key, value],我们需要取 value + if result == nil { + return "", nil // 超时返回空 + } + + // GoFrame gredis 返回的是 *gvar.Var 类型 + arr := result.Strings() + if len(arr) >= 2 { + return arr[1], nil // arr[0] 是 key,arr[1] 是 value + } + + return "", nil +} + +// GetQueueLength 获取队列当前长度 +// 用于监控队列积压情况 +// 参数: +// - queueKey: 队列键名 +// +// 返回: +// - int64: 队列中消息数量 +// - error: 操作失败时返回错误 +func GetQueueLength(ctx context.Context, queueKey string) (int64, error) { + result, err := RedisClient.Do(ctx, "LLEN", queueKey) + if err != nil { + return 0, err + } + return result.Int64(), nil +} + +// SetSessionLastActive 设置用户最后活跃时间 +// 用于控制是否发送追问:用户回复后更新活跃时间,避免重复追问 +// 过期时间:2小时,超过2小时未活跃的记录会自动删除 +// 参数: +// - userId: 用户ID +// +// 返回:error 设置失败时返回错误 +func SetSessionLastActive(ctx context.Context, userId string) error { + key := SessionLastActiveKeyPrefix + userId + ":last_active" + timestamp := time.Now().Unix() + + // 设置过期时间为 2 小时 + return RedisClient.SetEX(ctx, key, timestamp, 7200) +} + +// GetSessionLastActive 获取用户最后活跃时间 +// 参数: +// - userId: 用户ID +// +// 返回: +// - int64: Unix时间戳,未找到返回0 +// - error: 操作失败时返回错误 +func GetSessionLastActive(ctx context.Context, userId string) (int64, error) { + key := SessionLastActiveKeyPrefix + userId + ":last_active" + result, err := RedisClient.Get(ctx, key) + if err != nil { + return 0, err + } + + if result.IsNil() { + return 0, nil // 未找到返回 0 + } + + return result.Int64(), nil +} + +// IsUserActive 检查用户是否在指定时间范围内活跃过 +// 用于追问逻辑:如果用户最近活跃过,则不发送追问消息 +// 参数: +// - userId: 用户ID +// - seconds: 时间范围(秒),例如传入300表示检查5分钟内是否活跃 +// +// 返回: +// - bool: true表示用户在指定时间内活跃过 +// - error: 操作失败时返回错误 +func IsUserActive(ctx context.Context, userId string, seconds int64) (bool, error) { + lastActive, err := GetSessionLastActive(ctx, userId) + if err != nil { + return false, err + } + + if lastActive == 0 { + return false, nil // 未找到记录,视为不活跃 + } + + now := time.Now().Unix() + return (now - lastActive) < seconds, nil +} + +// SetSessionCache 缓存用户的 RAGFlow Session ID +// 避免每次请求都创建新 Session,提高性能 +// 过期时间:7天,超过7天未使用的Session会自动清理 +// 参数: +// - userId: 用户ID +// - sessionId: RAGFlow返回的Session ID +// +// 返回:error 设置失败时返回错误 +func SetSessionCache(ctx context.Context, userId, sessionId string) error { + key := SessionLastActiveKeyPrefix + userId + ":session_id" + return RedisClient.SetEX(ctx, key, sessionId, 7*24*3600) +} + +// GetSessionCache 获取缓存的 RAGFlow Session ID +// 如果缓存中存在则直接使用,不存在则需要创建新Session +// 参数: +// - userId: 用户ID +// +// 返回: +// - string: Session ID,未找到返回空字符串 +// - error: 操作失败时返回错误 +func GetSessionCache(ctx context.Context, userId string) (string, error) { + key := SessionLastActiveKeyPrefix + userId + ":session_id" + result, err := RedisClient.Get(ctx, key) + if err != nil { + return "", err + } + + if result.IsNil() { + return "", nil + } + + return result.String(), nil +}