Files
data-engine/scheduler/run_sync_task_log_task.go
2026-04-08 14:30:09 +08:00

274 lines
8.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
dao "cid/dao/copydata"
taskDto "cid/model/dto/copydata"
"cid/sync"
"context"
"fmt"
"strings"
"time"
"gitea.com/red-future/common/beans"
_ "github.com/gogf/gf/contrib/drivers/pgsql/v2"
"github.com/gogf/gf/v2/os/gctx"
"github.com/sirupsen/logrus"
)
type CompensationScheduler struct {
syncService *sync.SyncService
}
func NewCompensationScheduler() *CompensationScheduler {
return &CompensationScheduler{
syncService: sync.NewSyncService(),
}
}
func (s *CompensationScheduler) RunCompensationOnce() {
ctx := gctx.New()
ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin"})
logrus.Info("=== 开始执行数据同步补偿任务 ===")
s.processCompensation(ctx)
logrus.Info("=== 补偿任务执行完毕 ===")
}
func (s *CompensationScheduler) processCompensation(ctx context.Context) {
logrus.Info(">>> 开始扫描需要补偿的失败分页任务...")
queryReq := &taskDto.QueryFailedTasksReq{
Status: []string{"failed"},
TaskType: "account_report_page",
Limit: 50,
}
failedPageTasks, err := dao.SyncTaskLog.QueryFailedTasks(ctx, queryReq)
if err != nil {
logrus.Errorf("查询失败的分页任务异常:%v", err)
return
}
if len(failedPageTasks) == 0 {
logrus.Info("✓ 当前没有需要补偿的失败分页任务")
return
}
logrus.Infof("发现 %d 个需要补偿的失败分页任务,开始逐个处理...", len(failedPageTasks))
successCount := 0
failCount := 0
for _, pageTask := range failedPageTasks {
if pageTask.RetryCount >= pageTask.MaxRetry {
logrus.Warnf("⚠ 分页任务 %s 已达到最大重试次数 %d标记为需人工处理", pageTask.TaskID, pageTask.MaxRetry)
updateReq := &taskDto.UpdateSyncTaskLogReq{
ID: pageTask.Id,
Status: "manual_review",
ErrorMessage: fmt.Sprintf("已达到最大重试次数 %d 次", pageTask.MaxRetry),
ErrorCode: "MAX_RETRY_EXCEEDED",
}
dao.SyncTaskLog.Update(ctx, updateReq)
s.sendAlert(pageTask)
failCount++
continue
}
logrus.Infof("▶ 开始补偿分页任务:%s (广告主=%d, 第 %d/%d 次重试)",
pageTask.TaskID, pageTask.AdvertiserID, pageTask.RetryCount+1, pageTask.MaxRetry)
if s.compensatePageTask(ctx, pageTask) {
successCount++
logrus.Infof("✓ 分页任务 %s 补偿成功", pageTask.TaskID)
parentTaskID := s.extractParentTaskID(pageTask.TaskID)
if parentTaskID != "" {
s.checkAndUpdateParentTaskStatus(ctx, parentTaskID)
}
} else {
failCount++
logrus.Warnf("✗ 分页任务 %s 补偿失败", pageTask.TaskID)
}
time.Sleep(1 * time.Second)
}
logrus.Infof("=== 补偿任务执行完成:总计=%d, 成功=%d, 失败=%d ===",
len(failedPageTasks), successCount, failCount)
}
func (s *CompensationScheduler) compensatePageTask(ctx context.Context, pageTask *taskDto.SyncTaskLogItem) bool {
retryCount := pageTask.RetryCount + 1
updateReq := &taskDto.UpdateSyncTaskLogReq{
ID: pageTask.Id,
Status: "retrying",
RetryCount: &retryCount,
}
dao.SyncTaskLog.Update(ctx, updateReq)
startTime := s.parseTime(pageTask.StartTime)
endTime := s.parseTime(pageTask.EndTime)
pageNumber := s.extractPageNumber(pageTask.TaskID)
if pageNumber == 0 {
logrus.Errorf("无法从任务ID %s 解析页码", pageTask.TaskID)
s.markPageTaskFailed(ctx, pageTask.Id, retryCount, "无法解析页码", "PARSE_PAGE_NUMBER_FAILED")
return false
}
req := &sync.AccountReportRequest{
AdvertiserID: pageTask.AdvertiserID,
StartTime: startTime.UnixMilli(),
EndTime: endTime.UnixMilli(),
SelectColumns: []string{"impression", "click", "cost", "t0GMV"},
GroupType: 1,
QueryVersion: 1,
PageInfo: &sync.PageInfo{
CurrentPage: pageNumber,
PageSize: 100,
},
}
maxRetries := 3
parentTaskID := s.extractParentTaskID(pageTask.TaskID)
pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, pageNumber)
result, err := s.syncService.SyncSinglePageWithTask(ctx, req, true, maxRetries, pageTaskID, pageNumber)
if err != nil {
logrus.Errorf("补偿分页任务 %s 失败(第 %d 次):%v", pageTask.TaskID, retryCount, err)
s.markPageTaskFailed(ctx, pageTask.Id, retryCount, err.Error(), "PAGE_COMPENSATION_FAILED")
return false
}
logrus.Infof("✓ 补偿分页任务 %s 成功 - 记录数=%d", pageTask.TaskID, result.DetailCount)
return true
}
func (s *CompensationScheduler) markPageTaskFailed(ctx context.Context, taskID int64, retryCount int, errMsg, errCode string) {
backoffMinutes := s.calculateBackoff(retryCount)
nextRetry := time.Now().Add(time.Duration(backoffMinutes) * time.Minute)
updateReq := &taskDto.UpdateSyncTaskLogReq{
ID: taskID,
Status: "failed",
ErrorMessage: errMsg,
ErrorCode: errCode,
NextRetryTime: nextRetry,
}
dao.SyncTaskLog.Update(ctx, updateReq)
}
func (s *CompensationScheduler) checkAndUpdateParentTaskStatus(ctx context.Context, parentTaskID string) {
logrus.Infof(">>> 检查主任务 %s 的所有分页任务状态...", parentTaskID)
parentTask, err := dao.SyncTaskLog.GetByTaskID(ctx, parentTaskID, "account_report")
if err != nil || parentTask == nil {
logrus.Warnf("未找到主任务 %s跳过状态更新", parentTaskID)
return
}
if parentTask.Status == "success" {
logrus.Infof("主任务 %s 已经是成功状态,无需更新", parentTaskID)
return
}
allPageTasks, err := dao.SyncTaskLog.QueryAllPageTasksByParentID(ctx, parentTaskID, 1000)
if err != nil {
logrus.Errorf("查询主任务 %s 的分页任务失败:%v", parentTaskID, err)
return
}
if len(allPageTasks) == 0 {
logrus.Warnf("主任务 %s 没有找到任何分页任务", parentTaskID)
return
}
failedPages := make([]int, 0)
successPages := make([]int, 0)
for _, pageTask := range allPageTasks {
pageNumber := s.extractPageNumber(pageTask.TaskID)
if pageTask.Status == "success" {
successPages = append(successPages, pageNumber)
} else if pageTask.Status == "failed" || pageTask.Status == "manual_review" {
failedPages = append(failedPages, pageNumber)
}
}
logrus.Infof("主任务 %s 分页状态:总数=%d, 成功=%d, 失败=%d",
parentTaskID, len(allPageTasks), len(successPages), len(failedPages))
if len(failedPages) == 0 {
logrus.Infof("✓ 主任务 %s 的所有分页任务都已成功,更新主任务状态为 success", parentTaskID)
summary := map[string]interface{}{
"total_pages": len(allPageTasks),
"success_pages": len(successPages),
"failed_pages": 0,
"compensated": true,
}
updateReq := &taskDto.UpdateSyncTaskLogReq{
ID: parentTask.Id,
Status: "success",
ResultSummary: summary,
}
if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil {
logrus.Errorf("更新主任务 %s 状态失败:%v", parentTaskID, err)
}
} else {
logrus.Infof("⚠ 主任务 %s 仍有 %d 个失败的分页任务:%v保持部分失败状态",
parentTaskID, len(failedPages), failedPages)
}
}
func (s *CompensationScheduler) extractParentTaskID(taskID string) string {
if idx := strings.LastIndex(taskID, "_page_"); idx > 0 {
return taskID[:idx]
}
return ""
}
func (s *CompensationScheduler) extractPageNumber(taskID string) int {
if idx := strings.LastIndex(taskID, "_page_"); idx > 0 {
var pageNum int
fmt.Sscanf(taskID[idx+6:], "%d", &pageNum)
return pageNum
}
return 0
}
func (s *CompensationScheduler) calculateBackoff(retryCount int) int {
backoffs := []int{5, 15, 30, 60, 120}
if retryCount <= len(backoffs) {
return backoffs[retryCount-1]
}
return backoffs[len(backoffs)-1]
}
func (s *CompensationScheduler) parseTime(t interface{}) time.Time {
switch v := t.(type) {
case time.Time:
return v
case string:
if parsed, err := time.Parse("2006-01-02 15:04:05", v); err == nil {
return parsed
}
}
return time.Now()
}
func (s *CompensationScheduler) sendAlert(task *taskDto.SyncTaskLogItem) {
logrus.Errorf("【告警】分页任务 %s 需要人工介入:广告主=%d, 错误=%s",
task.TaskID, task.AdvertiserID, task.ErrorMessage)
}
func main() {
scheduler := NewCompensationScheduler()
scheduler.RunCompensationOnce()
}