// ============================================================================= // MongoDB 业务操作封装 // 提供向后兼容的CRUD操作方法,支持多数据源 // ============================================================================= package mongo import ( "context" "errors" "fmt" "gitee.com/red-future---jilin-g/common/log/consts" "time" "gitee.com/red-future---jilin-g/common/beans" "gitee.com/red-future---jilin-g/common/log/model/entity" "gitee.com/red-future---jilin-g/common/redis" "gitee.com/red-future---jilin-g/common/utils" "github.com/gogf/gf/v2/container/gvar" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/grpool" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" "go.mongodb.org/mongo-driver/v2/bson" "go.mongodb.org/mongo-driver/v2/mongo" "go.mongodb.org/mongo-driver/v2/mongo/options" ) // ============================================================================= // 向后兼容的MongoDB结构体 // ============================================================================= type MongoDB struct { noCache bool dataSource string // 数据源名称,默认为 "default" noTenantId bool // 是否跳过租户过滤 } func DB(cache ...bool) *MongoDB { return &MongoDB{ noCache: false, dataSource: "default", noTenantId: false, } } // WithDataSource 指定使用的数据源 func (m *MongoDB) WithDataSource(name string) *MongoDB { m.dataSource = name return m } // NoCache 不使用缓存 func (m *MongoDB) NoCache() *MongoDB { m.noCache = true return m } // NoTenantId 不使用租户过滤 func (m *MongoDB) NoTenantId() *MongoDB { m.noTenantId = true return m } // ============================================================================= // 向后兼容的全局变量和方法 // ============================================================================= var ( manager = GetManager() logPool *grpool.Pool serverName string LogRedisKey string ) // FieldInfo 定义字段信息结构体 type FieldInfo struct { FieldName string FieldValue interface{} } const PageSize = 20 // GetDB 获取默认数据源的数据库实例(向后兼容) func GetDB() *mongo.Database { source, err := manager.GetDataSource("default") if err != nil { return nil } return source.Database() } // ============================================================================= // MongoDB 操作方法(支持多数据源) // ============================================================================= // getDataSource 获取当前使用的数据源 func (m *MongoDB) getDataSource() (DataSource, error) { if m.dataSource == "" { m.dataSource = "default" } return manager.GetDataSource(m.dataSource) } // Count 查询总数 func (m *MongoDB) Count(ctx context.Context, filter bson.M, collection string) (count int64, err error) { source, err := m.getDataSource() if err != nil { return 0, err } db := source.Database() user, err := utils.GetUserInfo(ctx) if err != nil { return } filter["isDeleted"] = false delete(filter, "tenantId") filterKey := fmt.Sprintf("%+v", filter) redisKey := fmt.Sprintf(redis.Count, user.TenantId, collection, filterKey) if !m.noCache { var resultStr *gvar.Var resultStr, err = redis.RedisClient.Get(ctx, redisKey) if err != nil { return } if !g.IsEmpty(resultStr) { count = gconv.Int64(resultStr) return } } // 如果没有调用 noTenantId,则添加 tenantId 过滤 if !m.noTenantId && !g.IsEmpty(user.TenantId) { filter["tenantId"] = user.TenantId } count, err = db.Collection(collection).CountDocuments(ctx, filter) if !m.noCache { err = redis.RedisClient.SetEX(ctx, redisKey, count, int64(time.Hour)) if err != nil { return } } return } // Find 查询多条记录 func (m *MongoDB) Find(ctx context.Context, filter bson.M, result interface{}, collection string, page *beans.Page, orderBy []beans.OrderBy) (total int64, err error) { source, err := m.getDataSource() if err != nil { return 0, err } db := source.Database() if err = utils.ValidStructPtr(result); err != nil { return } user, err := utils.GetUserInfo(ctx) if err != nil { return } if g.IsEmpty(filter["isDeleted"]) { filter["isDeleted"] = false } filterKey := fmt.Sprintf("%+v", filter) optionsKey := fmt.Sprintf("%+v%+v", page, orderBy) redisKey := fmt.Sprintf(redis.List, user.TenantId, collection, filterKey, optionsKey) if !m.noCache { var resultStr *gvar.Var resultStr, err = redis.RedisClient.Get(ctx, redisKey) if err != nil { return } if !resultStr.IsEmpty() { if err = resultStr.Structs(result); err != nil { return } total = int64(len(resultStr.Array())) return } } // 如果没有调用 noTenantId,则添加 tenantId 过滤 if !m.noTenantId && !g.IsEmpty(user.TenantId) { filter["tenantId"] = user.TenantId } limit := int64(PageSize) skip := int64(0) if page != nil && !g.IsEmpty(page.PageNum) && !g.IsEmpty(page.PageSize) { limit = page.PageSize if limit == -1 { skip = 0 } else { skip = (page.PageNum - 1) * limit } } opt := options.Find().SetSkip(skip) if limit != -1 { opt.SetLimit(limit) } else { total, err = m.Count(ctx, filter, collection) if err != nil || total == 0 { return } } if orderBy == nil { opt.SetSort(bson.M{"createdAt": -1}) } else { orderBson := bson.D{} for _, v := range orderBy { if v.Order == beans.Asc { orderBson = append(orderBson, bson.E{Key: v.Field, Value: 1}) } else { orderBson = append(orderBson, bson.E{Key: v.Field, Value: -1}) } } opt.SetSort(orderBson) } cur, err := db.Collection(collection).Find(ctx, filter, opt) if err != nil { return } if limit == -1 { total = int64(cur.RemainingBatchLength()) } defer cur.Close(ctx) if err = cur.All(ctx, result); err != nil { return } if !m.noCache { err = redis.RedisClient.SetEX(ctx, redisKey, result, int64(time.Hour)) if err != nil { return } } return } // FindOne 查询1条记录 func (m *MongoDB) FindOne(ctx context.Context, filter bson.M, result interface{}, collection string, opts ...options.Lister[options.FindOneOptions]) (err error) { source, err := m.getDataSource() if err != nil { return err } db := source.Database() if len(filter) == 0 { err = gerror.New("缺少查询条件") return } if err = utils.ValidStructPtr(result); err != nil { return } user, err := utils.GetUserInfo(ctx) if err != nil { return } filter["isDeleted"] = false filterKey := fmt.Sprintf("%+v", filter) redisKey := fmt.Sprintf(redis.One, user.TenantId, collection, filterKey) if !m.noCache { var resultStr *gvar.Var resultStr, err = redis.RedisClient.Get(ctx, redisKey) if err != nil { return } if !g.IsEmpty(resultStr) { err = gconv.Scan(resultStr, result) if err != nil { return err } return } } // 如果没有调用 noTenantId,则添加 tenantId 过滤 if !m.noTenantId && !g.IsEmpty(user.TenantId) { filter["tenantId"] = user.TenantId } cur := db.Collection(collection).FindOne(ctx, filter, opts...) err = cur.Decode(result) if errors.Is(err, mongo.ErrNoDocuments) { err = nil } if !m.noCache { err = redis.RedisClient.SetEX(ctx, redisKey, result, int64(time.Hour)) if err != nil { return err } } return } // getDeletedData 获取要删除的数据 func (m *MongoDB) getDeletedData(ctx context.Context, filter bson.M, collection string) (deletedIDs []bson.ObjectID, deletedData []bson.M, err error) { // 查询要删除的数据 _, err = m.Find(ctx, filter, &deletedData, collection, nil, nil) // 从查询结果中获取 _id for _, doc := range deletedData { deletedIDs = append(deletedIDs, doc["_id"].(bson.ObjectID)) } return } func (m *MongoDB) CleanRedis(ctx context.Context, filter bson.M, tenantId interface{}, collection string) (err error) { listKeys := fmt.Sprintf(redis.CleanList, tenantId, collection) keys, err := redis.RedisClient.Keys(ctx, listKeys) if err != nil { return } for _, key := range keys { _, err = redis.RedisClient.Del(ctx, key) if err != nil { return } } countKeys := fmt.Sprintf(redis.CleanCount, tenantId, collection) keys, err = redis.RedisClient.Keys(ctx, countKeys) if err != nil { return } for _, key := range keys { _, err = redis.RedisClient.Del(ctx, key) if err != nil { return } } filter["isDeleted"] = false delete(filter, "tenantId") filterKey := fmt.Sprintf("%+v", filter) oneKey := fmt.Sprintf(redis.One, tenantId, collection, filterKey) _, err = redis.RedisClient.Del(ctx, oneKey) if err != nil { return } return } func (m *MongoDB) log(ctx context.Context, ids []bson.ObjectID, filter bson.M, collection string, data interface{}, userName, tenantId interface{}, operationType consts.OperationType) { // 提前获取 IP 地址,避免异步任务执行时请求已结束 var ipAddress string if request := g.RequestFromCtx(ctx); request != nil { ipAddress = request.GetClientIp() } if operationType != consts.OperationInsert && operationType != consts.OperationDelete { if !g.IsEmpty(filter["_id"]) { objectID := filter["_id"].(*bson.ObjectID) ids = append(ids, *objectID) } else { var err error if ids, _, err = m.getDeletedData(ctx, filter, collection); err != nil { return } } } log := &entity.OperationLog{ ServiceName: serverName, Collection: collection, CollectionID: ids, Operation: string(operationType), IPAddress: ipAddress, Data: data, } log.Creator = userName log.Updater = userName now := >ime.Now().Time log.CreatedAt = now log.UpdatedAt = now log.TenantId = tenantId // 使用新的 context 进行 Redis 操作 if _, err := redis.AddToStream(ctx, LogRedisKey, log); err != nil { glog.Error(ctx, "mongoLog-AddToStream err: %v", err) } return } // Insert 插入多条记录 func (m *MongoDB) Insert(ctx context.Context, documents []interface{}, collection string, opts ...options.Lister[options.InsertManyOptions]) (ids []interface{}, err error) { source, err := m.getDataSource() if err != nil { return nil, err } db := source.Database() user, err := utils.GetUserInfo(ctx) if err != nil { return } docs := make([]interface{}, 0, len(documents)) for _, document := range documents { doc := gconv.Map(document) delete(doc, "id") if !g.IsEmpty(user.UserName) && g.IsEmpty(doc["creator"]) { doc["creator"] = user.UserName } if !g.IsEmpty(user.UserName) && g.IsEmpty(doc["updater"]) { doc["updater"] = user.UserName } if !g.IsEmpty(user.TenantId) && g.IsEmpty(doc["tenantId"]) { doc["tenantId"] = user.TenantId } if g.IsEmpty(doc["createdAt"]) { doc["createdAt"] = gtime.Now().Time } if g.IsEmpty(doc["updatedAt"]) { doc["updatedAt"] = gtime.Now().Time } doc["isDeleted"] = false docs = append(docs, doc) } r, err := db.Collection(collection).InsertMany(ctx, docs, opts...) if err != nil { return } ids = r.InsertedIDs err = m.CleanRedis(ctx, bson.M{}, user.TenantId, collection) //写日志 if collection != consts.OperationLogCollection { objectIds := make([]bson.ObjectID, 0) for _, id := range ids { objectIds = append(objectIds, id.(bson.ObjectID)) } m.log(ctx, objectIds, nil, collection, nil, user.UserName, user.TenantId, consts.OperationInsert) } return } // Delete 删除记录 func (m *MongoDB) Delete(ctx context.Context, filter bson.M, collection string, opts ...options.Lister[options.DeleteManyOptions]) (count int64, err error) { source, err := m.getDataSource() if err != nil { return 0, err } db := source.Database() if len(filter) == 0 { err = gerror.New("缺少查询条件") return } user, err := utils.GetUserInfo(ctx) if err != nil { return } filter["tenantId"] = user.TenantId // 获取要删除的数据 ds, ms, err := m.getDeletedData(ctx, filter, collection) if err != nil { return } // 执行删除操作 r, err := db.Collection(collection).DeleteMany(ctx, filter, opts...) if err != nil { return } count = r.DeletedCount // 清理redis err = m.CleanRedis(ctx, filter, user.TenantId, collection) // 写日志 m.log(ctx, ds, nil, collection, ms, user.UserName, user.TenantId, consts.OperationDelete) return } // DeleteSoft 假删除记录 func (m *MongoDB) DeleteSoft(ctx context.Context, filter bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (result *mongo.UpdateResult, err error) { update := bson.M{"$set": bson.M{"isDeleted": true}} return m.Update(ctx, filter, update, collection, opts...) } // Update 修改记录 func (m *MongoDB) Update(ctx context.Context, filter bson.M, update bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (result *mongo.UpdateResult, err error) { source, err := m.getDataSource() if err != nil { return nil, err } db := source.Database() if len(filter) == 0 { err = gerror.New("缺少查询条件") return } filter["isDeleted"] = false user, err := utils.GetUserInfo(ctx) if err != nil { return } if !g.IsEmpty(user.TenantId) { filter["tenantId"] = user.TenantId } // 遍历 update 中的所有操作符和字段,存放到 list 中 fieldList := make([]FieldInfo, 0) for _, doc := range update { if m, ok := doc.(bson.M); ok { for fieldName, fieldValue := range m { // 获取到字段名和字段值 fieldList = append(fieldList, FieldInfo{ FieldName: fieldName, FieldValue: fieldValue, }) } } } setDoc := update["$set"].(bson.M) if !g.IsEmpty(user.UserName) { setDoc["updater"] = user.UserName } setDoc["updatedAt"] = gtime.Now().Time update["$set"] = setDoc result, err = db.Collection(collection).UpdateMany(ctx, filter, update, opts...) if err != nil { return } // 清理redis err = m.CleanRedis(ctx, filter, user.TenantId, collection) // 写日志 if !g.IsEmpty(setDoc["isDeleted"]) && gconv.Bool(setDoc["isDeleted"]) { filter["isDeleted"] = true m.log(ctx, nil, filter, collection, nil, user.UserName, user.TenantId, consts.OperationDeleteSoft) } else { m.log(ctx, nil, filter, collection, fieldList, user.UserName, user.TenantId, consts.OperationUpdate) } return } // RandomSoftDelete 随机软删除个文档的 _id func (m *MongoDB) RandomSoftDelete(ctx context.Context, limit int, collection string, opts ...options.Lister[options.UpdateManyOptions]) (result *mongo.UpdateResult, err error) { source, err := m.getDataSource() if err != nil { return nil, err } db := source.Database() _ = opts pipeline := mongo.Pipeline{ bson.D{{Key: "$addFields", Value: bson.D{{Key: "random", Value: bson.M{"$rand": bson.M{}}}}}}, bson.D{{Key: "$match", Value: bson.D{{Key: "isDeleted", Value: false}}}}, bson.D{{Key: "$sort", Value: bson.D{{Key: "random", Value: -1}}}}, bson.D{{Key: "$limit", Value: limit}}, bson.D{{Key: "$project", Value: bson.D{{Key: "_id", Value: 1}}}}, } cursor, err := db.Collection(collection).Aggregate(ctx, pipeline) if err != nil { return } defer cursor.Close(ctx) var idsToUpdate []bson.ObjectID for cursor.Next(ctx) { var result bson.M if err := cursor.Decode(&result); err != nil { return nil, err } id := result["_id"].(bson.ObjectID) idsToUpdate = append(idsToUpdate, id) } if err := cursor.Err(); err != nil { return nil, err } fmt.Printf("准备更新的随机文档ID: %v\n", idsToUpdate) if len(idsToUpdate) > 0 { filter := bson.D{{Key: "_id", Value: bson.D{{Key: "$in", Value: idsToUpdate}}}} update := bson.D{{Key: "$set", Value: bson.D{{Key: "isDeleted", Value: true}}}} _, err = db.Collection(collection).UpdateMany(ctx, filter, update) if err != nil { return } } return } // SaveOrUpdate 批量增加或修改 func (m *MongoDB) SaveOrUpdate(ctx context.Context, filter []bson.M, update []bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (result *mongo.BulkWriteResult, err error) { source, err := m.getDataSource() if err != nil { return nil, err } db := source.Database() if len(filter) == 0 || len(update) == 0 { err = gerror.New("缺少查询条件或更新数据") return } if len(filter) != len(update) { err = gerror.New("查询条件和更新数据的数量必须一致") return } user, err := utils.GetUserInfo(ctx) if err != nil { return } var models []mongo.WriteModel for i := 0; i < len(filter); i++ { filter[i]["isDeleted"] = false if !g.IsEmpty(user.TenantId) { filter[i]["tenantId"] = user.TenantId } if setDoc, exists := update[i]["$set"].(bson.M); exists { if !g.IsEmpty(user.UserName) { setDoc["updater"] = user.UserName } setDoc["updatedAt"] = gtime.Now().Time } else { setDoc := bson.M{} if !g.IsEmpty(user.UserName) { setDoc["updater"] = user.UserName } setDoc["updatedAt"] = gtime.Now().Time update[i]["$set"] = setDoc } updateModel := mongo.NewUpdateOneModel() updateModel.SetFilter(filter[i]) updateModel.SetUpdate(update[i]) updateModel.SetUpsert(true) if len(opts) > 0 { for _, opt := range opts { var updateOpts options.UpdateManyOptions optFuncs := opt.List() for _, fn := range optFuncs { fn(&updateOpts) } if updateOpts.Upsert != nil { updateModel.SetUpsert(*updateOpts.Upsert) } } } models = append(models, updateModel) } bulkOpts := options.BulkWrite().SetOrdered(false) bulkResult, err := db.Collection(collection).BulkWrite(ctx, models, bulkOpts) if err != nil { return nil, err } for _, filterItem := range filter { err = m.CleanRedis(ctx, filterItem, user.TenantId, collection) if err != nil { glog.Warning(ctx, "清理Redis缓存失败:", err) } } return bulkResult, nil } func BuildUpdateData(ctx context.Context, req interface{}) (filter bson.M, err error) { _ = ctx filter = bson.M{} reqMap := gconv.Map(req) for mk, mv := range reqMap { if mk != "id" && !g.IsEmpty(mv) { filter[mk] = mv } } return }