diff --git a/scheduler/run_account_report_task.go b/scheduler/run_account_report_task.go index 94dcddd..8351a1b 100644 --- a/scheduler/run_account_report_task.go +++ b/scheduler/run_account_report_task.go @@ -30,12 +30,24 @@ func main() { SelectColumns: []string{"impression", "click", "cost", "t0GMV"}, GroupType: 1, QueryVersion: 1, + PageInfo: &sync.PageInfo{ + CurrentPage: 1, + PageSize: 10, + }, } - logrus.Infof("=== 开始执行定时同步任务 ===") - logrus.Infof("时间区间:%s ~ %s", lastHourStart.Format("2006-01-02 15:04:05"), lastHourEnd.Format("2006-01-02 15:04:05")) + config := sync.ConcurrentSyncConfig{ + MaxConcurrency: 5, + UseMock: true, + MaxRetries: 3, + } - result, err := syncService.SyncAccountReportWithPagination(ctx, req, true, 3) + logrus.Infof("=== 开始执行定时同步任务(并发模式)===") + logrus.Infof("时间区间:%s ~ %s", lastHourStart.Format("2006-01-02 15:04:05"), lastHourEnd.Format("2006-01-02 15:04:05")) + logrus.Infof("分页配置:每页大小=%d", req.PageInfo.PageSize) + logrus.Infof("并发配置:最大并发数=%d, 最大重试次数=%d", config.MaxConcurrency, config.MaxRetries) + + result, err := syncService.SyncAccountReportConcurrent(ctx, req, config) if err != nil { logrus.Errorf("定时同步任务执行完成,存在失败的页面") logrus.Infof("主任务日志ID:%d", result.TaskLogID) diff --git a/sync/mock_generator.go b/sync/mock_generator.go index 0236f5f..88718b6 100644 --- a/sync/mock_generator.go +++ b/sync/mock_generator.go @@ -28,14 +28,20 @@ func (m *MockDataGenerator) GenerateAccountReportRequest() *AccountReportRequest }, PageInfo: &PageInfo{ CurrentPage: 1, - PageSize: 20, + PageSize: 10, }, } } func (m *MockDataGenerator) GenerateAccountReportResponse() *AccountReportResponse { + return m.GenerateAccountReportResponseWithPage(1, 10) +} + +func (m *MockDataGenerator) GenerateAccountReportResponseWithPage(page int, pageSize int) *AccountReportResponse { sumData := m.generateSumData() - detailData := m.generateDetailData(5) + + detailData := m.generateDetailDataByPage(page, pageSize) + totalCount := 23 return &AccountReportResponse{ Code: 0, @@ -43,11 +49,58 @@ func (m *MockDataGenerator) GenerateAccountReportResponse() *AccountReportRespon Data: &AccountReportData{ Sum: sumData, Detail: detailData, - TotalCount: len(detailData), + TotalCount: totalCount, }, } } +func (m *MockDataGenerator) generateDetailDataByPage(page int, pageSize int) []*AccountReportItem { + totalCount := 23 + + if page < 1 || pageSize <= 0 { + return []*AccountReportItem{} + } + + startIndex := (page - 1) * pageSize + + if startIndex >= totalCount { + return []*AccountReportItem{} + } + + endIndex := startIndex + pageSize + if endIndex > totalCount { + endIndex = totalCount + } + + actualCount := endIndex - startIndex + items := make([]*AccountReportItem, actualCount) + + for i := 0; i < actualCount; i++ { + itemIndex := startIndex + i + 1 + + campaignId := int64(itemIndex) + unitId := int64(itemIndex * 10) + creativeId := int64(itemIndex * 100) + + campaignName := "测试计划_" + string(rune('A'+itemIndex-1)) + unitName := "测试单元_" + string(rune('A'+itemIndex-1)) + creativeName := "测试创意_" + string(rune('A'+itemIndex-1)) + + item := m.generateSumData() + + item.CampaignId = &campaignId + item.UnitId = &unitId + item.CreativeId = &creativeId + item.CampaignName = campaignName + item.UnitName = unitName + item.CreativeName = creativeName + + items[i] = (*AccountReportItem)(item) + } + + return items +} + func (m *MockDataGenerator) generateSumData() *AccountReportSum { cost := m.randomFloat(1000, 10000) impression := m.randomInt64(10000, 100000) diff --git a/sync/sync_service.go b/sync/sync_service.go index e44d54e..2ec580d 100644 --- a/sync/sync_service.go +++ b/sync/sync_service.go @@ -8,9 +8,13 @@ import ( "context" "encoding/json" "fmt" + "sync" + "sync/atomic" "time" "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" ) type SyncService struct { @@ -56,7 +60,7 @@ func (s *SyncService) SyncAccountReport(ctx context.Context, req *AccountReportR if useMock { logrus.Info("使用 Mock 数据同步快手广告账户报表") - responseData = s.mockGen.GenerateAccountReportResponse() + responseData = s.mockGen.GenerateAccountReportResponseWithPage(req.PageInfo.CurrentPage, req.PageInfo.PageSize) } else { logrus.Info("从真实 API 同步快手广告账户报表") respBytes, err := s.httpClient.Post(ctx, "/rest/openapi/gw/esp/report/accountReport", req) @@ -167,14 +171,19 @@ func (s *SyncService) SyncAccountReportWithPagination(ctx context.Context, req * totalCount := 0 currentPage := 1 - pageSize := 100 - successPages := 0 - failedPages := 0 + pageSize := 10 if req.PageInfo == nil { req.PageInfo = &PageInfo{} } + if req.PageInfo.PageSize > 0 { + pageSize = req.PageInfo.PageSize + } + + successPages := 0 + failedPages := 0 + var totalPages int for { @@ -380,7 +389,7 @@ func (s *SyncService) syncSinglePageWithTask(ctx context.Context, req *AccountRe func (s *SyncService) fetchCurrentData(req *AccountReportRequest, useMock bool) *AccountReportData { if useMock { - responseData := s.mockGen.GenerateAccountReportResponse() + responseData := s.mockGen.GenerateAccountReportResponseWithPage(req.PageInfo.CurrentPage, req.PageInfo.PageSize) if responseData != nil && responseData.Data != nil { return responseData.Data } @@ -434,3 +443,249 @@ func (s *SyncService) SyncWithRetry(ctx context.Context, req *AccountReportReque return lastResult, lastErr } + +type ConcurrentSyncConfig struct { + MaxConcurrency int + UseMock bool + MaxRetries int +} + +func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *AccountReportRequest, config ConcurrentSyncConfig) (*SyncResult, error) { + startTime := time.Now() + parentTaskID := fmt.Sprintf("%d_%d_account", req.AdvertiserID, req.StartTime) + + logReq := &taskDto.CreateSyncTaskLogReq{ + TaskID: parentTaskID, + TaskType: "account_report", + AdvertiserID: req.AdvertiserID, + StartTime: time.UnixMilli(req.StartTime), + EndTime: time.UnixMilli(req.EndTime), + Status: "pending", + MaxRetry: config.MaxRetries, + RequestParams: req, + } + + parentLogID, err := dao.SyncTaskLog.Create(ctx, logReq) + if err != nil { + logrus.Errorf("创建主任务日志失败:%v", err) + } + + updateParentLog := func(status, errMsg, errorCode string, summary interface{}) { + if parentLogID == 0 { + return + } + duration := time.Since(startTime).Milliseconds() + updateReq := &taskDto.UpdateSyncTaskLogReq{ + ID: parentLogID, + Status: status, + ErrorMessage: errMsg, + ErrorCode: errorCode, + DurationMs: &duration, + } + + if status == "success" || status == "manual_review" { + completedAt := time.Now() + updateReq.CompletedAt = completedAt + } + + if summary != nil { + updateReq.ResultSummary = summary + } + + if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil { + logrus.Errorf("更新主任务日志失败:%v", err) + } + } + + updateParentLog("running", "", "", nil) + + if req.PageInfo == nil { + req.PageInfo = &PageInfo{} + } + + pageSize := 10 + if req.PageInfo.PageSize > 0 { + pageSize = req.PageInfo.PageSize + } + + firstPageReq := *req + firstPageReq.PageInfo.CurrentPage = 1 + firstPageReq.PageInfo.PageSize = pageSize + + currentData := s.fetchCurrentData(&firstPageReq, config.UseMock) + if currentData == nil || currentData.TotalCount == 0 { + logrus.Warn("未获取到总记录数,降级为串行同步") + return s.SyncAccountReportWithPagination(ctx, req, config.UseMock, config.MaxRetries) + } + + totalPages := (currentData.TotalCount + pageSize - 1) / pageSize + logrus.Infof("开始并发同步 - 总记录数:%d, 总页数:%d, 每页大小:%d, 并发数:%d", + currentData.TotalCount, totalPages, pageSize, config.MaxConcurrency) + + updateParentLog("running", "", "", map[string]interface{}{ + "total_pages": totalPages, + "total_records": currentData.TotalCount, + "page_size": pageSize, + "concurrency": config.MaxConcurrency, + }) + + pageResults := make([]*PageSyncResult, totalPages) + var pageResultsMu sync.Mutex + + var sumSuccess bool + var sumID int64 + var sumMu sync.Mutex + + var totalDetailCount int32 + var successPages int64 + var failedPages int64 + + sem := semaphore.NewWeighted(int64(config.MaxConcurrency)) + eg, egCtx := errgroup.WithContext(ctx) + + for pageNum := 1; pageNum <= totalPages; pageNum++ { + if err := sem.Acquire(egCtx, 1); err != nil { + logrus.Errorf("获取信号量失败:%v", err) + break + } + + currentPage := pageNum + eg.Go(func() error { + defer sem.Release(1) + + select { + case <-egCtx.Done(): + return egCtx.Err() + default: + } + + pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, currentPage) + pageStartTime := time.Now() + + pageReq := *req + pageReq.PageInfo = &PageInfo{ + CurrentPage: currentPage, + PageSize: pageSize, + } + + result, err := s.syncSinglePageWithTask(egCtx, &pageReq, config.UseMock, config.MaxRetries, pageTaskID, currentPage) + pageDuration := time.Since(pageStartTime).Milliseconds() + + pageResult := &PageSyncResult{ + PageNumber: currentPage, + Success: false, + RecordCount: 0, + DurationMs: pageDuration, + ErrorMessage: "", + RetryCount: 0, + PageTaskLogID: 0, + } + + if err != nil { + logrus.Errorf("第 %d 页同步失败:%v", currentPage, err) + pageResult.ErrorMessage = err.Error() + + pageResultsMu.Lock() + pageResults[currentPage-1] = pageResult + pageResultsMu.Unlock() + + newFailedCount := atomic.AddInt64(&failedPages, 1) + if int(newFailedCount) > config.MaxRetries { + logrus.Warnf("失败页数超过阈值 %d", config.MaxRetries) + return fmt.Errorf("失败页数超过阈值") + } + return nil + } + + if result.SumSuccess { + sumMu.Lock() + if !sumSuccess { + sumSuccess = true + sumID = result.SumID + logrus.Infof("✓ 汇总数据已保存,ID=%d(来自第 %d 页)", result.SumID, currentPage) + } + sumMu.Unlock() + } + + if result.DetailSuccess && result.DetailCount > 0 { + pageResult.Success = true + pageResult.RecordCount = result.DetailCount + + atomic.AddInt32(&totalDetailCount, int32(result.DetailCount)) + atomic.AddInt64(&successPages, 1) + + logrus.Debugf("✓ 第 %d 页完成:%d 条明细,耗时 %dms", currentPage, result.DetailCount, pageDuration) + } + + pageResultsMu.Lock() + pageResults[currentPage-1] = pageResult + pageResultsMu.Unlock() + + return nil + }) + } + + if err := eg.Wait(); err != nil { + logrus.Errorf("协程组执行出错:%v", err) + updateParentLog("failed", err.Error(), "CONCURRENT_SYNC_FAILED", nil) + return &SyncResult{ + SumSuccess: sumSuccess, + SumID: sumID, + TaskLogID: parentLogID, + PageResults: pageResults, + Error: err, + }, err + } + + actualResults := make([]*PageSyncResult, 0, len(pageResults)) + for _, pr := range pageResults { + if pr != nil { + actualResults = append(actualResults, pr) + } + } + + finalDetailCount := atomic.LoadInt32(&totalDetailCount) + finalSuccessPages := atomic.LoadInt64(&successPages) + finalFailedPages := atomic.LoadInt64(&failedPages) + + logrus.Infof("并发同步完成 - 成功:%d页, 失败:%d页, 总明细:%d条", + finalSuccessPages, finalFailedPages, finalDetailCount) + + aggregatedResult := &SyncResult{ + SumSuccess: sumSuccess, + SumID: sumID, + TaskLogID: parentLogID, + DetailCount: int(finalDetailCount), + DetailSuccessCount: finalSuccessPages, + DetailFailCount: finalFailedPages, + PageResults: actualResults, + } + + if finalFailedPages > 0 { + logrus.Warnf("存在 %d 个失败的页面,主任务标记为部分失败", finalFailedPages) + + summary := map[string]interface{}{ + "sum_id": sumID, + "detail_count": finalDetailCount, + "total_pages": totalPages, + "success_pages": finalSuccessPages, + "failed_pages": finalFailedPages, + "page_results": actualResults, + } + updateParentLog("partial_failed", fmt.Sprintf("%d 个页面同步失败", finalFailedPages), "PAGE_SYNC_FAILED", summary) + } else { + logrus.Info("✓ 所有页面同步成功") + + summary := map[string]interface{}{ + "sum_id": sumID, + "detail_count": finalDetailCount, + "total_pages": totalPages, + "success_pages": finalSuccessPages, + "failed_pages": 0, + "page_results": actualResults, + } + updateParentLog("success", "", "", summary) + } + + return aggregatedResult, nil +}