Files
data-engine/service/sync/sync_scheduler.go
2026-06-11 13:06:54 +08:00

83 lines
2.4 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sync
import (
"context"
"time"
dao "dataengine/dao/dict"
dto "dataengine/model/dto/dict"
"gitea.redpowerfuture.com/red-future/common/beans"
"github.com/gogf/gf/v2/frame/g"
"github.com/sirupsen/logrus"
)
// StartAutoSync 启动自动同步(独立 goroutine每次完成后等待 interval 再执行下一次)
func StartAutoSync(ctx context.Context) {
interval := GetSyncInterval(ctx)
logrus.Infof("自动同步调度器启动,间隔: %d 分钟(完成一次后开始计时)", interval)
for {
runAutoSync(ctx)
logrus.Infof("自动同步完成,等待 %d 分钟后执行下一次", interval)
time.Sleep(time.Duration(interval) * time.Minute)
}
}
func runAutoSync(ctx context.Context) {
logrus.Info("=== 开始自动同步 ===")
// 注入用户上下文ORM 框架需要用于租户隔离)
ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1})
// 查询所有 ACTIVE 平台
platforms, _, err := dao.DatasourcePlatform.List(ctx, &dto.ListDatasourcePlatformReq{
Status: "ACTIVE",
})
if err != nil {
logrus.Errorf("查询平台列表失败: %v", err)
return
}
for _, p := range platforms {
// 查询该平台下有 table_definition 的接口
interfaces, _, err := dao.ApiInterface.List(ctx, &dto.ListApiInterfaceReq{
PlatformId: p.ID,
Status: "active",
})
if err != nil {
logrus.Errorf("查询接口列表失败 [platform=%s]: %v", p.PlatformCode, err)
continue
}
for _, iface := range interfaces {
if iface.TableDefinition == nil || len(iface.TableDefinition) == 0 {
continue
}
logrus.Infof("自动同步: %s / %s", p.PlatformCode, iface.Code)
// isFullSync=false 表示去查 sync_tracker
// 有记录 → 增量,无记录 → lastSyncTime=0 → 全量
_, err := SyncByConfig(ctx, p.PlatformCode, iface.Code, false)
if err != nil {
logrus.Errorf("自动同步失败 [%s/%s]: %v", p.PlatformCode, iface.Code, err)
}
}
}
logrus.Info("=== 自动同步完成 ===")
}
// InitAndStartAutoSync 在 main 中调用:初始化配置后启动自动同步和补偿
func InitAndStartAutoSync(ctx context.Context) {
// 读取配置中的同步开关
enabled := g.Cfg().MustGet(ctx, "sync.auto_sync_enabled", false).Bool()
if enabled {
go StartAutoSync(ctx)
} else {
logrus.Info("自动同步已关闭")
}
// 补偿调度器独立启动,不受 auto_sync_enabled 控制
go StartCompensation(ctx)
}