补偿机制任务协程处理

This commit is contained in:
2026-04-09 15:16:43 +08:00
parent eb5e0af308
commit ee713c5c0f

View File

@@ -7,12 +7,15 @@ import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"gitea.com/red-future/common/beans"
_ "github.com/gogf/gf/contrib/drivers/pgsql/v2"
"github.com/gogf/gf/v2/os/gctx"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)
// CompensationScheduler 补偿调度器,负责扫描和补偿失败的分页同步任务
@@ -58,49 +61,71 @@ func (s *CompensationScheduler) processCompensation(ctx context.Context) {
return
}
logrus.Infof("发现 %d 个需要补偿的失败分页任务,开始逐个处理...", len(failedPageTasks))
logrus.Infof("发现 %d 个需要补偿的失败分页任务,开始并发处理...", len(failedPageTasks))
successCount := 0
failCount := 0
maxConcurrency := 5
var successCount int64
var failCount int64
var manualReviewCount int64
sem := semaphore.NewWeighted(int64(maxConcurrency))
var wg sync.WaitGroup
for _, pageTask := range failedPageTasks {
if pageTask.RetryCount >= pageTask.MaxRetry {
logrus.Warnf("⚠ 分页任务 %s 已达到最大重试次数 %d标记为需人工处理", pageTask.TaskID, pageTask.MaxRetry)
wg.Add(1)
updateReq := &taskDto.UpdateSyncTaskLogReq{
ID: pageTask.Id,
Status: "manual_review",
ErrorMessage: fmt.Sprintf("已达到最大重试次数 %d 次", pageTask.MaxRetry),
ErrorCode: "MAX_RETRY_EXCEEDED",
go func(task *taskDto.SyncTaskLogItem) {
defer wg.Done()
if err := sem.Acquire(ctx, 1); err != nil {
logrus.Errorf("获取信号量失败:%v", err)
atomic.AddInt64(&failCount, 1)
return
}
dao.SyncTaskLog.Update(ctx, updateReq)
defer sem.Release(1)
s.sendAlert(pageTask)
failCount++
continue
}
if task.RetryCount >= task.MaxRetry {
logrus.Warnf("⚠ 分页任务 %s 已达到最大重试次数 %d标记为需人工处理", task.TaskID, task.MaxRetry)
logrus.Infof("▶ 开始补偿分页任务:%s (广告主=%d, 第 %d/%d 次重试)",
pageTask.TaskID, pageTask.AdvertiserID, pageTask.RetryCount+1, pageTask.MaxRetry)
updateReq := &taskDto.UpdateSyncTaskLogReq{
ID: task.Id,
Status: "manual_review",
ErrorMessage: fmt.Sprintf("已达到最大重试次数 %d 次", task.MaxRetry),
ErrorCode: "MAX_RETRY_EXCEEDED",
}
dao.SyncTaskLog.Update(ctx, updateReq)
if s.compensatePageTask(ctx, pageTask) {
successCount++
logrus.Infof("✓ 分页任务 %s 补偿成功", pageTask.TaskID)
parentTaskID := s.extractParentTaskID(pageTask.TaskID)
if parentTaskID != "" {
s.checkAndUpdateParentTaskStatus(ctx, parentTaskID)
s.sendAlert(task)
atomic.AddInt64(&manualReviewCount, 1)
return
}
} else {
failCount++
logrus.Warnf("✗ 分页任务 %s 补偿失败", pageTask.TaskID)
}
time.Sleep(1 * time.Second)
logrus.Infof("▶ 开始补偿分页任务:%s (广告主=%d, 第 %d/%d 次重试)",
task.TaskID, task.AdvertiserID, task.RetryCount+1, task.MaxRetry)
if s.compensatePageTask(ctx, task) {
atomic.AddInt64(&successCount, 1)
logrus.Infof("✓ 分页任务 %s 补偿成功", task.TaskID)
parentTaskID := s.extractParentTaskID(task.TaskID)
if parentTaskID != "" {
s.checkAndUpdateParentTaskStatus(ctx, parentTaskID)
}
} else {
atomic.AddInt64(&failCount, 1)
logrus.Warnf("✗ 分页任务 %s 补偿失败", task.TaskID)
}
}(pageTask)
}
logrus.Infof("=== 补偿任务执行完成:总计=%d, 成功=%d, 失败=%d ===",
len(failedPageTasks), successCount, failCount)
wg.Wait()
finalSuccess := atomic.LoadInt64(&successCount)
finalFail := atomic.LoadInt64(&failCount)
finalManualReview := atomic.LoadInt64(&manualReviewCount)
logrus.Infof("=== 补偿任务执行完成:总计=%d, 成功=%d, 失败=%d, 需人工处理=%d ===",
len(failedPageTasks), finalSuccess, finalFail, finalManualReview)
}
// compensatePageTask 补偿单个分页任务重新请求API并插入数据
@@ -262,7 +287,6 @@ func (s *CompensationScheduler) extractPageNumber(taskID string) int {
// extractPageSize 从任务日志的 PageInfo 或 RequestParams 字段中提取每页大小
// 优先级PageInfo.page_size > RequestParams.page_size > 默认值10
func (s *CompensationScheduler) extractPageSize(pageTask *taskDto.SyncTaskLogItem) int {
// 在 extractPageSize 方法开头添加
logrus.Infof("DEBUG - PageInfo 类型: %T, 值: %+v", pageTask.PageInfo, pageTask.PageInfo)
logrus.Infof("DEBUG - RequestParams 类型: %T, 值: %+v", pageTask.RequestParams, pageTask.RequestParams)