package sync import ( "context" "time" dao "dataengine/dao/dict" dto "dataengine/model/dto/dict" "gitea.redpowerfuture.com/red-future/common/beans" "github.com/gogf/gf/v2/frame/g" "github.com/sirupsen/logrus" ) // StartAutoSync 启动自动同步(独立 goroutine,每次完成后等待 interval 再执行下一次) func StartAutoSync(ctx context.Context) { interval := GetSyncInterval(ctx) logrus.Infof("自动同步调度器启动,间隔: %d 分钟(完成一次后开始计时)", interval) for { runAutoSync(ctx) logrus.Infof("自动同步完成,等待 %d 分钟后执行下一次", interval) time.Sleep(time.Duration(interval) * time.Minute) } } func runAutoSync(ctx context.Context) { logrus.Info("=== 开始自动同步 ===") // 注入用户上下文(ORM 框架需要用于租户隔离) ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1}) // 查询所有 ACTIVE 平台 platforms, _, err := dao.DatasourcePlatform.List(ctx, &dto.ListDatasourcePlatformReq{ Status: "ACTIVE", }) if err != nil { logrus.Errorf("查询平台列表失败: %v", err) return } for _, p := range platforms { // 查询该平台下有 table_definition 的接口 interfaces, _, err := dao.ApiInterface.List(ctx, &dto.ListApiInterfaceReq{ PlatformId: p.ID, Status: "active", }) if err != nil { logrus.Errorf("查询接口列表失败 [platform=%s]: %v", p.PlatformCode, err) continue } for _, iface := range interfaces { if iface.TableDefinition == nil || len(iface.TableDefinition) == 0 { continue } logrus.Infof("自动同步: %s / %s", p.PlatformCode, iface.Code) // isFullSync=false 表示去查 sync_tracker: // 有记录 → 增量,无记录 → lastSyncTime=0 → 全量 _, err := SyncByConfig(ctx, p.PlatformCode, iface.Code, false) if err != nil { logrus.Errorf("自动同步失败 [%s/%s]: %v", p.PlatformCode, iface.Code, err) } } } logrus.Info("=== 自动同步完成 ===") } // InitAndStartAutoSync 在 main 中调用:初始化配置后启动自动同步和补偿 func InitAndStartAutoSync(ctx context.Context) { // 读取配置中的同步开关 enabled := g.Cfg().MustGet(ctx, "sync.auto_sync_enabled", false).Bool() if enabled { go StartAutoSync(ctx) } else { logrus.Info("自动同步已关闭") } // 补偿调度器独立启动,不受 auto_sync_enabled 控制 go StartCompensation(ctx) }