96 lines
3.0 KiB
Go
96 lines
3.0 KiB
Go
package dao
|
||
|
||
import (
|
||
"context"
|
||
commonMongo "gitee.com/red-future---jilin-g/common/mongo"
|
||
"github.com/gogf/gf/v2/errors/gerror"
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/gogf/gf/v2/os/glog"
|
||
"github.com/gogf/gf/v2/os/gtime"
|
||
"github.com/gogf/gf/v2/util/gconv"
|
||
"go.mongodb.org/mongo-driver/v2/bson"
|
||
"go.mongodb.org/mongo-driver/v2/mongo"
|
||
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
||
)
|
||
|
||
// MongoDAO MongoDB原生查询(不需要token验证)
|
||
var MongoDAO = &mongoDAO{}
|
||
|
||
type mongoDAO struct{}
|
||
|
||
// SaveOrUpdate 原生批量增加或修改
|
||
func (d *mongoDAO) SaveOrUpdate(ctx context.Context, filter []bson.M, update []bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (result *mongo.BulkWriteResult, err error) {
|
||
db := commonMongo.GetDB()
|
||
if len(filter) == 0 || len(update) == 0 {
|
||
err = gerror.New("缺少查询条件或更新数据")
|
||
return
|
||
}
|
||
if len(filter) != len(update) {
|
||
err = gerror.New("查询条件和更新数据的数量必须一致")
|
||
return
|
||
}
|
||
// 构建批量操作模型
|
||
var models []mongo.WriteModel
|
||
for i := 0; i < len(filter); i++ {
|
||
if g.IsEmpty(filter[i]["tenantId"]) && g.IsEmpty(gconv.Map(update[i]["$set"])["updater"]) {
|
||
return nil, gerror.New("tenantId不能为空")
|
||
}
|
||
if g.IsEmpty(filter[i]["updater"]) && g.IsEmpty(gconv.Map(update[i]["$set"])["updater"]) {
|
||
return nil, gerror.New("updater不能为空")
|
||
}
|
||
// 处理过滤器
|
||
filter[i]["isDeleted"] = false
|
||
// 处理更新数据
|
||
if setDoc, exists := update[i]["$set"].(bson.M); exists {
|
||
setDoc["updater"] = gconv.Map(update[i]["$set"])["updater"]
|
||
setDoc["updatedAt"] = gtime.Now().Time
|
||
} else {
|
||
// 如果没有$set字段,则创建一个
|
||
setDoc := bson.M{}
|
||
setDoc["updater"] = gconv.Map(update[i]["$set"])["updater"]
|
||
setDoc["updatedAt"] = gtime.Now().Time
|
||
update[i]["$set"] = setDoc
|
||
}
|
||
// 创建更新操作模型
|
||
updateModel := mongo.NewUpdateOneModel()
|
||
updateModel.SetFilter(filter[i])
|
||
updateModel.SetUpdate(update[i])
|
||
updateModel.SetUpsert(true) // 默认不插入新文档
|
||
// 处理选项参数
|
||
if len(opts) > 0 {
|
||
for _, opt := range opts {
|
||
var updateOpts options.UpdateManyOptions
|
||
optFuncs := opt.List()
|
||
for _, fn := range optFuncs {
|
||
fn(&updateOpts)
|
||
}
|
||
if updateOpts.Upsert != nil {
|
||
updateModel.SetUpsert(*updateOpts.Upsert)
|
||
}
|
||
}
|
||
}
|
||
models = append(models, updateModel)
|
||
}
|
||
// 执行批量操作,无序执行提高性能
|
||
bulkOpts := options.BulkWrite().SetOrdered(false)
|
||
bulkResult, err := db.Collection(collection).BulkWrite(ctx, models, bulkOpts)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
// 清理相关缓存
|
||
for i := 0; i < len(filter); i++ {
|
||
var tenantId any
|
||
if g.IsEmpty(filter[i]["tenantId"]) {
|
||
tenantId = filter[i]["tenantId"]
|
||
}
|
||
if g.IsEmpty(gconv.Map(update[i]["$set"])["tenantId"]) {
|
||
tenantId = gconv.Map(update[i]["$set"])["tenantId"]
|
||
}
|
||
err = commonMongo.DB().CleanRedis(ctx, filter[i], tenantId, collection)
|
||
if err != nil {
|
||
glog.Warning(ctx, "清理Redis缓存失败:", err)
|
||
}
|
||
}
|
||
return bulkResult, nil
|
||
}
|