170 lines
4.9 KiB
Go
170 lines
4.9 KiB
Go
package dao
|
||
|
||
import (
|
||
"assets/consts/public"
|
||
dto "assets/model/dto/sync"
|
||
entity "assets/model/entity/sync"
|
||
"context"
|
||
|
||
"gitea.com/red-future/common/db/mongo"
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/gogf/gf/v2/os/gtime"
|
||
"github.com/gogf/gf/v2/util/gconv"
|
||
"go.mongodb.org/mongo-driver/v2/bson"
|
||
)
|
||
|
||
var SyncTask = new(syncTask)
|
||
|
||
type syncTask struct{}
|
||
|
||
// Insert 插入同步任务
|
||
func (d *syncTask) Insert(ctx context.Context, task *entity.SyncTask) (err error) {
|
||
_, err = mongo.DB().Insert(ctx, []interface{}{task}, task.CollectionName())
|
||
return
|
||
}
|
||
|
||
// GetOne 获取单个同步任务
|
||
func (d *syncTask) GetOne(ctx context.Context, id *bson.ObjectID) (task *entity.SyncTask, err error) {
|
||
filter := bson.M{"_id": id}
|
||
|
||
task = &entity.SyncTask{}
|
||
err = mongo.DB().FindOne(ctx, filter, task, task.CollectionName())
|
||
return task, err
|
||
}
|
||
|
||
// List 获取同步任务列表
|
||
func (d *syncTask) List(ctx context.Context, req *dto.ListSyncTaskReq) (list []*entity.SyncTask, total int64, err error) {
|
||
// 构建查询过滤条件
|
||
filter := d.buildListFilter(req)
|
||
|
||
// 调用 common/db/mongo 的 Find 方法,不使用排序
|
||
total, err = mongo.DB().Find(ctx, filter, &list, "sync_task", &req.Page, nil)
|
||
return
|
||
}
|
||
|
||
// Update 更新同步任务
|
||
func (d *syncTask) Update(ctx context.Context, id string, updateData *entity.SyncTask) (err error) {
|
||
objectId, err := bson.ObjectIDFromHex(id)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
filter := bson.M{"_id": objectId}
|
||
|
||
if !g.IsEmpty(updateData) {
|
||
// 直接使用 struct 转 map,不需要额外的转换
|
||
update := bson.M{"$set": gconv.Map(updateData)}
|
||
_, err = mongo.DB().Update(ctx, filter, update, "sync_task")
|
||
}
|
||
return err
|
||
}
|
||
|
||
// UpdateStatus 更新同步任务状态
|
||
func (d *syncTask) UpdateStatus(ctx context.Context, id *bson.ObjectID, status public.SyncStatus, errorMessage string) (err error) {
|
||
filter := bson.M{"_id": id}
|
||
|
||
updateData := bson.M{
|
||
"status": status,
|
||
"errorMessage": errorMessage,
|
||
}
|
||
|
||
if status == public.SyncStatusSyncing {
|
||
updateData["startedAt"] = gtime.Now()
|
||
} else if status == public.SyncStatusSuccess || status == public.SyncStatusFailed {
|
||
updateData["finishedAt"] = gtime.Now()
|
||
}
|
||
|
||
update := bson.M{"$set": updateData}
|
||
_, err = mongo.DB().Update(ctx, filter, update, "sync_task")
|
||
return err
|
||
}
|
||
|
||
// UpdateErrorCount 更新错误计数
|
||
func (d *syncTask) UpdateErrorCount(ctx context.Context, id string, increment int) (err error) {
|
||
objectId, err := bson.ObjectIDFromHex(id)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
filter := bson.M{"_id": objectId}
|
||
update := bson.M{"$inc": bson.M{"errorCount": increment}}
|
||
_, err = mongo.DB().Update(ctx, filter, update, "sync_task")
|
||
return err
|
||
}
|
||
|
||
// Delete 删除同步任务
|
||
func (d *syncTask) Delete(ctx context.Context, id string) (err error) {
|
||
objectId, err := bson.ObjectIDFromHex(id)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
filter := bson.M{"_id": objectId}
|
||
_, err = mongo.DB().Delete(ctx, filter, "sync_task")
|
||
return err
|
||
}
|
||
|
||
// GetPendingTasks 获取待处理的同步任务
|
||
func (d *syncTask) GetPendingTasks(ctx context.Context, limit int) (tasks []*entity.SyncTask, err error) {
|
||
filter := bson.M{"status": public.SyncStatusPending}
|
||
|
||
// 调用 common/db/mongo 的 Find 方法,不使用排序
|
||
_, err = mongo.DB().Find(ctx, filter, &tasks, "sync_task", nil, nil)
|
||
return tasks, err
|
||
}
|
||
|
||
// buildListFilter 构建列表查询的过滤条件
|
||
func (d *syncTask) buildListFilter(req *dto.ListSyncTaskReq) bson.M {
|
||
filter := bson.M{}
|
||
|
||
if !g.IsEmpty(req.Platform) {
|
||
filter["platform"] = req.Platform
|
||
}
|
||
if !g.IsEmpty(req.Status) {
|
||
filter["status"] = req.Status
|
||
}
|
||
if req.StartTime != nil {
|
||
filter["createdAt"] = bson.M{"$gte": req.StartTime}
|
||
}
|
||
if req.EndTime != nil {
|
||
if existingFilter, exists := filter["createdAt"]; exists {
|
||
if existingTimeRange, ok := existingFilter.(bson.M); ok {
|
||
existingTimeRange["$lte"] = req.EndTime
|
||
filter["createdAt"] = existingTimeRange
|
||
}
|
||
} else {
|
||
filter["createdAt"] = bson.M{"$lte": req.EndTime}
|
||
}
|
||
}
|
||
|
||
return filter
|
||
}
|
||
|
||
// SyncConfigDao 同步配置DAO
|
||
var SyncConfig = new(syncConfig)
|
||
|
||
type syncConfig struct{}
|
||
|
||
// GetByPlatform 根据平台获取同步配置
|
||
func (d *syncConfig) GetByPlatform(ctx context.Context, platform public.SyncPlatform) (config *entity.ChannelConfig, err error) {
|
||
filter := bson.M{"platform": platform}
|
||
config = &entity.ChannelConfig{}
|
||
err = mongo.DB().FindOne(ctx, filter, config, "sync_config")
|
||
return config, err
|
||
}
|
||
|
||
// List 获取同步配置列表
|
||
func (d *syncConfig) List(ctx context.Context) (configs []*entity.ChannelConfig, err error) {
|
||
_, err = mongo.DB().Find(ctx, bson.M{}, &configs, "sync_config", nil, nil)
|
||
return configs, err
|
||
}
|
||
|
||
// Update 更新同步配置
|
||
func (d *syncConfig) Update(ctx context.Context, platform public.SyncPlatform, updateData *entity.ChannelConfig) (err error) {
|
||
filter := bson.M{"platform": platform}
|
||
|
||
if !g.IsEmpty(updateData) {
|
||
// 直接使用 struct 转 map
|
||
update := bson.M{"$set": gconv.Map(updateData)}
|
||
_, err = mongo.DB().Update(ctx, filter, update, "sync_config")
|
||
}
|
||
return err
|
||
}
|