diff --git a/mongo/mongo.go b/mongo/mongo.go index e1e1b91..711b0c0 100644 --- a/mongo/mongo.go +++ b/mongo/mongo.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/gogf/gf/v2/container/gvar" "strings" "sync" "time" @@ -317,7 +318,7 @@ func GetTenantInfo(ctx context.Context) (user do.User, err error) { } // Find 查询多条记录 -func Find(ctx context.Context, filter bson.M, result interface{}, collection string, opts ...options.Lister[options.FindOptions]) (err error) { +func Find(ctx context.Context, NoCache bool, filter bson.M, result interface{}, collection string, opts ...options.Lister[options.FindOptions]) (err error) { if err = utils.ValidStructPtr(result); err != nil { return } @@ -328,34 +329,39 @@ func Find(ctx context.Context, filter bson.M, result interface{}, collection str filter["isDeleted"] = false filterMap := utils.OrderMap(filter) optsMap := listOptionsToMap(ctx, opts...) - redisKey := fmt.Sprintf(consts.List, user.TenantId, collection, gconv.String(filterMap), gconv.String(optsMap)) - resultStr, err := redis.RedisClient.Get(ctx, redisKey) - if err != nil { - return - } - if !g.IsEmpty(resultStr) { - err = gconv.Scan(resultStr, result) + redisKey := "" + if !NoCache { + redisKey = fmt.Sprintf(consts.List, user.TenantId, collection, gconv.String(filterMap), gconv.String(optsMap)) + var resultStr *gvar.Var + resultStr, err = redis.RedisClient.Get(ctx, redisKey) if err != nil { - return err + return + } + if !g.IsEmpty(resultStr) { + err = gconv.Scan(resultStr, result) + if err != nil { + return err + } + return } - return } - filter["tenantId"] = user.TenantId cur, err := db.Collection(collection).Find(ctx, filter, opts...) if err != nil { return } err = cur.All(ctx, result) - err = redis.RedisClient.SetEX(ctx, redisKey, result, int64(time.Hour)) - if err != nil { - return err + if !NoCache { + err = redis.RedisClient.SetEX(ctx, redisKey, result, int64(time.Hour)) + if err != nil { + return err + } } return } // FindOne 查询1条记录 -func FindOne(ctx context.Context, filter bson.M, result interface{}, collection string, opts ...options.Lister[options.FindOneOptions]) (err error) { +func FindOne(ctx context.Context, NoCache bool, filter bson.M, result interface{}, collection string, opts ...options.Lister[options.FindOneOptions]) (err error) { if len(filter) == 0 { err = gerror.New("缺少查询条件") return @@ -369,27 +375,35 @@ func FindOne(ctx context.Context, filter bson.M, result interface{}, collection } filter["isDeleted"] = false filterMap := utils.OrderMap(filter) - redisKey := fmt.Sprintf(consts.One, user.TenantId, collection, gconv.String(filterMap)) - resultStr, err := redis.RedisClient.Get(ctx, redisKey) - if err != nil { - return - } - if !g.IsEmpty(resultStr) { - err = gconv.Scan(resultStr, result) + redisKey := "" + if !NoCache { + redisKey := fmt.Sprintf(consts.One, user.TenantId, collection, gconv.String(filterMap)) + var resultStr *gvar.Var + resultStr, err = redis.RedisClient.Get(ctx, redisKey) if err != nil { - return err + return + } + if !g.IsEmpty(resultStr) { + err = gconv.Scan(resultStr, result) + if err != nil { + return err + } + return } - return } - filter["tenantId"] = user.TenantId + if !g.IsEmpty(user.TenantId) { + filter["tenantId"] = user.TenantId + } cur := db.Collection(collection).FindOne(ctx, filter, opts...) err = cur.Decode(result) if errors.Is(err, mongo.ErrNoDocuments) { err = nil } - err = redis.RedisClient.SetEX(ctx, redisKey, result, int64(time.Hour)) - if err != nil { - return err + if !NoCache { + err = redis.RedisClient.SetEX(ctx, redisKey, result, int64(time.Hour)) + if err != nil { + return err + } } return } @@ -475,13 +489,61 @@ func Update(ctx context.Context, filter bson.M, update bson.M, collection string return } +// RandomSoftDelete 随机软删除个文档的 _id +func RandomSoftDelete(ctx context.Context, limit int, collection string, opts ...options.Lister[options.UpdateManyOptions]) (result *mongo.UpdateResult, err error) { + // 步骤 1: 使用聚合管道的 $sample 操作符随机抽取5个文档的 _id + pipeline := mongo.Pipeline{ + // 阶段1: 为每个文档添加一个 0-1 之间的随机数字段 'random' + bson.D{{Key: "$addFields", Value: bson.D{{Key: "random", Value: bson.M{"$rand": bson.M{}}}}}}, + // 阶段1: 匹配所有未删除的文档 + bson.D{{Key: "$match", Value: bson.D{{Key: "isDeleted", Value: false}}}}, + // 阶段2: 按随机数降序排序 + bson.D{{Key: "$sort", Value: bson.D{{Key: "random", Value: -1}}}}, + // 阶段3: 只取前5个 + bson.D{{Key: "$limit", Value: limit}}, + // 阶段4: 只投影 _id + bson.D{{Key: "$project", Value: bson.D{{Key: "_id", Value: 1}}}}, + } + cursor, err := db.Collection(collection).Aggregate(ctx, pipeline) + if err != nil { + return + } + defer cursor.Close(ctx) + // 步骤 2: 从聚合结果中提取 _id 到一个切片中 + var idsToUpdate []bson.ObjectID + for cursor.Next(ctx) { + var result bson.M + if err := cursor.Decode(&result); err != nil { + return nil, err + } + // 将 bson.M 中的 _id 断言为 primitive.ObjectID + id := result["_id"].(bson.ObjectID) + idsToUpdate = append(idsToUpdate, id) + } + if err := cursor.Err(); err != nil { + return nil, err + } + fmt.Printf("准备更新的随机文档ID: %v\n", idsToUpdate) + // 步骤 3: 使用 $in 操作符和 UpdateMany 批量更新选定的文档 + if len(idsToUpdate) > 0 { + // 过滤条件:匹配 idsToUpdate 切片中的任意一个 _id + filter := bson.D{{Key: "_id", Value: bson.D{{Key: "$in", Value: idsToUpdate}}}} + // 更新操作:使用 $set 修改字段 + update := bson.D{{Key: "$set", Value: bson.D{{Key: "isDeleted", Value: true}}}} + _, err = db.Collection(collection).UpdateMany(ctx, filter, update) + if err != nil { + return + } + } + return +} + // SaveOrUpdate 批量增加或修改 func SaveOrUpdate(ctx context.Context, filter []bson.M, update []bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (result *mongo.BulkWriteResult, err error) { if len(filter) == 0 || len(update) == 0 { err = gerror.New("缺少查询条件或更新数据") return } - if len(filter) != len(update) { err = gerror.New("查询条件和更新数据的数量必须一致") return @@ -490,17 +552,14 @@ func SaveOrUpdate(ctx context.Context, filter []bson.M, update []bson.M, collect if err != nil { return } - // 构建批量操作模型 var models []mongo.WriteModel - for i := 0; i < len(filter); i++ { // 处理过滤器 filter[i]["isDeleted"] = false if !g.IsEmpty(user.TenantId) { filter[i]["tenantId"] = user.TenantId } - // 处理更新数据 if setDoc, exists := update[i]["$set"].(bson.M); exists { if !g.IsEmpty(user.UserName) { @@ -516,13 +575,11 @@ func SaveOrUpdate(ctx context.Context, filter []bson.M, update []bson.M, collect setDoc["updatedAt"] = gtime.Now().Time update[i]["$set"] = setDoc } - // 创建更新操作模型 updateModel := mongo.NewUpdateOneModel() updateModel.SetFilter(filter[i]) updateModel.SetUpdate(update[i]) updateModel.SetUpsert(true) // 默认不插入新文档 - // 处理选项参数 if len(opts) > 0 { for _, opt := range opts { @@ -536,17 +593,14 @@ func SaveOrUpdate(ctx context.Context, filter []bson.M, update []bson.M, collect } } } - models = append(models, updateModel) } - // 执行批量操作,无序执行提高性能 bulkOpts := options.BulkWrite().SetOrdered(false) bulkResult, err := db.Collection(collection).BulkWrite(ctx, models, bulkOpts) if err != nil { return nil, err } - // 清理相关缓存 for _, filterItem := range filter { err = cleanRedis(ctx, filterItem, user.TenantId, collection) @@ -591,27 +645,33 @@ func Insert(ctx context.Context, documents []interface{}, collection string, opt } // Count 查询总数 -func Count(ctx context.Context, filter bson.M, collection string) (count int64, err error) { +func Count(ctx context.Context, NoCache bool, filter bson.M, collection string) (count int64, err error) { user, err := GetTenantInfo(ctx) if err != nil { return } filter["isDeleted"] = false filterMap := utils.OrderMap(filter) - redisKey := fmt.Sprintf(consts.Count, user.TenantId, collection, gconv.String(filterMap)) - resultStr, err := redis.RedisClient.Get(ctx, redisKey) - if err != nil { - return - } - if !g.IsEmpty(resultStr) { - count = gconv.Int64(resultStr) - return + redisKey := "" + if !NoCache { + redisKey = fmt.Sprintf(consts.Count, user.TenantId, collection, gconv.String(filterMap)) + var resultStr *gvar.Var + resultStr, err = redis.RedisClient.Get(ctx, redisKey) + if err != nil { + return + } + if !g.IsEmpty(resultStr) { + count = gconv.Int64(resultStr) + return + } } // 调用驱动的 CountDocuments,在数据库端执行的 count, err = db.Collection(collection).CountDocuments(ctx, filter) - err = redis.RedisClient.SetEX(ctx, redisKey, count, int64(time.Hour)) - if err != nil { - return + if !NoCache { + err = redis.RedisClient.SetEX(ctx, redisKey, count, int64(time.Hour)) + if err != nil { + return + } } return } diff --git a/redis/message.go b/redis/message.go new file mode 100644 index 0000000..e7e6cb0 --- /dev/null +++ b/redis/message.go @@ -0,0 +1,14 @@ +package redis + +import "context" + +type QueueMessage struct { + StreamKey string // Stream 键名 + GroupName string // 消费者组名称 + ConsumerName string // 消费者名称 + Timeout int64 // 阻塞超时时间(毫秒) + BatchSize int64 // 最大并发数(信号量容量) + BlockMs int64 + Block bool + HandleFunc func(ctx context.Context, message map[string]interface{}) error +} diff --git a/redis/redis.go b/redis/redis.go index df2dacb..552d273 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -3,6 +3,7 @@ package redis import ( "context" "errors" + "fmt" "strings" "sync" "time" @@ -33,6 +34,9 @@ func GetRedisClient() *gredis.Redis { return getClient() } +// RedisClient 导出的 Redis 客户端(供 mongo.go 使用,兼容旧代码) +var RedisClient = getClient() + // Lock 分布式锁 func Lock(ctx context.Context, key string, expireSeconds int64, fn func(ctx context.Context) error) (success bool, err error) { limit := 3 @@ -66,8 +70,52 @@ LOOP: } } -// RedisClient 导出的 Redis 客户端(供 mongo.go 使用,兼容旧代码) -var RedisClient = getClient() +func GetReadStream(ctx context.Context, msg ...QueueMessage) error { + for _, t := range msg { + err := GetReadFromStream(ctx, t.StreamKey, t.GroupName, t.ConsumerName, t.BatchSize, t.BlockMs, t.Block, t.HandleFunc) + if err != nil { + return err + } + } + return nil +} + +// GetReadFromStream 读取ReadFromStream数据 +func GetReadFromStream(ctx context.Context, streamKey, groupName, consumerName string, count, blockMs int64, Block bool, fn func(ctx context.Context, message map[string]interface{}) error) (err error) { + glog.Infof(ctx, "初始化 Stream: %s, 消费者组: %s", streamKey, groupName) + err = InitStreamGroup(ctx, streamKey, groupName) + if err != nil { + return err + } + for { + // 从 Redis Stream 读取一批消息 + messages, err := ReadFromStream(ctx, streamKey, groupName, consumerName, count, blockMs) + if err != nil { + glog.Errorf(ctx, "[DEBUG Redis] XREADGROUP 错误: %v", err) + return err + } + // 处理消息 + for _, msg := range messages { + fmt.Printf("消费者 '%s' -> 接收到消息 ID: %s, 内容: %v\n", consumerName, msg.ID, msg.Values) + // 业务处理 + if err = fn(ctx, msg.Values); err != nil { + return err + } + // 确认消息 (ACK) + if Block { + // 处理成功后,必须调用 XAck,否则消息会一直留在 PEL 中 + err = AckMessage(ctx, streamKey, groupName, msg.ID) + if err != nil { + glog.Infof(ctx, "消费者 '%s' 确认消息 ID %s 失败: %v\n", consumerName, msg.ID, err) + } else { + glog.Infof(ctx, "消费者 '%s' -> 已确认消息 ID: %s\n", consumerName, msg.ID) + } + } + + } + } + return +} // Stream 和消费者组常量 const ( @@ -150,6 +198,7 @@ func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName stri execCtx = context.Background() } +RECONNECT: // XREADGROUP GROUP groupName consumerName COUNT count BLOCK blockMs STREAMS streamKey > result, err := redisClient.Do(execCtx, "XREADGROUP", "GROUP", groupName, consumerName, @@ -157,10 +206,8 @@ func ReadFromStream(ctx context.Context, streamKey, groupName, consumerName stri "BLOCK", blockMs, "STREAMS", streamKey, ">", ) - if err != nil { - glog.Errorf(ctx, "[DEBUG Redis] XREADGROUP 错误: %v", err) - return nil, err + goto RECONNECT } glog.Debugf(ctx, "[DEBUG Redis] XREADGROUP 返回: %+v", result)