package sync import ( "context" "fmt" "time" dao "dataengine/dao/copydata" taskDto "dataengine/model/dto/copydata" "gitea.redpowerfuture.com/red-future/common/beans" "github.com/gogf/gf/v2/frame/g" "github.com/sirupsen/logrus" ) // StartCompensation 启动补偿调度器(在后台循环执行) func StartCompensation(ctx context.Context) { sec := g.Cfg().MustGet(ctx, "sync.compensation_interval_seconds", 300).Int() if sec < 10 { sec = 300 } interval := time.Duration(sec) * time.Second logrus.Infof("补偿调度器启动,间隔: %v", interval) ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin", TenantId: 1}) runCompensation(ctx) ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ticker.C: runCompensation(ctx) case <-ctx.Done(): logrus.Info("补偿调度器已停止") return } } } func runCompensation(ctx context.Context) { logrus.Info("=== 开始补偿扫描 ===") tasks, err := dao.SyncTaskLog.QueryFailedTasks(ctx, &taskDto.QueryFailedTasksReq{ Status: []string{"failed"}, Limit: 50, }) if err != nil { logrus.Errorf("查询失败任务异常: %v", err) return } if len(tasks) == 0 { logrus.Info("当前没有需要补偿的任务") return } logrus.Infof("发现 %d 个失败任务", len(tasks)) for _, task := range tasks { if task.RetryCount >= task.MaxRetry { logrus.Warnf("任务 %s 已达最大重试次数 %d", task.TaskID, task.MaxRetry) dao.SyncTaskLog.Update(ctx, &taskDto.UpdateSyncTaskLogReq{ ID: task.Id, Status: "manual_review", ErrorMessage: fmt.Sprintf("已达最大重试次数 %d", task.MaxRetry), }) continue } platformCode := task.PlatformCode interfaceCode := task.InterfaceCode if platformCode == "" || interfaceCode == "" { logrus.Warnf("任务 %s 缺少 platform_code 或 interface_code,跳过", task.TaskID) continue } logrus.Infof("补偿: %s/%s (第 %d 次)", platformCode, interfaceCode, task.RetryCount+1) retryCount := task.RetryCount + 1 dao.SyncTaskLog.Update(ctx, &taskDto.UpdateSyncTaskLogReq{ ID: task.Id, Status: "retrying", RetryCount: &retryCount, }) _, err := SyncByConfig(ctx, platformCode, interfaceCode, false) if err != nil { logrus.Errorf("补偿失败: %v", err) backoff := []int{5, 15, 30, 60, 120} waitMin := 5 if retryCount <= len(backoff) { waitMin = backoff[retryCount-1] } else { waitMin = backoff[len(backoff)-1] } nextRetry := time.Now().Add(time.Duration(waitMin) * time.Minute) dao.SyncTaskLog.Update(ctx, &taskDto.UpdateSyncTaskLogReq{ ID: task.Id, Status: "failed", ErrorMessage: err.Error(), ErrorCode: "COMPENSATION_FAILED", NextRetryTime: nextRetry, }) } else { logrus.Infof("补偿成功: %s/%s", platformCode, interfaceCode) now := time.Now() dao.SyncTaskLog.Update(ctx, &taskDto.UpdateSyncTaskLogReq{ ID: task.Id, Status: "success", CompletedAt: now, }) } } logrus.Info("=== 补偿扫描完成 ===") }