抽取数据添加协程

This commit is contained in:
2026-04-08 16:00:54 +08:00
parent 073a098317
commit 000ea03420
3 changed files with 331 additions and 11 deletions

View File

@@ -30,12 +30,24 @@ func main() {
SelectColumns: []string{"impression", "click", "cost", "t0GMV"},
GroupType: 1,
QueryVersion: 1,
PageInfo: &sync.PageInfo{
CurrentPage: 1,
PageSize: 10,
},
}
logrus.Infof("=== 开始执行定时同步任务 ===")
logrus.Infof("时间区间:%s ~ %s", lastHourStart.Format("2006-01-02 15:04:05"), lastHourEnd.Format("2006-01-02 15:04:05"))
config := sync.ConcurrentSyncConfig{
MaxConcurrency: 5,
UseMock: true,
MaxRetries: 3,
}
result, err := syncService.SyncAccountReportWithPagination(ctx, req, true, 3)
logrus.Infof("=== 开始执行定时同步任务(并发模式)===")
logrus.Infof("时间区间:%s ~ %s", lastHourStart.Format("2006-01-02 15:04:05"), lastHourEnd.Format("2006-01-02 15:04:05"))
logrus.Infof("分页配置:每页大小=%d", req.PageInfo.PageSize)
logrus.Infof("并发配置:最大并发数=%d, 最大重试次数=%d", config.MaxConcurrency, config.MaxRetries)
result, err := syncService.SyncAccountReportConcurrent(ctx, req, config)
if err != nil {
logrus.Errorf("定时同步任务执行完成,存在失败的页面")
logrus.Infof("主任务日志ID%d", result.TaskLogID)

View File

@@ -28,14 +28,20 @@ func (m *MockDataGenerator) GenerateAccountReportRequest() *AccountReportRequest
},
PageInfo: &PageInfo{
CurrentPage: 1,
PageSize: 20,
PageSize: 10,
},
}
}
func (m *MockDataGenerator) GenerateAccountReportResponse() *AccountReportResponse {
return m.GenerateAccountReportResponseWithPage(1, 10)
}
func (m *MockDataGenerator) GenerateAccountReportResponseWithPage(page int, pageSize int) *AccountReportResponse {
sumData := m.generateSumData()
detailData := m.generateDetailData(5)
detailData := m.generateDetailDataByPage(page, pageSize)
totalCount := 23
return &AccountReportResponse{
Code: 0,
@@ -43,11 +49,58 @@ func (m *MockDataGenerator) GenerateAccountReportResponse() *AccountReportRespon
Data: &AccountReportData{
Sum: sumData,
Detail: detailData,
TotalCount: len(detailData),
TotalCount: totalCount,
},
}
}
func (m *MockDataGenerator) generateDetailDataByPage(page int, pageSize int) []*AccountReportItem {
totalCount := 23
if page < 1 || pageSize <= 0 {
return []*AccountReportItem{}
}
startIndex := (page - 1) * pageSize
if startIndex >= totalCount {
return []*AccountReportItem{}
}
endIndex := startIndex + pageSize
if endIndex > totalCount {
endIndex = totalCount
}
actualCount := endIndex - startIndex
items := make([]*AccountReportItem, actualCount)
for i := 0; i < actualCount; i++ {
itemIndex := startIndex + i + 1
campaignId := int64(itemIndex)
unitId := int64(itemIndex * 10)
creativeId := int64(itemIndex * 100)
campaignName := "测试计划_" + string(rune('A'+itemIndex-1))
unitName := "测试单元_" + string(rune('A'+itemIndex-1))
creativeName := "测试创意_" + string(rune('A'+itemIndex-1))
item := m.generateSumData()
item.CampaignId = &campaignId
item.UnitId = &unitId
item.CreativeId = &creativeId
item.CampaignName = campaignName
item.UnitName = unitName
item.CreativeName = creativeName
items[i] = (*AccountReportItem)(item)
}
return items
}
func (m *MockDataGenerator) generateSumData() *AccountReportSum {
cost := m.randomFloat(1000, 10000)
impression := m.randomInt64(10000, 100000)

View File

@@ -8,9 +8,13 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
type SyncService struct {
@@ -56,7 +60,7 @@ func (s *SyncService) SyncAccountReport(ctx context.Context, req *AccountReportR
if useMock {
logrus.Info("使用 Mock 数据同步快手广告账户报表")
responseData = s.mockGen.GenerateAccountReportResponse()
responseData = s.mockGen.GenerateAccountReportResponseWithPage(req.PageInfo.CurrentPage, req.PageInfo.PageSize)
} else {
logrus.Info("从真实 API 同步快手广告账户报表")
respBytes, err := s.httpClient.Post(ctx, "/rest/openapi/gw/esp/report/accountReport", req)
@@ -167,14 +171,19 @@ func (s *SyncService) SyncAccountReportWithPagination(ctx context.Context, req *
totalCount := 0
currentPage := 1
pageSize := 100
successPages := 0
failedPages := 0
pageSize := 10
if req.PageInfo == nil {
req.PageInfo = &PageInfo{}
}
if req.PageInfo.PageSize > 0 {
pageSize = req.PageInfo.PageSize
}
successPages := 0
failedPages := 0
var totalPages int
for {
@@ -380,7 +389,7 @@ func (s *SyncService) syncSinglePageWithTask(ctx context.Context, req *AccountRe
func (s *SyncService) fetchCurrentData(req *AccountReportRequest, useMock bool) *AccountReportData {
if useMock {
responseData := s.mockGen.GenerateAccountReportResponse()
responseData := s.mockGen.GenerateAccountReportResponseWithPage(req.PageInfo.CurrentPage, req.PageInfo.PageSize)
if responseData != nil && responseData.Data != nil {
return responseData.Data
}
@@ -434,3 +443,249 @@ func (s *SyncService) SyncWithRetry(ctx context.Context, req *AccountReportReque
return lastResult, lastErr
}
type ConcurrentSyncConfig struct {
MaxConcurrency int
UseMock bool
MaxRetries int
}
func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *AccountReportRequest, config ConcurrentSyncConfig) (*SyncResult, error) {
startTime := time.Now()
parentTaskID := fmt.Sprintf("%d_%d_account", req.AdvertiserID, req.StartTime)
logReq := &taskDto.CreateSyncTaskLogReq{
TaskID: parentTaskID,
TaskType: "account_report",
AdvertiserID: req.AdvertiserID,
StartTime: time.UnixMilli(req.StartTime),
EndTime: time.UnixMilli(req.EndTime),
Status: "pending",
MaxRetry: config.MaxRetries,
RequestParams: req,
}
parentLogID, err := dao.SyncTaskLog.Create(ctx, logReq)
if err != nil {
logrus.Errorf("创建主任务日志失败:%v", err)
}
updateParentLog := func(status, errMsg, errorCode string, summary interface{}) {
if parentLogID == 0 {
return
}
duration := time.Since(startTime).Milliseconds()
updateReq := &taskDto.UpdateSyncTaskLogReq{
ID: parentLogID,
Status: status,
ErrorMessage: errMsg,
ErrorCode: errorCode,
DurationMs: &duration,
}
if status == "success" || status == "manual_review" {
completedAt := time.Now()
updateReq.CompletedAt = completedAt
}
if summary != nil {
updateReq.ResultSummary = summary
}
if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil {
logrus.Errorf("更新主任务日志失败:%v", err)
}
}
updateParentLog("running", "", "", nil)
if req.PageInfo == nil {
req.PageInfo = &PageInfo{}
}
pageSize := 10
if req.PageInfo.PageSize > 0 {
pageSize = req.PageInfo.PageSize
}
firstPageReq := *req
firstPageReq.PageInfo.CurrentPage = 1
firstPageReq.PageInfo.PageSize = pageSize
currentData := s.fetchCurrentData(&firstPageReq, config.UseMock)
if currentData == nil || currentData.TotalCount == 0 {
logrus.Warn("未获取到总记录数,降级为串行同步")
return s.SyncAccountReportWithPagination(ctx, req, config.UseMock, config.MaxRetries)
}
totalPages := (currentData.TotalCount + pageSize - 1) / pageSize
logrus.Infof("开始并发同步 - 总记录数:%d, 总页数:%d, 每页大小:%d, 并发数:%d",
currentData.TotalCount, totalPages, pageSize, config.MaxConcurrency)
updateParentLog("running", "", "", map[string]interface{}{
"total_pages": totalPages,
"total_records": currentData.TotalCount,
"page_size": pageSize,
"concurrency": config.MaxConcurrency,
})
pageResults := make([]*PageSyncResult, totalPages)
var pageResultsMu sync.Mutex
var sumSuccess bool
var sumID int64
var sumMu sync.Mutex
var totalDetailCount int32
var successPages int64
var failedPages int64
sem := semaphore.NewWeighted(int64(config.MaxConcurrency))
eg, egCtx := errgroup.WithContext(ctx)
for pageNum := 1; pageNum <= totalPages; pageNum++ {
if err := sem.Acquire(egCtx, 1); err != nil {
logrus.Errorf("获取信号量失败:%v", err)
break
}
currentPage := pageNum
eg.Go(func() error {
defer sem.Release(1)
select {
case <-egCtx.Done():
return egCtx.Err()
default:
}
pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, currentPage)
pageStartTime := time.Now()
pageReq := *req
pageReq.PageInfo = &PageInfo{
CurrentPage: currentPage,
PageSize: pageSize,
}
result, err := s.syncSinglePageWithTask(egCtx, &pageReq, config.UseMock, config.MaxRetries, pageTaskID, currentPage)
pageDuration := time.Since(pageStartTime).Milliseconds()
pageResult := &PageSyncResult{
PageNumber: currentPage,
Success: false,
RecordCount: 0,
DurationMs: pageDuration,
ErrorMessage: "",
RetryCount: 0,
PageTaskLogID: 0,
}
if err != nil {
logrus.Errorf("第 %d 页同步失败:%v", currentPage, err)
pageResult.ErrorMessage = err.Error()
pageResultsMu.Lock()
pageResults[currentPage-1] = pageResult
pageResultsMu.Unlock()
newFailedCount := atomic.AddInt64(&failedPages, 1)
if int(newFailedCount) > config.MaxRetries {
logrus.Warnf("失败页数超过阈值 %d", config.MaxRetries)
return fmt.Errorf("失败页数超过阈值")
}
return nil
}
if result.SumSuccess {
sumMu.Lock()
if !sumSuccess {
sumSuccess = true
sumID = result.SumID
logrus.Infof("✓ 汇总数据已保存ID=%d来自第 %d 页)", result.SumID, currentPage)
}
sumMu.Unlock()
}
if result.DetailSuccess && result.DetailCount > 0 {
pageResult.Success = true
pageResult.RecordCount = result.DetailCount
atomic.AddInt32(&totalDetailCount, int32(result.DetailCount))
atomic.AddInt64(&successPages, 1)
logrus.Debugf("✓ 第 %d 页完成:%d 条明细,耗时 %dms", currentPage, result.DetailCount, pageDuration)
}
pageResultsMu.Lock()
pageResults[currentPage-1] = pageResult
pageResultsMu.Unlock()
return nil
})
}
if err := eg.Wait(); err != nil {
logrus.Errorf("协程组执行出错:%v", err)
updateParentLog("failed", err.Error(), "CONCURRENT_SYNC_FAILED", nil)
return &SyncResult{
SumSuccess: sumSuccess,
SumID: sumID,
TaskLogID: parentLogID,
PageResults: pageResults,
Error: err,
}, err
}
actualResults := make([]*PageSyncResult, 0, len(pageResults))
for _, pr := range pageResults {
if pr != nil {
actualResults = append(actualResults, pr)
}
}
finalDetailCount := atomic.LoadInt32(&totalDetailCount)
finalSuccessPages := atomic.LoadInt64(&successPages)
finalFailedPages := atomic.LoadInt64(&failedPages)
logrus.Infof("并发同步完成 - 成功:%d页, 失败:%d页, 总明细:%d条",
finalSuccessPages, finalFailedPages, finalDetailCount)
aggregatedResult := &SyncResult{
SumSuccess: sumSuccess,
SumID: sumID,
TaskLogID: parentLogID,
DetailCount: int(finalDetailCount),
DetailSuccessCount: finalSuccessPages,
DetailFailCount: finalFailedPages,
PageResults: actualResults,
}
if finalFailedPages > 0 {
logrus.Warnf("存在 %d 个失败的页面,主任务标记为部分失败", finalFailedPages)
summary := map[string]interface{}{
"sum_id": sumID,
"detail_count": finalDetailCount,
"total_pages": totalPages,
"success_pages": finalSuccessPages,
"failed_pages": finalFailedPages,
"page_results": actualResults,
}
updateParentLog("partial_failed", fmt.Sprintf("%d 个页面同步失败", finalFailedPages), "PAGE_SYNC_FAILED", summary)
} else {
logrus.Info("✓ 所有页面同步成功")
summary := map[string]interface{}{
"sum_id": sumID,
"detail_count": finalDetailCount,
"total_pages": totalPages,
"success_pages": finalSuccessPages,
"failed_pages": 0,
"page_results": actualResults,
}
updateParentLog("success", "", "", summary)
}
return aggregatedResult, nil
}