Files
data-engine/sync/sync_service.go
2026-04-07 09:51:32 +08:00

269 lines
8.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sync
import (
dto "cid/model/dto/copydata"
"cid/service/copydata"
"context"
"encoding/json"
"fmt"
"time"
"gitea.com/red-future/common/beans"
"github.com/sirupsen/logrus"
)
type SyncService struct {
httpClient *HttpClient
converter *DataConverter
mockGen *MockDataGenerator
}
func NewSyncService() *SyncService {
return &SyncService{
httpClient: NewHttpClient("https://ad.e.kuaishou.com", 0),
converter: NewDataConverter(),
mockGen: NewMockDataGenerator(),
}
}
type SyncResult struct {
SumSuccess bool `json:"sum_success"`
SumID int64 `json:"sum_id"`
DetailSuccess bool `json:"detail_success"`
DetailCount int `json:"detail_count"`
DetailSuccessCount int64 `json:"detail_success_count"`
DetailFailCount int64 `json:"detail_fail_count"`
Error error `json:"error"`
}
func (s *SyncService) SyncCampaignReport(ctx context.Context, req *CampaignReportRequest, useMock bool) (*SyncResult, error) {
result := &SyncResult{}
var responseData *CampaignReportResponse
if useMock {
logrus.Info("使用 Mock 数据同步快手广告计划报表")
responseData = s.mockGen.GenerateCampaignReportResponse()
} else {
logrus.Info("从真实 API 同步快手广告计划报表")
respBytes, err := s.httpClient.Post(ctx, "/rest/openapi/gw/esp/report/campaignReport", req)
if err != nil {
result.Error = fmt.Errorf("调用 API 失败:%w", err)
return result, result.Error
}
responseData = &CampaignReportResponse{}
if err := json.Unmarshal(respBytes, responseData); err != nil {
result.Error = fmt.Errorf("解析响应失败:%w", err)
return result, result.Error
}
if responseData.Code != 0 {
result.Error = fmt.Errorf("API 返回错误code=%d, message=%s", responseData.Code, responseData.Message)
return result, result.Error
}
}
if responseData.Data.Sum != nil {
sumItem := s.converter.ConvertToSumItem(responseData.Data.Sum, "campaign_report")
ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin"})
sumResult, saveErr := s.saveSumData(ctx, sumItem)
if saveErr != nil {
logrus.Errorf("保存汇总数据失败:%v", saveErr)
result.Error = fmt.Errorf("保存汇总数据失败:%w", saveErr)
} else {
result.SumSuccess = true
result.SumID = sumResult.Id
logrus.Infof("成功保存汇总数据ID=%d", sumResult.Id)
}
}
if len(responseData.Data.Detail) > 0 {
detailItems := s.converter.ConvertToDetailItems(responseData.Data.Detail, "campaign_report")
detailResult, saveErr := s.saveDetailData(ctx, detailItems)
if saveErr != nil {
logrus.Errorf("保存明细数据失败:%v", saveErr)
result.Error = fmt.Errorf("保存明细数据失败:%w", saveErr)
} else {
result.DetailSuccess = true
result.DetailCount = len(detailItems)
result.DetailSuccessCount = detailResult.SuccessCount
result.DetailFailCount = detailResult.FailCount
logrus.Infof("成功保存明细数据,成功=%d, 失败=%d", detailResult.SuccessCount, detailResult.FailCount)
}
}
return result, result.Error
}
// SyncCampaignReportWithPagination 带分页处理的同步方法(支持全量数据抽取)
func (s *SyncService) SyncCampaignReportWithPagination(ctx context.Context, req *CampaignReportRequest, useMock bool, maxRetries int) (*SyncResult, error) {
aggregatedResult := &SyncResult{
SumSuccess: false,
SumID: 0,
}
allDetailItems := make([]*dto.CidAccountReportDetailItem, 0)
totalCount := 0
currentPage := 1
pageSize := 100
if req.PageInfo == nil {
req.PageInfo = &PageInfo{}
}
for {
logrus.Infof(">>> 正在同步第 %d 页数据...", currentPage)
req.PageInfo.CurrentPage = currentPage
req.PageInfo.PageSize = pageSize
result, err := s.SyncWithRetry(ctx, req, useMock, maxRetries)
if err != nil {
logrus.Errorf("第 %d 页同步失败:%v", currentPage, err)
return aggregatedResult, err
}
if result.SumSuccess && aggregatedResult.SumID == 0 {
aggregatedResult.SumSuccess = true
aggregatedResult.SumID = result.SumID
logrus.Infof("✓ 汇总数据已保存ID=%d", result.SumID)
}
if result.DetailSuccess && result.DetailCount > 0 {
detailItems := s.extractDetailItems(req, useMock)
if len(detailItems) > 0 {
allDetailItems = append(allDetailItems, detailItems...)
totalCount += len(detailItems)
logrus.Infof("✓ 第 %d 页获取到 %d 条明细数据,累计 %d 条", currentPage, len(detailItems), totalCount)
}
}
currentData := s.fetchCurrentData(req, useMock)
if currentData != nil && currentData.TotalCount > 0 {
totalPages := (currentData.TotalCount + pageSize - 1) / pageSize
logrus.Infof("总记录数:%d, 总页数:%d, 当前页:%d/%d",
currentData.TotalCount, totalPages, currentPage, totalPages)
if currentPage >= totalPages {
logrus.Infof("✓ 已同步所有页面数据,共 %d 页,%d 条记录", totalPages, currentData.TotalCount)
break
}
}
if result.DetailCount < pageSize {
logrus.Infof("✓ 当前页数据不足 %d 条,已到达最后一页", pageSize)
break
}
currentPage++
time.Sleep(300 * time.Millisecond)
}
if len(allDetailItems) > 0 {
logrus.Infof("开始批量保存 %d 条明细数据...", len(allDetailItems))
detailResult, saveErr := s.saveDetailData(ctx, allDetailItems)
if saveErr != nil {
logrus.Errorf("批量保存明细数据失败:%v", saveErr)
aggregatedResult.Error = fmt.Errorf("批量保存明细数据失败:%w", saveErr)
} else {
aggregatedResult.DetailSuccess = true
aggregatedResult.DetailCount = len(allDetailItems)
aggregatedResult.DetailSuccessCount = detailResult.SuccessCount
aggregatedResult.DetailFailCount = detailResult.FailCount
logrus.Infof("✓ 批量保存明细数据完成,成功=%d, 失败=%d",
detailResult.SuccessCount, detailResult.FailCount)
}
} else {
logrus.Info("没有明细数据需要保存")
}
return aggregatedResult, aggregatedResult.Error
}
func (s *SyncService) extractDetailItems(req *CampaignReportRequest, useMock bool) []*dto.CidAccountReportDetailItem {
if useMock {
responseData := s.mockGen.GenerateCampaignReportResponse()
if responseData == nil || responseData.Data == nil || len(responseData.Data.Detail) == 0 {
return nil
}
return s.converter.ConvertToDetailItems(responseData.Data.Detail, "campaign_report")
}
respBytes, err := s.httpClient.Post(context.Background(), "/rest/openapi/gw/esp/report/campaignReport", req)
if err != nil {
logrus.Errorf("重新获取数据失败:%v", err)
return nil
}
responseData := &CampaignReportResponse{}
if err := json.Unmarshal(respBytes, responseData); err != nil {
logrus.Errorf("解析响应失败:%v", err)
return nil
}
if responseData.Code != 0 || responseData.Data == nil || len(responseData.Data.Detail) == 0 {
return nil
}
return s.converter.ConvertToDetailItems(responseData.Data.Detail, "campaign_report")
}
func (s *SyncService) fetchCurrentData(req *CampaignReportRequest, useMock bool) *CampaignReportData {
if useMock {
responseData := s.mockGen.GenerateCampaignReportResponse()
if responseData != nil && responseData.Data != nil {
return responseData.Data
}
return nil
}
respBytes, err := s.httpClient.Post(context.Background(), "/rest/openapi/gw/esp/report/campaignReport", req)
if err != nil {
return nil
}
responseData := &CampaignReportResponse{}
if err := json.Unmarshal(respBytes, responseData); err != nil {
return nil
}
if responseData.Code == 0 && responseData.Data != nil {
return responseData.Data
}
return nil
}
func (s *SyncService) saveSumData(ctx context.Context, item *dto.CidAccountReportSumItem) (*dto.CreateCidAccountReportSumRes, error) {
return copydata.CidAccountReportDetail.CreateSum(ctx, item)
}
func (s *SyncService) saveDetailData(ctx context.Context, items []*dto.CidAccountReportDetailItem) (*dto.BatchCreateCidAccountReportDetailRes, error) {
req := &dto.BatchCreateCidAccountReportDetailReq{
Items: items,
}
return copydata.CidAccountReportDetail.BatchCreate(ctx, req)
}
func (s *SyncService) SyncWithRetry(ctx context.Context, req *CampaignReportRequest, useMock bool, maxRetries int) (*SyncResult, error) {
var lastResult *SyncResult
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
result, err := s.SyncCampaignReport(ctx, req, useMock)
lastResult = result
lastErr = err
if err == nil {
logrus.Infof("同步成功,尝试次数:%d", attempt+1)
return result, nil
}
logrus.Warnf("同步失败,第 %d 次重试,错误:%v", attempt+1, err)
}
return lastResult, lastErr
}