Files
common/db/mongo/mongo.go

661 lines
18 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// =============================================================================
// MongoDB 业务操作封装
// 提供向后兼容的CRUD操作方法支持多数据源
// =============================================================================
package mongo
import (
"context"
"errors"
"fmt"
"time"
"gitea.com/red-future/common/log/consts"
"go.mongodb.org/mongo-driver/v2/event"
"gitea.com/red-future/common/beans"
"gitea.com/red-future/common/log/model/entity"
"gitea.com/red-future/common/redis"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/container/gvar"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/grpool"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
)
// =============================================================================
// 向后兼容的MongoDB结构体
// =============================================================================
type mongoDB struct {
noCache bool
dataSource string // 数据源名称,默认为 "default"
noTenantId bool // 是否跳过租户过滤
}
func DB(cache ...bool) *mongoDB {
return &mongoDB{
noCache: false,
dataSource: "default",
noTenantId: false,
}
}
// commandMonitor 命令监控器
func commandMonitor() *event.CommandMonitor {
return &event.CommandMonitor{
// 命令执行前触发
Started: func(ctx context.Context, evt *event.CommandStartedEvent) {
// 执行前的处理逻辑示例:记录开始时间、打印执行的命令
fmt.Printf("[%s] 开始执行命令 | 数据库: %s | 集合: %s | 命令: %+v\n",
time.Now().Format("2006-01-02 15:04:05"),
evt.DatabaseName,
//evt.Command.Lookup("collection").StringValue(), // 获取集合名
evt.Command,
)
// 也可以在这里添加:参数校验、权限检查、链路追踪埋点等
// 例如将开始时间存入ctx供后续结束时计算耗时
ctx = context.WithValue(ctx, "cmd_start_time", time.Now())
},
// 命令执行成功后触发
Succeeded: func(ctx context.Context, evt *event.CommandSucceededEvent) {
// 从ctx中获取开始时间计算执行耗时
startTime, ok := ctx.Value("cmd_start_time").(time.Time)
if ok {
elapsed := time.Since(startTime)
fmt.Printf("[%s] 命令执行成功 | 耗时: %s | 结果: %+v\n",
time.Now().Format("2006-01-02 15:04:05"),
elapsed,
evt.Reply,
)
}
// 也可以在这里添加:日志入库、性能指标上报、结果校验等
},
// 命令执行失败后触发
Failed: func(ctx context.Context, evt *event.CommandFailedEvent) {
fmt.Printf("[%s] 命令执行失败 | 错误: %s | 耗时: %s\n",
time.Now().Format("2006-01-02 15:04:05"),
evt.Failure,
evt.Duration,
)
// 也可以在这里添加:错误告警、重试逻辑、异常日志记录等
},
}
}
// WithDataSource 指定使用的数据源
func (m *mongoDB) WithDataSource(name string) *mongoDB {
m.dataSource = name
return m
}
// NoCache 不使用缓存
func (m *mongoDB) NoCache() *mongoDB {
m.noCache = true
return m
}
// NoTenantId 不使用租户过滤
func (m *mongoDB) NoTenantId() *mongoDB {
m.noTenantId = true
return m
}
// =============================================================================
// 向后兼容的全局变量和方法
// =============================================================================
var (
manager = GetManager()
logPool *grpool.Pool
serverName string
LogRedisKey string
)
// FieldInfo 定义字段信息结构体
type FieldInfo struct {
FieldName string
FieldValue interface{}
}
const PageSize = 20
// GetDB 获取默认数据源的数据库实例(向后兼容)
func GetDB() *mongo.Database {
source, err := manager.GetDataSource("default")
if err != nil {
return nil
}
return source.Database()
}
// =============================================================================
// MongoDB 操作方法(支持多数据源)
// =============================================================================
// getDataSource 获取当前使用的数据源
func (m *mongoDB) getDataSource() (DataSource, error) {
if m.dataSource == "" {
m.dataSource = "default"
}
return manager.GetDataSource(m.dataSource)
}
// Count 查询总数
func (m *mongoDB) Count(ctx context.Context, filter bson.M, collection string) (count int64, err error) {
source, err := m.getDataSource()
if err != nil {
return 0, err
}
db := source.Database()
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
filter["isDeleted"] = false
delete(filter, "tenantId")
filterKey := fmt.Sprintf("%+v", filter)
redisKey := fmt.Sprintf(redis.Count, user.TenantId, collection, filterKey)
if !m.noCache {
var resultStr *gvar.Var
resultStr, err = redis.RedisClient().Get(ctx, redisKey)
if err != nil {
return
}
if !g.IsEmpty(resultStr) {
count = gconv.Int64(resultStr)
return
}
}
// 如果没有调用 noTenantId则添加 tenantId 过滤
if !m.noTenantId && !g.IsEmpty(user.TenantId) {
filter["tenantId"] = user.TenantId
}
count, err = db.Collection(collection).CountDocuments(ctx, filter)
if !m.noCache {
err = redis.RedisClient().SetEX(ctx, redisKey, count, int64(time.Hour))
if err != nil {
return
}
}
return
}
// Find 查询多条记录
func (m *mongoDB) Find(ctx context.Context, filter bson.M, result interface{}, collection string, page *beans.Page, orderBy []beans.OrderBy) (total int64, err error) {
source, err := m.getDataSource()
if err != nil {
return 0, err
}
db := source.Database()
if err = utils.ValidStructPtr(result); err != nil {
return
}
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
if g.IsEmpty(filter["isDeleted"]) {
filter["isDeleted"] = false
}
filterKey := fmt.Sprintf("%+v", filter)
optionsKey := fmt.Sprintf("%+v%+v", page, orderBy)
redisKey := fmt.Sprintf(redis.List, user.TenantId, collection, filterKey, optionsKey)
if !m.noCache {
var resultStr *gvar.Var
resultStr, err = redis.RedisClient().Get(ctx, redisKey)
if err != nil {
return
}
if !resultStr.IsEmpty() {
if err = resultStr.Structs(result); err != nil {
return
}
total = int64(len(resultStr.Array()))
return
}
}
// 如果没有调用 noTenantId则添加 tenantId 过滤
if !m.noTenantId && !g.IsEmpty(user.TenantId) {
filter["tenantId"] = user.TenantId
}
limit := int64(PageSize)
skip := int64(0)
if page != nil && !g.IsEmpty(page.PageNum) && !g.IsEmpty(page.PageSize) {
limit = page.PageSize
if limit == -1 {
skip = 0
} else {
skip = (page.PageNum - 1) * limit
}
}
opt := options.Find().SetSkip(skip)
if limit != -1 {
opt.SetLimit(limit)
total, err = m.Count(ctx, filter, collection)
if err != nil || total == 0 {
return
}
}
if orderBy == nil {
opt.SetSort(bson.M{"createdAt": -1})
} else {
orderBson := bson.D{}
for _, v := range orderBy {
if v.Order == beans.Asc {
orderBson = append(orderBson, bson.E{Key: v.Field, Value: 1})
} else {
orderBson = append(orderBson, bson.E{Key: v.Field, Value: -1})
}
}
opt.SetSort(orderBson)
}
cur, err := db.Collection(collection).Find(ctx, filter, opt)
if err != nil {
return
}
if limit == -1 {
total = int64(cur.RemainingBatchLength())
}
defer cur.Close(ctx)
if err = cur.All(ctx, result); err != nil {
return
}
if !m.noCache {
err = redis.RedisClient().SetEX(ctx, redisKey, result, int64(time.Hour))
if err != nil {
return
}
}
return
}
// FindOne 查询1条记录
func (m *mongoDB) FindOne(ctx context.Context, filter bson.M, result interface{}, collection string, opts ...options.Lister[options.FindOneOptions]) (err error) {
source, err := m.getDataSource()
if err != nil {
return err
}
db := source.Database()
if len(filter) == 0 {
err = gerror.New("缺少查询条件")
return
}
if err = utils.ValidStructPtr(result); err != nil {
return
}
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
filter["isDeleted"] = false
filterKey := fmt.Sprintf("%+v", filter)
redisKey := fmt.Sprintf(redis.One, user.TenantId, collection, filterKey)
if !m.noCache {
var resultStr *gvar.Var
resultStr, err = redis.RedisClient().Get(ctx, redisKey)
if err != nil {
return
}
if !g.IsEmpty(resultStr) {
err = gconv.Scan(resultStr, result)
if err != nil {
return err
}
return
}
}
// 如果没有调用 noTenantId则添加 tenantId 过滤
if !m.noTenantId && !g.IsEmpty(user.TenantId) {
filter["tenantId"] = user.TenantId
}
cur := db.Collection(collection).FindOne(ctx, filter, opts...)
err = cur.Decode(result)
if errors.Is(err, mongo.ErrNoDocuments) {
err = nil
}
if !m.noCache {
err = redis.RedisClient().SetEX(ctx, redisKey, result, int64(time.Hour))
if err != nil {
return err
}
}
return
}
// getDeletedData 获取要删除的数据
func (m *mongoDB) getDeletedData(ctx context.Context, filter bson.M, collection string) (deletedIDs []bson.ObjectID, deletedData []bson.M, err error) {
// 查询要删除的数据
_, err = m.Find(ctx, filter, &deletedData, collection, nil, nil)
// 从查询结果中获取 _id
for _, doc := range deletedData {
deletedIDs = append(deletedIDs, doc["_id"].(bson.ObjectID))
}
return
}
func (m *mongoDB) CleanRedis(ctx context.Context, filter bson.M, tenantId interface{}, collection string) (err error) {
listKeys := fmt.Sprintf(redis.CleanList, tenantId, collection)
keys, err := redis.RedisClient().Keys(ctx, listKeys)
if err != nil {
return
}
for _, key := range keys {
_, err = redis.RedisClient().Del(ctx, key)
if err != nil {
return
}
}
countKeys := fmt.Sprintf(redis.CleanCount, tenantId, collection)
keys, err = redis.RedisClient().Keys(ctx, countKeys)
if err != nil {
return
}
for _, key := range keys {
_, err = redis.RedisClient().Del(ctx, key)
if err != nil {
return
}
}
filter["isDeleted"] = false
delete(filter, "tenantId")
filterKey := fmt.Sprintf("%+v", filter)
oneKey := fmt.Sprintf(redis.One, tenantId, collection, filterKey)
_, err = redis.RedisClient().Del(ctx, oneKey)
if err != nil {
return
}
return
}
func (m *mongoDB) log(ctx context.Context, ids []bson.ObjectID, filter bson.M, collection string, data interface{}, userName, tenantId interface{}, operationType consts.OperationType) {
// 提前获取 IP 地址,避免异步任务执行时请求已结束
var ipAddress string
if request := g.RequestFromCtx(ctx); request != nil {
ipAddress = request.GetClientIp()
}
if operationType != consts.OperationInsert && operationType != consts.OperationDelete {
if !g.IsEmpty(filter["_id"]) {
objectID := filter["_id"].(*bson.ObjectID)
ids = append(ids, *objectID)
} else {
var err error
if ids, _, err = m.getDeletedData(ctx, filter, collection); err != nil {
return
}
}
}
log := &entity.OperationLog{
ServiceName: serverName,
Collection: collection,
CollectionID: ids,
Operation: string(operationType),
IPAddress: ipAddress,
Data: data,
}
log.Creator = userName
log.Updater = userName
now := &gtime.Now().Time
log.CreatedAt = now
log.UpdatedAt = now
log.TenantId = tenantId
// 使用新的 context 进行 Redis 操作
if _, err := redis.AddToStream(ctx, LogRedisKey, log); err != nil {
glog.Error(ctx, "mongoLog-AddToStream err: %v", err)
}
return
}
// Insert 插入多条记录
func (m *mongoDB) Insert(ctx context.Context, documents []interface{}, collection string, opts ...options.Lister[options.InsertManyOptions]) (ids []interface{}, err error) {
source, err := m.getDataSource()
if err != nil {
return nil, err
}
db := source.Database()
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
docs := make([]interface{}, 0, len(documents))
for _, document := range documents {
doc := gconv.Map(document)
delete(doc, "id")
if !g.IsEmpty(user.UserName) && g.IsEmpty(doc["creator"]) {
doc["creator"] = user.UserName
}
if !g.IsEmpty(user.UserName) && g.IsEmpty(doc["updater"]) {
doc["updater"] = user.UserName
}
if !g.IsEmpty(user.TenantId) && g.IsEmpty(doc["tenantId"]) {
doc["tenantId"] = user.TenantId
}
if g.IsEmpty(doc["createdAt"]) {
doc["createdAt"] = gtime.Now().Time
}
if g.IsEmpty(doc["updatedAt"]) {
doc["updatedAt"] = gtime.Now().Time
}
doc["isDeleted"] = false
docs = append(docs, doc)
}
r, err := db.Collection(collection).InsertMany(ctx, docs, opts...)
if err != nil {
return
}
ids = r.InsertedIDs
err = m.CleanRedis(ctx, bson.M{}, user.TenantId, collection)
//写日志
if collection != consts.OperationLogCollection {
objectIds := make([]bson.ObjectID, 0)
for _, id := range ids {
objectIds = append(objectIds, id.(bson.ObjectID))
}
m.log(ctx, objectIds, nil, collection, nil, user.UserName, user.TenantId, consts.OperationInsert)
}
return
}
// Delete 删除记录
func (m *mongoDB) Delete(ctx context.Context, filter bson.M, collection string, opts ...options.Lister[options.DeleteManyOptions]) (count int64, err error) {
source, err := m.getDataSource()
if err != nil {
return 0, err
}
db := source.Database()
if len(filter) == 0 {
err = gerror.New("缺少查询条件")
return
}
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
filter["tenantId"] = user.TenantId
// 获取要删除的数据
ds, ms, err := m.getDeletedData(ctx, filter, collection)
if err != nil {
return
}
// 执行删除操作
r, err := db.Collection(collection).DeleteMany(ctx, filter, opts...)
if err != nil {
return
}
count = r.DeletedCount
// 清理redis
err = m.CleanRedis(ctx, filter, user.TenantId, collection)
// 写日志
m.log(ctx, ds, nil, collection, ms, user.UserName, user.TenantId, consts.OperationDelete)
return
}
// DeleteSoft 假删除记录
func (m *mongoDB) DeleteSoft(ctx context.Context, filter bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (modifiedCount int64, err error) {
update := bson.M{"$set": bson.M{"isDeleted": true}}
return m.Update(ctx, filter, update, collection, opts...)
}
// Update 修改记录
func (m *mongoDB) Update(ctx context.Context, filter bson.M, update bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (modifiedCount int64, err error) {
source, err := m.getDataSource()
if err != nil {
return 0, err
}
db := source.Database()
if len(filter) == 0 {
err = gerror.New("缺少查询条件")
return
}
filter["isDeleted"] = false
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
if !g.IsEmpty(user.TenantId) {
filter["tenantId"] = user.TenantId
}
// 遍历 update 中的所有操作符和字段,存放到 list 中
fieldList := make([]FieldInfo, 0)
for _, doc := range update {
if m, ok := doc.(bson.M); ok {
for fieldName, fieldValue := range m {
// 获取到字段名和字段值
fieldList = append(fieldList, FieldInfo{
FieldName: fieldName,
FieldValue: fieldValue,
})
}
}
}
setDoc := bson.M{}
if !g.IsEmpty(update["$set"]) {
setDoc = update["$set"].(bson.M)
}
if !g.IsEmpty(user.UserName) {
setDoc["updater"] = user.UserName
}
setDoc["updatedAt"] = gtime.Now().Time
update["$set"] = setDoc
result, err := db.Collection(collection).UpdateMany(ctx, filter, update, opts...)
if err != nil {
return
}
modifiedCount = result.ModifiedCount
// 清理redis
err = m.CleanRedis(ctx, filter, user.TenantId, collection)
// 写日志
if !g.IsEmpty(setDoc["isDeleted"]) && gconv.Bool(setDoc["isDeleted"]) {
filter["isDeleted"] = true
m.log(ctx, nil, filter, collection, nil, user.UserName, user.TenantId, consts.OperationDeleteSoft)
} else {
m.log(ctx, nil, filter, collection, fieldList, user.UserName, user.TenantId, consts.OperationUpdate)
}
return
}
// SaveOrUpdate 批量增加或修改
func (m *mongoDB) SaveOrUpdate(ctx context.Context, filter []bson.M, update []bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (result *mongo.BulkWriteResult, err error) {
source, err := m.getDataSource()
if err != nil {
return nil, err
}
db := source.Database()
if len(filter) == 0 || len(update) == 0 {
err = gerror.New("缺少查询条件或更新数据")
return
}
if len(filter) != len(update) {
err = gerror.New("查询条件和更新数据的数量必须一致")
return
}
user, err := utils.GetUserInfo(ctx)
if err != nil {
return
}
var models []mongo.WriteModel
for i := 0; i < len(filter); i++ {
filter[i]["isDeleted"] = false
if !g.IsEmpty(user.TenantId) {
filter[i]["tenantId"] = user.TenantId
}
if setDoc, exists := update[i]["$set"].(bson.M); exists {
if !g.IsEmpty(user.UserName) {
setDoc["updater"] = user.UserName
}
setDoc["updatedAt"] = gtime.Now().Time
} else {
setDoc := bson.M{}
if !g.IsEmpty(user.UserName) {
setDoc["updater"] = user.UserName
}
setDoc["updatedAt"] = gtime.Now().Time
update[i]["$set"] = setDoc
}
updateModel := mongo.NewUpdateOneModel()
updateModel.SetFilter(filter[i])
updateModel.SetUpdate(update[i])
updateModel.SetUpsert(true)
if len(opts) > 0 {
for _, opt := range opts {
var updateOpts options.UpdateManyOptions
optFuncs := opt.List()
for _, fn := range optFuncs {
fn(&updateOpts)
}
if updateOpts.Upsert != nil {
updateModel.SetUpsert(*updateOpts.Upsert)
}
}
}
models = append(models, updateModel)
}
bulkOpts := options.BulkWrite().SetOrdered(false)
bulkResult, err := db.Collection(collection).BulkWrite(ctx, models, bulkOpts)
if err != nil {
return nil, err
}
for _, filterItem := range filter {
err = m.CleanRedis(ctx, filterItem, user.TenantId, collection)
if err != nil {
glog.Warning(ctx, "清理Redis缓存失败:", err)
}
}
return bulkResult, nil
}
func BuildUpdateData(ctx context.Context, req interface{}) (filter bson.M, err error) {
_ = ctx
filter = bson.M{}
reqMap := gconv.Map(req)
for mk, mv := range reqMap {
if mk != "id" && !g.IsEmpty(mv) {
filter[mk] = mv
}
}
return
}