diff --git a/mongo/mongo.go b/mongo/mongo.go index 22dc1a4..c510ea5 100644 --- a/mongo/mongo.go +++ b/mongo/mongo.go @@ -109,7 +109,7 @@ func (m *MongoDB) Find(ctx context.Context, filter bson.M, result interface{}, c redisKey := fmt.Sprintf(redis.List, user.TenantId, collection, filterKey, optionsKey) if m.Cache { var resultStr *gvar.Var - resultStr, err = redis.RedisClient().Get(ctx, redisKey) + resultStr, err = redis.RedisClient.Get(ctx, redisKey) if err != nil { return } @@ -168,7 +168,7 @@ func (m *MongoDB) Find(ctx context.Context, filter bson.M, result interface{}, c return } if m.Cache { - err = redis.RedisClient().SetEX(ctx, redisKey, result, int64(time.Hour)) + err = redis.RedisClient.SetEX(ctx, redisKey, result, int64(time.Hour)) if err != nil { return } @@ -200,7 +200,7 @@ func (m *MongoDB) FindOne(ctx context.Context, filter bson.M, result interface{} redisKey := fmt.Sprintf(redis.One, user.TenantId, collection, filterKey) if m.Cache { var resultStr *gvar.Var - resultStr, err = redis.RedisClient().Get(ctx, redisKey) + resultStr, err = redis.RedisClient.Get(ctx, redisKey) if err != nil { return } @@ -221,7 +221,7 @@ func (m *MongoDB) FindOne(ctx context.Context, filter bson.M, result interface{} err = nil } if m.Cache { - err = redis.RedisClient().SetEX(ctx, redisKey, result, int64(time.Hour)) + err = redis.RedisClient.SetEX(ctx, redisKey, result, int64(time.Hour)) if err != nil { return err } @@ -231,23 +231,23 @@ func (m *MongoDB) FindOne(ctx context.Context, filter bson.M, result interface{} func (m *MongoDB) CleanRedis(ctx context.Context, filter bson.M, tenantId interface{}, collection string) (err error) { listKeys := fmt.Sprintf(redis.CleanList, tenantId, collection) - keys, err := redis.RedisClient().Keys(ctx, listKeys) + keys, err := redis.RedisClient.Keys(ctx, listKeys) if err != nil { return } for _, key := range keys { - _, err = redis.RedisClient().Del(ctx, key) + _, err = redis.RedisClient.Del(ctx, key) if err != nil { return } } countKeys := fmt.Sprintf(redis.CleanCount, tenantId, collection) - keys, err = redis.RedisClient().Keys(ctx, countKeys) + keys, err = redis.RedisClient.Keys(ctx, countKeys) if err != nil { return } for _, key := range keys { - _, err = redis.RedisClient().Del(ctx, key) + _, err = redis.RedisClient.Del(ctx, key) if err != nil { return } @@ -256,7 +256,7 @@ func (m *MongoDB) CleanRedis(ctx context.Context, filter bson.M, tenantId interf delete(filter, "tenantId") filterKey := fmt.Sprintf("%+v", filter) oneKey := fmt.Sprintf(redis.One, tenantId, collection, filterKey) - _, err = redis.RedisClient().Del(ctx, oneKey) + _, err = redis.RedisClient.Del(ctx, oneKey) if err != nil { return } @@ -525,7 +525,7 @@ func (m *MongoDB) Count(ctx context.Context, filter bson.M, collection string) ( redisKey := fmt.Sprintf(redis.Count, user.TenantId, collection, filterKey) if m.Cache { var resultStr *gvar.Var - resultStr, err = redis.RedisClient().Get(ctx, redisKey) + resultStr, err = redis.RedisClient.Get(ctx, redisKey) if err != nil { return } @@ -536,7 +536,7 @@ func (m *MongoDB) Count(ctx context.Context, filter bson.M, collection string) ( } count, err = db.Collection(collection).CountDocuments(ctx, filter) if m.Cache { - err = redis.RedisClient().SetEX(ctx, redisKey, count, int64(time.Hour)) + err = redis.RedisClient.SetEX(ctx, redisKey, count, int64(time.Hour)) if err != nil { return } diff --git a/redis/redis.go b/redis/redis.go index aae782b..648bd99 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -2,6 +2,7 @@ package redis import ( "context" + "errors" "strings" "sync" "time" @@ -14,27 +15,60 @@ import ( ) var ( + // redisClient 内部使用的 Redis 客户端(单例模式) redisClient *gredis.Redis redisOnce sync.Once ) -// RedisClient 获取Redis客户端(支持重试3次,每次间隔2秒) -func RedisClient() *gredis.Redis { +// getClient 获取 Redis 客户端(延迟初始化) +func getClient() *gredis.Redis { redisOnce.Do(func() { - for i := 0; i < 3; i++ { - redisClient = g.Redis() - if redisClient != nil { - ctx := context.Background() - if _, err := redisClient.Do(ctx, "PING"); err == nil { - return - } - } - time.Sleep(2 * time.Second) - } + redisClient = g.Redis() }) return redisClient } +// GetRedisClient 获取 Redis 客户端(供外部使用) +func GetRedisClient() *gredis.Redis { + return getClient() +} + +// RedisClient 导出的 Redis 客户端(供 mongo.go 使用,兼容旧代码) +var RedisClient = getClient() + +// Lock 分布式锁 +func Lock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) { + limit := 3 +LOOP: + if limit < 0 { + return false, errors.New("锁重试次数耗尽") + } + limit-- + if val, err := RedisClient.Set(ctx, key, true, gredis.SetOption{ + TTLOption: gredis.TTLOption{ + EX: &expireSeconds, + }, + NX: true, + }); err != nil { + return false, err + } else { + if val.Bool() { + defer func(RedisClient *gredis.Redis, ctx context.Context, key string) { + if _, err = RedisClient.Del(ctx, key); err != nil { + glog.Errorf(ctx, "RedisClient.Del error: %v", err) + } + }(RedisClient, ctx, key) + if err = fn(ctx); err != nil { + return false, err + } + return true, nil + } else { + time.Sleep(time.Second) + goto LOOP + } + } +} + func GetReadStream(ctx context.Context, msg ...QueueMessage) error { for _, t := range msg { err := GetReadFromStream(ctx, t.StreamKey, t.GroupName, t.ConsumerName, t.BatchSize, t.BlockMs, t.AutoAck, t.HandleFunc) @@ -164,9 +198,6 @@ func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName stri RECONNECT: // 先尝试读取pending消息(ID=0),处理积压 - glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP GROUP %s %s COUNT %d BLOCK 0 STREAMS %s 0", - groupName, consumerName, count, streamKey) - result, err := redisClient.Do(execCtx, "XREADGROUP", "GROUP", groupName, consumerName, "COUNT", count, @@ -179,9 +210,6 @@ RECONNECT: // 如果没有pending消息,读取新消息 if result == nil || result.IsEmpty() { - glog.Debugf(ctx, "[DEBUG Redis] 无pending消息,读取新消息 XREADGROUP GROUP %s %s COUNT %d BLOCK %d STREAMS %s >", - groupName, consumerName, count, blockMs, streamKey) - result, err = redisClient.Do(execCtx, "XREADGROUP", "GROUP", groupName, consumerName, "COUNT", count, @@ -193,8 +221,6 @@ RECONNECT: } } - glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP 返回: %+v", result) - // 预分配容量,避免动态扩容 messages := make([]StreamMessage, 0, int(count)) diff --git a/redis/types.go b/redis/types.go index 2ccfbde..f3dc0c7 100644 --- a/redis/types.go +++ b/redis/types.go @@ -116,3 +116,13 @@ func GetHistoryContextLimit() int64 { ctx := context.Background() return g.Cfg().MustGet(ctx, "history.contextLimit", 5).Int64() // 默认5轮对话 } + +// DocSyncMessage 文档同步消息结构(RAGFlow与MongoDB同步) +type DocSyncMessage struct { + DocId string `json:"docId"` // MongoDB文档ID + RagflowDocId string `json:"ragflowDocId"` // RAGFlow文档ID + TenantId string `json:"tenantId"` // 租户ID + DocType string `json:"docType"` // 文档类型:speechcraft/product + Action string `json:"action"` // 操作类型:sync_ragflow_id + Timestamp int64 `json:"timestamp"` // 时间戳 +}