.gitignore
This commit is contained in:
@@ -8,7 +8,6 @@ package mongo
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"gitee.com/red-future---jilin-g/common/log/consts"
|
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -16,6 +15,8 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"gitee.com/red-future---jilin-g/common/log/consts"
|
||||||
|
|
||||||
"github.com/gogf/gf/v2/frame/g"
|
"github.com/gogf/gf/v2/frame/g"
|
||||||
"github.com/gogf/gf/v2/os/glog"
|
"github.com/gogf/gf/v2/os/glog"
|
||||||
"github.com/gogf/gf/v2/os/grpool"
|
"github.com/gogf/gf/v2/os/grpool"
|
||||||
@@ -120,7 +121,7 @@ func (d *BaseDataSource) Connect(ctx context.Context) error {
|
|||||||
SetHeartbeatInterval(10 * time.Second).
|
SetHeartbeatInterval(10 * time.Second).
|
||||||
SetMaxConnIdleTime(60 * time.Second).
|
SetMaxConnIdleTime(60 * time.Second).
|
||||||
SetRetryWrites(true).
|
SetRetryWrites(true).
|
||||||
SetRetryReads(true)
|
SetRetryReads(true).SetMonitor(commandMonitor())
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
d.client, err = mongo.Connect(opt)
|
d.client, err = mongo.Connect(opt)
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gitee.com/red-future---jilin-g/common/log/consts"
|
"gitee.com/red-future---jilin-g/common/log/consts"
|
||||||
|
"go.mongodb.org/mongo-driver/v2/event"
|
||||||
|
|
||||||
"gitee.com/red-future---jilin-g/common/beans"
|
"gitee.com/red-future---jilin-g/common/beans"
|
||||||
"gitee.com/red-future---jilin-g/common/log/model/entity"
|
"gitee.com/red-future---jilin-g/common/log/model/entity"
|
||||||
@@ -33,34 +34,81 @@ import (
|
|||||||
// 向后兼容的MongoDB结构体
|
// 向后兼容的MongoDB结构体
|
||||||
// =============================================================================
|
// =============================================================================
|
||||||
|
|
||||||
type MongoDB struct {
|
type mongoDB struct {
|
||||||
noCache bool
|
noCache bool
|
||||||
dataSource string // 数据源名称,默认为 "default"
|
dataSource string // 数据源名称,默认为 "default"
|
||||||
noTenantId bool // 是否跳过租户过滤
|
noTenantId bool // 是否跳过租户过滤
|
||||||
}
|
}
|
||||||
|
|
||||||
func DB(cache ...bool) *MongoDB {
|
func DB(cache ...bool) *mongoDB {
|
||||||
return &MongoDB{
|
return &mongoDB{
|
||||||
noCache: false,
|
noCache: false,
|
||||||
dataSource: "default",
|
dataSource: "default",
|
||||||
noTenantId: false,
|
noTenantId: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// commandMonitor 命令监控器
|
||||||
|
func commandMonitor() *event.CommandMonitor {
|
||||||
|
return &event.CommandMonitor{
|
||||||
|
// 命令执行前触发
|
||||||
|
Started: func(ctx context.Context, evt *event.CommandStartedEvent) {
|
||||||
|
// 执行前的处理逻辑示例:记录开始时间、打印执行的命令
|
||||||
|
fmt.Printf("[%s] 开始执行命令 | 数据库: %s | 集合: %s | 命令: %+v\n",
|
||||||
|
time.Now().Format("2006-01-02 15:04:05"),
|
||||||
|
evt.DatabaseName,
|
||||||
|
evt.Command.Lookup("collection").StringValue(), // 获取集合名
|
||||||
|
evt.Command,
|
||||||
|
)
|
||||||
|
|
||||||
|
// 也可以在这里添加:参数校验、权限检查、链路追踪埋点等
|
||||||
|
// 例如:将开始时间存入ctx,供后续结束时计算耗时
|
||||||
|
ctx = context.WithValue(ctx, "cmd_start_time", time.Now())
|
||||||
|
},
|
||||||
|
|
||||||
|
// 命令执行成功后触发
|
||||||
|
Succeeded: func(ctx context.Context, evt *event.CommandSucceededEvent) {
|
||||||
|
// 从ctx中获取开始时间,计算执行耗时
|
||||||
|
startTime, ok := ctx.Value("cmd_start_time").(time.Time)
|
||||||
|
if ok {
|
||||||
|
elapsed := time.Since(startTime)
|
||||||
|
fmt.Printf("[%s] 命令执行成功 | 耗时: %s | 结果: %+v\n",
|
||||||
|
time.Now().Format("2006-01-02 15:04:05"),
|
||||||
|
elapsed,
|
||||||
|
evt.Reply,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 也可以在这里添加:日志入库、性能指标上报、结果校验等
|
||||||
|
},
|
||||||
|
|
||||||
|
// 命令执行失败后触发
|
||||||
|
Failed: func(ctx context.Context, evt *event.CommandFailedEvent) {
|
||||||
|
fmt.Printf("[%s] 命令执行失败 | 错误: %s | 耗时: %s\n",
|
||||||
|
time.Now().Format("2006-01-02 15:04:05"),
|
||||||
|
evt.Failure,
|
||||||
|
evt.Duration,
|
||||||
|
)
|
||||||
|
|
||||||
|
// 也可以在这里添加:错误告警、重试逻辑、异常日志记录等
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithDataSource 指定使用的数据源
|
// WithDataSource 指定使用的数据源
|
||||||
func (m *MongoDB) WithDataSource(name string) *MongoDB {
|
func (m *mongoDB) WithDataSource(name string) *mongoDB {
|
||||||
m.dataSource = name
|
m.dataSource = name
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// NoCache 不使用缓存
|
// NoCache 不使用缓存
|
||||||
func (m *MongoDB) NoCache() *MongoDB {
|
func (m *mongoDB) NoCache() *mongoDB {
|
||||||
m.noCache = true
|
m.noCache = true
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// NoTenantId 不使用租户过滤
|
// NoTenantId 不使用租户过滤
|
||||||
func (m *MongoDB) NoTenantId() *MongoDB {
|
func (m *mongoDB) NoTenantId() *mongoDB {
|
||||||
m.noTenantId = true
|
m.noTenantId = true
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
@@ -98,7 +146,7 @@ func GetDB() *mongo.Database {
|
|||||||
// =============================================================================
|
// =============================================================================
|
||||||
|
|
||||||
// getDataSource 获取当前使用的数据源
|
// getDataSource 获取当前使用的数据源
|
||||||
func (m *MongoDB) getDataSource() (DataSource, error) {
|
func (m *mongoDB) getDataSource() (DataSource, error) {
|
||||||
if m.dataSource == "" {
|
if m.dataSource == "" {
|
||||||
m.dataSource = "default"
|
m.dataSource = "default"
|
||||||
}
|
}
|
||||||
@@ -106,7 +154,7 @@ func (m *MongoDB) getDataSource() (DataSource, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Count 查询总数
|
// Count 查询总数
|
||||||
func (m *MongoDB) Count(ctx context.Context, filter bson.M, collection string) (count int64, err error) {
|
func (m *mongoDB) Count(ctx context.Context, filter bson.M, collection string) (count int64, err error) {
|
||||||
source, err := m.getDataSource()
|
source, err := m.getDataSource()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@@ -147,7 +195,7 @@ func (m *MongoDB) Count(ctx context.Context, filter bson.M, collection string) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Find 查询多条记录
|
// Find 查询多条记录
|
||||||
func (m *MongoDB) Find(ctx context.Context, filter bson.M, result interface{}, collection string, page *beans.Page, orderBy []beans.OrderBy) (total int64, err error) {
|
func (m *mongoDB) Find(ctx context.Context, filter bson.M, result interface{}, collection string, page *beans.Page, orderBy []beans.OrderBy) (total int64, err error) {
|
||||||
source, err := m.getDataSource()
|
source, err := m.getDataSource()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@@ -238,7 +286,7 @@ func (m *MongoDB) Find(ctx context.Context, filter bson.M, result interface{}, c
|
|||||||
}
|
}
|
||||||
|
|
||||||
// FindOne 查询1条记录
|
// FindOne 查询1条记录
|
||||||
func (m *MongoDB) FindOne(ctx context.Context, filter bson.M, result interface{}, collection string, opts ...options.Lister[options.FindOneOptions]) (err error) {
|
func (m *mongoDB) FindOne(ctx context.Context, filter bson.M, result interface{}, collection string, opts ...options.Lister[options.FindOneOptions]) (err error) {
|
||||||
source, err := m.getDataSource()
|
source, err := m.getDataSource()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -292,7 +340,7 @@ func (m *MongoDB) FindOne(ctx context.Context, filter bson.M, result interface{}
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getDeletedData 获取要删除的数据
|
// getDeletedData 获取要删除的数据
|
||||||
func (m *MongoDB) getDeletedData(ctx context.Context, filter bson.M, collection string) (deletedIDs []bson.ObjectID, deletedData []bson.M, err error) {
|
func (m *mongoDB) getDeletedData(ctx context.Context, filter bson.M, collection string) (deletedIDs []bson.ObjectID, deletedData []bson.M, err error) {
|
||||||
// 查询要删除的数据
|
// 查询要删除的数据
|
||||||
_, err = m.Find(ctx, filter, &deletedData, collection, nil, nil)
|
_, err = m.Find(ctx, filter, &deletedData, collection, nil, nil)
|
||||||
// 从查询结果中获取 _id
|
// 从查询结果中获取 _id
|
||||||
@@ -302,7 +350,7 @@ func (m *MongoDB) getDeletedData(ctx context.Context, filter bson.M, collection
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MongoDB) CleanRedis(ctx context.Context, filter bson.M, tenantId interface{}, collection string) (err error) {
|
func (m *mongoDB) CleanRedis(ctx context.Context, filter bson.M, tenantId interface{}, collection string) (err error) {
|
||||||
listKeys := fmt.Sprintf(redis.CleanList, tenantId, collection)
|
listKeys := fmt.Sprintf(redis.CleanList, tenantId, collection)
|
||||||
keys, err := redis.RedisClient().Keys(ctx, listKeys)
|
keys, err := redis.RedisClient().Keys(ctx, listKeys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -336,7 +384,7 @@ func (m *MongoDB) CleanRedis(ctx context.Context, filter bson.M, tenantId interf
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MongoDB) log(ctx context.Context, ids []bson.ObjectID, filter bson.M, collection string, data interface{}, userName, tenantId interface{}, operationType consts.OperationType) {
|
func (m *mongoDB) log(ctx context.Context, ids []bson.ObjectID, filter bson.M, collection string, data interface{}, userName, tenantId interface{}, operationType consts.OperationType) {
|
||||||
// 提前获取 IP 地址,避免异步任务执行时请求已结束
|
// 提前获取 IP 地址,避免异步任务执行时请求已结束
|
||||||
var ipAddress string
|
var ipAddress string
|
||||||
if request := g.RequestFromCtx(ctx); request != nil {
|
if request := g.RequestFromCtx(ctx); request != nil {
|
||||||
@@ -375,7 +423,7 @@ func (m *MongoDB) log(ctx context.Context, ids []bson.ObjectID, filter bson.M, c
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Insert 插入多条记录
|
// Insert 插入多条记录
|
||||||
func (m *MongoDB) Insert(ctx context.Context, documents []interface{}, collection string, opts ...options.Lister[options.InsertManyOptions]) (ids []interface{}, err error) {
|
func (m *mongoDB) Insert(ctx context.Context, documents []interface{}, collection string, opts ...options.Lister[options.InsertManyOptions]) (ids []interface{}, err error) {
|
||||||
source, err := m.getDataSource()
|
source, err := m.getDataSource()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -426,7 +474,7 @@ func (m *MongoDB) Insert(ctx context.Context, documents []interface{}, collectio
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Delete 删除记录
|
// Delete 删除记录
|
||||||
func (m *MongoDB) Delete(ctx context.Context, filter bson.M, collection string, opts ...options.Lister[options.DeleteManyOptions]) (count int64, err error) {
|
func (m *mongoDB) Delete(ctx context.Context, filter bson.M, collection string, opts ...options.Lister[options.DeleteManyOptions]) (count int64, err error) {
|
||||||
source, err := m.getDataSource()
|
source, err := m.getDataSource()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@@ -461,13 +509,13 @@ func (m *MongoDB) Delete(ctx context.Context, filter bson.M, collection string,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DeleteSoft 假删除记录
|
// DeleteSoft 假删除记录
|
||||||
func (m *MongoDB) DeleteSoft(ctx context.Context, filter bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (modifiedCount int64, err error) {
|
func (m *mongoDB) DeleteSoft(ctx context.Context, filter bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (modifiedCount int64, err error) {
|
||||||
update := bson.M{"$set": bson.M{"isDeleted": true}}
|
update := bson.M{"$set": bson.M{"isDeleted": true}}
|
||||||
return m.Update(ctx, filter, update, collection, opts...)
|
return m.Update(ctx, filter, update, collection, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update 修改记录
|
// Update 修改记录
|
||||||
func (m *MongoDB) Update(ctx context.Context, filter bson.M, update bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (modifiedCount int64, err error) {
|
func (m *mongoDB) Update(ctx context.Context, filter bson.M, update bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (modifiedCount int64, err error) {
|
||||||
source, err := m.getDataSource()
|
source, err := m.getDataSource()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
@@ -526,7 +574,7 @@ func (m *MongoDB) Update(ctx context.Context, filter bson.M, update bson.M, coll
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SaveOrUpdate 批量增加或修改
|
// SaveOrUpdate 批量增加或修改
|
||||||
func (m *MongoDB) SaveOrUpdate(ctx context.Context, filter []bson.M, update []bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (result *mongo.BulkWriteResult, err error) {
|
func (m *mongoDB) SaveOrUpdate(ctx context.Context, filter []bson.M, update []bson.M, collection string, opts ...options.Lister[options.UpdateManyOptions]) (result *mongo.BulkWriteResult, err error) {
|
||||||
source, err := m.getDataSource()
|
source, err := m.getDataSource()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
Reference in New Issue
Block a user