package sync import ( dao "cid/dao/copydata" dto "cid/model/dto/copydata" taskDto "cid/model/dto/copydata" "cid/service/copydata" "context" "encoding/json" "fmt" "sync" "sync/atomic" "time" "github.com/sirupsen/logrus" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" ) type SyncService struct { httpClient *HttpClient converter *DataConverter mockGen *MockDataGenerator } func NewSyncService() *SyncService { return &SyncService{ httpClient: NewHttpClient("https://ad.e.kuaishou.com", 0), converter: NewDataConverter(), mockGen: NewMockDataGenerator(), } } type SyncResult struct { SumSuccess bool `json:"sum_success"` SumID int64 `json:"sum_id"` DetailSuccess bool `json:"detail_success"` DetailCount int `json:"detail_count"` DetailSuccessCount int64 `json:"detail_success_count"` DetailFailCount int64 `json:"detail_fail_count"` Error error `json:"error"` TaskLogID int64 `json:"task_log_id"` PageResults []*PageSyncResult `json:"page_results,omitempty"` } type PageSyncResult struct { PageNumber int `json:"page_number"` PageTaskLogID int64 `json:"page_task_log_id"` Success bool `json:"success"` RecordCount int `json:"record_count"` DurationMs int64 `json:"duration_ms"` ErrorMessage string `json:"error_message,omitempty"` RetryCount int `json:"retry_count"` } func (s *SyncService) SyncAccountReport(ctx context.Context, req *AccountReportRequest, useMock bool, saveSum bool) (*SyncResult, error) { result := &SyncResult{} var responseData *AccountReportResponse if useMock { logrus.Infof("使用 Mock 数据同步快手广告账户报表 - page=%d, pageSize=%d", req.PageInfo.CurrentPage, req.PageInfo.PageSize) responseData = s.mockGen.GenerateAccountReportResponseWithPage(req.PageInfo.CurrentPage, req.PageInfo.PageSize) logrus.Infof("✓ Mock 数据生成完成 - TotalCount=%d, Detail数组长度=%d", responseData.Data.TotalCount, len(responseData.Data.Detail)) if len(responseData.Data.Detail) > 0 { firstItem := responseData.Data.Detail[0] lastItem := responseData.Data.Detail[len(responseData.Data.Detail)-1] logrus.Infof(" 第一条: campaignId=%v, creativeId=%v", firstItem.CampaignId, firstItem.CreativeId) logrus.Infof(" 最后一条: campaignId=%v, creativeId=%v", lastItem.CampaignId, lastItem.CreativeId) } } else { logrus.Info("从真实 API 同步快手广告账户报表") respBytes, err := s.httpClient.Post(ctx, "/rest/openapi/gw/esp/report/accountReport", req) if err != nil { result.Error = fmt.Errorf("调用 API 失败:%w", err) return result, result.Error } responseData = &AccountReportResponse{} if err := json.Unmarshal(respBytes, responseData); err != nil { result.Error = fmt.Errorf("解析响应失败:%w", err) return result, result.Error } if responseData.Code != 0 { result.Error = fmt.Errorf("API 返回错误:code=%d, message=%s", responseData.Code, responseData.Message) return result, result.Error } } if saveSum && responseData.Data.Sum != nil { sumItem := s.converter.ConvertToSumItem(responseData.Data.Sum, "account_report", req.PageInfo.CurrentPage) sumResult, saveErr := s.saveSumData(ctx, sumItem) if saveErr != nil { logrus.Errorf("保存汇总数据失败:%v", saveErr) result.Error = fmt.Errorf("保存汇总数据失败:%w", saveErr) } else { result.SumSuccess = true result.SumID = sumResult.Id logrus.Infof("成功保存汇总数据,ID=%d", sumResult.Id) } } if len(responseData.Data.Detail) > 0 { detailItems := s.converter.ConvertToDetailItems(responseData.Data.Detail, "account_report", req.PageInfo.CurrentPage) logrus.Infof("▶ 准备插入 %d 条明细数据(page=%d)", len(detailItems), req.PageInfo.CurrentPage) detailResult, saveErr := s.saveDetailData(ctx, detailItems) if saveErr != nil { logrus.Errorf("保存明细数据失败:%v", saveErr) result.Error = fmt.Errorf("保存明细数据失败:%w", saveErr) } else { result.DetailSuccess = true result.DetailCount = len(detailItems) result.DetailSuccessCount = detailResult.SuccessCount result.DetailFailCount = detailResult.FailCount logrus.Infof("✓ 成功保存 %d 条明细数据(成功=%d, 失败=%d)", len(detailItems), detailResult.SuccessCount, detailResult.FailCount) } } return result, result.Error } // SyncAccountReportWithPagination 带分页处理的同步方法(支持全量数据抽取) func (s *SyncService) SyncAccountReportWithPagination(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int) (*SyncResult, error) { startTime := time.Now() parentTaskID := fmt.Sprintf("%d_%d_account", req.AdvertiserID, req.StartTime) logReq := &taskDto.CreateSyncTaskLogReq{ TaskID: parentTaskID, TaskType: "account_report", AdvertiserID: req.AdvertiserID, StartTime: time.UnixMilli(req.StartTime), EndTime: time.UnixMilli(req.EndTime), Status: "pending", MaxRetry: maxRetries, RequestParams: req, } parentLogID, err := dao.SyncTaskLog.Create(ctx, logReq) if err != nil { logrus.Errorf("创建主任务日志失败:%v", err) } updateParentLog := func(status, errMsg, errorCode string, summary interface{}) { if parentLogID == 0 { return } duration := time.Since(startTime).Milliseconds() updateReq := &taskDto.UpdateSyncTaskLogReq{ ID: parentLogID, Status: status, ErrorMessage: errMsg, ErrorCode: errorCode, DurationMs: &duration, } if status == "success" || status == "manual_review" { completedAt := time.Now() updateReq.CompletedAt = completedAt } if summary != nil { updateReq.ResultSummary = summary } if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil { logrus.Errorf("更新主任务日志失败:%v", err) } } updateParentLog("running", "", "", nil) aggregatedResult := &SyncResult{ SumSuccess: false, SumID: 0, TaskLogID: parentLogID, PageResults: make([]*PageSyncResult, 0), } totalCount := 0 currentPage := 1 pageSize := 10 if req.PageInfo == nil { req.PageInfo = &PageInfo{} } if req.PageInfo.PageSize > 0 { pageSize = req.PageInfo.PageSize } successPages := 0 failedPages := 0 var totalPages int for { logrus.Infof(">>> 正在同步第 %d 页数据...", currentPage) req.PageInfo.CurrentPage = currentPage req.PageInfo.PageSize = pageSize pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, currentPage) pageStartTime := time.Now() pageResult := &PageSyncResult{ PageNumber: currentPage, Success: false, RecordCount: 0, RetryCount: 0, } result, pageLogID, err := s.SyncSinglePageWithTask(ctx, req, useMock, maxRetries, pageTaskID, currentPage) pageDuration := time.Since(pageStartTime).Milliseconds() pageResult.DurationMs = pageDuration pageResult.PageTaskLogID = pageLogID if err != nil { logrus.Errorf("第 %d 页同步失败:%v", currentPage, err) pageResult.ErrorMessage = err.Error() failedPages++ aggregatedResult.PageResults = append(aggregatedResult.PageResults, pageResult) if failedPages > maxRetries { logrus.Warnf("失败页数超过阈值 %d,终止同步", maxRetries) break } currentPage++ time.Sleep(500 * time.Millisecond) continue } if result.SumSuccess && aggregatedResult.SumID == 0 { aggregatedResult.SumSuccess = true aggregatedResult.SumID = result.SumID logrus.Infof("✓ 汇总数据已保存,ID=%d", result.SumID) } if result.DetailSuccess && result.DetailCount > 0 { totalCount += result.DetailCount pageResult.Success = true pageResult.RecordCount = result.DetailCount successPages++ logrus.Infof("✓ 第 %d 页获取到 %d 条明细数据,累计 %d 条", currentPage, result.DetailCount, totalCount) } aggregatedResult.PageResults = append(aggregatedResult.PageResults, pageResult) currentData := s.fetchCurrentData(req, useMock) if currentData != nil && currentData.TotalCount > 0 { totalPages = (currentData.TotalCount + pageSize - 1) / pageSize logrus.Infof("总记录数:%d, 总页数:%d, 当前页:%d/%d", currentData.TotalCount, totalPages, currentPage, totalPages) if currentPage >= totalPages { logrus.Infof("✓ 已同步所有页面数据,共 %d 页,%d 条记录", totalPages, currentData.TotalCount) break } } if result.DetailCount < pageSize { logrus.Infof("✓ 当前页数据不足 %d 条,已到达最后一页", pageSize) break } currentPage++ time.Sleep(300 * time.Millisecond) } logrus.Infof("分页同步完成 - 成功:%d页, 失败:%d页, 总明细:%d条", successPages, failedPages, totalCount) // 统计所有子任务的结果 totalDetailCount := 0 var totalSuccessCount int64 var totalFailCount int64 for _, pageResult := range aggregatedResult.PageResults { if pageResult.Success { totalDetailCount += pageResult.RecordCount totalSuccessCount++ } else { totalFailCount++ } } aggregatedResult.DetailCount = totalDetailCount aggregatedResult.DetailSuccessCount = totalSuccessCount aggregatedResult.DetailFailCount = totalFailCount if failedPages > 0 { logrus.Warnf("存在 %d 个失败的页面,主任务标记为部分失败", failedPages) summary := map[string]interface{}{ "sum_id": aggregatedResult.SumID, "detail_count": totalDetailCount, "total_pages": totalPages, "success_pages": successPages, "failed_pages": failedPages, "page_results": aggregatedResult.PageResults, } updateParentLog("partial_failed", fmt.Sprintf("%d 个页面同步失败", failedPages), "PAGE_SYNC_FAILED", summary) } else { logrus.Info("✓ 所有页面同步成功") summary := map[string]interface{}{ "sum_id": aggregatedResult.SumID, "detail_count": totalDetailCount, "total_pages": totalPages, "success_pages": successPages, "failed_pages": 0, "page_results": aggregatedResult.PageResults, } updateParentLog("success", "", "", summary) } return aggregatedResult, aggregatedResult.Error } func (s *SyncService) SyncSinglePageWithTask(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int) (*SyncResult, int64, error) { pageStartTime := time.Now() pageLogReq := &taskDto.CreateSyncTaskLogReq{ TaskID: pageTaskID, TaskType: "account_report_page", AdvertiserID: req.AdvertiserID, StartTime: time.UnixMilli(req.StartTime), EndTime: time.UnixMilli(req.EndTime), Status: "pending", MaxRetry: maxRetries, PageInfo: req.PageInfo, RequestParams: map[string]interface{}{ "page_number": pageNumber, "page_size": req.PageInfo.PageSize, }, } pageLogID, err := dao.SyncTaskLog.Create(ctx, pageLogReq) if err != nil { logrus.Errorf("创建分页任务日志失败:%v", err) } updatePageLog := func(status, errMsg, errorCode string, retryCount int) { if pageLogID == 0 { return } duration := time.Since(pageStartTime).Milliseconds() updateReq := &taskDto.UpdateSyncTaskLogReq{ ID: pageLogID, Status: status, ErrorMessage: errMsg, ErrorCode: errorCode, DurationMs: &duration, } if retryCount > 0 { updateReq.RetryCount = &retryCount } if status == "success" || status == "failed" || status == "partial_failed" { completedAt := time.Now() updateReq.CompletedAt = completedAt } if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil { logrus.Errorf("更新分页任务日志失败:%v", err) } } updatePageLog("running", "", "", 0) logrus.Infof(">>> 开始同步第 %d 页数据...", pageNumber) var lastResult *SyncResult var lastErr error for attempt := 0; attempt <= maxRetries; attempt++ { saveSum := (attempt == 0 && pageNumber == 1) result, err := s.SyncAccountReport(ctx, req, useMock, saveSum) lastResult = result lastErr = err if err == nil { logrus.Infof("第 %d 页同步成功,尝试次数:%d", pageNumber, attempt+1) updatePageLog("success", "", "", attempt) return result, pageLogID, nil } logrus.Warnf("第 %d 页同步失败,第 %d 次重试,错误:%v", pageNumber, attempt+1, err) } updatePageLog("failed", lastErr.Error(), "PAGE_SYNC_FAILED", maxRetries) return lastResult, pageLogID, lastErr } func (s *SyncService) syncSinglePageWithTaskForConcurrent(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int, isFirstPage bool) (*SyncResult, int64, error) { pageStartTime := time.Now() pageLogReq := &taskDto.CreateSyncTaskLogReq{ TaskID: pageTaskID, TaskType: "account_report_page", AdvertiserID: req.AdvertiserID, StartTime: time.UnixMilli(req.StartTime), EndTime: time.UnixMilli(req.EndTime), Status: "pending", MaxRetry: maxRetries, PageInfo: req.PageInfo, RequestParams: map[string]interface{}{ "page_number": pageNumber, "page_size": req.PageInfo.PageSize, }, } pageLogID, err := dao.SyncTaskLog.Create(ctx, pageLogReq) if err != nil { logrus.Errorf("创建分页任务日志失败:%v", err) } updatePageLog := func(status, errMsg, errorCode string, retryCount int) { if pageLogID == 0 { return } duration := time.Since(pageStartTime).Milliseconds() updateReq := &taskDto.UpdateSyncTaskLogReq{ ID: pageLogID, Status: status, ErrorMessage: errMsg, ErrorCode: errorCode, DurationMs: &duration, } if retryCount > 0 { updateReq.RetryCount = &retryCount } if status == "success" || status == "failed" || status == "partial_failed" { completedAt := time.Now() updateReq.CompletedAt = completedAt } if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil { logrus.Errorf("更新分页任务日志失败:%v", err) } } updatePageLog("running", "", "", 0) logrus.Infof(">>> 开始同步第 %d 页数据...", pageNumber) var lastResult *SyncResult var lastErr error for attempt := 0; attempt <= maxRetries; attempt++ { saveSum := isFirstPage && (attempt == 0) result, err := s.SyncAccountReport(ctx, req, useMock, saveSum) lastResult = result lastErr = err if err == nil { logrus.Infof("第 %d 页同步成功,尝试次数:%d", pageNumber, attempt+1) updatePageLog("success", "", "", attempt) return result, pageLogID, nil } logrus.Warnf("第 %d 页同步失败,第 %d 次重试,错误:%v", pageNumber, attempt+1, err) } updatePageLog("failed", lastErr.Error(), "PAGE_SYNC_FAILED", maxRetries) return lastResult, pageLogID, lastErr } func (s *SyncService) SyncWithRetry(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int) (*SyncResult, error) { var lastResult *SyncResult var lastErr error for attempt := 0; attempt <= maxRetries; attempt++ { saveSum := (attempt == 0 && req.PageInfo.CurrentPage == 1) result, err := s.SyncAccountReport(ctx, req, useMock, saveSum) lastResult = result lastErr = err if err == nil { logrus.Infof("同步成功,尝试次数:%d", attempt+1) return result, nil } logrus.Warnf("同步失败,第 %d 次重试,错误:%v", attempt+1, err) } return lastResult, lastErr } type ConcurrentSyncConfig struct { MaxConcurrency int UseMock bool MaxRetries int } func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *AccountReportRequest, config ConcurrentSyncConfig) (*SyncResult, error) { startTime := time.Now() parentTaskID := fmt.Sprintf("%d_%d_account", req.AdvertiserID, req.StartTime) logReq := &taskDto.CreateSyncTaskLogReq{ TaskID: parentTaskID, TaskType: "account_report", AdvertiserID: req.AdvertiserID, StartTime: time.UnixMilli(req.StartTime), EndTime: time.UnixMilli(req.EndTime), Status: "pending", MaxRetry: config.MaxRetries, RequestParams: req, } parentLogID, err := dao.SyncTaskLog.Create(ctx, logReq) if err != nil { logrus.Errorf("创建主任务日志失败:%v", err) } updateParentLog := func(status, errMsg, errorCode string, summary interface{}) { if parentLogID == 0 { return } duration := time.Since(startTime).Milliseconds() updateReq := &taskDto.UpdateSyncTaskLogReq{ ID: parentLogID, Status: status, ErrorMessage: errMsg, ErrorCode: errorCode, DurationMs: &duration, } if status == "success" || status == "manual_review" { completedAt := time.Now() updateReq.CompletedAt = completedAt } if summary != nil { updateReq.ResultSummary = summary } if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil { logrus.Errorf("更新主任务日志失败:%v", err) } } updateParentLog("running", "", "", nil) if req.PageInfo == nil { req.PageInfo = &PageInfo{} } pageSize := 10 if req.PageInfo.PageSize > 0 { pageSize = req.PageInfo.PageSize } firstPageReq := *req firstPageReq.PageInfo.CurrentPage = 1 firstPageReq.PageInfo.PageSize = pageSize currentData := s.fetchCurrentData(&firstPageReq, config.UseMock) if currentData == nil || currentData.TotalCount == 0 { logrus.Warn("未获取到总记录数,降级为串行同步") return s.SyncAccountReportWithPagination(ctx, req, config.UseMock, config.MaxRetries) } totalPages := (currentData.TotalCount + pageSize - 1) / pageSize logrus.Infof("开始并发同步 - 总记录数:%d, 总页数:%d, 每页大小:%d, 并发数:%d", currentData.TotalCount, totalPages, pageSize, config.MaxConcurrency) updateParentLog("running", "", "", map[string]interface{}{ "total_pages": totalPages, "total_records": currentData.TotalCount, "page_size": pageSize, "concurrency": config.MaxConcurrency, }) pageResults := make([]*PageSyncResult, totalPages) var pageResultsMu sync.Mutex var sumSuccess bool var sumID int64 var sumMu sync.Mutex var totalDetailCount int32 var successPages int64 var failedPages int64 sem := semaphore.NewWeighted(int64(config.MaxConcurrency)) eg, egCtx := errgroup.WithContext(ctx) for pageNum := 1; pageNum <= totalPages; pageNum++ { if err := sem.Acquire(egCtx, 1); err != nil { logrus.Errorf("获取信号量失败:%v", err) break } currentPage := pageNum eg.Go(func() error { defer sem.Release(1) select { case <-egCtx.Done(): return egCtx.Err() default: } pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, currentPage) pageStartTime := time.Now() pageReq := *req pageReq.PageInfo = &PageInfo{ CurrentPage: currentPage, PageSize: pageSize, } result, pageLogID, err := s.syncSinglePageWithTaskForConcurrent(egCtx, &pageReq, config.UseMock, config.MaxRetries, pageTaskID, currentPage, currentPage == 1) pageDuration := time.Since(pageStartTime).Milliseconds() pageResult := &PageSyncResult{ PageNumber: currentPage, Success: false, RecordCount: 0, DurationMs: pageDuration, ErrorMessage: "", RetryCount: 0, PageTaskLogID: pageLogID, } if err != nil { logrus.Errorf("第 %d 页同步失败:%v", currentPage, err) pageResult.ErrorMessage = err.Error() pageResultsMu.Lock() pageResults[currentPage-1] = pageResult pageResultsMu.Unlock() newFailedCount := atomic.AddInt64(&failedPages, 1) if int(newFailedCount) > config.MaxRetries { logrus.Warnf("失败页数超过阈值 %d", config.MaxRetries) return fmt.Errorf("失败页数超过阈值") } return nil } if result.SumSuccess { sumMu.Lock() if !sumSuccess { sumSuccess = true sumID = result.SumID logrus.Infof("✓ 汇总数据已保存,ID=%d(来自第 %d 页)", result.SumID, currentPage) } sumMu.Unlock() } if result.DetailSuccess && result.DetailCount > 0 { pageResult.Success = true pageResult.RecordCount = result.DetailCount atomic.AddInt32(&totalDetailCount, int32(result.DetailCount)) atomic.AddInt64(&successPages, 1) logrus.Debugf("✓ 第 %d 页完成:%d 条明细,耗时 %dms", currentPage, result.DetailCount, pageDuration) } pageResultsMu.Lock() pageResults[currentPage-1] = pageResult pageResultsMu.Unlock() return nil }) } if err := eg.Wait(); err != nil { logrus.Errorf("协程组执行出错:%v", err) updateParentLog("failed", err.Error(), "CONCURRENT_SYNC_FAILED", nil) return &SyncResult{ SumSuccess: sumSuccess, SumID: sumID, TaskLogID: parentLogID, PageResults: pageResults, Error: err, }, err } actualResults := make([]*PageSyncResult, 0, len(pageResults)) for _, pr := range pageResults { if pr != nil { actualResults = append(actualResults, pr) } } finalDetailCount := atomic.LoadInt32(&totalDetailCount) finalSuccessPages := atomic.LoadInt64(&successPages) finalFailedPages := atomic.LoadInt64(&failedPages) logrus.Infof("并发同步完成 - 成功:%d页, 失败:%d页, 总明细:%d条", finalSuccessPages, finalFailedPages, finalDetailCount) aggregatedResult := &SyncResult{ SumSuccess: sumSuccess, SumID: sumID, TaskLogID: parentLogID, DetailCount: int(finalDetailCount), DetailSuccessCount: finalSuccessPages, DetailFailCount: finalFailedPages, PageResults: actualResults, } if finalFailedPages > 0 { logrus.Warnf("存在 %d 个失败的页面,主任务标记为部分失败", finalFailedPages) summary := map[string]interface{}{ "sum_id": sumID, "detail_count": finalDetailCount, "total_pages": totalPages, "success_pages": finalSuccessPages, "failed_pages": finalFailedPages, "page_results": actualResults, } updateParentLog("partial_failed", fmt.Sprintf("%d 个页面同步失败", finalFailedPages), "PAGE_SYNC_FAILED", summary) } else { logrus.Info("✓ 所有页面同步成功") summary := map[string]interface{}{ "sum_id": sumID, "detail_count": finalDetailCount, "total_pages": totalPages, "success_pages": finalSuccessPages, "failed_pages": 0, "page_results": actualResults, } updateParentLog("success", "", "", summary) } return aggregatedResult, nil } func (s *SyncService) fetchCurrentData(req *AccountReportRequest, useMock bool) *AccountReportData { if useMock { responseData := s.mockGen.GenerateAccountReportResponseWithPage(req.PageInfo.CurrentPage, req.PageInfo.PageSize) if responseData != nil && responseData.Data != nil { return responseData.Data } return nil } respBytes, err := s.httpClient.Post(context.Background(), "/rest/openapi/gw/esp/report/accountReport", req) if err != nil { return nil } responseData := &AccountReportResponse{} if err := json.Unmarshal(respBytes, responseData); err != nil { return nil } if responseData.Code == 0 && responseData.Data != nil { return responseData.Data } return nil } func (s *SyncService) saveSumData(ctx context.Context, item *dto.CidAccountReportSumItem) (*dto.CreateCidAccountReportSumRes, error) { return copydata.CidAccountReportDetail.CreateSum(ctx, item) } func (s *SyncService) saveDetailData(ctx context.Context, items []*dto.CidAccountReportDetailItem) (*dto.BatchCreateCidAccountReportDetailRes, error) { req := &dto.BatchCreateCidAccountReportDetailReq{ Items: items, } return copydata.CidAccountReportDetail.BatchCreate(ctx, req) }