diff --git a/consts/public/collections.go b/consts/public/collections.go index 699e804..e33f1a2 100644 --- a/consts/public/collections.go +++ b/consts/public/collections.go @@ -18,4 +18,5 @@ const ( UnitReportDetailTable = "unit_report_detail" // 广告单元数据detail表 CampaignReportSumTable = "campaign_report_sum" // 广告计划数据detail表 CampaignReportDetailTable = "campaign_report_detail" // 广告计划数据detail表 + SyncTaskLogTable = "sync_task_log" // 广告计划数据detail表 ) diff --git a/dao/copydata/api_account_report_detail_dao.go b/dao/copydata/api_account_report_detail_dao.go index d4e3687..ed78303 100644 --- a/dao/copydata/api_account_report_detail_dao.go +++ b/dao/copydata/api_account_report_detail_dao.go @@ -9,6 +9,7 @@ import ( "gitea.com/red-future/common/db/gfdb" "github.com/gogf/gf/v2/util/gconv" + "github.com/sirupsen/logrus" ) var CidAccountReportDetail = new(cidAccountReportDetailDao) @@ -29,7 +30,7 @@ func (d *cidAccountReportDetailDao) Insert(ctx context.Context, req *dto.CidAcco return r.LastInsertId() } -// BatchInsert 批量插入广告数据报表详情 +// BatchInsert 批量插入广告数据报表详情(使用 OnConflict 实现幂等性) func (d *cidAccountReportDetailDao) BatchInsert(ctx context.Context, reqs []*dto.CidAccountReportDetailItem) (successCount int64, failCount int64, failedIndexes []int64, err error) { if len(reqs) == 0 { return 0, 0, nil, errors.New("批量插入数据不能为空") @@ -64,9 +65,19 @@ func (d *cidAccountReportDetailDao) BatchInsert(ctx context.Context, reqs []*dto continue } - // 执行批量插入 - _, err = gfdb.DB(ctx).Model(ctx, consts.CidAccountReportDetailTable).Data(entityList).Insert() + // 执行批量插入,使用 OnConflict 实现幂等性 + _, err = gfdb.DB(ctx).Model(ctx, consts.CidAccountReportDetailTable). + Data(entityList). + OnConflict( + "report_date_str", + "page_number", + "campaign_id", + "creative_id", + ). + Save() + if err != nil { + logrus.Warnf("批量插入失败,尝试逐条插入: %v", err) // 批量插入失败,尝试逐条插入 for k := range batch { _, singleErr := d.Insert(ctx, batch[k]) @@ -84,3 +95,66 @@ func (d *cidAccountReportDetailDao) BatchInsert(ctx context.Context, reqs []*dto return successCount, failCount, failedIndexes, nil } + +// DeleteByDateRange 按日期范围删除数据(用于补偿前去重) +func (d *cidAccountReportDetailDao) DeleteByDateRange(ctx context.Context, advertiserID int64, startDateStr, endDateStr string) (int64, error) { + cols := (&entity.CidAccountReportDetail{}).GetCols() + + result, err := gfdb.DB(ctx).Model(ctx, consts.CidAccountReportDetailTable). + Where(cols.ReportDateStr+" >= ? AND "+cols.ReportDateStr+" <= ?", startDateStr, endDateStr). + Delete() + + if err != nil { + return 0, err + } + + affected, _ := result.RowsAffected() + return affected, nil +} + +// BatchInsertInTx 在事务中批量插入 +func (d *cidAccountReportDetailDao) BatchInsertInTx(ctx context.Context, tx interface{}, reqs []*dto.CidAccountReportDetailItem) (successCount int64, failCount int64, err error) { + if len(reqs) == 0 { + return 0, 0, errors.New("批量插入数据不能为空") + } + + batchSize := 100 + successCount = 0 + failCount = 0 + + for i := 0; i < len(reqs); i += batchSize { + end := i + batchSize + if end > len(reqs) { + end = len(reqs) + } + + batch := reqs[i:end] + entityList := make([]*entity.CidAccountReportDetail, 0, len(batch)) + + for _, req := range batch { + var entityData entity.CidAccountReportDetail + if err = gconv.Struct(req, &entityData); err != nil { + failCount++ + logrus.Errorf("数据转换失败: %v", err) + continue + } + entityList = append(entityList, &entityData) + } + + if len(entityList) == 0 { + continue + } + + _, txErr := gfdb.DB(ctx).Model(ctx, consts.CidAccountReportDetailTable).Data(entityList).Insert() + if txErr != nil { + logrus.Errorf("批量插入失败 batch[%d:%d]: %v", i, end, txErr) + failCount += int64(len(entityList)) + err = txErr + continue + } + + successCount += int64(len(entityList)) + } + + return successCount, failCount, err +} diff --git a/dao/copydata/api_account_report_sum_dao.go b/dao/copydata/api_account_report_sum_dao.go index 5cb0035..da336f5 100644 --- a/dao/copydata/api_account_report_sum_dao.go +++ b/dao/copydata/api_account_report_sum_dao.go @@ -9,13 +9,14 @@ import ( "gitea.com/red-future/common/db/gfdb" "github.com/gogf/gf/v2/util/gconv" + "github.com/sirupsen/logrus" ) var CidAccountReportSum = new(CidAccountReportSumDao) type CidAccountReportSumDao struct{} -// Insert 插入广告数据报表详情 +// Insert 插入广告数据报表汇总 func (d *CidAccountReportSumDao) Insert(ctx context.Context, req *dto.CidAccountReportSumItem) (id int64, err error) { var entityData *entity.CidAccountReportSum if err = gconv.Struct(req, &entityData); err != nil { @@ -29,7 +30,7 @@ func (d *CidAccountReportSumDao) Insert(ctx context.Context, req *dto.CidAccount return r.LastInsertId() } -// BatchInsert 批量插入广告数据报表详情 +// BatchInsert 批量插入广告数据报表汇总(使用 OnConflict 实现幂等性) func (d *CidAccountReportSumDao) BatchInsert(ctx context.Context, reqs []*dto.CidAccountReportSumItem) (successCount int64, failCount int64, failedIndexes []int64, err error) { if len(reqs) == 0 { return 0, 0, nil, errors.New("批量插入数据不能为空") @@ -64,9 +65,17 @@ func (d *CidAccountReportSumDao) BatchInsert(ctx context.Context, reqs []*dto.Ci continue } - // 执行批量插入 - _, err = gfdb.DB(ctx).Model(ctx, consts.CidAccountReportSumTable).Data(entityList).Insert() + // 执行批量插入,使用 OnConflict 实现幂等性 + _, err = gfdb.DB(ctx).Model(ctx, consts.CidAccountReportSumTable). + Data(entityList). + OnConflict( + "report_date_str", + "page_number", + ). + Save() + if err != nil { + logrus.Warnf("批量插入失败,尝试逐条插入: %v", err) // 批量插入失败,尝试逐条插入 for k := range batch { _, singleErr := d.Insert(ctx, batch[k]) @@ -84,3 +93,19 @@ func (d *CidAccountReportSumDao) BatchInsert(ctx context.Context, reqs []*dto.Ci return successCount, failCount, failedIndexes, nil } + +// DeleteByDateRange 按日期范围删除数据(用于补偿前去重) +func (d *CidAccountReportSumDao) DeleteByDateRange(ctx context.Context, advertiserID int64, startDateStr, endDateStr string) (int64, error) { + cols := (&entity.CidAccountReportSum{}).GetCols() + + result, err := gfdb.DB(ctx).Model(ctx, consts.CidAccountReportSumTable). + Where(cols.ReportDateStr+" >= ? AND "+cols.ReportDateStr+" <= ?", startDateStr, endDateStr). + Delete() + + if err != nil { + return 0, err + } + + affected, _ := result.RowsAffected() + return affected, nil +} diff --git a/dao/copydata/sync_task_log_dao.go b/dao/copydata/sync_task_log_dao.go new file mode 100644 index 0000000..5613454 --- /dev/null +++ b/dao/copydata/sync_task_log_dao.go @@ -0,0 +1,126 @@ +package copydata + +import ( + consts "cid/consts/public" + dto "cid/model/dto/copydata" + entity "cid/model/entity/copydata" + "context" + "time" + + "gitea.com/red-future/common/db/gfdb" + "github.com/gogf/gf/v2/database/gdb" + "github.com/gogf/gf/v2/util/gconv" +) + +var SyncTaskLog = new(SyncTaskLogDao) + +type SyncTaskLogDao struct{} + +// Create 创建任务日志 +func (d *SyncTaskLogDao) Create(ctx context.Context, req *dto.CreateSyncTaskLogReq) (int64, error) { + var entityData entity.SyncTaskLog + if err := gconv.Struct(req, &entityData); err != nil { + return 0, err + } + + r, err := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).Data(&entityData).Insert() + if err != nil { + return 0, err + } + return r.LastInsertId() +} + +// Update 更新任务日志 +func (d *SyncTaskLogDao) Update(ctx context.Context, req *dto.UpdateSyncTaskLogReq) error { + data := make(gdb.Map) + + if req.Status != "" { + data["status"] = req.Status + } + if req.RetryCount != nil { + data["retry_count"] = *req.RetryCount + } + if req.ErrorMessage != "" { + data["error_message"] = req.ErrorMessage + } + if req.ErrorCode != "" { + data["error_code"] = req.ErrorCode + } + if req.ResultSummary != nil { + data["result_summary"] = req.ResultSummary + } + if req.NextRetryTime != nil { + data["next_retry_time"] = req.NextRetryTime + } + if req.CompletedAt != nil { + data["completed_at"] = req.CompletedAt + } + if req.DurationMs != nil { + data["duration_ms"] = *req.DurationMs + } + + data["updated_at"] = time.Now() + + _, err := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable). + Data(data). + Where("id", req.ID). + Update() + + return err +} + +// QueryFailedTasks 查询需要补偿的失败任务 +func (d *SyncTaskLogDao) QueryFailedTasks(ctx context.Context, req *dto.QueryFailedTasksReq) ([]*dto.SyncTaskLogItem, error) { + model := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).Model + + // 状态过滤 + if len(req.Status) > 0 { + model = model.WhereIn("status", req.Status) + } + + // 任务类型过滤 + if req.TaskType != "" { + model = model.Where("task_type", req.TaskType) + } + + // 只查询到达重试时间的任务(或从未设置过重试时间) + model = model.Where( + "(next_retry_time <= ? OR next_retry_time IS NULL)", + time.Now(), + ) + + // 限制数量 + limit := req.Limit + if limit <= 0 { + limit = 100 + } + model = model.Limit(limit) + + var results []*entity.SyncTaskLog + if err := model.Scan(&results); err != nil { + return nil, err + } + + items := make([]*dto.SyncTaskLogItem, len(results)) + for i, r := range results { + item := &dto.SyncTaskLogItem{} + gconv.Struct(r, item) + items[i] = item + } + + return items, nil +} + +// GetByTaskID 根据任务ID获取日志 +func (d *SyncTaskLogDao) GetByTaskID(ctx context.Context, taskID, taskType string) (*entity.SyncTaskLog, error) { + var result *entity.SyncTaskLog + err := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable). + Where("task_id", taskID). + Where("task_type", taskType). + Scan(&result) + + if err != nil { + return nil, err + } + return result, nil +} diff --git a/model/dto/copydata/api_account_report_detail_dto.go b/model/dto/copydata/api_account_report_detail_dto.go index 2e94bf9..afb4f45 100644 --- a/model/dto/copydata/api_account_report_detail_dto.go +++ b/model/dto/copydata/api_account_report_detail_dto.go @@ -282,6 +282,9 @@ type CidAccountReportDetailItem struct { // 时间字段 ReportDateStr string `json:"reportDateStr" v:"required" dc:"时间"` + // 分页字段 + PageNumber int `json:"pageNumber" dc:"页码"` + // 广告结构字段 CampaignId *int64 `json:"campaignId" dc:"计划ID"` CampaignName string `json:"campaignName" dc:"计划名称"` diff --git a/model/dto/copydata/api_account_report_sum_dto.go b/model/dto/copydata/api_account_report_sum_dto.go index 8367f5a..f1b194e 100644 --- a/model/dto/copydata/api_account_report_sum_dto.go +++ b/model/dto/copydata/api_account_report_sum_dto.go @@ -282,6 +282,9 @@ type CidAccountReportSumItem struct { // 时间字段 ReportDateStr string `json:"reportDateStr" v:"required" dc:"时间"` + // 分页字段 + PageNumber int `json:"pageNumber" dc:"页码"` + // 广告结构字段 CampaignId *int64 `json:"campaignId" dc:"计划ID"` CampaignName string `json:"campaignName" dc:"计划名称"` diff --git a/model/dto/copydata/sync_task_log_dto.go b/model/dto/copydata/sync_task_log_dto.go new file mode 100644 index 0000000..02bd878 --- /dev/null +++ b/model/dto/copydata/sync_task_log_dto.go @@ -0,0 +1,56 @@ +package copydata + +// CreateSyncTaskLogReq 创建同步任务日志请求 +type CreateSyncTaskLogReq struct { + TaskID string `json:"taskId"` + TaskType string `json:"taskType"` + AdvertiserID int64 `json:"advertiserId"` + StartTime interface{} `json:"startTime"` + EndTime interface{} `json:"endTime"` + Status string `json:"status"` + MaxRetry int `json:"maxRetry"` + PageInfo interface{} `json:"pageInfo,omitempty"` + RequestParams interface{} `json:"requestParams,omitempty"` +} + +// UpdateSyncTaskLogReq 更新同步任务日志请求 +type UpdateSyncTaskLogReq struct { + ID int64 `json:"id"` + Status string `json:"status,omitempty"` + RetryCount *int `json:"retryCount,omitempty"` + ErrorMessage string `json:"errorMessage,omitempty"` + ErrorCode string `json:"errorCode,omitempty"` + ResultSummary interface{} `json:"resultSummary,omitempty"` + NextRetryTime interface{} `json:"nextRetryTime,omitempty"` + CompletedAt interface{} `json:"completedAt,omitempty"` + DurationMs *int64 `json:"durationMs,omitempty"` +} + +// QueryFailedTasksReq 查询失败任务请求 +type QueryFailedTasksReq struct { + Status []string `json:"status,omitempty"` // 查询的状态列表 + TaskType string `json:"taskType,omitempty"` // 任务类型 + MaxRetries *int `json:"maxRetries,omitempty"` // 最大重试次数过滤 + Limit int `json:"limit,omitempty"` // 限制数量 +} + +// SyncTaskLogItem 同步任务日志项 +type SyncTaskLogItem struct { + ID int64 `json:"id"` + TaskID string `json:"taskId"` + TaskType string `json:"taskType"` + AdvertiserID int64 `json:"advertiserId"` + StartTime interface{} `json:"startTime"` + EndTime interface{} `json:"endTime"` + Status string `json:"status"` + RetryCount int `json:"retryCount"` + MaxRetry int `json:"maxRetry"` + ErrorMessage string `json:"errorMessage,omitempty"` + ErrorCode string `json:"errorCode,omitempty"` + ResultSummary interface{} `json:"resultSummary,omitempty"` + NextRetryTime interface{} `json:"nextRetryTime,omitempty"` + CompletedAt interface{} `json:"completedAt,omitempty"` + DurationMs int64 `json:"durationMs"` + CreatedAt interface{} `json:"createdAt"` + UpdatedAt interface{} `json:"updatedAt"` +} diff --git a/model/entity/copydata/api_account_report_detail.go b/model/entity/copydata/api_account_report_detail.go index 3ca016b..d0ffb59 100644 --- a/model/entity/copydata/api_account_report_detail.go +++ b/model/entity/copydata/api_account_report_detail.go @@ -255,21 +255,24 @@ type CidAccountReportDetail struct { ShowCnt *int64 `orm:"show_cnt" json:"showCnt" description:"全站曝光"` // 时间字段 - ReportDateStr string `orm:"report_date_str" json:"reportDateStr" description:"时间"` + ReportDateStr string + + // 分页字段 + PageNumber string // 广告结构字段 - CampaignId *int64 `orm:"campaign_id" json:"campaignId" description:"计划ID"` - CampaignName string `orm:"campaign_name" json:"campaignName" description:"计划名称"` - UnitId *int64 `orm:"unit_id" json:"unitId" description:"单元ID"` - UnitName string `orm:"unit_name" json:"unitName" description:"单元名称"` - CreativeId *int64 `orm:"creative_id" json:"creativeId" description:"创意ID"` - CreativeName string `orm:"creative_name" json:"creativeName" description:"创意名称"` + CampaignId string + CampaignName string + UnitId string + UnitName string + CreativeId string + CreativeName string // 补贴相关字段 - CidActualRoiAfterSubsidy *float64 `orm:"cid_actual_roi_after_subsidy" json:"cidActualRoiAfterSubsidy" description:"补贴后实际ROI"` - CidCouponAmount *int64 `orm:"cid_coupon_amount" json:"cidCouponAmount" description:"核销券金额"` - CidCouponCallbackPaidRefundAmount *int64 `orm:"cid_coupon_callback_paid_refund_amount" json:"cidCouponCallbackPaidRefundAmount" description:"退单有回传_核销券金额"` - CidVoucherCost *float64 `orm:"cid_voucher_cost" json:"cidVoucherCost" description:"券成本"` + CidActualRoiAfterSubsidy string + CidCouponAmount string + CidCouponCallbackPaidRefundAmount string + CidVoucherCost string } // CidAccountReportDetailCol 广告数据报表详情表字段定义 diff --git a/model/entity/copydata/api_account_report_sum.go b/model/entity/copydata/api_account_report_sum.go index 46a3efd..459b4a2 100644 --- a/model/entity/copydata/api_account_report_sum.go +++ b/model/entity/copydata/api_account_report_sum.go @@ -257,6 +257,9 @@ type CidAccountReportSum struct { // 时间字段 ReportDateStr string `orm:"report_date_str" json:"reportDateStr" description:"时间"` + // 分页字段 + PageNumber int `orm:"page_number" json:"pageNumber" description:"页码"` + // 广告结构字段 CampaignId *int64 `orm:"campaign_id" json:"campaignId" description:"计划ID"` CampaignName string `orm:"campaign_name" json:"campaignName" description:"计划名称"` @@ -451,6 +454,7 @@ type CidAccountReportSumCol struct { ItemEntranceClkCnt string ShowCnt string ReportDateStr string + PageNumber string CampaignId string CampaignName string UnitId string @@ -651,6 +655,7 @@ func (e *CidAccountReportSum) GetCols() *CidAccountReportSumCol { ItemEntranceClkCnt: "item_entrance_clk_cnt", ShowCnt: "show_cnt", ReportDateStr: "report_date_str", + PageNumber: "page_number", CampaignId: "campaign_id", CampaignName: "campaign_name", UnitId: "unit_id", diff --git a/model/entity/copydata/sync_task_log.go b/model/entity/copydata/sync_task_log.go new file mode 100644 index 0000000..4ea952d --- /dev/null +++ b/model/entity/copydata/sync_task_log.go @@ -0,0 +1,30 @@ +package copydata + +import "gitea.com/red-future/common/beans" + +// SyncTaskLog 同步任务日志实体 +type SyncTaskLog struct { + beans.SQLBaseDO `orm:",inherit"` + + TaskID string `orm:"task_id" json:"taskId" description:"任务唯一标识"` + TaskType string `orm:"task_type" json:"taskType" description:"任务类型"` + AdvertiserID int64 `orm:"advertiser_id" json:"advertiserId" description:"广告主ID"` + StartTime interface{} `orm:"start_time" json:"startTime" description:"数据开始时间"` + EndTime interface{} `orm:"end_time" json:"endTime" description:"数据结束时间"` + Status string `orm:"status" json:"status" description:"任务状态"` + RetryCount int `orm:"retry_count" json:"retryCount" description:"已重试次数"` + MaxRetry int `orm:"max_retry" json:"maxRetry" description:"最大重试次数"` + PageInfo interface{} `orm:"page_info" json:"pageInfo" description:"分页信息"` + RequestParams interface{} `orm:"request_params" json:"requestParams" description:"请求参数快照"` + ErrorMessage string `orm:"error_message" json:"errorMessage" description:"错误信息"` + ErrorCode string `orm:"error_code" json:"errorCode" description:"错误码"` + ResultSummary interface{} `orm:"result_summary" json:"resultSummary" description:"结果摘要"` + NextRetryTime interface{} `orm:"next_retry_time" json:"nextRetryTime" description:"下次重试时间"` + CompletedAt interface{} `orm:"completed_at" json:"completedAt" description:"完成时间"` + DurationMs int64 `orm:"duration_ms" json:"durationMs" description:"执行耗时毫秒"` +} + +// TableName 返回表名 +func (e *SyncTaskLog) TableName() string { + return "sync_task_log" +} diff --git a/scheduler/run_account_report_task.go b/scheduler/run_account_report_task.go index f264515..337f95a 100644 --- a/scheduler/run_account_report_task.go +++ b/scheduler/run_account_report_task.go @@ -1,11 +1,13 @@ package main import ( + "context" "fmt" "time" "cid/sync" + "gitea.com/red-future/common/beans" _ "github.com/gogf/gf/contrib/drivers/pgsql/v2" "github.com/gogf/gf/v2/os/gctx" @@ -15,8 +17,8 @@ import ( func main() { ctx := gctx.New() syncService := sync.NewSyncService() - - req := &sync.CampaignReportRequest{ + ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin"}) + req := &sync.AccountReportRequest{ AdvertiserID: 10001, StartTime: time.Now().AddDate(0, 0, -30).UnixNano() / 1e6, EndTime: time.Now().UnixNano() / 1e6, @@ -26,14 +28,16 @@ func main() { } logrus.Info("=== 开始执行定时同步任务 ===") - result, err := syncService.SyncCampaignReportWithPagination(ctx, req, true, 3) + result, err := syncService.SyncAccountReportWithPagination(ctx, req, true, 3) if err != nil { logrus.Errorf("定时同步任务失败:%v", err) + logrus.Infof("任务已记录到日志表,等待补偿调度器自动重试") return } fmt.Printf("✓ 定时同步完成:\n") fmt.Printf(" 汇总数据:成功=%v, ID=%d\n", result.SumSuccess, result.SumID) - fmt.Printf(" 明细数据:总数=%d, 成功=%d, 失败=%d\n", + fmt.Printf(" 明细数据:总记录数=%d, 成功页数=%d, 失败页数=%d\n", result.DetailCount, result.DetailSuccessCount, result.DetailFailCount) + fmt.Printf(" 任务日志ID:%d\n", result.TaskLogID) } diff --git a/scheduler/run_sync_task_log_task.go b/scheduler/run_sync_task_log_task.go new file mode 100644 index 0000000..ffb8a0e --- /dev/null +++ b/scheduler/run_sync_task_log_task.go @@ -0,0 +1,384 @@ +package main + +import ( + dao "cid/dao/copydata" + taskDto "cid/model/dto/copydata" + "cid/sync" + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "gitea.com/red-future/common/beans" + _ "github.com/gogf/gf/contrib/drivers/pgsql/v2" + "github.com/gogf/gf/v2/os/gctx" + "github.com/sirupsen/logrus" +) + +type CompensationScheduler struct { + syncService *sync.SyncService +} + +func NewCompensationScheduler() *CompensationScheduler { + return &CompensationScheduler{ + syncService: sync.NewSyncService(), + } +} + +func (s *CompensationScheduler) RunCompensationOnce() { + ctx := gctx.New() + ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin"}) + + logrus.Info("=== 开始执行数据同步补偿任务 ===") + s.processCompensation(ctx) + logrus.Info("=== 补偿任务执行完毕 ===") +} + +func (s *CompensationScheduler) processCompensation(ctx context.Context) { + logrus.Info(">>> 开始检查需要同步补偿的任务...") + + queryReq := &taskDto.QueryFailedTasksReq{ + Status: []string{"failed", "retrying", "partial_failed"}, + MaxRetries: nil, + Limit: 50, + } + + failedTasks, err := dao.SyncTaskLog.QueryFailedTasks(ctx, queryReq) + if err != nil { + logrus.Errorf("查询失败任务异常:%v", err) + return + } + + if len(failedTasks) == 0 { + logrus.Info("✓ 没有需要补偿的任务") + return + } + + logrus.Infof("发现 %d 个需要补偿的任务", len(failedTasks)) + + successCount := 0 + failCount := 0 + partialCount := 0 + + for _, task := range failedTasks { + if task.RetryCount >= task.MaxRetry { + logrus.Warnf("任务 %s 已达到最大重试次数 %d,标记为需人工处理", task.TaskID, task.MaxRetry) + + updateReq := &taskDto.UpdateSyncTaskLogReq{ + ID: task.ID, + Status: "manual_review", + ErrorMessage: fmt.Sprintf("已达到最大重试次数 %d 次", task.MaxRetry), + ErrorCode: "MAX_RETRY_EXCEEDED", + } + dao.SyncTaskLog.Update(ctx, updateReq) + + s.sendAlert(task) + failCount++ + continue + } + + logrus.Infof(">>> 开始补偿任务:%s (类型=%s, 第 %d/%d 次重试)", + task.TaskID, task.TaskType, task.RetryCount+1, task.MaxRetry) + + if s.compensateTask(ctx, task) { + successCount++ + } else { + failCount++ + } + + time.Sleep(1 * time.Second) + } + + logrus.Infof("✓ 补偿任务完成:成功=%d, 部分成功=%d, 失败=%d", + successCount, partialCount, failCount) +} + +func (s *CompensationScheduler) compensateTask(ctx context.Context, task *taskDto.SyncTaskLogItem) bool { + retryCount := task.RetryCount + 1 + + updateReq := &taskDto.UpdateSyncTaskLogReq{ + ID: task.ID, + Status: "retrying", + RetryCount: &retryCount, + } + dao.SyncTaskLog.Update(ctx, updateReq) + + startTime := s.parseTime(task.StartTime) + endTime := s.parseTime(task.EndTime) + + logrus.Infof(">>> 开始补偿任务: %s (advertiser=%d, time=[%s, %s], 第 %d/%d 次重试)", + task.TaskID, task.AdvertiserID, + startTime.Format("2006-01-02"), endTime.Format("2006-01-02"), + retryCount, task.MaxRetry) + + if task.TaskType == "account_report_page" { + return s.compensatePageTask(ctx, task, retryCount) + } + + if task.TaskType == "account_report" && task.Status == "partial_failed" { + return s.compensatePartialFailedTask(ctx, task, startTime, endTime, retryCount) + } + + return s.compensateMainTask(ctx, task, startTime, endTime, retryCount) +} + +func (s *CompensationScheduler) compensatePartialFailedTask(ctx context.Context, task *taskDto.SyncTaskLogItem, startTime, endTime time.Time, retryCount int) bool { + logrus.Infof(">>> 检测到部分失败任务 %s,开始智能补偿(只重试失败的页)", task.TaskID) + + failedPages := s.extractFailedPages(task) + if len(failedPages) == 0 { + logrus.Warnf("任务 %s 标记为部分失败,但未找到失败的页信息,将重新同步所有页", task.TaskID) + return s.compensateMainTask(ctx, task, startTime, endTime, retryCount) + } + + logrus.Infof("任务 %s 共有 %d 个失败的页需要补偿: %v", task.TaskID, len(failedPages), failedPages) + + allSuccess := true + compensatedPages := 0 + + for _, pageNumber := range failedPages { + logrus.Infof(">>> 开始补偿第 %d 页...", pageNumber) + + pageSuccess := s.compensateSinglePage(ctx, task, startTime, endTime, pageNumber, retryCount) + if pageSuccess { + compensatedPages++ + } else { + allSuccess = false + } + + time.Sleep(500 * time.Millisecond) + } + + if allSuccess { + logrus.Infof("✓ 部分失败任务 %s 补偿成功 - 共补偿 %d 个页", task.TaskID, compensatedPages) + + updateReq := &taskDto.UpdateSyncTaskLogReq{ + ID: task.ID, + Status: "success", + } + dao.SyncTaskLog.Update(ctx, updateReq) + return true + } else { + logrus.Warnf("⚠ 部分失败任务 %s 补偿后仍有失败的页 - 成功补偿 %d/%d 个页", + task.TaskID, compensatedPages, len(failedPages)) + + backoffMinutes := s.calculateBackoff(retryCount) + nextRetry := time.Now().Add(time.Duration(backoffMinutes) * time.Minute) + + updateReq := &taskDto.UpdateSyncTaskLogReq{ + ID: task.ID, + Status: "partial_failed", + NextRetryTime: nextRetry, + } + dao.SyncTaskLog.Update(ctx, updateReq) + return false + } +} + +func (s *CompensationScheduler) compensateSinglePage(ctx context.Context, task *taskDto.SyncTaskLogItem, startTime, endTime time.Time, pageNumber int, retryCount int) bool { + req := &sync.AccountReportRequest{ + AdvertiserID: task.AdvertiserID, + StartTime: startTime.UnixMilli(), + EndTime: endTime.UnixMilli(), + SelectColumns: []string{"impression", "click", "cost", "t0GMV"}, + GroupType: 1, + QueryVersion: 1, + PageInfo: &sync.PageInfo{ + CurrentPage: pageNumber, + PageSize: 100, + }, + } + + maxRetries := 3 + pageTaskID := fmt.Sprintf("%s_page_%d", task.TaskID, pageNumber) + result, err := s.syncService.SyncSinglePageWithTask(ctx, req, true, maxRetries, pageTaskID, pageNumber) + + if err != nil { + logrus.Errorf("补偿第 %d 页失败:%v", pageNumber, err) + return false + } + + logrus.Infof("✓ 补偿第 %d 页成功 - 记录数=%d", pageNumber, result.DetailCount) + return true +} + +func (s *CompensationScheduler) extractFailedPages(task *taskDto.SyncTaskLogItem) []int { + if task.ResultSummary == nil { + return nil + } + + summaryMap, ok := task.ResultSummary.(map[string]interface{}) + if !ok { + return nil + } + + pageResultsRaw, exists := summaryMap["page_results"] + if !exists { + return nil + } + + pageResultsJSON, err := json.Marshal(pageResultsRaw) + if err != nil { + logrus.Errorf("序列化 page_results 失败:%v", err) + return nil + } + + var pageResults []map[string]interface{} + if err := json.Unmarshal(pageResultsJSON, &pageResults); err != nil { + logrus.Errorf("反序列化 page_results 失败:%v", err) + return nil + } + + failedPages := make([]int, 0) + for _, pageResult := range pageResults { + success, _ := pageResult["success"].(bool) + pageNumberFloat, _ := pageResult["page_number"].(float64) + pageNumber := int(pageNumberFloat) + + if !success && pageNumber > 0 { + failedPages = append(failedPages, pageNumber) + } + } + + return failedPages +} + +func (s *CompensationScheduler) compensateMainTask(ctx context.Context, task *taskDto.SyncTaskLogItem, startTime, endTime time.Time, retryCount int) bool { + req := &sync.AccountReportRequest{ + AdvertiserID: task.AdvertiserID, + StartTime: startTime.UnixMilli(), + EndTime: endTime.UnixMilli(), + SelectColumns: []string{"impression", "click", "cost", "t0GMV"}, + GroupType: 1, + QueryVersion: 1, + } + + maxRetries := 3 + result, err := s.syncService.SyncAccountReportWithPagination(ctx, req, true, maxRetries) + + if err != nil { + logrus.Errorf("补偿主任务 %s 失败(第 %d 次):%v", task.TaskID, retryCount, err) + + backoffMinutes := s.calculateBackoff(retryCount) + nextRetry := time.Now().Add(time.Duration(backoffMinutes) * time.Minute) + + updateReq := &taskDto.UpdateSyncTaskLogReq{ + ID: task.ID, + Status: "failed", + ErrorMessage: err.Error(), + ErrorCode: "COMPENSATION_FAILED", + NextRetryTime: nextRetry, + } + dao.SyncTaskLog.Update(ctx, updateReq) + + return false + } + + logrus.Infof("✓ 补偿主任务 %s 成功 - 汇总ID=%d, 明细成功=%d, 失败=%d, 页数=%d", + task.TaskID, result.SumID, result.DetailSuccessCount, result.DetailFailCount, len(result.PageResults)) + return true +} + +func (s *CompensationScheduler) compensatePageTask(ctx context.Context, task *taskDto.SyncTaskLogItem, retryCount int) bool { + logrus.Infof(">>> 补偿分页任务: %s (重试第 %d 次)", task.TaskID, retryCount) + + parentTaskID := s.extractParentTaskID(task.TaskID) + pageNumber := s.extractPageNumber(task.TaskID) + + if parentTaskID == "" || pageNumber == 0 { + logrus.Errorf("无法解析分页任务ID: %s", task.TaskID) + return false + } + + startTime := s.parseTime(task.StartTime) + endTime := s.parseTime(task.EndTime) + + req := &sync.AccountReportRequest{ + AdvertiserID: task.AdvertiserID, + StartTime: startTime.UnixMilli(), + EndTime: endTime.UnixMilli(), + SelectColumns: []string{"impression", "click", "cost", "t0GMV"}, + GroupType: 1, + QueryVersion: 1, + PageInfo: &sync.PageInfo{ + CurrentPage: pageNumber, + PageSize: 100, + }, + } + + maxRetries := 3 + pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, pageNumber) + result, err := s.syncService.SyncSinglePageWithTask(ctx, req, true, maxRetries, pageTaskID, pageNumber) + + if err != nil { + logrus.Errorf("补偿分页任务 %s 失败(第 %d 次):%v", task.TaskID, retryCount, err) + + backoffMinutes := s.calculateBackoff(retryCount) + nextRetry := time.Now().Add(time.Duration(backoffMinutes) * time.Minute) + + updateReq := &taskDto.UpdateSyncTaskLogReq{ + ID: task.ID, + Status: "failed", + ErrorMessage: err.Error(), + ErrorCode: "PAGE_COMPENSATION_FAILED", + NextRetryTime: nextRetry, + } + dao.SyncTaskLog.Update(ctx, updateReq) + + return false + } + + logrus.Infof("✓ 补偿分页任务 %s 成功 - 记录数=%d", task.TaskID, result.DetailCount) + return true +} + +func (s *CompensationScheduler) extractParentTaskID(taskID string) string { + if idx := strings.LastIndex(taskID, "_page_"); idx > 0 { + return taskID[:idx] + } + return "" +} + +func (s *CompensationScheduler) extractPageNumber(taskID string) int { + if idx := strings.LastIndex(taskID, "_page_"); idx > 0 { + var pageNum int + fmt.Sscanf(taskID[idx+6:], "%d", &pageNum) + return pageNum + } + return 0 +} + +func (s *CompensationScheduler) calculateBackoff(retryCount int) int { + backoffs := []int{5, 15, 30, 60, 120} + if retryCount <= len(backoffs) { + return backoffs[retryCount-1] + } + return backoffs[len(backoffs)-1] +} + +func (s *CompensationScheduler) parseTime(t interface{}) time.Time { + switch v := t.(type) { + case time.Time: + return v + case string: + if parsed, err := time.Parse("2006-01-02 15:04:05", v); err == nil { + return parsed + } + } + return time.Now() +} + +func (s *CompensationScheduler) sendAlert(task *taskDto.SyncTaskLogItem) { + logrus.Errorf("【告警】任务 %s 需要人工介入:广告主=%d, 类型=%s, 错误=%s", + task.TaskID, task.AdvertiserID, task.TaskType, task.ErrorMessage) + + // TODO: 集成钉钉/企业微信/邮件告警 + // s.sendDingTalkAlert(task) + // s.sendEmailAlert(task) +} + +func main() { + scheduler := NewCompensationScheduler() + scheduler.RunCompensationOnce() +} diff --git a/sync/campaign_report_sync.go b/sync/account_report_sync.go similarity index 66% rename from sync/campaign_report_sync.go rename to sync/account_report_sync.go index 69051bb..879bde4 100644 --- a/sync/campaign_report_sync.go +++ b/sync/account_report_sync.go @@ -10,39 +10,39 @@ import ( "github.com/sirupsen/logrus" ) -type CampaignReportSync struct { +type AccountReportSync struct { *BaseReportSync converter *DataConverter mockGen *MockDataGenerator } -func NewCampaignReportSync() *CampaignReportSync { - return &CampaignReportSync{ +func NewAccountReportSync() *AccountReportSync { + return &AccountReportSync{ BaseReportSync: NewBaseReportSync(), converter: NewDataConverter(), mockGen: NewMockDataGenerator(), } } -func (c *CampaignReportSync) FetchReport(ctx context.Context, params interface{}) (interface{}, error) { - req, ok := params.(*CampaignReportRequest) +func (c *AccountReportSync) FetchReport(ctx context.Context, params interface{}) (interface{}, error) { + req, ok := params.(*AccountReportRequest) if !ok { - return nil, fmt.Errorf("参数类型错误,期望 CampaignReportRequest 类型") + return nil, fmt.Errorf("参数类型错误,期望 AccountReportRequest 类型") } useMock := false if useMock { logrus.Info("使用 Mock 数据") - return c.mockGen.GenerateCampaignReportResponse(), nil + return c.mockGen.GenerateAccountReportResponse(), nil } - respBytes, err := NewHttpClient("https://ad.e.kuaishou.com", 0).Post(ctx, "/rest/openapi/gw/esp/report/campaignReport", req) + respBytes, err := NewHttpClient("https://ad.e.kuaishou.com", 0).Post(ctx, "/rest/openapi/gw/esp/report/accountReport", req) if err != nil { return nil, fmt.Errorf("调用 API 失败:%w", err) } - var response CampaignReportResponse + var response AccountReportResponse if err := json.Unmarshal(respBytes, &response); err != nil { return nil, fmt.Errorf("解析响应失败:%w", err) } @@ -54,22 +54,22 @@ func (c *CampaignReportSync) FetchReport(ctx context.Context, params interface{} return &response, nil } -func (c *CampaignReportSync) ConvertToSum(apiData interface{}, dataType string) interface{} { - response, ok := apiData.(*CampaignReportResponse) +func (c *AccountReportSync) ConvertToSum(apiData interface{}, dataType string) interface{} { + response, ok := apiData.(*AccountReportResponse) if !ok || response.Data == nil || response.Data.Sum == nil { return nil } - return c.converter.ConvertToSumItem(response.Data.Sum, dataType) + return c.converter.ConvertToSumItem(response.Data.Sum, dataType, 0) } -func (c *CampaignReportSync) ConvertToDetails(apiData interface{}, dataType string) []interface{} { - response, ok := apiData.(*CampaignReportResponse) +func (c *AccountReportSync) ConvertToDetails(apiData interface{}, dataType string) []interface{} { + response, ok := apiData.(*AccountReportResponse) if !ok || response.Data == nil || len(response.Data.Detail) == 0 { return nil } - detailItems := c.converter.ConvertToDetailItems(response.Data.Detail, dataType) + detailItems := c.converter.ConvertToDetailItems(response.Data.Detail, dataType, 0) result := make([]interface{}, len(detailItems)) for i, item := range detailItems { @@ -78,7 +78,7 @@ func (c *CampaignReportSync) ConvertToDetails(apiData interface{}, dataType stri return result } -func (c *CampaignReportSync) SaveSum(ctx context.Context, data interface{}) (int64, error) { +func (c *AccountReportSync) SaveSum(ctx context.Context, data interface{}) (int64, error) { sumItem, ok := data.(*dto.CidAccountReportSumItem) if !ok { return 0, fmt.Errorf("数据类型错误,期望 CidAccountReportSumItem 类型") @@ -92,7 +92,7 @@ func (c *CampaignReportSync) SaveSum(ctx context.Context, data interface{}) (int return res.Id, nil } -func (c *CampaignReportSync) SaveDetails(ctx context.Context, data []interface{}) (int64, int64, error) { +func (c *AccountReportSync) SaveDetails(ctx context.Context, data []interface{}) (int64, int64, error) { detailItems := make([]*dto.CidAccountReportDetailItem, len(data)) for i, item := range data { detailItem, ok := item.(*dto.CidAccountReportDetailItem) diff --git a/sync/campaign_report_types.go b/sync/account_report_types.go similarity index 93% rename from sync/campaign_report_types.go rename to sync/account_report_types.go index d8acb2b..68b87f1 100644 --- a/sync/campaign_report_types.go +++ b/sync/account_report_types.go @@ -1,17 +1,17 @@ package sync -type CampaignReportRequest struct { - AdvertiserID int64 `json:"advertiser_id"` - StartTime int64 `json:"start_time"` - EndTime int64 `json:"end_time"` - SelectColumns []string `json:"select_columns"` - GroupType int `json:"group_type"` - QueryVersion int `json:"query_version"` - SelectParam *CampaignSelectParam `json:"select_param,omitempty"` - PageInfo *PageInfo `json:"page_info,omitempty"` +type AccountReportRequest struct { + AdvertiserID int64 `json:"advertiser_id"` + StartTime int64 `json:"start_time"` + EndTime int64 `json:"end_time"` + SelectColumns []string `json:"select_columns"` + GroupType int `json:"group_type"` + QueryVersion int `json:"query_version"` + SelectParam *AccountSelectParam `json:"select_param,omitempty"` + PageInfo *PageInfo `json:"page_info,omitempty"` } -type CampaignSelectParam struct { +type AccountSelectParam struct { CampaignIDs []int64 `json:"campaign_ids,omitempty"` AuthorID int64 `json:"author_id,omitempty"` AdTypeStr string `json:"ad_type_str,omitempty"` @@ -33,19 +33,19 @@ type PageInfo struct { TotalCount int `json:"total_count"` } -type CampaignReportResponse struct { - Code int `json:"code"` - Message string `json:"message"` - Data *CampaignReportData `json:"data"` +type AccountReportResponse struct { + Code int `json:"code"` + Message string `json:"message"` + Data *AccountReportData `json:"data"` } -type CampaignReportData struct { - Sum *CampaignReportSum `json:"sum"` - Detail []*CampaignReportItem `json:"detail"` - TotalCount int `json:"total_count"` +type AccountReportData struct { + Sum *AccountReportSum `json:"sum"` + Detail []*AccountReportItem `json:"detail"` + TotalCount int `json:"total_count"` } -type CampaignReportSum struct { +type AccountReportSum struct { T0OrderPaymentAmt string `json:"t0_order_payment_amt"` CreativeMaterialType string `json:"creative_material_type"` LiveName string `json:"live_name"` @@ -232,4 +232,4 @@ type CampaignReportSum struct { CidVoucherCost *float64 `json:"cid_voucher_cost"` } -type CampaignReportItem CampaignReportSum +type AccountReportItem AccountReportSum diff --git a/sync/data_converter.go b/sync/data_converter.go index 0cfce1a..1d06cd8 100644 --- a/sync/data_converter.go +++ b/sync/data_converter.go @@ -10,7 +10,7 @@ func NewDataConverter() *DataConverter { return &DataConverter{} } -func (c *DataConverter) ConvertToSumItem(apiData *CampaignReportSum, dataType string) *copydata.CidAccountReportSumItem { +func (c *DataConverter) ConvertToSumItem(apiData *AccountReportSum, dataType string, pageNumber int) *copydata.CidAccountReportSumItem { if apiData == nil { return nil } @@ -191,6 +191,7 @@ func (c *DataConverter) ConvertToSumItem(apiData *CampaignReportSum, dataType st ItemEntranceClkCnt: apiData.ItemEntranceClkCnt, ShowCnt: apiData.ShowCnt, ReportDateStr: apiData.ReportDateStr, + PageNumber: pageNumber, CampaignId: apiData.CampaignId, CampaignName: apiData.CampaignName, UnitId: apiData.UnitId, @@ -204,26 +205,26 @@ func (c *DataConverter) ConvertToSumItem(apiData *CampaignReportSum, dataType st } } -func (c *DataConverter) ConvertToDetailItems(apiItems []*CampaignReportItem, dataType string) []*copydata.CidAccountReportDetailItem { +func (c *DataConverter) ConvertToDetailItems(apiItems []*AccountReportItem, dataType string, pageNumber int) []*copydata.CidAccountReportDetailItem { if len(apiItems) == 0 { return nil } result := make([]*copydata.CidAccountReportDetailItem, 0, len(apiItems)) for _, item := range apiItems { - detailItem := c.convertItemToDetail(item, dataType) + detailItem := c.convertItemToDetail(item, dataType, pageNumber) result = append(result, detailItem) } return result } -func (c *DataConverter) convertItemToDetail(apiItem *CampaignReportItem, dataType string) *copydata.CidAccountReportDetailItem { +func (c *DataConverter) convertItemToDetail(apiItem *AccountReportItem, dataType string, pageNumber int) *copydata.CidAccountReportDetailItem { if apiItem == nil { return nil } - item := (*CampaignReportSum)(apiItem) - sumItem := c.ConvertToSumItem(item, dataType) + item := (*AccountReportSum)(apiItem) + sumItem := c.ConvertToSumItem(item, dataType, pageNumber) return ©data.CidAccountReportDetailItem{ DataType: sumItem.DataType, @@ -401,6 +402,7 @@ func (c *DataConverter) convertItemToDetail(apiItem *CampaignReportItem, dataTyp ItemEntranceClkCnt: sumItem.ItemEntranceClkCnt, ShowCnt: sumItem.ShowCnt, ReportDateStr: sumItem.ReportDateStr, + PageNumber: pageNumber, CampaignId: sumItem.CampaignId, CampaignName: sumItem.CampaignName, UnitId: sumItem.UnitId, diff --git a/sync/mock_generator.go b/sync/mock_generator.go index 8dab89e..0236f5f 100644 --- a/sync/mock_generator.go +++ b/sync/mock_generator.go @@ -15,15 +15,15 @@ func NewMockDataGenerator() *MockDataGenerator { } } -func (m *MockDataGenerator) GenerateCampaignReportRequest() *CampaignReportRequest { - return &CampaignReportRequest{ +func (m *MockDataGenerator) GenerateAccountReportRequest() *AccountReportRequest { + return &AccountReportRequest{ AdvertiserID: 10001, StartTime: time.Now().AddDate(0, 0, -30).UnixNano() / 1e6, EndTime: time.Now().UnixNano() / 1e6, SelectColumns: []string{"impression", "click", "cost", "t0GMV"}, GroupType: 1, QueryVersion: 1, - SelectParam: &CampaignSelectParam{ + SelectParam: &AccountSelectParam{ CampaignIDs: []int64{1, 2, 3}, }, PageInfo: &PageInfo{ @@ -33,14 +33,14 @@ func (m *MockDataGenerator) GenerateCampaignReportRequest() *CampaignReportReque } } -func (m *MockDataGenerator) GenerateCampaignReportResponse() *CampaignReportResponse { +func (m *MockDataGenerator) GenerateAccountReportResponse() *AccountReportResponse { sumData := m.generateSumData() detailData := m.generateDetailData(5) - return &CampaignReportResponse{ + return &AccountReportResponse{ Code: 0, Message: "success", - Data: &CampaignReportData{ + Data: &AccountReportData{ Sum: sumData, Detail: detailData, TotalCount: len(detailData), @@ -48,12 +48,12 @@ func (m *MockDataGenerator) GenerateCampaignReportResponse() *CampaignReportResp } } -func (m *MockDataGenerator) generateSumData() *CampaignReportSum { +func (m *MockDataGenerator) generateSumData() *AccountReportSum { cost := m.randomFloat(1000, 10000) impression := m.randomInt64(10000, 100000) click := m.randomInt64(100, 1000) - return &CampaignReportSum{ + return &AccountReportSum{ T0OrderPaymentAmt: "888.99", CreativeMaterialType: "视频素材类型", LiveName: "测试直播间", @@ -241,10 +241,10 @@ func (m *MockDataGenerator) generateSumData() *CampaignReportSum { } } -func (m *MockDataGenerator) generateDetailData(count int) []*CampaignReportItem { - items := make([]*CampaignReportItem, count) +func (m *MockDataGenerator) generateDetailData(count int) []*AccountReportItem { + items := make([]*AccountReportItem, count) for i := 0; i < count; i++ { - items[i] = (*CampaignReportItem)(m.generateSumData()) + items[i] = (*AccountReportItem)(m.generateSumData()) } return items } diff --git a/sync/quick_sync.go b/sync/quick_sync.go index 060d143..62730a2 100644 --- a/sync/quick_sync.go +++ b/sync/quick_sync.go @@ -7,17 +7,17 @@ import ( "github.com/sirupsen/logrus" ) -func SyncCampaignReportWithMock(ctx context.Context) error { +func SyncAccountReportWithMock(ctx context.Context) error { syncService := NewSyncService() - req := &CampaignReportRequest{ + req := &AccountReportRequest{ AdvertiserID: 10001, StartTime: time.Now().AddDate(0, 0, -30).UnixNano() / 1e6, EndTime: time.Now().UnixNano() / 1e6, SelectColumns: []string{"impression", "click", "cost", "t0GMV"}, GroupType: 1, QueryVersion: 1, - SelectParam: &CampaignSelectParam{ + SelectParam: &AccountSelectParam{ CampaignIDs: []int64{1, 2, 3}, }, PageInfo: &PageInfo{ @@ -26,7 +26,7 @@ func SyncCampaignReportWithMock(ctx context.Context) error { }, } - result, err := syncService.SyncCampaignReport(ctx, req, true) + result, err := syncService.SyncAccountReport(ctx, req, true) if err != nil { logrus.Errorf("同步失败:%v", err) return err @@ -36,10 +36,10 @@ func SyncCampaignReportWithMock(ctx context.Context) error { return nil } -func SyncCampaignReportWithRealAPI(ctx context.Context, req *CampaignReportRequest) error { +func SyncAccountReportWithRealAPI(ctx context.Context, req *AccountReportRequest) error { syncService := NewSyncService() - result, err := syncService.SyncCampaignReport(ctx, req, false) + result, err := syncService.SyncAccountReport(ctx, req, false) if err != nil { logrus.Errorf("同步失败:%v", err) return err diff --git a/sync/sync_service.go b/sync/sync_service.go index 9c6c35f..e44d54e 100644 --- a/sync/sync_service.go +++ b/sync/sync_service.go @@ -1,14 +1,15 @@ package sync import ( + dao "cid/dao/copydata" dto "cid/model/dto/copydata" + taskDto "cid/model/dto/copydata" "cid/service/copydata" "context" "encoding/json" "fmt" "time" - "gitea.com/red-future/common/beans" "github.com/sirupsen/logrus" ) @@ -27,32 +28,44 @@ func NewSyncService() *SyncService { } type SyncResult struct { - SumSuccess bool `json:"sum_success"` - SumID int64 `json:"sum_id"` - DetailSuccess bool `json:"detail_success"` - DetailCount int `json:"detail_count"` - DetailSuccessCount int64 `json:"detail_success_count"` - DetailFailCount int64 `json:"detail_fail_count"` - Error error `json:"error"` + SumSuccess bool `json:"sum_success"` + SumID int64 `json:"sum_id"` + DetailSuccess bool `json:"detail_success"` + DetailCount int `json:"detail_count"` + DetailSuccessCount int64 `json:"detail_success_count"` + DetailFailCount int64 `json:"detail_fail_count"` + Error error `json:"error"` + TaskLogID int64 `json:"task_log_id"` + PageResults []*PageSyncResult `json:"page_results,omitempty"` } -func (s *SyncService) SyncCampaignReport(ctx context.Context, req *CampaignReportRequest, useMock bool) (*SyncResult, error) { +type PageSyncResult struct { + PageNumber int `json:"page_number"` + PageTaskLogID int64 `json:"page_task_log_id"` + Success bool `json:"success"` + RecordCount int `json:"record_count"` + DurationMs int64 `json:"duration_ms"` + ErrorMessage string `json:"error_message,omitempty"` + RetryCount int `json:"retry_count"` +} + +func (s *SyncService) SyncAccountReport(ctx context.Context, req *AccountReportRequest, useMock bool) (*SyncResult, error) { result := &SyncResult{} - var responseData *CampaignReportResponse + var responseData *AccountReportResponse if useMock { - logrus.Info("使用 Mock 数据同步快手广告计划报表") - responseData = s.mockGen.GenerateCampaignReportResponse() + logrus.Info("使用 Mock 数据同步快手广告账户报表") + responseData = s.mockGen.GenerateAccountReportResponse() } else { - logrus.Info("从真实 API 同步快手广告计划报表") - respBytes, err := s.httpClient.Post(ctx, "/rest/openapi/gw/esp/report/campaignReport", req) + logrus.Info("从真实 API 同步快手广告账户报表") + respBytes, err := s.httpClient.Post(ctx, "/rest/openapi/gw/esp/report/accountReport", req) if err != nil { result.Error = fmt.Errorf("调用 API 失败:%w", err) return result, result.Error } - responseData = &CampaignReportResponse{} + responseData = &AccountReportResponse{} if err := json.Unmarshal(respBytes, responseData); err != nil { result.Error = fmt.Errorf("解析响应失败:%w", err) return result, result.Error @@ -65,9 +78,7 @@ func (s *SyncService) SyncCampaignReport(ctx context.Context, req *CampaignRepor } if responseData.Data.Sum != nil { - sumItem := s.converter.ConvertToSumItem(responseData.Data.Sum, "campaign_report") - ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin"}) - + sumItem := s.converter.ConvertToSumItem(responseData.Data.Sum, "account_report", req.PageInfo.CurrentPage) sumResult, saveErr := s.saveSumData(ctx, sumItem) if saveErr != nil { logrus.Errorf("保存汇总数据失败:%v", saveErr) @@ -80,7 +91,7 @@ func (s *SyncService) SyncCampaignReport(ctx context.Context, req *CampaignRepor } if len(responseData.Data.Detail) > 0 { - detailItems := s.converter.ConvertToDetailItems(responseData.Data.Detail, "campaign_report") + detailItems := s.converter.ConvertToDetailItems(responseData.Data.Detail, "account_report", req.PageInfo.CurrentPage) detailResult, saveErr := s.saveDetailData(ctx, detailItems) if saveErr != nil { logrus.Errorf("保存明细数据失败:%v", saveErr) @@ -90,39 +101,117 @@ func (s *SyncService) SyncCampaignReport(ctx context.Context, req *CampaignRepor result.DetailCount = len(detailItems) result.DetailSuccessCount = detailResult.SuccessCount result.DetailFailCount = detailResult.FailCount - logrus.Infof("成功保存明细数据,成功=%d, 失败=%d", detailResult.SuccessCount, detailResult.FailCount) + logrus.Infof("成功保存 %d 条明细数据(成功=%d, 失败=%d)", len(detailItems), detailResult.SuccessCount, detailResult.FailCount) } } return result, result.Error } -// SyncCampaignReportWithPagination 带分页处理的同步方法(支持全量数据抽取) -func (s *SyncService) SyncCampaignReportWithPagination(ctx context.Context, req *CampaignReportRequest, useMock bool, maxRetries int) (*SyncResult, error) { - aggregatedResult := &SyncResult{ - SumSuccess: false, - SumID: 0, +// SyncAccountReportWithPagination 带分页处理的同步方法(支持全量数据抽取) +func (s *SyncService) SyncAccountReportWithPagination(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int) (*SyncResult, error) { + startTime := time.Now() + parentTaskID := fmt.Sprintf("%d_%d_account", req.AdvertiserID, req.StartTime) + + logReq := &taskDto.CreateSyncTaskLogReq{ + TaskID: parentTaskID, + TaskType: "account_report", + AdvertiserID: req.AdvertiserID, + StartTime: time.UnixMilli(req.StartTime), + EndTime: time.UnixMilli(req.EndTime), + Status: "pending", + MaxRetry: maxRetries, + RequestParams: req, + } + + parentLogID, err := dao.SyncTaskLog.Create(ctx, logReq) + if err != nil { + logrus.Errorf("创建主任务日志失败:%v", err) + } + + updateParentLog := func(status, errMsg, errorCode string, summary interface{}) { + if parentLogID == 0 { + return + } + duration := time.Since(startTime).Milliseconds() + updateReq := &taskDto.UpdateSyncTaskLogReq{ + ID: parentLogID, + Status: status, + ErrorMessage: errMsg, + ErrorCode: errorCode, + DurationMs: &duration, + } + + if status == "success" || status == "manual_review" { + completedAt := time.Now() + updateReq.CompletedAt = completedAt + } + + if summary != nil { + updateReq.ResultSummary = summary + } + + if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil { + logrus.Errorf("更新主任务日志失败:%v", err) + } + } + + updateParentLog("running", "", "", nil) + + aggregatedResult := &SyncResult{ + SumSuccess: false, + SumID: 0, + TaskLogID: parentLogID, + PageResults: make([]*PageSyncResult, 0), } - allDetailItems := make([]*dto.CidAccountReportDetailItem, 0) totalCount := 0 currentPage := 1 pageSize := 100 + successPages := 0 + failedPages := 0 if req.PageInfo == nil { req.PageInfo = &PageInfo{} } + var totalPages int + for { logrus.Infof(">>> 正在同步第 %d 页数据...", currentPage) req.PageInfo.CurrentPage = currentPage req.PageInfo.PageSize = pageSize - result, err := s.SyncWithRetry(ctx, req, useMock, maxRetries) + pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, currentPage) + pageStartTime := time.Now() + + pageResult := &PageSyncResult{ + PageNumber: currentPage, + Success: false, + RecordCount: 0, + RetryCount: 0, + } + + result, err := s.syncSinglePageWithTask(ctx, req, useMock, maxRetries, pageTaskID, currentPage) + pageDuration := time.Since(pageStartTime).Milliseconds() + pageResult.DurationMs = pageDuration + if err != nil { logrus.Errorf("第 %d 页同步失败:%v", currentPage, err) - return aggregatedResult, err + pageResult.ErrorMessage = err.Error() + failedPages++ + + aggregatedResult.PageResults = append(aggregatedResult.PageResults, pageResult) + + if failedPages > maxRetries { + logrus.Warnf("失败页数超过阈值 %d,终止同步", maxRetries) + break + } + + currentPage++ + time.Sleep(500 * time.Millisecond) + continue } if result.SumSuccess && aggregatedResult.SumID == 0 { @@ -132,17 +221,18 @@ func (s *SyncService) SyncCampaignReportWithPagination(ctx context.Context, req } if result.DetailSuccess && result.DetailCount > 0 { - detailItems := s.extractDetailItems(req, useMock) - if len(detailItems) > 0 { - allDetailItems = append(allDetailItems, detailItems...) - totalCount += len(detailItems) - logrus.Infof("✓ 第 %d 页获取到 %d 条明细数据,累计 %d 条", currentPage, len(detailItems), totalCount) - } + totalCount += result.DetailCount + pageResult.Success = true + pageResult.RecordCount = result.DetailCount + successPages++ + logrus.Infof("✓ 第 %d 页获取到 %d 条明细数据,累计 %d 条", currentPage, result.DetailCount, totalCount) } + aggregatedResult.PageResults = append(aggregatedResult.PageResults, pageResult) + currentData := s.fetchCurrentData(req, useMock) if currentData != nil && currentData.TotalCount > 0 { - totalPages := (currentData.TotalCount + pageSize - 1) / pageSize + totalPages = (currentData.TotalCount + pageSize - 1) / pageSize logrus.Infof("总记录数:%d, 总页数:%d, 当前页:%d/%d", currentData.TotalCount, totalPages, currentPage, totalPages) @@ -161,70 +251,148 @@ func (s *SyncService) SyncCampaignReportWithPagination(ctx context.Context, req time.Sleep(300 * time.Millisecond) } - if len(allDetailItems) > 0 { - logrus.Infof("开始批量保存 %d 条明细数据...", len(allDetailItems)) - detailResult, saveErr := s.saveDetailData(ctx, allDetailItems) - if saveErr != nil { - logrus.Errorf("批量保存明细数据失败:%v", saveErr) - aggregatedResult.Error = fmt.Errorf("批量保存明细数据失败:%w", saveErr) + logrus.Infof("分页同步完成 - 成功:%d页, 失败:%d页, 总明细:%d条", + successPages, failedPages, totalCount) + + // 统计所有子任务的结果 + totalDetailCount := 0 + var totalSuccessCount int64 + var totalFailCount int64 + + for _, pageResult := range aggregatedResult.PageResults { + if pageResult.Success { + totalDetailCount += pageResult.RecordCount + totalSuccessCount++ } else { - aggregatedResult.DetailSuccess = true - aggregatedResult.DetailCount = len(allDetailItems) - aggregatedResult.DetailSuccessCount = detailResult.SuccessCount - aggregatedResult.DetailFailCount = detailResult.FailCount - logrus.Infof("✓ 批量保存明细数据完成,成功=%d, 失败=%d", - detailResult.SuccessCount, detailResult.FailCount) + totalFailCount++ } + } + + aggregatedResult.DetailCount = totalDetailCount + aggregatedResult.DetailSuccessCount = totalSuccessCount + aggregatedResult.DetailFailCount = totalFailCount + + if failedPages > 0 { + logrus.Warnf("存在 %d 个失败的页面,主任务标记为部分失败", failedPages) + + summary := map[string]interface{}{ + "sum_id": aggregatedResult.SumID, + "detail_count": totalDetailCount, + "total_pages": totalPages, + "success_pages": successPages, + "failed_pages": failedPages, + "page_results": aggregatedResult.PageResults, + } + updateParentLog("partial_failed", fmt.Sprintf("%d 个页面同步失败", failedPages), "PAGE_SYNC_FAILED", summary) } else { - logrus.Info("没有明细数据需要保存") + logrus.Info("✓ 所有页面同步成功") + + summary := map[string]interface{}{ + "sum_id": aggregatedResult.SumID, + "detail_count": totalDetailCount, + "total_pages": totalPages, + "success_pages": successPages, + "failed_pages": 0, + "page_results": aggregatedResult.PageResults, + } + updateParentLog("success", "", "", summary) } return aggregatedResult, aggregatedResult.Error } -func (s *SyncService) extractDetailItems(req *CampaignReportRequest, useMock bool) []*dto.CidAccountReportDetailItem { - if useMock { - responseData := s.mockGen.GenerateCampaignReportResponse() - if responseData == nil || responseData.Data == nil || len(responseData.Data.Detail) == 0 { - return nil - } - return s.converter.ConvertToDetailItems(responseData.Data.Detail, "campaign_report") - } - - respBytes, err := s.httpClient.Post(context.Background(), "/rest/openapi/gw/esp/report/campaignReport", req) - if err != nil { - logrus.Errorf("重新获取数据失败:%v", err) - return nil - } - - responseData := &CampaignReportResponse{} - if err := json.Unmarshal(respBytes, responseData); err != nil { - logrus.Errorf("解析响应失败:%v", err) - return nil - } - - if responseData.Code != 0 || responseData.Data == nil || len(responseData.Data.Detail) == 0 { - return nil - } - - return s.converter.ConvertToDetailItems(responseData.Data.Detail, "campaign_report") +func (s *SyncService) SyncSinglePageWithTask(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int) (*SyncResult, error) { + return s.syncSinglePageWithTask(ctx, req, useMock, maxRetries, pageTaskID, pageNumber) } -func (s *SyncService) fetchCurrentData(req *CampaignReportRequest, useMock bool) *CampaignReportData { +func (s *SyncService) syncSinglePageWithTask(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int, pageTaskID string, pageNumber int) (*SyncResult, error) { + pageStartTime := time.Now() + + pageLogReq := &taskDto.CreateSyncTaskLogReq{ + TaskID: pageTaskID, + TaskType: "account_report_page", + AdvertiserID: req.AdvertiserID, + StartTime: time.UnixMilli(req.StartTime), + EndTime: time.UnixMilli(req.EndTime), + Status: "pending", + MaxRetry: maxRetries, + PageInfo: req.PageInfo, + RequestParams: map[string]interface{}{ + "page_number": pageNumber, + "page_size": req.PageInfo.PageSize, + }, + } + + pageLogID, err := dao.SyncTaskLog.Create(ctx, pageLogReq) + if err != nil { + logrus.Errorf("创建分页任务日志失败:%v", err) + } + + updatePageLog := func(status, errMsg, errorCode string, retryCount int) { + if pageLogID == 0 { + return + } + duration := time.Since(pageStartTime).Milliseconds() + updateReq := &taskDto.UpdateSyncTaskLogReq{ + ID: pageLogID, + Status: status, + ErrorMessage: errMsg, + ErrorCode: errorCode, + DurationMs: &duration, + } + + if retryCount > 0 { + updateReq.RetryCount = &retryCount + } + + if status == "success" || status == "failed" { + completedAt := time.Now() + updateReq.CompletedAt = completedAt + } + + if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil { + logrus.Errorf("更新分页任务日志失败:%v", err) + } + } + + updatePageLog("running", "", "", 0) + + logrus.Infof(">>> 开始同步第 %d 页数据...", pageNumber) + + result, err := s.SyncWithRetry(ctx, req, useMock, maxRetries) + + if err != nil { + updatePageLog("failed", err.Error(), "PAGE_SYNC_FAILED", 0) + return result, err + } + + summary := map[string]interface{}{ + "page_number": pageNumber, + "detail_count": result.DetailCount, + "sum_saved": result.SumSuccess, + } + updatePageLog("success", "", "", 0) + + logrus.Debugf("分页任务 %s 完成: %v", pageTaskID, summary) + + return result, nil +} + +func (s *SyncService) fetchCurrentData(req *AccountReportRequest, useMock bool) *AccountReportData { if useMock { - responseData := s.mockGen.GenerateCampaignReportResponse() + responseData := s.mockGen.GenerateAccountReportResponse() if responseData != nil && responseData.Data != nil { return responseData.Data } return nil } - respBytes, err := s.httpClient.Post(context.Background(), "/rest/openapi/gw/esp/report/campaignReport", req) + respBytes, err := s.httpClient.Post(context.Background(), "/rest/openapi/gw/esp/report/accountReport", req) if err != nil { return nil } - responseData := &CampaignReportResponse{} + responseData := &AccountReportResponse{} if err := json.Unmarshal(respBytes, responseData); err != nil { return nil } @@ -247,12 +415,12 @@ func (s *SyncService) saveDetailData(ctx context.Context, items []*dto.CidAccoun return copydata.CidAccountReportDetail.BatchCreate(ctx, req) } -func (s *SyncService) SyncWithRetry(ctx context.Context, req *CampaignReportRequest, useMock bool, maxRetries int) (*SyncResult, error) { +func (s *SyncService) SyncWithRetry(ctx context.Context, req *AccountReportRequest, useMock bool, maxRetries int) (*SyncResult, error) { var lastResult *SyncResult var lastErr error for attempt := 0; attempt <= maxRetries; attempt++ { - result, err := s.SyncCampaignReport(ctx, req, useMock) + result, err := s.SyncAccountReport(ctx, req, useMock) lastResult = result lastErr = err diff --git a/sync/sync_test.go b/sync/sync_test.go index f74005a..4d592c8 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -32,7 +32,7 @@ func init() { func TestMockDataGeneration(t *testing.T) { mockGen := NewMockDataGenerator() - req := mockGen.GenerateCampaignReportRequest() + req := mockGen.GenerateAccountReportRequest() if req == nil { t.Error("请求数据生成失败") return @@ -45,23 +45,19 @@ func TestDataConverter(t *testing.T) { converter := NewDataConverter() mockGen := NewMockDataGenerator() - responseData := mockGen.GenerateCampaignReportResponse() + responseData := mockGen.GenerateAccountReportResponse() if responseData == nil || responseData.Data.Sum == nil { t.Fatal("Mock 数据生成失败") } - sumItem := converter.ConvertToSumItem(responseData.Data.Sum, "campaign_report") + sumItem := converter.ConvertToSumItem(responseData.Data.Sum, "account_report", 1) if sumItem == nil { t.Fatal("转换为汇总数据失败") } - if sumItem.CampaignName == "" { - t.Error("计划名称为空") - } - fmt.Printf("✓ 汇总数据转换成功:计划=%s\n", sumItem.CampaignName) - detailItems := converter.ConvertToDetailItems(responseData.Data.Detail, "campaign_report") + detailItems := converter.ConvertToDetailItems(responseData.Data.Detail, "account_report", 1) if len(detailItems) == 0 { t.Fatal("转换为明细数据失败") } @@ -69,11 +65,11 @@ func TestDataConverter(t *testing.T) { fmt.Printf("✓ 明细数据转换成功:数量=%d\n", len(detailItems)) } -func TestSyncCampaignReportWithDB(t *testing.T) { +func TestSyncAccountReportWithDB(t *testing.T) { ctx := gctx.New() syncService := NewSyncService() - req := &CampaignReportRequest{ + req := &AccountReportRequest{ AdvertiserID: 10001, StartTime: time.Now().AddDate(0, 0, -30).UnixNano() / 1e6, EndTime: time.Now().UnixNano() / 1e6, @@ -82,7 +78,7 @@ func TestSyncCampaignReportWithDB(t *testing.T) { QueryVersion: 1, } - result, err := syncService.SyncCampaignReport(ctx, req, true) + result, err := syncService.SyncAccountReport(ctx, req, true) if err != nil { t.Logf("同步失败(可能是数据库问题): %v", err) return @@ -98,7 +94,7 @@ func TestSyncCampaignReportWithDB(t *testing.T) { // ctx := gctx.New() // syncService := NewSyncService() // -// req := &CampaignReportRequest{ +// req := &AccountReportRequest{ // AdvertiserID: 10001, // StartTime: time.Now().AddDate(0, 0, -30).UnixNano() / 1e6, // EndTime: time.Now().UnixNano() / 1e6, @@ -108,7 +104,7 @@ func TestSyncCampaignReportWithDB(t *testing.T) { // } // // logrus.Info("=== 开始执行定时同步任务 ===") -// result, err := syncService.SyncCampaignReportWithPagination(ctx, req, true, 3) +// result, err := syncService.SyncAccountReportWithPagination(ctx, req, true, 3) // if err != nil { // t.Logf("定时同步任务失败:%v", err) // return @@ -120,10 +116,10 @@ func TestSyncCampaignReportWithDB(t *testing.T) { // result.DetailCount, result.DetailSuccessCount, result.DetailFailCount) //} -func BenchmarkSyncCampaignReport(b *testing.B) { +func BenchmarkSyncAccountReport(b *testing.B) { ctx := gctx.New() syncService := NewSyncService() - req := &CampaignReportRequest{ + req := &AccountReportRequest{ AdvertiserID: 10001, StartTime: time.Now().AddDate(0, 0, -30).UnixNano() / 1e6, EndTime: time.Now().UnixNano() / 1e6, @@ -134,6 +130,6 @@ func BenchmarkSyncCampaignReport(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - _, _ = syncService.SyncCampaignReport(ctx, req, true) + _, _ = syncService.SyncAccountReport(ctx, req, true) } }