diff --git a/redis/redis.go b/redis/redis.go index 7b215d0..d8dd359 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -147,7 +147,7 @@ type StreamMessage struct { // 使用 gredis Do() 方法执行 XGROUP CREATE 命令 func InitStreamGroup(ctx context.Context, streamKey, groupName string) error { // XGROUP CREATE streamKey groupName 0 MKSTREAM - _, err := redisClient.Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM") + _, err := getClient().Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM") if err != nil { // 如果组已存在,忽略错误 errStr := err.Error() @@ -173,7 +173,7 @@ func AddToStream(ctx context.Context, streamKey string, msg interface{}) (messag args = append(args, key, val) } - result, err := redisClient.Do(ctx, "XADD", args...) + result, err := getClient().Do(ctx, "XADD", args...) if err != nil { return } @@ -186,7 +186,7 @@ func AddToStream(ctx context.Context, streamKey string, msg interface{}) (messag // XGROUP CREATE streamKey groupName 0 MKSTREAM // 使用0作为起始ID,从Stream开头读取所有未消费消息 func CreateConsumerGroup(ctx context.Context, streamKey, groupName string) error { - _, err := redisClient.Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM") + _, err := getClient().Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM") return err } @@ -202,7 +202,7 @@ func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName stri RECONNECT: // 先尝试读取pending消息(ID=0),处理积压 - result, err := redisClient.Do(execCtx, + result, err := getClient().Do(execCtx, "XREADGROUP", "GROUP", groupName, consumerName, "COUNT", count, "BLOCK", 0, // 不阻塞,立即返回 @@ -232,7 +232,7 @@ RECONNECT: // 如果没有pending消息,读取新消息 if !hasPending { - result, err = redisClient.Do(execCtx, + result, err = getClient().Do(execCtx, "XREADGROUP", "GROUP", groupName, consumerName, "COUNT", count, "BLOCK", blockMs, @@ -354,7 +354,7 @@ func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ... args = append(args, id) } - _, err := redisClient.Do(ctx, "XACK", args...) + _, err := getClient().Do(ctx, "XACK", args...) return err } @@ -362,7 +362,7 @@ func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ... // 使用 gredis Do() 方法执行 XLEN 命令 func GetStreamLength(ctx context.Context, streamKey string) (int64, error) { // XLEN streamKey - result, err := redisClient.Do(ctx, "XLEN", streamKey) + result, err := getClient().Do(ctx, "XLEN", streamKey) if err != nil { return 0, err } @@ -383,7 +383,7 @@ type PendingMessage struct { // 使用 gredis Do() 方法执行 XPENDING 命令 func GetPendingMessages(ctx context.Context, streamKey, groupName string, start, end string, count int64) ([]PendingMessage, error) { // XPENDING streamKey groupName start end count - result, err := redisClient.Do(ctx, "XPENDING", streamKey, groupName, start, end, count) + result, err := getClient().Do(ctx, "XPENDING", streamKey, groupName, start, end, count) if err != nil { return nil, err } @@ -425,7 +425,7 @@ func ClaimPendingMessage(ctx context.Context, streamKey, groupName, consumerName args = append(args, id) } - result, err := redisClient.Do(ctx, "XCLAIM", args...) + result, err := getClient().Do(ctx, "XCLAIM", args...) if err != nil { return nil, err } @@ -479,7 +479,7 @@ func SetSessionLastActive(ctx context.Context, userId string) error { timestamp := gtime.Now().Timestamp() // SETEX key 7200 value (7200秒 = 2小时) - _, err := redisClient.Do(ctx, "SETEX", key, 7200, timestamp) + _, err := getClient().Do(ctx, "SETEX", key, 7200, timestamp) return err } @@ -487,7 +487,7 @@ func SetSessionLastActive(ctx context.Context, userId string) error { // 使用 gredis Get 方法 func GetSessionLastActive(ctx context.Context, userId string) (int64, error) { key := SessionLastActiveKeyPrefix + userId + ":last_active" - result, err := redisClient.Get(ctx, key) + result, err := getClient().Get(ctx, key) if err != nil { return 0, err } @@ -531,7 +531,7 @@ func IsUserActive(ctx context.Context, userId string, seconds int64) (bool, erro // windowSeconds: 时间窗口(秒) func IncrRateLimit(ctx context.Context, key string, windowSeconds int64) (count int64, err error) { fullKey := RateLimitKeyPrefix + key - result, err := redisClient.Do(ctx, "INCR", fullKey) + result, err := getClient().Do(ctx, "INCR", fullKey) if err != nil { return } @@ -539,7 +539,7 @@ func IncrRateLimit(ctx context.Context, key string, windowSeconds int64) (count // 首次设置过期时间 if count == 1 { - redisClient.Do(ctx, "EXPIRE", fullKey, windowSeconds) + getClient().Do(ctx, "EXPIRE", fullKey, windowSeconds) } return } @@ -547,7 +547,7 @@ func IncrRateLimit(ctx context.Context, key string, windowSeconds int64) (count // GetRateLimit 获取当前限流计数 func GetRateLimit(ctx context.Context, key string) (count int64, err error) { fullKey := RateLimitKeyPrefix + key - result, err := redisClient.Get(ctx, fullKey) + result, err := getClient().Get(ctx, fullKey) if err != nil { return } @@ -562,14 +562,14 @@ func GetRateLimit(ctx context.Context, key string) (count int64, err error) { func SetSessionCache(ctx context.Context, tenantId, userId, sessionId string) error { key := SessionLastActiveKeyPrefix + tenantId + ":" + userId + ":session_id" // SETEX key 7200 value (7200秒 = 2小时,与last_active保持一致) - _, err := redisClient.Do(ctx, "SETEX", key, 7200, sessionId) + _, err := getClient().Do(ctx, "SETEX", key, 7200, sessionId) return err } // GetSessionCache 获取缓存的 RAGFlow Session ID(租户+用户隔离) func GetSessionCache(ctx context.Context, tenantId, userId string) (string, error) { key := SessionLastActiveKeyPrefix + tenantId + ":" + userId + ":session_id" - result, err := redisClient.Get(ctx, key) + result, err := getClient().Get(ctx, key) if err != nil { return "", err } @@ -584,7 +584,7 @@ func GetSessionCache(ctx context.Context, tenantId, userId string) (string, erro // DelSessionCache 删除缓存的 RAGFlow Session ID(归档时调用,租户+用户隔离) func DelSessionCache(ctx context.Context, tenantId, userId string) error { key := SessionLastActiveKeyPrefix + tenantId + ":" + userId + ":session_id" - _, err := redisClient.Del(ctx, key) + _, err := getClient().Del(ctx, key) return err } @@ -594,7 +594,7 @@ func DelSessionCache(ctx context.Context, tenantId, userId string) error { // 返回 true 表示获取成功,false 表示锁已被其他节点持有 func TryLock(ctx context.Context, key string, expireSeconds int) bool { // SET key value NX EX expireSeconds - result, err := redisClient.Do(ctx, "SET", key, gtime.Now().String(), "NX", "EX", expireSeconds) + result, err := getClient().Do(ctx, "SET", key, gtime.Now().String(), "NX", "EX", expireSeconds) if err != nil { glog.Errorf(ctx, "获取分布式锁失败: %v", err) return false @@ -604,7 +604,7 @@ func TryLock(ctx context.Context, key string, expireSeconds int) bool { // Unlock 释放分布式锁 func Unlock(ctx context.Context, key string) { - if _, err := redisClient.Del(ctx, key); err != nil { + if _, err := getClient().Del(ctx, key); err != nil { glog.Errorf(ctx, "释放分布式锁失败: %v", err) } } @@ -629,7 +629,7 @@ type UserState struct { // GetUserState 获取用户状态(阶段+计数) func GetUserState(ctx context.Context, userId, platform string) (state *UserState, err error) { key := UserStateKeyPrefix + userId + "_" + platform - result, err := redisClient.Do(ctx, "HGETALL", key) + result, err := getClient().Do(ctx, "HGETALL", key) if err != nil { return } @@ -654,52 +654,52 @@ func GetUserState(ctx context.Context, userId, platform string) (state *UserStat // SetUserStage 设置用户阶段,并刷新过期时间 func SetUserStage(ctx context.Context, userId, platform string, stage int) error { key := UserStateKeyPrefix + userId + "_" + platform - _, err := redisClient.Do(ctx, "HSET", key, "stage", stage) + _, err := getClient().Do(ctx, "HSET", key, "stage", stage) if err != nil { return err } - _, err = redisClient.Do(ctx, "EXPIRE", key, UserStateExpireSeconds) + _, err = getClient().Do(ctx, "EXPIRE", key, UserStateExpireSeconds) return err } // SetUserAccountName 设置用户对应的客服账号名称,并刷新过期时间 func SetUserAccountName(ctx context.Context, userId, platform, accountName string) error { key := UserStateKeyPrefix + userId + "_" + platform - _, err := redisClient.Do(ctx, "HSET", key, "accountName", accountName) + _, err := getClient().Do(ctx, "HSET", key, "accountName", accountName) if err != nil { return err } - _, err = redisClient.Do(ctx, "EXPIRE", key, UserStateExpireSeconds) + _, err = getClient().Do(ctx, "EXPIRE", key, UserStateExpireSeconds) return err } // SetUserDirection 设置用户选择的咨询方向,并刷新过期时间 func SetUserDirection(ctx context.Context, userId, platform, direction string) error { key := UserStateKeyPrefix + userId + "_" + platform - _, err := redisClient.Do(ctx, "HSET", key, "direction", direction) + _, err := getClient().Do(ctx, "HSET", key, "direction", direction) if err != nil { return err } - _, err = redisClient.Do(ctx, "EXPIRE", key, UserStateExpireSeconds) + _, err = getClient().Do(ctx, "EXPIRE", key, UserStateExpireSeconds) return err } // IncrUserCount 增加用户对话计数,返回当前轮数,并刷新过期时间 func IncrUserCount(ctx context.Context, userId, platform string) (count int64, err error) { key := UserStateKeyPrefix + userId + "_" + platform - result, err := redisClient.Do(ctx, "HINCRBY", key, "count", 1) + result, err := getClient().Do(ctx, "HINCRBY", key, "count", 1) if err != nil { return } count = result.Int64() - _, err = redisClient.Do(ctx, "EXPIRE", key, UserStateExpireSeconds) + _, err = getClient().Do(ctx, "EXPIRE", key, UserStateExpireSeconds) return } // ResetUserState 重置用户状态(归档时调用) func ResetUserState(ctx context.Context, userId, platform string) error { key := UserStateKeyPrefix + userId + "_" + platform - _, err := redisClient.Del(ctx, key) + _, err := getClient().Del(ctx, key) return err } @@ -715,18 +715,18 @@ const ( // CacheConversation 缓存单条对话到Redis List(按sessionId存储) func CacheConversation(ctx context.Context, sessionId string, data []byte) error { key := ConversationCacheKeyPrefix + sessionId - _, err := redisClient.Do(ctx, "RPUSH", key, string(data)) + _, err := getClient().Do(ctx, "RPUSH", key, string(data)) if err != nil { return err } - _, err = redisClient.Do(ctx, "EXPIRE", key, ConversationCacheExpireSeconds) + _, err = getClient().Do(ctx, "EXPIRE", key, ConversationCacheExpireSeconds) return err } // GetCachedConversations 获取缓存的对话列表并清空(按sessionId查询) func GetCachedConversations(ctx context.Context, sessionId string) (list []string, err error) { key := ConversationCacheKeyPrefix + sessionId - result, err := redisClient.Do(ctx, "LRANGE", key, 0, -1) + result, err := getClient().Do(ctx, "LRANGE", key, 0, -1) if err != nil { return } @@ -735,14 +735,14 @@ func GetCachedConversations(ctx context.Context, sessionId string) (list []strin } list = result.Strings() // 清空缓存 - redisClient.Del(ctx, key) + getClient().Del(ctx, key) return } // GetCachedConversationCount 获取缓存的对话数量(按sessionId查询) func GetCachedConversationCount(ctx context.Context, sessionId string) (count int64, err error) { key := ConversationCacheKeyPrefix + sessionId - result, err := redisClient.Do(ctx, "LLEN", key) + result, err := getClient().Do(ctx, "LLEN", key) if err != nil { return } @@ -752,7 +752,7 @@ func GetCachedConversationCount(ctx context.Context, sessionId string) (count in // ClearCachedConversations 清空对话缓存(归档时调用,按sessionId) func ClearCachedConversations(ctx context.Context, sessionId string) error { key := ConversationCacheKeyPrefix + sessionId - _, err := redisClient.Del(ctx, key) + _, err := getClient().Del(ctx, key) return err }