From 556ec5075ece4b4492514c91c22c0da4088fad5b Mon Sep 17 00:00:00 2001 From: lmk <1095689763@qq.com> Date: Thu, 9 Apr 2026 09:48:22 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=81=BF=E6=95=B0=E6=8D=AE=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- model/dto/copydata/sync_task_log_dto.go | 2 + scheduler/run_sync_task_log_task.go | 82 ++++++++++++++++++++++++- sync/sync_service.go | 22 +++++-- 3 files changed, 99 insertions(+), 7 deletions(-) diff --git a/model/dto/copydata/sync_task_log_dto.go b/model/dto/copydata/sync_task_log_dto.go index e7e57c2..dac0cfa 100644 --- a/model/dto/copydata/sync_task_log_dto.go +++ b/model/dto/copydata/sync_task_log_dto.go @@ -48,6 +48,8 @@ type SyncTaskLogItem struct { ErrorMessage string `json:"errorMessage,omitempty"` ErrorCode string `json:"errorCode,omitempty"` ResultSummary interface{} `json:"resultSummary,omitempty"` + PageInfo interface{} `json:"pageInfo,omitempty"` + RequestParams interface{} `json:"requestParams,omitempty"` NextRetryTime interface{} `json:"nextRetryTime,omitempty"` CompletedAt interface{} `json:"completedAt,omitempty"` DurationMs int64 `json:"durationMs"` diff --git a/scheduler/run_sync_task_log_task.go b/scheduler/run_sync_task_log_task.go index 04a8aaf..6c4d067 100644 --- a/scheduler/run_sync_task_log_task.go +++ b/scheduler/run_sync_task_log_task.go @@ -15,16 +15,19 @@ import ( "github.com/sirupsen/logrus" ) +// CompensationScheduler 补偿调度器,负责扫描和补偿失败的分页同步任务 type CompensationScheduler struct { syncService *sync.SyncService } +// NewCompensationScheduler 创建补偿调度器实例 func NewCompensationScheduler() *CompensationScheduler { return &CompensationScheduler{ syncService: sync.NewSyncService(), } } +// RunCompensationOnce 执行一次补偿任务(用于手动触发或定时任务调用) func (s *CompensationScheduler) RunCompensationOnce() { ctx := gctx.New() ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin"}) @@ -34,6 +37,7 @@ func (s *CompensationScheduler) RunCompensationOnce() { logrus.Info("=== 补偿任务执行完毕 ===") } +// processCompensation 处理补偿逻辑:扫描失败的分页任务并逐个补偿 func (s *CompensationScheduler) processCompensation(ctx context.Context) { logrus.Info(">>> 开始扫描需要补偿的失败分页任务...") @@ -99,6 +103,8 @@ func (s *CompensationScheduler) processCompensation(ctx context.Context) { len(failedPageTasks), successCount, failCount) } +// compensatePageTask 补偿单个分页任务:重新请求API并插入数据 +// 返回 true 表示补偿成功,false 表示补偿失败 func (s *CompensationScheduler) compensatePageTask(ctx context.Context, pageTask *taskDto.SyncTaskLogItem) bool { retryCount := pageTask.RetryCount + 1 @@ -107,7 +113,10 @@ func (s *CompensationScheduler) compensatePageTask(ctx context.Context, pageTask Status: "retrying", RetryCount: &retryCount, } - dao.SyncTaskLog.Update(ctx, updateReq) + if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil { + logrus.Errorf("更新任务状态为 retrying 失败:%v", err) + return false + } startTime := s.parseTime(pageTask.StartTime) endTime := s.parseTime(pageTask.EndTime) @@ -119,6 +128,8 @@ func (s *CompensationScheduler) compensatePageTask(ctx context.Context, pageTask return false } + pageSize := s.extractPageSize(pageTask) + req := &sync.AccountReportRequest{ AdvertiserID: pageTask.AdvertiserID, StartTime: startTime.UnixMilli(), @@ -128,7 +139,7 @@ func (s *CompensationScheduler) compensatePageTask(ctx context.Context, pageTask QueryVersion: 1, PageInfo: &sync.PageInfo{ CurrentPage: pageNumber, - PageSize: 100, + PageSize: pageSize, }, } @@ -136,7 +147,7 @@ func (s *CompensationScheduler) compensatePageTask(ctx context.Context, pageTask parentTaskID := s.extractParentTaskID(pageTask.TaskID) pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, pageNumber) - result, err := s.syncService.SyncSinglePageWithTask(ctx, req, true, maxRetries, pageTaskID, pageNumber) + result, _, err := s.syncService.SyncSinglePageWithTask(ctx, req, true, maxRetries, pageTaskID, pageNumber) if err != nil { logrus.Errorf("补偿分页任务 %s 失败(第 %d 次):%v", pageTask.TaskID, retryCount, err) @@ -148,6 +159,7 @@ func (s *CompensationScheduler) compensatePageTask(ctx context.Context, pageTask return true } +// markPageTaskFailed 标记分页任务为失败状态,并设置下次重试时间(指数退避策略) func (s *CompensationScheduler) markPageTaskFailed(ctx context.Context, taskID int64, retryCount int, errMsg, errCode string) { backoffMinutes := s.calculateBackoff(retryCount) nextRetry := time.Now().Add(time.Duration(backoffMinutes) * time.Minute) @@ -162,6 +174,7 @@ func (s *CompensationScheduler) markPageTaskFailed(ctx context.Context, taskID i dao.SyncTaskLog.Update(ctx, updateReq) } +// checkAndUpdateParentTaskStatus 检查主任务的所有分页任务状态,如果全部成功则更新主任务状态 func (s *CompensationScheduler) checkAndUpdateParentTaskStatus(ctx context.Context, parentTaskID string) { logrus.Infof(">>> 检查主任务 %s 的所有分页任务状态...", parentTaskID) @@ -226,6 +239,8 @@ func (s *CompensationScheduler) checkAndUpdateParentTaskStatus(ctx context.Conte } } +// extractParentTaskID 从分页任务ID中提取主任务ID +// 例如:从 "12345_1234567890_account_page_2" 提取 "12345_1234567890_account" func (s *CompensationScheduler) extractParentTaskID(taskID string) string { if idx := strings.LastIndex(taskID, "_page_"); idx > 0 { return taskID[:idx] @@ -233,6 +248,8 @@ func (s *CompensationScheduler) extractParentTaskID(taskID string) string { return "" } +// extractPageNumber 从分页任务ID中提取页码 +// 例如:从 "12345_1234567890_account_page_2" 提取 2 func (s *CompensationScheduler) extractPageNumber(taskID string) int { if idx := strings.LastIndex(taskID, "_page_"); idx > 0 { var pageNum int @@ -242,6 +259,63 @@ func (s *CompensationScheduler) extractPageNumber(taskID string) int { return 0 } +// extractPageSize 从任务日志的 PageInfo 或 RequestParams 字段中提取每页大小 +// 优先级:PageInfo.page_size > RequestParams.page_size > 默认值10 +func (s *CompensationScheduler) extractPageSize(pageTask *taskDto.SyncTaskLogItem) int { + // 在 extractPageSize 方法开头添加 + logrus.Infof("DEBUG - PageInfo 类型: %T, 值: %+v", pageTask.PageInfo, pageTask.PageInfo) + logrus.Infof("DEBUG - RequestParams 类型: %T, 值: %+v", pageTask.RequestParams, pageTask.RequestParams) + + // 优先从 PageInfo 中提取 + if pageTask.PageInfo != nil { + switch v := pageTask.PageInfo.(type) { + case map[string]interface{}: + // 尝试 float64 类型(JSON 数字默认类型) + if pageSize, ok := v["page_size"].(float64); ok { + return int(pageSize) + } + // 尝试 string 类型 + if pageSizeStr, ok := v["page_size"].(string); ok { + var pageSize int + fmt.Sscanf(pageSizeStr, "%d", &pageSize) + if pageSize > 0 { + return pageSize + } + } + case map[string]string: + if pageSizeStr, ok := v["page_size"]; ok { + var pageSize int + fmt.Sscanf(pageSizeStr, "%d", &pageSize) + if pageSize > 0 { + return pageSize + } + } + } + } + + // 其次从 RequestParams 中提取 + if pageTask.RequestParams != nil { + switch v := pageTask.RequestParams.(type) { + case map[string]interface{}: + if pageSize, ok := v["page_size"].(float64); ok { + return int(pageSize) + } + if pageSizeStr, ok := v["page_size"].(string); ok { + var pageSize int + fmt.Sscanf(pageSizeStr, "%d", &pageSize) + if pageSize > 0 { + return pageSize + } + } + } + } + + // 默认值改为 10 + return 10 +} + +// calculateBackoff 根据重试次数计算退避时间(分钟) +// 重试次数:1->5分钟, 2->15分钟, 3->30分钟, 4->60分钟, 5+->120分钟 func (s *CompensationScheduler) calculateBackoff(retryCount int) int { backoffs := []int{5, 15, 30, 60, 120} if retryCount <= len(backoffs) { @@ -250,6 +324,7 @@ func (s *CompensationScheduler) calculateBackoff(retryCount int) int { return backoffs[len(backoffs)-1] } +// parseTime 解析时间字段,支持 time.Time 和字符串格式 func (s *CompensationScheduler) parseTime(t interface{}) time.Time { switch v := t.(type) { case time.Time: @@ -262,6 +337,7 @@ func (s *CompensationScheduler) parseTime(t interface{}) time.Time { return time.Now() } +// sendAlert 发送告警通知(当前仅记录错误日志) func (s *CompensationScheduler) sendAlert(task *taskDto.SyncTaskLogItem) { logrus.Errorf("【告警】分页任务 %s 需要人工介入:广告主=%d, 错误=%s", task.TaskID, task.AdvertiserID, task.ErrorMessage) diff --git a/sync/sync_service.go b/sync/sync_service.go index c850ec2..8765b36 100644 --- a/sync/sync_service.go +++ b/sync/sync_service.go @@ -59,8 +59,19 @@ func (s *SyncService) SyncAccountReport(ctx context.Context, req *AccountReportR var responseData *AccountReportResponse if useMock { - logrus.Info("使用 Mock 数据同步快手广告账户报表") + logrus.Infof("使用 Mock 数据同步快手广告账户报表 - page=%d, pageSize=%d", + req.PageInfo.CurrentPage, req.PageInfo.PageSize) responseData = s.mockGen.GenerateAccountReportResponseWithPage(req.PageInfo.CurrentPage, req.PageInfo.PageSize) + + logrus.Infof("✓ Mock 数据生成完成 - TotalCount=%d, Detail数组长度=%d", + responseData.Data.TotalCount, len(responseData.Data.Detail)) + + if len(responseData.Data.Detail) > 0 { + firstItem := responseData.Data.Detail[0] + lastItem := responseData.Data.Detail[len(responseData.Data.Detail)-1] + logrus.Infof(" 第一条: campaignId=%v, creativeId=%v", firstItem.CampaignId, firstItem.CreativeId) + logrus.Infof(" 最后一条: campaignId=%v, creativeId=%v", lastItem.CampaignId, lastItem.CreativeId) + } } else { logrus.Info("从真实 API 同步快手广告账户报表") respBytes, err := s.httpClient.Post(ctx, "/rest/openapi/gw/esp/report/accountReport", req) @@ -96,6 +107,9 @@ func (s *SyncService) SyncAccountReport(ctx context.Context, req *AccountReportR if len(responseData.Data.Detail) > 0 { detailItems := s.converter.ConvertToDetailItems(responseData.Data.Detail, "account_report", req.PageInfo.CurrentPage) + + logrus.Infof("▶ 准备插入 %d 条明细数据(page=%d)", len(detailItems), req.PageInfo.CurrentPage) + detailResult, saveErr := s.saveDetailData(ctx, detailItems) if saveErr != nil { logrus.Errorf("保存明细数据失败:%v", saveErr) @@ -105,7 +119,7 @@ func (s *SyncService) SyncAccountReport(ctx context.Context, req *AccountReportR result.DetailCount = len(detailItems) result.DetailSuccessCount = detailResult.SuccessCount result.DetailFailCount = detailResult.FailCount - logrus.Infof("成功保存 %d 条明细数据(成功=%d, 失败=%d)", len(detailItems), detailResult.SuccessCount, detailResult.FailCount) + logrus.Infof("✓ 成功保存 %d 条明细数据(成功=%d, 失败=%d)", len(detailItems), detailResult.SuccessCount, detailResult.FailCount) } } @@ -202,7 +216,7 @@ func (s *SyncService) SyncAccountReportWithPagination(ctx context.Context, req * RetryCount: 0, } - result, pageLogID, err := s.syncSinglePageWithTask(ctx, req, useMock, maxRetries, pageTaskID, currentPage) + result, pageLogID, err := s.SyncSinglePageWithTask(ctx, req, useMock, maxRetries, pageTaskID, currentPage) pageDuration := time.Since(pageStartTime).Milliseconds() pageResult.DurationMs = pageDuration pageResult.PageTaskLogID = pageLogID @@ -311,7 +325,7 @@ func (s *SyncService) SyncAccountReportWithPagination(ctx context.Context, req * return aggregatedResult, aggregatedResult.Error } -func (s *SyncService) syncSinglePageWithTask(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int) (*SyncResult, int64, error) { +func (s *SyncService) SyncSinglePageWithTask(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int) (*SyncResult, int64, error) { pageStartTime := time.Now() pageLogReq := &taskDto.CreateSyncTaskLogReq{