diff --git a/ragflow/worker_pool.go b/ragflow/worker_pool.go index 682786f..8c46c47 100644 --- a/ragflow/worker_pool.go +++ b/ragflow/worker_pool.go @@ -88,57 +88,70 @@ func (w *WorkerPool) PrintStats(ctx context.Context) { glog.Infof(ctx, "协程池统计 - 池大小: %d, 等待任务: %d", stats.PoolSize, stats.Jobs) } -// QueueProcessor 队列处理器,从 Redis 队列中取出任务并提交到协程池 +// QueueProcessor Stream 处理器,从 Redis Stream 中取出任务并提交到协程池 type QueueProcessor struct { - pool *WorkerPool - queueKey string - timeout int - stopChan chan struct{} - handleFunc func(ctx context.Context, message string) error + pool *WorkerPool + streamKey string // Stream 键名 + groupName string // 消费者组名称 + consumerName string // 消费者名称 + timeout int64 // 阻塞超时时间(毫秒) + batchSize int64 // 每次读取的消息数量 + stopChan chan struct{} + handleFunc func(ctx context.Context, message map[string]interface{}) error } -// NewQueueProcessor 创建队列处理器 +// NewQueueProcessor 创建 Stream 处理器 // 参数: // - pool: 协程池 -// - queueKey: Redis 队列键名 -// - timeout: 从队列取消息的超时时间(秒) +// - streamKey: Redis Stream 键名 +// - groupName: 消费者组名称 +// - consumerName: 消费者名称(唯一标识) +// - timeout: 从 Stream 取消息的超时时间(毫秒) +// - batchSize: 每次读取的消息数量 // - handleFunc: 消息处理函数 -func NewQueueProcessor(pool *WorkerPool, queueKey string, timeout int, handleFunc func(ctx context.Context, message string) error) *QueueProcessor { +func NewQueueProcessor(pool *WorkerPool, streamKey, groupName, consumerName string, timeout int64, batchSize int64, handleFunc func(ctx context.Context, message map[string]interface{}) error) *QueueProcessor { return &QueueProcessor{ - pool: pool, - queueKey: queueKey, - timeout: timeout, - stopChan: make(chan struct{}), - handleFunc: handleFunc, + pool: pool, + streamKey: streamKey, + groupName: groupName, + consumerName: consumerName, + timeout: timeout, + batchSize: batchSize, + stopChan: make(chan struct{}), + handleFunc: handleFunc, } } -// Start 启动队列处理器 -// 会阻塞运行,持续从 Redis 队列中取出消息并提交到协程池处理 +// Start 启动 Stream 处理器 +// 会阻塞运行,持续从 Redis Stream 中取出消息并提交到协程池处理 func (q *QueueProcessor) Start(ctx context.Context) error { - glog.Infof(ctx, "队列处理器启动 - 队列: %s, 超时: %ds", q.queueKey, q.timeout) + glog.Infof(ctx, "Stream 处理器启动 - Stream: %s, 消费者组: %s, 消费者: %s, 超时: %dms", + q.streamKey, q.groupName, q.consumerName, q.timeout) for { select { case <-q.stopChan: - glog.Info(ctx, "队列处理器收到停止信号") + glog.Info(ctx, "Stream 处理器收到停止信号") return nil default: - // 从 Redis 队列中取出消息 - message, err := q.fetchMessage(ctx) + // 从 Redis Stream 中读取消息 + messages, err := q.fetchMessages(ctx) if err != nil { - glog.Errorf(ctx, "从队列取消息失败: %v", err) + glog.Errorf(ctx, "从 Stream 读取消息失败: %v", err) continue } - // 队列为空,继续等待 - if message == "" { + // 没有新消息,继续等待 + if len(messages) == 0 { continue } - // 提交到协程池处理 - if err := q.submitTask(ctx, message); err != nil { - glog.Errorf(ctx, "提交任务到协程池失败: %v", err) + // 处理每条消息 + for _, msg := range messages { + // 提交到协程池处理 + if err := q.submitTask(ctx, msg); err != nil { + glog.Errorf(ctx, "提交任务到协程池失败: %v, 消息ID: %s", err, msg.ID) + } } } } @@ -149,17 +162,26 @@ func (q *QueueProcessor) Stop() { close(q.stopChan) } -// fetchMessage 从 Redis 队列中取出消息 -func (q *QueueProcessor) fetchMessage(ctx context.Context) (string, error) { - // 调用 Redis 队列的 PopFromQueue 方法从队列中取出消息 - return redis.PopFromQueue(ctx, q.queueKey, q.timeout) +// fetchMessages 从 Redis Stream 中读取消息 +func (q *QueueProcessor) fetchMessages(ctx context.Context) ([]redis.StreamMessage, error) { + // 从消费者组读取消息 + return redis.ReadFromStream(ctx, q.streamKey, q.groupName, q.consumerName, q.batchSize, q.timeout) } // submitTask 将消息处理任务提交到协程池 -func (q *QueueProcessor) submitTask(ctx context.Context, message string) error { +func (q *QueueProcessor) submitTask(ctx context.Context, message redis.StreamMessage) error { return q.pool.Submit(ctx, func(ctx context.Context) { - if err := q.handleFunc(ctx, message); err != nil { - glog.Errorf(ctx, "处理消息失败: %v, 消息: %s", err, message) + // 处理消息 + if err := q.handleFunc(ctx, message.Values); err != nil { + glog.Errorf(ctx, "处理消息失败: %v, 消息ID: %s", err, message.ID) + return + } + + // 处理成功后确认消息 + if err := redis.AckMessage(ctx, q.streamKey, q.groupName, message.ID); err != nil { + glog.Errorf(ctx, "确认消息失败: %v, 消息ID: %s", err, message.ID) + } else { + glog.Debugf(ctx, "消息处理完成并已确认: %s", message.ID) } }) } diff --git a/redis/redis.go b/redis/redis.go index fb197dd..4e339dd 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -2,81 +2,250 @@ package redis import ( "context" + "strconv" "time" - "github.com/gogf/gf/v2/database/gredis" "github.com/gogf/gf/v2/frame/g" + "github.com/redis/go-redis/v9" ) -var RedisClient *gredis.Redis +var RedisClient *redis.Client func init() { - RedisClient = g.Redis() + // 从 GoFrame 配置读取 Redis 配置 + ctx := context.Background() + + // 读取 Redis 配置 + addr := g.Cfg().MustGet(ctx, "redis.default.address").String() + + password := g.Cfg().MustGet(ctx, "redis.default.pass", "").String() + db := g.Cfg().MustGet(ctx, "redis.default.db", 0).Int() + + // 读取超时配置 + dialTimeout := g.Cfg().MustGet(ctx, "redis.default.dialTimeout", "30s").Duration() + readTimeout := g.Cfg().MustGet(ctx, "redis.default.readTimeout", "30s").Duration() + writeTimeout := g.Cfg().MustGet(ctx, "redis.default.writeTimeout", "30s").Duration() + + // 创建 Redis 客户端 + RedisClient = redis.NewClient(&redis.Options{ + Addr: addr, + Password: password, + DB: db, + DialTimeout: dialTimeout, + ReadTimeout: readTimeout, + WriteTimeout: writeTimeout, + // 不设置 Protocol(让 go-redis 自动协商) + // Protocol: 2, + }) } -// 队列操作常量 +// Stream 和消费者组常量 const ( - // RAGFlow 请求队列 Key - RAGFlowRequestQueueKey = "ragflow:request:queue" + // RAGFlow 请求 Stream Key + RAGFlowRequestStreamKey = "ragflow:request:stream" + // RAGFlow 消费者组名称 + RAGFlowConsumerGroup = "ragflow:consumer:group" // 会话最后活跃时间 Key 前缀 SessionLastActiveKeyPrefix = "ragflow:session:" ) -// PushToQueue 将消息推入队列(LPUSH从左侧入队) -// 用于 Controller 层将 RAGFlow 请求推入队列,由后台 Goroutine 处理 -// 参数: -// - queueKey: 队列键名 -// - message: 要推入的消息内容 -// -// 返回:error 推入失败时返回错误 -func PushToQueue(ctx context.Context, queueKey string, message interface{}) error { - _, err := RedisClient.Do(ctx, "LPUSH", queueKey, message) - return err +// StreamMessage Redis Stream 消息结构 +type StreamMessage struct { + ID string // 消息ID(自动生成) + Values map[string]interface{} // 消息内容 } -// PopFromQueue 从队列中阻塞取出消息(BRPOP从右侧出队) -// 后台 Goroutine 使用此方法从队列中取出请求进行处理 +// InitStreamGroup 初始化 Stream 和消费者组 +// 在应用启动时调用一次,创建 Stream 和消费者组 +// 使用 GoFrame Do() 方法执行 XGROUP CREATE 命令 // 参数: -// - queueKey: 队列键名 -// - timeout: 阻塞超时时间(秒),0表示永久阻塞 +// - streamKey: Stream 键名 +// - groupName: 消费者组名称 +// +// 返回:error 初始化失败时返回错误 +func InitStreamGroup(ctx context.Context, streamKey, groupName string) error { + // 使用 XGroupCreateMkStream 创建消费者组 + // 如果 Stream 不存在会自动创建 (MKSTREAM) + // "0": 从 Stream 开头开始消费 + err := RedisClient.XGroupCreateMkStream(ctx, streamKey, groupName, "0").Err() + if err != nil { + // 如果组已存在,忽略 BUSYGROUP 错误 + if err.Error() == "BUSYGROUP Consumer Group name already exists" { + return nil + } + return err + } + return nil +} + +// AddToStream 将消息添加到 Stream +// 用于 Controller 层将 RAGFlow 请求推入 Stream +// 参数: +// - streamKey: Stream 键名 +// - values: 消息内容(键值对) // // 返回: -// - string: 取出的消息内容,超时或队列为空返回空字符串 -// - error: 操作失败时返回错误 -func PopFromQueue(ctx context.Context, queueKey string, timeout int) (string, error) { - result, err := RedisClient.Do(ctx, "BRPOP", queueKey, timeout) +// - string: 消息ID +// - error: 添加失败时返回错误 +func AddToStream(ctx context.Context, streamKey string, values map[string]interface{}) (string, error) { + // 使用 XAdd 添加消息到 Stream + messageID, err := RedisClient.XAdd(ctx, &redis.XAddArgs{ + Stream: streamKey, + Values: values, + }).Result() if err != nil { return "", err } - - // BRPOP 返回 [key, value],我们需要取 value - if result == nil { - return "", nil // 超时返回空 - } - - // GoFrame gredis 返回的是 *gvar.Var 类型 - arr := result.Strings() - if len(arr) >= 2 { - return arr[1], nil // arr[0] 是 key,arr[1] 是 value - } - - return "", nil + return messageID, nil } -// GetQueueLength 获取队列当前长度 -// 用于监控队列积压情况 +// ReadFromStream 从 Stream 读取消息(消费者组模式) +// 后台 Goroutine 使用此方法从 Stream 中取出请求进行处理 // 参数: -// - queueKey: 队列键名 +// - streamKey: Stream 键名 +// - groupName: 消费者组名称 +// - consumerName: 消费者名称(唯一标识) +// - count: 每次读取的消息数量 +// - blockMs: 阻塞时间(毫秒),0表示不阻塞 // // 返回: -// - int64: 队列中消息数量 +// - []StreamMessage: 消息列表 +// - error: 读取失败时返回错误 +func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count int64, blockMs int64) ([]StreamMessage, error) { + // 使用 XReadGroup 从消费者组读取消息 + // ">" 表示读取未被消费的新消息(只获取新消息) + // 如果使用 "0" 或其他 ID,则返回 Pending 消息(未确认的消息) + streams, err := RedisClient.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{streamKey, ">"}, // Stream名称 + 起始ID + Count: count, + Block: time.Duration(blockMs) * time.Millisecond, + }).Result() + + // 处理错误:超时或没有数据时返回 redis.Nil + if err != nil { + if err == redis.Nil { + // 超时或没有数据,返回空数组 + return []StreamMessage{}, nil + } + return nil, err + } + + // 解析返回的消息 + var messages []StreamMessage + for _, stream := range streams { + for _, msg := range stream.Messages { + messages = append(messages, StreamMessage{ + ID: msg.ID, + Values: msg.Values, + }) + } + } + + return messages, nil +} + +// AckMessage 确认消息已处理 +// 处理完消息后必须调用此方法确认,否则消息会保留在 Pending List (PEL) +// 确认后消息会从 PEL 中移除 +// 参数: +// - streamKey: Stream 键名 +// - groupName: 消费者组名称 +// - messageIDs: 要确认的消息ID列表 +// +// 返回:error 确认失败时返回错误 +func AckMessage(ctx context.Context, streamKey, groupName string, messageIDs ...string) error { + // 使用 XAck 确认消息 + // 返回值是成功确认的消息数量 + count, err := RedisClient.XAck(ctx, streamKey, groupName, messageIDs...).Result() + if err != nil { + return err + } + // 可以检查 count 是否等于 len(messageIDs) + _ = count + return nil +} + +// GetStreamLength 获取 Stream 当前长度 +// 用于监控 Stream 消息积压情况 +// 参数: +// - streamKey: Stream 键名 +// +// 返回: +// - int64: Stream 中消息数量 // - error: 操作失败时返回错误 -func GetQueueLength(ctx context.Context, queueKey string) (int64, error) { - result, err := RedisClient.Do(ctx, "LLEN", queueKey) +func GetStreamLength(ctx context.Context, streamKey string) (int64, error) { + // 使用 XLen 获取 Stream 长度 + length, err := RedisClient.XLen(ctx, streamKey).Result() if err != nil { return 0, err } - return result.Int64(), nil + return length, nil +} + +// GetPendingMessages 获取待处理消息(未确认的消息) +// 用于监控和重试失败的消息 +// 参数: +// - streamKey: Stream 键名 +// - groupName: 消费者组名称 +// - start: 起始ID,"-" 表示最小ID +// - end: 结束ID,"+" 表示最大ID +// - count: 返回数量 +// +// 返回: +// - []redis.XPendingExt: Pending 消息列表 +// - error: 操作失败时返回错误 +func GetPendingMessages(ctx context.Context, streamKey, groupName string, start, end string, count int64) ([]redis.XPendingExt, error) { + // 使用 XPendingExt 获取详细的 Pending 消息 + pending, err := RedisClient.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: streamKey, + Group: groupName, + Start: start, + End: end, + Count: count, + }).Result() + if err != nil { + return nil, err + } + return pending, nil +} + +// ClaimPendingMessage 认领超时的 Pending 消息 +// 当某个消费者故障后,其他消费者可以认领其未完成的消息 +// 参数: +// - streamKey: Stream 键名 +// - groupName: 消费者组名称 +// - consumerName: 新消费者名称 +// - minIdleTime: 消息空闲时间(毫秒),超过此时间才能被认领 +// - messageIDs: 要认领的消息ID列表 +// +// 返回: +// - []StreamMessage: 认领的消息列表 +// - error: 操作失败时返回错误 +func ClaimPendingMessage(ctx context.Context, streamKey, groupName, consumerName string, minIdleTime int64, messageIDs ...string) ([]StreamMessage, error) { + // 使用 XClaim 认领消息 + msgs, err := RedisClient.XClaim(ctx, &redis.XClaimArgs{ + Stream: streamKey, + Group: groupName, + Consumer: consumerName, + MinIdle: time.Duration(minIdleTime) * time.Millisecond, + Messages: messageIDs, + }).Result() + if err != nil { + return nil, err + } + + // 转换为 StreamMessage + var messages []StreamMessage + for _, msg := range msgs { + messages = append(messages, StreamMessage{ + ID: msg.ID, + Values: msg.Values, + }) + } + + return messages, nil } // SetSessionLastActive 设置用户最后活跃时间 @@ -91,7 +260,7 @@ func SetSessionLastActive(ctx context.Context, userId string) error { timestamp := time.Now().Unix() // 设置过期时间为 2 小时 - return RedisClient.SetEX(ctx, key, timestamp, 7200) + return RedisClient.Set(ctx, key, timestamp, 2*time.Hour).Err() } // GetSessionLastActive 获取用户最后活跃时间 @@ -103,16 +272,20 @@ func SetSessionLastActive(ctx context.Context, userId string) error { // - error: 操作失败时返回错误 func GetSessionLastActive(ctx context.Context, userId string) (int64, error) { key := SessionLastActiveKeyPrefix + userId + ":last_active" - result, err := RedisClient.Get(ctx, key) + result, err := RedisClient.Get(ctx, key).Result() + if err == redis.Nil { + return 0, nil // 未找到返回 0 + } if err != nil { return 0, err } - if result.IsNil() { - return 0, nil // 未找到返回 0 + // 将字符串转换为 int64 + timestamp, err := strconv.ParseInt(result, 10, 64) + if err != nil { + return 0, err } - - return result.Int64(), nil + return timestamp, nil } // IsUserActive 检查用户是否在指定时间范围内活跃过 @@ -148,7 +321,7 @@ func IsUserActive(ctx context.Context, userId string, seconds int64) (bool, erro // 返回:error 设置失败时返回错误 func SetSessionCache(ctx context.Context, userId, sessionId string) error { key := SessionLastActiveKeyPrefix + userId + ":session_id" - return RedisClient.SetEX(ctx, key, sessionId, 7*24*3600) + return RedisClient.Set(ctx, key, sessionId, 7*24*time.Hour).Err() } // GetSessionCache 获取缓存的 RAGFlow Session ID @@ -161,14 +334,13 @@ func SetSessionCache(ctx context.Context, userId, sessionId string) error { // - error: 操作失败时返回错误 func GetSessionCache(ctx context.Context, userId string) (string, error) { key := SessionLastActiveKeyPrefix + userId + ":session_id" - result, err := RedisClient.Get(ctx, key) + result, err := RedisClient.Get(ctx, key).Result() + if err == redis.Nil { + return "", nil // 未找到返回空字符串 + } if err != nil { return "", err } - if result.IsNil() { - return "", nil - } - - return result.String(), nil + return result, nil }