package main import ( "context" "fmt" "time" dao "dataengine/dao/copydata" taskDto "dataengine/model/dto/copydata" syncSvc "dataengine/service/sync" "gitea.redpowerfuture.com/red-future/common/beans" _ "github.com/gogf/gf/contrib/drivers/pgsql/v2" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gctx" "github.com/sirupsen/logrus" ) // CompensationScheduler 通用补偿调度器 type CompensationScheduler struct { interval time.Duration } // NewCompensationScheduler 创建调度器 func NewCompensationScheduler() *CompensationScheduler { ctx := gctx.New() sec := g.Cfg().MustGet(ctx, "sync.compensation_interval_seconds", 300).Int() if sec < 10 { sec = 300 } return &CompensationScheduler{interval: time.Duration(sec) * time.Second} } // RunOnce 执行一次补偿 func (s *CompensationScheduler) RunOnce() { ctx := gctx.New() ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1}) logrus.Info("=== 开始补偿扫描 ===") // 查询所有 failed 状态的任务,不限类型 tasks, err := dao.SyncTaskLog.QueryFailedTasks(ctx, &taskDto.QueryFailedTasksReq{ Status: []string{"failed"}, Limit: 50, }) if err != nil { logrus.Errorf("查询失败任务异常: %v", err) return } if len(tasks) == 0 { logrus.Info("当前没有需要补偿的任务") return } logrus.Infof("发现 %d 个失败任务", len(tasks)) for _, task := range tasks { if task.RetryCount >= task.MaxRetry { logrus.Warnf("任务 %s 已达最大重试次数 %d", task.TaskID, task.MaxRetry) dao.SyncTaskLog.Update(ctx, &taskDto.UpdateSyncTaskLogReq{ ID: task.Id, Status: "manual_review", ErrorMessage: fmt.Sprintf("已达最大重试次数 %d", task.MaxRetry), }) continue } platformCode := task.PlatformCode interfaceCode := task.InterfaceCode if platformCode == "" || interfaceCode == "" { logrus.Warnf("任务 %s 缺少 platform_code 或 interface_code,跳过", task.TaskID) continue } logrus.Infof("补偿: %s/%s (第 %d 次)", platformCode, interfaceCode, task.RetryCount+1) // 更新状态为 retrying retryCount := task.RetryCount + 1 dao.SyncTaskLog.Update(ctx, &taskDto.UpdateSyncTaskLogReq{ ID: task.Id, Status: "retrying", RetryCount: &retryCount, }) // 执行补偿(增量同步) _, err := syncSvc.SyncByConfig(ctx, platformCode, interfaceCode, false) if err != nil { logrus.Errorf("补偿失败: %v", err) // 更新状态为 failed,设置下次重试时间 backoff := []int{5, 15, 30, 60, 120} waitMin := 5 if retryCount <= len(backoff) { waitMin = backoff[retryCount-1] } else { waitMin = backoff[len(backoff)-1] } nextRetry := time.Now().Add(time.Duration(waitMin) * time.Minute) dao.SyncTaskLog.Update(ctx, &taskDto.UpdateSyncTaskLogReq{ ID: task.Id, Status: "failed", ErrorMessage: err.Error(), ErrorCode: "COMPENSATION_FAILED", NextRetryTime: nextRetry, }) } else { logrus.Infof("补偿成功: %s/%s", platformCode, interfaceCode) now := time.Now() dao.SyncTaskLog.Update(ctx, &taskDto.UpdateSyncTaskLogReq{ ID: task.Id, Status: "success", CompletedAt: now, }) } } logrus.Info("=== 补偿扫描完成 ===") } // Start 启动 func (s *CompensationScheduler) Start() { logrus.Infof("补偿调度器启动,间隔: %v", s.interval) s.RunOnce() ticker := time.NewTicker(s.interval) defer ticker.Stop() for range ticker.C { s.RunOnce() } } func main() { NewCompensationScheduler().Start() }