Files
common/redis/redis.go
2026-03-12 08:50:55 +08:00

175 lines
4.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package redis
import (
"context"
"time"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/frame/g"
)
var RedisClient *gredis.Redis
func init() {
RedisClient = g.Redis()
}
// 队列操作常量
const (
// RAGFlow 请求队列 Key
RAGFlowRequestQueueKey = "ragflow:request:queue"
// 会话最后活跃时间 Key 前缀
SessionLastActiveKeyPrefix = "ragflow:session:"
)
// PushToQueue 将消息推入队列LPUSH从左侧入队
// 用于 Controller 层将 RAGFlow 请求推入队列,由后台 Goroutine 处理
// 参数:
// - queueKey: 队列键名
// - message: 要推入的消息内容
//
// 返回error 推入失败时返回错误
func PushToQueue(ctx context.Context, queueKey string, message interface{}) error {
_, err := RedisClient.Do(ctx, "LPUSH", queueKey, message)
return err
}
// PopFromQueue 从队列中阻塞取出消息BRPOP从右侧出队
// 后台 Goroutine 使用此方法从队列中取出请求进行处理
// 参数:
// - queueKey: 队列键名
// - timeout: 阻塞超时时间0表示永久阻塞
//
// 返回:
// - string: 取出的消息内容,超时或队列为空返回空字符串
// - error: 操作失败时返回错误
func PopFromQueue(ctx context.Context, queueKey string, timeout int) (string, error) {
result, err := RedisClient.Do(ctx, "BRPOP", queueKey, timeout)
if err != nil {
return "", err
}
// BRPOP 返回 [key, value],我们需要取 value
if result == nil {
return "", nil // 超时返回空
}
// GoFrame gredis 返回的是 *gvar.Var 类型
arr := result.Strings()
if len(arr) >= 2 {
return arr[1], nil // arr[0] 是 keyarr[1] 是 value
}
return "", nil
}
// GetQueueLength 获取队列当前长度
// 用于监控队列积压情况
// 参数:
// - queueKey: 队列键名
//
// 返回:
// - int64: 队列中消息数量
// - error: 操作失败时返回错误
func GetQueueLength(ctx context.Context, queueKey string) (int64, error) {
result, err := RedisClient.Do(ctx, "LLEN", queueKey)
if err != nil {
return 0, err
}
return result.Int64(), nil
}
// SetSessionLastActive 设置用户最后活跃时间
// 用于控制是否发送追问:用户回复后更新活跃时间,避免重复追问
// 过期时间2小时超过2小时未活跃的记录会自动删除
// 参数:
// - userId: 用户ID
//
// 返回error 设置失败时返回错误
func SetSessionLastActive(ctx context.Context, userId string) error {
key := SessionLastActiveKeyPrefix + userId + ":last_active"
timestamp := time.Now().Unix()
// 设置过期时间为 2 小时
return RedisClient.SetEX(ctx, key, timestamp, 7200)
}
// GetSessionLastActive 获取用户最后活跃时间
// 参数:
// - userId: 用户ID
//
// 返回:
// - int64: Unix时间戳未找到返回0
// - error: 操作失败时返回错误
func GetSessionLastActive(ctx context.Context, userId string) (int64, error) {
key := SessionLastActiveKeyPrefix + userId + ":last_active"
result, err := RedisClient.Get(ctx, key)
if err != nil {
return 0, err
}
if result.IsNil() {
return 0, nil // 未找到返回 0
}
return result.Int64(), nil
}
// IsUserActive 检查用户是否在指定时间范围内活跃过
// 用于追问逻辑:如果用户最近活跃过,则不发送追问消息
// 参数:
// - userId: 用户ID
// - seconds: 时间范围例如传入300表示检查5分钟内是否活跃
//
// 返回:
// - bool: true表示用户在指定时间内活跃过
// - error: 操作失败时返回错误
func IsUserActive(ctx context.Context, userId string, seconds int64) (bool, error) {
lastActive, err := GetSessionLastActive(ctx, userId)
if err != nil {
return false, err
}
if lastActive == 0 {
return false, nil // 未找到记录,视为不活跃
}
now := time.Now().Unix()
return (now - lastActive) < seconds, nil
}
// SetSessionCache 缓存用户的 RAGFlow Session ID
// 避免每次请求都创建新 Session提高性能
// 过期时间7天超过7天未使用的Session会自动清理
// 参数:
// - userId: 用户ID
// - sessionId: RAGFlow返回的Session ID
//
// 返回error 设置失败时返回错误
func SetSessionCache(ctx context.Context, userId, sessionId string) error {
key := SessionLastActiveKeyPrefix + userId + ":session_id"
return RedisClient.SetEX(ctx, key, sessionId, 7*24*3600)
}
// GetSessionCache 获取缓存的 RAGFlow Session ID
// 如果缓存中存在则直接使用不存在则需要创建新Session
// 参数:
// - userId: 用户ID
//
// 返回:
// - string: Session ID未找到返回空字符串
// - error: 操作失败时返回错误
func GetSessionCache(ctx context.Context, userId string) (string, error) {
key := SessionLastActiveKeyPrefix + userId + ":session_id"
result, err := RedisClient.Get(ctx, key)
if err != nil {
return "", err
}
if result.IsNil() {
return "", nil
}
return result.String(), nil
}