From ee713c5c0f1b5e520dbd679a4b268bbcb675e2fd Mon Sep 17 00:00:00 2001 From: lmk <1095689763@qq.com> Date: Thu, 9 Apr 2026 15:16:43 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=81=BF=E6=9C=BA=E5=88=B6=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E5=8D=8F=E7=A8=8B=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scheduler/run_sync_task_log_task.go | 88 ++++++++++++++++++----------- 1 file changed, 56 insertions(+), 32 deletions(-) diff --git a/scheduler/run_sync_task_log_task.go b/scheduler/run_sync_task_log_task.go index 1906638..8585808 100644 --- a/scheduler/run_sync_task_log_task.go +++ b/scheduler/run_sync_task_log_task.go @@ -7,12 +7,15 @@ import ( "context" "fmt" "strings" + "sync" + "sync/atomic" "time" "gitea.com/red-future/common/beans" _ "github.com/gogf/gf/contrib/drivers/pgsql/v2" "github.com/gogf/gf/v2/os/gctx" "github.com/sirupsen/logrus" + "golang.org/x/sync/semaphore" ) // CompensationScheduler 补偿调度器,负责扫描和补偿失败的分页同步任务 @@ -58,49 +61,71 @@ func (s *CompensationScheduler) processCompensation(ctx context.Context) { return } - logrus.Infof("发现 %d 个需要补偿的失败分页任务,开始逐个处理...", len(failedPageTasks)) + logrus.Infof("发现 %d 个需要补偿的失败分页任务,开始并发处理...", len(failedPageTasks)) - successCount := 0 - failCount := 0 + maxConcurrency := 5 + var successCount int64 + var failCount int64 + var manualReviewCount int64 + + sem := semaphore.NewWeighted(int64(maxConcurrency)) + var wg sync.WaitGroup for _, pageTask := range failedPageTasks { - if pageTask.RetryCount >= pageTask.MaxRetry { - logrus.Warnf("⚠ 分页任务 %s 已达到最大重试次数 %d,标记为需人工处理", pageTask.TaskID, pageTask.MaxRetry) + wg.Add(1) - updateReq := &taskDto.UpdateSyncTaskLogReq{ - ID: pageTask.Id, - Status: "manual_review", - ErrorMessage: fmt.Sprintf("已达到最大重试次数 %d 次", pageTask.MaxRetry), - ErrorCode: "MAX_RETRY_EXCEEDED", + go func(task *taskDto.SyncTaskLogItem) { + defer wg.Done() + + if err := sem.Acquire(ctx, 1); err != nil { + logrus.Errorf("获取信号量失败:%v", err) + atomic.AddInt64(&failCount, 1) + return } - dao.SyncTaskLog.Update(ctx, updateReq) + defer sem.Release(1) - s.sendAlert(pageTask) - failCount++ - continue - } + if task.RetryCount >= task.MaxRetry { + logrus.Warnf("⚠ 分页任务 %s 已达到最大重试次数 %d,标记为需人工处理", task.TaskID, task.MaxRetry) - logrus.Infof("▶ 开始补偿分页任务:%s (广告主=%d, 第 %d/%d 次重试)", - pageTask.TaskID, pageTask.AdvertiserID, pageTask.RetryCount+1, pageTask.MaxRetry) + updateReq := &taskDto.UpdateSyncTaskLogReq{ + ID: task.Id, + Status: "manual_review", + ErrorMessage: fmt.Sprintf("已达到最大重试次数 %d 次", task.MaxRetry), + ErrorCode: "MAX_RETRY_EXCEEDED", + } + dao.SyncTaskLog.Update(ctx, updateReq) - if s.compensatePageTask(ctx, pageTask) { - successCount++ - logrus.Infof("✓ 分页任务 %s 补偿成功", pageTask.TaskID) - - parentTaskID := s.extractParentTaskID(pageTask.TaskID) - if parentTaskID != "" { - s.checkAndUpdateParentTaskStatus(ctx, parentTaskID) + s.sendAlert(task) + atomic.AddInt64(&manualReviewCount, 1) + return } - } else { - failCount++ - logrus.Warnf("✗ 分页任务 %s 补偿失败", pageTask.TaskID) - } - time.Sleep(1 * time.Second) + logrus.Infof("▶ 开始补偿分页任务:%s (广告主=%d, 第 %d/%d 次重试)", + task.TaskID, task.AdvertiserID, task.RetryCount+1, task.MaxRetry) + + if s.compensatePageTask(ctx, task) { + atomic.AddInt64(&successCount, 1) + logrus.Infof("✓ 分页任务 %s 补偿成功", task.TaskID) + + parentTaskID := s.extractParentTaskID(task.TaskID) + if parentTaskID != "" { + s.checkAndUpdateParentTaskStatus(ctx, parentTaskID) + } + } else { + atomic.AddInt64(&failCount, 1) + logrus.Warnf("✗ 分页任务 %s 补偿失败", task.TaskID) + } + }(pageTask) } - logrus.Infof("=== 补偿任务执行完成:总计=%d, 成功=%d, 失败=%d ===", - len(failedPageTasks), successCount, failCount) + wg.Wait() + + finalSuccess := atomic.LoadInt64(&successCount) + finalFail := atomic.LoadInt64(&failCount) + finalManualReview := atomic.LoadInt64(&manualReviewCount) + + logrus.Infof("=== 补偿任务执行完成:总计=%d, 成功=%d, 失败=%d, 需人工处理=%d ===", + len(failedPageTasks), finalSuccess, finalFail, finalManualReview) } // compensatePageTask 补偿单个分页任务:重新请求API并插入数据 @@ -262,7 +287,6 @@ func (s *CompensationScheduler) extractPageNumber(taskID string) int { // extractPageSize 从任务日志的 PageInfo 或 RequestParams 字段中提取每页大小 // 优先级:PageInfo.page_size > RequestParams.page_size > 默认值10 func (s *CompensationScheduler) extractPageSize(pageTask *taskDto.SyncTaskLogItem) int { - // 在 extractPageSize 方法开头添加 logrus.Infof("DEBUG - PageInfo 类型: %T, 值: %+v", pageTask.PageInfo, pageTask.PageInfo) logrus.Infof("DEBUG - RequestParams 类型: %T, 值: %+v", pageTask.RequestParams, pageTask.RequestParams)