From 073a098317bde8deace7eb53490d1b8ee3f644c3 Mon Sep 17 00:00:00 2001 From: lmk <1095689763@qq.com> Date: Wed, 8 Apr 2026 14:30:09 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=81=BF=E9=80=BB=E8=BE=91=E4=BF=AE?= =?UTF-8?q?=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dao/copydata/sync_task_log_dao.go | 35 ++- model/dto/copydata/sync_task_log_dto.go | 2 +- model/entity/copydata/sync_task_log.go | 34 +-- scheduler/run_sync_task_log_task.go | 309 ++++++++---------------- 4 files changed, 151 insertions(+), 229 deletions(-) diff --git a/dao/copydata/sync_task_log_dao.go b/dao/copydata/sync_task_log_dao.go index 5613454..da05ca0 100644 --- a/dao/copydata/sync_task_log_dao.go +++ b/dao/copydata/sync_task_log_dao.go @@ -10,14 +10,21 @@ import ( "gitea.com/red-future/common/db/gfdb" "github.com/gogf/gf/v2/database/gdb" "github.com/gogf/gf/v2/util/gconv" + "github.com/sirupsen/logrus" ) var SyncTaskLog = new(SyncTaskLogDao) type SyncTaskLogDao struct{} -// Create 创建任务日志 +// Create 创建任务日志(如果task_id已存在则返回现有ID) func (d *SyncTaskLogDao) Create(ctx context.Context, req *dto.CreateSyncTaskLogReq) (int64, error) { + existingTask, err := d.GetByTaskID(ctx, req.TaskID, req.TaskType) + if err == nil && existingTask != nil { + logrus.Debugf("任务日志已存在,task_id=%s, task_type=%s, id=%d", req.TaskID, req.TaskType, existingTask.Id) + return existingTask.Id, nil + } + var entityData entity.SyncTaskLog if err := gconv.Struct(req, &entityData); err != nil { return 0, err @@ -124,3 +131,29 @@ func (d *SyncTaskLogDao) GetByTaskID(ctx context.Context, taskID, taskType strin } return result, nil } + +// QueryAllPageTasksByParentID 根据主任务ID查询所有分页任务 +func (d *SyncTaskLogDao) QueryAllPageTasksByParentID(ctx context.Context, parentTaskID string, limit int) ([]*dto.SyncTaskLogItem, error) { + if limit <= 0 { + limit = 1000 + } + + model := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).Model + model = model.Where("task_type", "account_report_page") + model = model.WhereLike("task_id", parentTaskID+"_page_%") + model = model.Limit(limit) + + var results []*entity.SyncTaskLog + if err := model.Scan(&results); err != nil { + return nil, err + } + + items := make([]*dto.SyncTaskLogItem, len(results)) + for i, r := range results { + item := &dto.SyncTaskLogItem{} + gconv.Struct(r, item) + items[i] = item + } + + return items, nil +} diff --git a/model/dto/copydata/sync_task_log_dto.go b/model/dto/copydata/sync_task_log_dto.go index 02bd878..e7e57c2 100644 --- a/model/dto/copydata/sync_task_log_dto.go +++ b/model/dto/copydata/sync_task_log_dto.go @@ -36,7 +36,7 @@ type QueryFailedTasksReq struct { // SyncTaskLogItem 同步任务日志项 type SyncTaskLogItem struct { - ID int64 `json:"id"` + Id int64 `json:"id"` TaskID string `json:"taskId"` TaskType string `json:"taskType"` AdvertiserID int64 `json:"advertiserId"` diff --git a/model/entity/copydata/sync_task_log.go b/model/entity/copydata/sync_task_log.go index 4ea952d..7c9d8a5 100644 --- a/model/entity/copydata/sync_task_log.go +++ b/model/entity/copydata/sync_task_log.go @@ -5,23 +5,23 @@ import "gitea.com/red-future/common/beans" // SyncTaskLog 同步任务日志实体 type SyncTaskLog struct { beans.SQLBaseDO `orm:",inherit"` - - TaskID string `orm:"task_id" json:"taskId" description:"任务唯一标识"` - TaskType string `orm:"task_type" json:"taskType" description:"任务类型"` - AdvertiserID int64 `orm:"advertiser_id" json:"advertiserId" description:"广告主ID"` - StartTime interface{} `orm:"start_time" json:"startTime" description:"数据开始时间"` - EndTime interface{} `orm:"end_time" json:"endTime" description:"数据结束时间"` - Status string `orm:"status" json:"status" description:"任务状态"` - RetryCount int `orm:"retry_count" json:"retryCount" description:"已重试次数"` - MaxRetry int `orm:"max_retry" json:"maxRetry" description:"最大重试次数"` - PageInfo interface{} `orm:"page_info" json:"pageInfo" description:"分页信息"` - RequestParams interface{} `orm:"request_params" json:"requestParams" description:"请求参数快照"` - ErrorMessage string `orm:"error_message" json:"errorMessage" description:"错误信息"` - ErrorCode string `orm:"error_code" json:"errorCode" description:"错误码"` - ResultSummary interface{} `orm:"result_summary" json:"resultSummary" description:"结果摘要"` - NextRetryTime interface{} `orm:"next_retry_time" json:"nextRetryTime" description:"下次重试时间"` - CompletedAt interface{} `orm:"completed_at" json:"completedAt" description:"完成时间"` - DurationMs int64 `orm:"duration_ms" json:"durationMs" description:"执行耗时毫秒"` + Id int64 `orm:"Id" json:"Id" description:"主键id"` + TaskID string `orm:"task_id" json:"taskId" description:"任务唯一标识"` + TaskType string `orm:"task_type" json:"taskType" description:"任务类型"` + AdvertiserID int64 `orm:"advertiser_id" json:"advertiserId" description:"广告主ID"` + StartTime interface{} `orm:"start_time" json:"startTime" description:"数据开始时间"` + EndTime interface{} `orm:"end_time" json:"endTime" description:"数据结束时间"` + Status string `orm:"status" json:"status" description:"任务状态"` + RetryCount int `orm:"retry_count" json:"retryCount" description:"已重试次数"` + MaxRetry int `orm:"max_retry" json:"maxRetry" description:"最大重试次数"` + PageInfo interface{} `orm:"page_info" json:"pageInfo" description:"分页信息"` + RequestParams interface{} `orm:"request_params" json:"requestParams" description:"请求参数快照"` + ErrorMessage string `orm:"error_message" json:"errorMessage" description:"错误信息"` + ErrorCode string `orm:"error_code" json:"errorCode" description:"错误码"` + ResultSummary interface{} `orm:"result_summary" json:"resultSummary" description:"结果摘要"` + NextRetryTime interface{} `orm:"next_retry_time" json:"nextRetryTime" description:"下次重试时间"` + CompletedAt interface{} `orm:"completed_at" json:"completedAt" description:"完成时间"` + DurationMs int64 `orm:"duration_ms" json:"durationMs" description:"执行耗时毫秒"` } // TableName 返回表名 diff --git a/scheduler/run_sync_task_log_task.go b/scheduler/run_sync_task_log_task.go index ffb8a0e..04a8aaf 100644 --- a/scheduler/run_sync_task_log_task.go +++ b/scheduler/run_sync_task_log_task.go @@ -5,7 +5,6 @@ import ( taskDto "cid/model/dto/copydata" "cid/sync" "context" - "encoding/json" "fmt" "strings" "time" @@ -36,149 +35,92 @@ func (s *CompensationScheduler) RunCompensationOnce() { } func (s *CompensationScheduler) processCompensation(ctx context.Context) { - logrus.Info(">>> 开始检查需要同步补偿的任务...") + logrus.Info(">>> 开始扫描需要补偿的失败分页任务...") queryReq := &taskDto.QueryFailedTasksReq{ - Status: []string{"failed", "retrying", "partial_failed"}, - MaxRetries: nil, - Limit: 50, + Status: []string{"failed"}, + TaskType: "account_report_page", + Limit: 50, } - failedTasks, err := dao.SyncTaskLog.QueryFailedTasks(ctx, queryReq) + failedPageTasks, err := dao.SyncTaskLog.QueryFailedTasks(ctx, queryReq) if err != nil { - logrus.Errorf("查询失败任务异常:%v", err) + logrus.Errorf("查询失败的分页任务异常:%v", err) return } - if len(failedTasks) == 0 { - logrus.Info("✓ 没有需要补偿的任务") + if len(failedPageTasks) == 0 { + logrus.Info("✓ 当前没有需要补偿的失败分页任务") return } - logrus.Infof("发现 %d 个需要补偿的任务", len(failedTasks)) + logrus.Infof("发现 %d 个需要补偿的失败分页任务,开始逐个处理...", len(failedPageTasks)) successCount := 0 failCount := 0 - partialCount := 0 - for _, task := range failedTasks { - if task.RetryCount >= task.MaxRetry { - logrus.Warnf("任务 %s 已达到最大重试次数 %d,标记为需人工处理", task.TaskID, task.MaxRetry) + for _, pageTask := range failedPageTasks { + if pageTask.RetryCount >= pageTask.MaxRetry { + logrus.Warnf("⚠ 分页任务 %s 已达到最大重试次数 %d,标记为需人工处理", pageTask.TaskID, pageTask.MaxRetry) updateReq := &taskDto.UpdateSyncTaskLogReq{ - ID: task.ID, + ID: pageTask.Id, Status: "manual_review", - ErrorMessage: fmt.Sprintf("已达到最大重试次数 %d 次", task.MaxRetry), + ErrorMessage: fmt.Sprintf("已达到最大重试次数 %d 次", pageTask.MaxRetry), ErrorCode: "MAX_RETRY_EXCEEDED", } dao.SyncTaskLog.Update(ctx, updateReq) - s.sendAlert(task) + s.sendAlert(pageTask) failCount++ continue } - logrus.Infof(">>> 开始补偿任务:%s (类型=%s, 第 %d/%d 次重试)", - task.TaskID, task.TaskType, task.RetryCount+1, task.MaxRetry) + logrus.Infof("▶ 开始补偿分页任务:%s (广告主=%d, 第 %d/%d 次重试)", + pageTask.TaskID, pageTask.AdvertiserID, pageTask.RetryCount+1, pageTask.MaxRetry) - if s.compensateTask(ctx, task) { + if s.compensatePageTask(ctx, pageTask) { successCount++ + logrus.Infof("✓ 分页任务 %s 补偿成功", pageTask.TaskID) + + parentTaskID := s.extractParentTaskID(pageTask.TaskID) + if parentTaskID != "" { + s.checkAndUpdateParentTaskStatus(ctx, parentTaskID) + } } else { failCount++ + logrus.Warnf("✗ 分页任务 %s 补偿失败", pageTask.TaskID) } time.Sleep(1 * time.Second) } - logrus.Infof("✓ 补偿任务完成:成功=%d, 部分成功=%d, 失败=%d", - successCount, partialCount, failCount) + logrus.Infof("=== 补偿任务执行完成:总计=%d, 成功=%d, 失败=%d ===", + len(failedPageTasks), successCount, failCount) } -func (s *CompensationScheduler) compensateTask(ctx context.Context, task *taskDto.SyncTaskLogItem) bool { - retryCount := task.RetryCount + 1 +func (s *CompensationScheduler) compensatePageTask(ctx context.Context, pageTask *taskDto.SyncTaskLogItem) bool { + retryCount := pageTask.RetryCount + 1 updateReq := &taskDto.UpdateSyncTaskLogReq{ - ID: task.ID, + ID: pageTask.Id, Status: "retrying", RetryCount: &retryCount, } dao.SyncTaskLog.Update(ctx, updateReq) - startTime := s.parseTime(task.StartTime) - endTime := s.parseTime(task.EndTime) + startTime := s.parseTime(pageTask.StartTime) + endTime := s.parseTime(pageTask.EndTime) - logrus.Infof(">>> 开始补偿任务: %s (advertiser=%d, time=[%s, %s], 第 %d/%d 次重试)", - task.TaskID, task.AdvertiserID, - startTime.Format("2006-01-02"), endTime.Format("2006-01-02"), - retryCount, task.MaxRetry) - - if task.TaskType == "account_report_page" { - return s.compensatePageTask(ctx, task, retryCount) - } - - if task.TaskType == "account_report" && task.Status == "partial_failed" { - return s.compensatePartialFailedTask(ctx, task, startTime, endTime, retryCount) - } - - return s.compensateMainTask(ctx, task, startTime, endTime, retryCount) -} - -func (s *CompensationScheduler) compensatePartialFailedTask(ctx context.Context, task *taskDto.SyncTaskLogItem, startTime, endTime time.Time, retryCount int) bool { - logrus.Infof(">>> 检测到部分失败任务 %s,开始智能补偿(只重试失败的页)", task.TaskID) - - failedPages := s.extractFailedPages(task) - if len(failedPages) == 0 { - logrus.Warnf("任务 %s 标记为部分失败,但未找到失败的页信息,将重新同步所有页", task.TaskID) - return s.compensateMainTask(ctx, task, startTime, endTime, retryCount) - } - - logrus.Infof("任务 %s 共有 %d 个失败的页需要补偿: %v", task.TaskID, len(failedPages), failedPages) - - allSuccess := true - compensatedPages := 0 - - for _, pageNumber := range failedPages { - logrus.Infof(">>> 开始补偿第 %d 页...", pageNumber) - - pageSuccess := s.compensateSinglePage(ctx, task, startTime, endTime, pageNumber, retryCount) - if pageSuccess { - compensatedPages++ - } else { - allSuccess = false - } - - time.Sleep(500 * time.Millisecond) - } - - if allSuccess { - logrus.Infof("✓ 部分失败任务 %s 补偿成功 - 共补偿 %d 个页", task.TaskID, compensatedPages) - - updateReq := &taskDto.UpdateSyncTaskLogReq{ - ID: task.ID, - Status: "success", - } - dao.SyncTaskLog.Update(ctx, updateReq) - return true - } else { - logrus.Warnf("⚠ 部分失败任务 %s 补偿后仍有失败的页 - 成功补偿 %d/%d 个页", - task.TaskID, compensatedPages, len(failedPages)) - - backoffMinutes := s.calculateBackoff(retryCount) - nextRetry := time.Now().Add(time.Duration(backoffMinutes) * time.Minute) - - updateReq := &taskDto.UpdateSyncTaskLogReq{ - ID: task.ID, - Status: "partial_failed", - NextRetryTime: nextRetry, - } - dao.SyncTaskLog.Update(ctx, updateReq) + pageNumber := s.extractPageNumber(pageTask.TaskID) + if pageNumber == 0 { + logrus.Errorf("无法从任务ID %s 解析页码", pageTask.TaskID) + s.markPageTaskFailed(ctx, pageTask.Id, retryCount, "无法解析页码", "PARSE_PAGE_NUMBER_FAILED") return false } -} -func (s *CompensationScheduler) compensateSinglePage(ctx context.Context, task *taskDto.SyncTaskLogItem, startTime, endTime time.Time, pageNumber int, retryCount int) bool { req := &sync.AccountReportRequest{ - AdvertiserID: task.AdvertiserID, + AdvertiserID: pageTask.AdvertiserID, StartTime: startTime.UnixMilli(), EndTime: endTime.UnixMilli(), SelectColumns: []string{"impression", "click", "cost", "t0GMV"}, @@ -191,146 +133,97 @@ func (s *CompensationScheduler) compensateSinglePage(ctx context.Context, task * } maxRetries := 3 - pageTaskID := fmt.Sprintf("%s_page_%d", task.TaskID, pageNumber) + parentTaskID := s.extractParentTaskID(pageTask.TaskID) + pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, pageNumber) + result, err := s.syncService.SyncSinglePageWithTask(ctx, req, true, maxRetries, pageTaskID, pageNumber) if err != nil { - logrus.Errorf("补偿第 %d 页失败:%v", pageNumber, err) + logrus.Errorf("补偿分页任务 %s 失败(第 %d 次):%v", pageTask.TaskID, retryCount, err) + s.markPageTaskFailed(ctx, pageTask.Id, retryCount, err.Error(), "PAGE_COMPENSATION_FAILED") return false } - logrus.Infof("✓ 补偿第 %d 页成功 - 记录数=%d", pageNumber, result.DetailCount) + logrus.Infof("✓ 补偿分页任务 %s 成功 - 记录数=%d", pageTask.TaskID, result.DetailCount) return true } -func (s *CompensationScheduler) extractFailedPages(task *taskDto.SyncTaskLogItem) []int { - if task.ResultSummary == nil { - return nil +func (s *CompensationScheduler) markPageTaskFailed(ctx context.Context, taskID int64, retryCount int, errMsg, errCode string) { + backoffMinutes := s.calculateBackoff(retryCount) + nextRetry := time.Now().Add(time.Duration(backoffMinutes) * time.Minute) + + updateReq := &taskDto.UpdateSyncTaskLogReq{ + ID: taskID, + Status: "failed", + ErrorMessage: errMsg, + ErrorCode: errCode, + NextRetryTime: nextRetry, + } + dao.SyncTaskLog.Update(ctx, updateReq) +} + +func (s *CompensationScheduler) checkAndUpdateParentTaskStatus(ctx context.Context, parentTaskID string) { + logrus.Infof(">>> 检查主任务 %s 的所有分页任务状态...", parentTaskID) + + parentTask, err := dao.SyncTaskLog.GetByTaskID(ctx, parentTaskID, "account_report") + if err != nil || parentTask == nil { + logrus.Warnf("未找到主任务 %s,跳过状态更新", parentTaskID) + return } - summaryMap, ok := task.ResultSummary.(map[string]interface{}) - if !ok { - return nil + if parentTask.Status == "success" { + logrus.Infof("主任务 %s 已经是成功状态,无需更新", parentTaskID) + return } - pageResultsRaw, exists := summaryMap["page_results"] - if !exists { - return nil - } - - pageResultsJSON, err := json.Marshal(pageResultsRaw) + allPageTasks, err := dao.SyncTaskLog.QueryAllPageTasksByParentID(ctx, parentTaskID, 1000) if err != nil { - logrus.Errorf("序列化 page_results 失败:%v", err) - return nil + logrus.Errorf("查询主任务 %s 的分页任务失败:%v", parentTaskID, err) + return } - var pageResults []map[string]interface{} - if err := json.Unmarshal(pageResultsJSON, &pageResults); err != nil { - logrus.Errorf("反序列化 page_results 失败:%v", err) - return nil + if len(allPageTasks) == 0 { + logrus.Warnf("主任务 %s 没有找到任何分页任务", parentTaskID) + return } failedPages := make([]int, 0) - for _, pageResult := range pageResults { - success, _ := pageResult["success"].(bool) - pageNumberFloat, _ := pageResult["page_number"].(float64) - pageNumber := int(pageNumberFloat) + successPages := make([]int, 0) - if !success && pageNumber > 0 { + for _, pageTask := range allPageTasks { + pageNumber := s.extractPageNumber(pageTask.TaskID) + if pageTask.Status == "success" { + successPages = append(successPages, pageNumber) + } else if pageTask.Status == "failed" || pageTask.Status == "manual_review" { failedPages = append(failedPages, pageNumber) } } - return failedPages -} + logrus.Infof("主任务 %s 分页状态:总数=%d, 成功=%d, 失败=%d", + parentTaskID, len(allPageTasks), len(successPages), len(failedPages)) -func (s *CompensationScheduler) compensateMainTask(ctx context.Context, task *taskDto.SyncTaskLogItem, startTime, endTime time.Time, retryCount int) bool { - req := &sync.AccountReportRequest{ - AdvertiserID: task.AdvertiserID, - StartTime: startTime.UnixMilli(), - EndTime: endTime.UnixMilli(), - SelectColumns: []string{"impression", "click", "cost", "t0GMV"}, - GroupType: 1, - QueryVersion: 1, - } + if len(failedPages) == 0 { + logrus.Infof("✓ 主任务 %s 的所有分页任务都已成功,更新主任务状态为 success", parentTaskID) - maxRetries := 3 - result, err := s.syncService.SyncAccountReportWithPagination(ctx, req, true, maxRetries) - - if err != nil { - logrus.Errorf("补偿主任务 %s 失败(第 %d 次):%v", task.TaskID, retryCount, err) - - backoffMinutes := s.calculateBackoff(retryCount) - nextRetry := time.Now().Add(time.Duration(backoffMinutes) * time.Minute) + summary := map[string]interface{}{ + "total_pages": len(allPageTasks), + "success_pages": len(successPages), + "failed_pages": 0, + "compensated": true, + } updateReq := &taskDto.UpdateSyncTaskLogReq{ - ID: task.ID, - Status: "failed", - ErrorMessage: err.Error(), - ErrorCode: "COMPENSATION_FAILED", - NextRetryTime: nextRetry, + ID: parentTask.Id, + Status: "success", + ResultSummary: summary, } - dao.SyncTaskLog.Update(ctx, updateReq) - - return false - } - - logrus.Infof("✓ 补偿主任务 %s 成功 - 汇总ID=%d, 明细成功=%d, 失败=%d, 页数=%d", - task.TaskID, result.SumID, result.DetailSuccessCount, result.DetailFailCount, len(result.PageResults)) - return true -} - -func (s *CompensationScheduler) compensatePageTask(ctx context.Context, task *taskDto.SyncTaskLogItem, retryCount int) bool { - logrus.Infof(">>> 补偿分页任务: %s (重试第 %d 次)", task.TaskID, retryCount) - - parentTaskID := s.extractParentTaskID(task.TaskID) - pageNumber := s.extractPageNumber(task.TaskID) - - if parentTaskID == "" || pageNumber == 0 { - logrus.Errorf("无法解析分页任务ID: %s", task.TaskID) - return false - } - - startTime := s.parseTime(task.StartTime) - endTime := s.parseTime(task.EndTime) - - req := &sync.AccountReportRequest{ - AdvertiserID: task.AdvertiserID, - StartTime: startTime.UnixMilli(), - EndTime: endTime.UnixMilli(), - SelectColumns: []string{"impression", "click", "cost", "t0GMV"}, - GroupType: 1, - QueryVersion: 1, - PageInfo: &sync.PageInfo{ - CurrentPage: pageNumber, - PageSize: 100, - }, - } - - maxRetries := 3 - pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, pageNumber) - result, err := s.syncService.SyncSinglePageWithTask(ctx, req, true, maxRetries, pageTaskID, pageNumber) - - if err != nil { - logrus.Errorf("补偿分页任务 %s 失败(第 %d 次):%v", task.TaskID, retryCount, err) - - backoffMinutes := s.calculateBackoff(retryCount) - nextRetry := time.Now().Add(time.Duration(backoffMinutes) * time.Minute) - - updateReq := &taskDto.UpdateSyncTaskLogReq{ - ID: task.ID, - Status: "failed", - ErrorMessage: err.Error(), - ErrorCode: "PAGE_COMPENSATION_FAILED", - NextRetryTime: nextRetry, + if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil { + logrus.Errorf("更新主任务 %s 状态失败:%v", parentTaskID, err) } - dao.SyncTaskLog.Update(ctx, updateReq) - - return false + } else { + logrus.Infof("⚠ 主任务 %s 仍有 %d 个失败的分页任务:%v,保持部分失败状态", + parentTaskID, len(failedPages), failedPages) } - - logrus.Infof("✓ 补偿分页任务 %s 成功 - 记录数=%d", task.TaskID, result.DetailCount) - return true } func (s *CompensationScheduler) extractParentTaskID(taskID string) string { @@ -370,12 +263,8 @@ func (s *CompensationScheduler) parseTime(t interface{}) time.Time { } func (s *CompensationScheduler) sendAlert(task *taskDto.SyncTaskLogItem) { - logrus.Errorf("【告警】任务 %s 需要人工介入:广告主=%d, 类型=%s, 错误=%s", - task.TaskID, task.AdvertiserID, task.TaskType, task.ErrorMessage) - - // TODO: 集成钉钉/企业微信/邮件告警 - // s.sendDingTalkAlert(task) - // s.sendEmailAlert(task) + logrus.Errorf("【告警】分页任务 %s 需要人工介入:广告主=%d, 错误=%s", + task.TaskID, task.AdvertiserID, task.ErrorMessage) } func main() {