更改 redis方法
This commit is contained in:
404
redis/redis.go
404
redis/redis.go
@@ -2,41 +2,24 @@ package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/v2/database/gredis"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/gogf/gf/v2/util/gconv"
|
||||
)
|
||||
|
||||
var RedisClient *redis.Client
|
||||
// GRedisClient GoFrame gredis 客户端,统一使用
|
||||
var GRedisClient *gredis.Redis
|
||||
|
||||
// RedisClient GRedisClient 的别名,保持向后兼容
|
||||
var RedisClient *gredis.Redis
|
||||
|
||||
func init() {
|
||||
// 从 GoFrame 配置读取 Redis 配置
|
||||
ctx := context.Background()
|
||||
|
||||
// 读取 Redis 配置
|
||||
addr := g.Cfg().MustGet(ctx, "redis.default.address").String()
|
||||
|
||||
password := g.Cfg().MustGet(ctx, "redis.default.pass", "").String()
|
||||
db := g.Cfg().MustGet(ctx, "redis.default.db", 0).Int()
|
||||
|
||||
// 读取超时配置
|
||||
dialTimeout := g.Cfg().MustGet(ctx, "redis.default.dialTimeout", "30s").Duration()
|
||||
readTimeout := g.Cfg().MustGet(ctx, "redis.default.readTimeout", "30s").Duration()
|
||||
writeTimeout := g.Cfg().MustGet(ctx, "redis.default.writeTimeout", "30s").Duration()
|
||||
|
||||
// 创建 Redis 客户端
|
||||
RedisClient = redis.NewClient(&redis.Options{
|
||||
Addr: addr,
|
||||
Password: password,
|
||||
DB: db,
|
||||
DialTimeout: dialTimeout,
|
||||
ReadTimeout: readTimeout,
|
||||
WriteTimeout: writeTimeout,
|
||||
// 不设置 Protocol(让 go-redis 自动协商)
|
||||
// Protocol: 2,
|
||||
})
|
||||
// 初始化 GoFrame gredis 客户端
|
||||
GRedisClient = g.Redis()
|
||||
RedisClient = GRedisClient // 别名指向同一个客户端
|
||||
}
|
||||
|
||||
// Stream 和消费者组常量
|
||||
@@ -56,21 +39,14 @@ type StreamMessage struct {
|
||||
}
|
||||
|
||||
// InitStreamGroup 初始化 Stream 和消费者组
|
||||
// 在应用启动时调用一次,创建 Stream 和消费者组
|
||||
// 使用 GoFrame Do() 方法执行 XGROUP CREATE 命令
|
||||
// 参数:
|
||||
// - streamKey: Stream 键名
|
||||
// - groupName: 消费者组名称
|
||||
//
|
||||
// 返回:error 初始化失败时返回错误
|
||||
// 使用 gredis Do() 方法执行 XGROUP CREATE 命令
|
||||
func InitStreamGroup(ctx context.Context, streamKey, groupName string) error {
|
||||
// 使用 XGroupCreateMkStream 创建消费者组
|
||||
// 如果 Stream 不存在会自动创建 (MKSTREAM)
|
||||
// "0": 从 Stream 开头开始消费
|
||||
err := RedisClient.XGroupCreateMkStream(ctx, streamKey, groupName, "0").Err()
|
||||
// XGROUP CREATE streamKey groupName 0 MKSTREAM
|
||||
_, err := GRedisClient.Do(ctx, "XGROUP", "CREATE", streamKey, groupName, "0", "MKSTREAM")
|
||||
if err != nil {
|
||||
// 如果组已存在,忽略 BUSYGROUP 错误
|
||||
if err.Error() == "BUSYGROUP Consumer Group name already exists" {
|
||||
// 如果组已存在,忽略错误
|
||||
errStr := err.Error()
|
||||
if strings.Contains(errStr, "BUSYGROUP") || strings.Contains(errStr, "already exists") {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
@@ -79,66 +55,94 @@ func InitStreamGroup(ctx context.Context, streamKey, groupName string) error {
|
||||
}
|
||||
|
||||
// AddToStream 将消息添加到 Stream
|
||||
// 用于 Controller 层将 RAGFlow 请求推入 Stream
|
||||
// 参数:
|
||||
// - streamKey: Stream 键名
|
||||
// - values: 消息内容(键值对)
|
||||
//
|
||||
// 返回:
|
||||
// - string: 消息ID
|
||||
// - error: 添加失败时返回错误
|
||||
// 使用 gredis Do() 方法执行 XADD 命令
|
||||
func AddToStream(ctx context.Context, streamKey string, values map[string]interface{}) (string, error) {
|
||||
// 使用 XAdd 添加消息到 Stream
|
||||
messageID, err := RedisClient.XAdd(ctx, &redis.XAddArgs{
|
||||
Stream: streamKey,
|
||||
Values: values,
|
||||
}).Result()
|
||||
// XADD streamKey * field1 value1 field2 value2 ...
|
||||
args := []interface{}{streamKey, "*"} // "*" 自动生成ID
|
||||
for key, val := range values {
|
||||
args = append(args, key, val)
|
||||
}
|
||||
|
||||
result, err := GRedisClient.Do(ctx, "XADD", args...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
// 返回消息ID
|
||||
messageID := result.String()
|
||||
return messageID, nil
|
||||
}
|
||||
|
||||
// ReadFromStream 从 Stream 读取消息(消费者组模式)
|
||||
// 后台 Goroutine 使用此方法从 Stream 中取出请求进行处理
|
||||
// 参数:
|
||||
// - streamKey: Stream 键名
|
||||
// - groupName: 消费者组名称
|
||||
// - consumerName: 消费者名称(唯一标识)
|
||||
// - count: 每次读取的消息数量
|
||||
// - blockMs: 阻塞时间(毫秒),0表示不阻塞
|
||||
//
|
||||
// 返回:
|
||||
// - []StreamMessage: 消息列表
|
||||
// - error: 读取失败时返回错误
|
||||
// 使用 gredis Do() 方法执行 XREADGROUP 命令
|
||||
func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count int64, blockMs int64) ([]StreamMessage, error) {
|
||||
// 使用 XReadGroup 从消费者组读取消息
|
||||
// ">" 表示读取未被消费的新消息(只获取新消息)
|
||||
// 如果使用 "0" 或其他 ID,则返回 Pending 消息(未确认的消息)
|
||||
streams, err := RedisClient.XReadGroup(ctx, &redis.XReadGroupArgs{
|
||||
Group: groupName,
|
||||
Consumer: consumerName,
|
||||
Streams: []string{streamKey, ">"}, // Stream名称 + 起始ID
|
||||
Count: count,
|
||||
Block: time.Duration(blockMs) * time.Millisecond,
|
||||
}).Result()
|
||||
// XREADGROUP GROUP groupName consumerName COUNT count BLOCK blockMs STREAMS streamKey >
|
||||
result, err := GRedisClient.Do(ctx,
|
||||
"XREADGROUP", "GROUP", groupName, consumerName,
|
||||
"COUNT", count,
|
||||
"BLOCK", blockMs,
|
||||
"STREAMS", streamKey, ">",
|
||||
)
|
||||
|
||||
// 处理错误:超时或没有数据时返回 redis.Nil
|
||||
if err != nil {
|
||||
if err == redis.Nil {
|
||||
// 超时或没有数据,返回空数组
|
||||
return []StreamMessage{}, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 解析返回的消息
|
||||
var messages []StreamMessage
|
||||
for _, stream := range streams {
|
||||
for _, msg := range stream.Messages {
|
||||
// 解析返回值
|
||||
// 格式: [[streamKey, [[msgID, [field1, value1, field2, value2, ...]], ...]]]
|
||||
messages := []StreamMessage{}
|
||||
|
||||
if result == nil {
|
||||
// 超时或没有数据
|
||||
return messages, nil
|
||||
}
|
||||
|
||||
// 类型断言:result.Val() 返回 interface{}
|
||||
streamsArray, ok := result.Val().([]interface{})
|
||||
if !ok || len(streamsArray) == 0 {
|
||||
return messages, nil
|
||||
}
|
||||
|
||||
// 遍历每个 stream
|
||||
for _, streamData := range streamsArray {
|
||||
streamArray, ok := streamData.([]interface{})
|
||||
if !ok || len(streamArray) < 2 {
|
||||
continue
|
||||
}
|
||||
|
||||
// streamArray[0] 是 streamKey, streamArray[1] 是消息数组
|
||||
messagesArray, ok := streamArray[1].([]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// 解析每条消息
|
||||
for _, msgData := range messagesArray {
|
||||
msgArray, ok := msgData.([]interface{})
|
||||
if !ok || len(msgArray) < 2 {
|
||||
continue
|
||||
}
|
||||
|
||||
// msgArray[0] 是 ID, msgArray[1] 是字段数组
|
||||
msgID := gconv.String(msgArray[0])
|
||||
fieldsArray, ok := msgArray[1].([]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// 解析字段为 map
|
||||
values := make(map[string]interface{})
|
||||
for i := 0; i < len(fieldsArray); i += 2 {
|
||||
if i+1 < len(fieldsArray) {
|
||||
key := gconv.String(fieldsArray[i])
|
||||
val := fieldsArray[i+1]
|
||||
values[key] = val
|
||||
}
|
||||
}
|
||||
|
||||
messages = append(messages, StreamMessage{
|
||||
ID: msg.ID,
|
||||
Values: msg.Values,
|
||||
ID: msgID,
|
||||
Values: values,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -147,101 +151,125 @@ func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName stri
|
||||
}
|
||||
|
||||
// AckMessage 确认消息已处理
|
||||
// 处理完消息后必须调用此方法确认,否则消息会保留在 Pending List (PEL)
|
||||
// 确认后消息会从 PEL 中移除
|
||||
// 参数:
|
||||
// - streamKey: Stream 键名
|
||||
// - groupName: 消费者组名称
|
||||
// - messageIDs: 要确认的消息ID列表
|
||||
//
|
||||
// 返回:error 确认失败时返回错误
|
||||
// 使用 gredis Do() 方法执行 XACK 命令
|
||||
func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error {
|
||||
// 使用 XAck 确认消息
|
||||
// 返回值是成功确认的消息数量
|
||||
count, err := RedisClient.XAck(ctx, streamKey, groupName, messageIDs...).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
// XACK streamKey groupName messageID1 messageID2 ...
|
||||
args := []interface{}{streamKey, groupName}
|
||||
for _, id := range messageIDs {
|
||||
args = append(args, id)
|
||||
}
|
||||
// 可以检查 count 是否等于 len(messageIDs)
|
||||
_ = count
|
||||
return nil
|
||||
|
||||
_, err := GRedisClient.Do(ctx, "XACK", args...)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetStreamLength 获取 Stream 当前长度
|
||||
// 用于监控 Stream 消息积压情况
|
||||
// 参数:
|
||||
// - streamKey: Stream 键名
|
||||
//
|
||||
// 返回:
|
||||
// - int64: Stream 中消息数量
|
||||
// - error: 操作失败时返回错误
|
||||
// 使用 gredis Do() 方法执行 XLEN 命令
|
||||
func GetStreamLength(ctx context.Context, streamKey string) (int64, error) {
|
||||
// 使用 XLen 获取 Stream 长度
|
||||
length, err := RedisClient.XLen(ctx, streamKey).Result()
|
||||
// XLEN streamKey
|
||||
result, err := GRedisClient.Do(ctx, "XLEN", streamKey)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
length := gconv.Int64(result)
|
||||
return length, nil
|
||||
}
|
||||
|
||||
// GetPendingMessages 获取待处理消息(未确认的消息)
|
||||
// 用于监控和重试失败的消息
|
||||
// 参数:
|
||||
// - streamKey: Stream 键名
|
||||
// - groupName: 消费者组名称
|
||||
// - start: 起始ID,"-" 表示最小ID
|
||||
// - end: 结束ID,"+" 表示最大ID
|
||||
// - count: 返回数量
|
||||
//
|
||||
// 返回:
|
||||
// - []redis.XPendingExt: Pending 消息列表
|
||||
// - error: 操作失败时返回错误
|
||||
func GetPendingMessages(ctx context.Context, streamKey, groupName string, start, end string, count int64) ([]redis.XPendingExt, error) {
|
||||
// 使用 XPendingExt 获取详细的 Pending 消息
|
||||
pending, err := RedisClient.XPendingExt(ctx, &redis.XPendingExtArgs{
|
||||
Stream: streamKey,
|
||||
Group: groupName,
|
||||
Start: start,
|
||||
End: end,
|
||||
Count: count,
|
||||
}).Result()
|
||||
// PendingMessage Pending 消息结构
|
||||
type PendingMessage struct {
|
||||
ID string // 消息ID
|
||||
Consumer string // 消费者名称
|
||||
Idle int64 // 空闲时间(毫秒)
|
||||
RetryCount int64 // 重试次数
|
||||
}
|
||||
|
||||
// GetPendingMessages 获取待处理消息
|
||||
// 使用 gredis Do() 方法执行 XPENDING 命令
|
||||
func GetPendingMessages(ctx context.Context, streamKey, groupName string, start, end string, count int64) ([]PendingMessage, error) {
|
||||
// XPENDING streamKey groupName start end count
|
||||
result, err := GRedisClient.Do(ctx, "XPENDING", streamKey, groupName, start, end, count)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return pending, nil
|
||||
|
||||
if result == nil {
|
||||
return []PendingMessage{}, nil
|
||||
}
|
||||
|
||||
// 解析返回值:[[ID, consumer, idle, retryCount], ...]
|
||||
pendingArray, ok := result.Val().([]interface{})
|
||||
if !ok {
|
||||
return []PendingMessage{}, nil
|
||||
}
|
||||
|
||||
var messages []PendingMessage
|
||||
for _, item := range pendingArray {
|
||||
itemArray, ok := item.([]interface{})
|
||||
if !ok || len(itemArray) < 4 {
|
||||
continue
|
||||
}
|
||||
|
||||
messages = append(messages, PendingMessage{
|
||||
ID: gconv.String(itemArray[0]),
|
||||
Consumer: gconv.String(itemArray[1]),
|
||||
Idle: gconv.Int64(itemArray[2]),
|
||||
RetryCount: gconv.Int64(itemArray[3]),
|
||||
})
|
||||
}
|
||||
|
||||
return messages, nil
|
||||
}
|
||||
|
||||
// ClaimPendingMessage 认领超时的 Pending 消息
|
||||
// 当某个消费者故障后,其他消费者可以认领其未完成的消息
|
||||
// 参数:
|
||||
// - streamKey: Stream 键名
|
||||
// - groupName: 消费者组名称
|
||||
// - consumerName: 新消费者名称
|
||||
// - minIdleTime: 消息空闲时间(毫秒),超过此时间才能被认领
|
||||
// - messageIDs: 要认领的消息ID列表
|
||||
//
|
||||
// 返回:
|
||||
// - []StreamMessage: 认领的消息列表
|
||||
// - error: 操作失败时返回错误
|
||||
// 使用 gredis Do() 方法执行 XCLAIM 命令
|
||||
func ClaimPendingMessage(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, messageIDs ...string) ([]StreamMessage, error) {
|
||||
// 使用 XClaim 认领消息
|
||||
msgs, err := RedisClient.XClaim(ctx, &redis.XClaimArgs{
|
||||
Stream: streamKey,
|
||||
Group: groupName,
|
||||
Consumer: consumerName,
|
||||
MinIdle: time.Duration(minIdleTime) * time.Millisecond,
|
||||
Messages: messageIDs,
|
||||
}).Result()
|
||||
// XCLAIM streamKey groupName consumerName minIdleTime messageID1 messageID2 ...
|
||||
args := []interface{}{streamKey, groupName, consumerName, minIdleTime}
|
||||
for _, id := range messageIDs {
|
||||
args = append(args, id)
|
||||
}
|
||||
|
||||
result, err := GRedisClient.Do(ctx, "XCLAIM", args...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 转换为 StreamMessage
|
||||
if result == nil {
|
||||
return []StreamMessage{}, nil
|
||||
}
|
||||
|
||||
// 解析返回值:类似 XREADGROUP
|
||||
messagesArray, ok := result.Val().([]interface{})
|
||||
if !ok {
|
||||
return []StreamMessage{}, nil
|
||||
}
|
||||
|
||||
var messages []StreamMessage
|
||||
for _, msg := range msgs {
|
||||
for _, msgData := range messagesArray {
|
||||
msgArray, ok := msgData.([]interface{})
|
||||
if !ok || len(msgArray) < 2 {
|
||||
continue
|
||||
}
|
||||
|
||||
msgID := gconv.String(msgArray[0])
|
||||
fieldsArray, ok := msgArray[1].([]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
values := make(map[string]interface{})
|
||||
for i := 0; i < len(fieldsArray); i += 2 {
|
||||
if i+1 < len(fieldsArray) {
|
||||
key := gconv.String(fieldsArray[i])
|
||||
val := fieldsArray[i+1]
|
||||
values[key] = val
|
||||
}
|
||||
}
|
||||
|
||||
messages = append(messages, StreamMessage{
|
||||
ID: msg.ID,
|
||||
Values: msg.Values,
|
||||
ID: msgID,
|
||||
Values: values,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -249,42 +277,30 @@ func ClaimPendingMessage(ctx context.Context, streamKey, groupName, consumerName
|
||||
}
|
||||
|
||||
// SetSessionLastActive 设置用户最后活跃时间
|
||||
// 用于控制是否发送追问:用户回复后更新活跃时间,避免重复追问
|
||||
// 过期时间:2小时,超过2小时未活跃的记录会自动删除
|
||||
// 参数:
|
||||
// - userId: 用户ID
|
||||
//
|
||||
// 返回:error 设置失败时返回错误
|
||||
// 使用 gredis SetEX 方法
|
||||
func SetSessionLastActive(ctx context.Context, userId string) error {
|
||||
key := SessionLastActiveKeyPrefix + userId + ":last_active"
|
||||
timestamp := time.Now().Unix()
|
||||
|
||||
// 设置过期时间为 2 小时
|
||||
return RedisClient.Set(ctx, key, timestamp, 2*time.Hour).Err()
|
||||
// SETEX key 7200 value (7200秒 = 2小时)
|
||||
_, err := GRedisClient.Do(ctx, "SETEX", key, 7200, timestamp)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetSessionLastActive 获取用户最后活跃时间
|
||||
// 参数:
|
||||
// - userId: 用户ID
|
||||
//
|
||||
// 返回:
|
||||
// - int64: Unix时间戳,未找到返回0
|
||||
// - error: 操作失败时返回错误
|
||||
// 使用 gredis Get 方法
|
||||
func GetSessionLastActive(ctx context.Context, userId string) (int64, error) {
|
||||
key := SessionLastActiveKeyPrefix + userId + ":last_active"
|
||||
result, err := RedisClient.Get(ctx, key).Result()
|
||||
if err == redis.Nil {
|
||||
return 0, nil // 未找到返回 0
|
||||
}
|
||||
result, err := GRedisClient.Get(ctx, key)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// 将字符串转换为 int64
|
||||
timestamp, err := strconv.ParseInt(result, 10, 64)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
if result.IsEmpty() {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
timestamp := gconv.Int64(result.Val())
|
||||
return timestamp, nil
|
||||
}
|
||||
|
||||
@@ -312,35 +328,27 @@ func IsUserActive(ctx context.Context, userId string, seconds int64) (bool, erro
|
||||
}
|
||||
|
||||
// SetSessionCache 缓存用户的 RAGFlow Session ID
|
||||
// 避免每次请求都创建新 Session,提高性能
|
||||
// 过期时间:7天,超过7天未使用的Session会自动清理
|
||||
// 参数:
|
||||
// - userId: 用户ID
|
||||
// - sessionId: RAGFlow返回的Session ID
|
||||
//
|
||||
// 返回:error 设置失败时返回错误
|
||||
// 使用 gredis SetEX 方法
|
||||
func SetSessionCache(ctx context.Context, userId, sessionId string) error {
|
||||
key := SessionLastActiveKeyPrefix + userId + ":session_id"
|
||||
return RedisClient.Set(ctx, key, sessionId, 7*24*time.Hour).Err()
|
||||
|
||||
// SETEX key 604800 value (604800秒 = 7天)
|
||||
_, err := GRedisClient.Do(ctx, "SETEX", key, 604800, sessionId)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetSessionCache 获取缓存的 RAGFlow Session ID
|
||||
// 如果缓存中存在则直接使用,不存在则需要创建新Session
|
||||
// 参数:
|
||||
// - userId: 用户ID
|
||||
//
|
||||
// 返回:
|
||||
// - string: Session ID,未找到返回空字符串
|
||||
// - error: 操作失败时返回错误
|
||||
// 使用 gredis Get 方法
|
||||
func GetSessionCache(ctx context.Context, userId string) (string, error) {
|
||||
key := SessionLastActiveKeyPrefix + userId + ":session_id"
|
||||
result, err := RedisClient.Get(ctx, key).Result()
|
||||
if err == redis.Nil {
|
||||
return "", nil // 未找到返回空字符串
|
||||
}
|
||||
result, err := GRedisClient.Get(ctx, key)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return result, nil
|
||||
if result.IsEmpty() {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
return result.String(), nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user