Files
data-engine/scheduler/run_sync_task_log_task.go
2026-06-10 15:56:02 +08:00

374 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"context"
dao "dataengine/dao/copydata"
taskDto "dataengine/model/dto/copydata"
"dataengine/syncdata"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"gitea.redpowerfuture.com/red-future/common/beans"
_ "github.com/gogf/gf/contrib/drivers/pgsql/v2"
"github.com/gogf/gf/v2/os/gctx"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)
// CompensationScheduler 补偿调度器,负责扫描和补偿失败的分页同步任务
type CompensationScheduler struct {
syncService *syncdata.SyncService
}
// NewCompensationScheduler 创建补偿调度器实例
func NewCompensationScheduler() *CompensationScheduler {
return &CompensationScheduler{
syncService: syncdata.NewSyncService(),
}
}
// RunCompensationOnce 执行一次补偿任务(用于手动触发或定时任务调用)
func (s *CompensationScheduler) RunCompensationOnce() {
ctx := gctx.New()
ctx = context.WithValue(ctx, "user", &beans.User{UserName: "admin"})
logrus.Info("=== 开始执行数据同步补偿任务 ===")
s.processCompensation(ctx)
logrus.Info("=== 补偿任务执行完毕 ===")
}
// processCompensation 处理补偿逻辑:扫描失败的分页任务并逐个补偿
func (s *CompensationScheduler) processCompensation(ctx context.Context) {
logrus.Info(">>> 开始扫描需要补偿的失败分页任务...")
queryReq := &taskDto.QueryFailedTasksReq{
Status: []string{"failed"},
TaskType: "account_report_page",
Limit: 50,
}
failedPageTasks, err := dao.SyncTaskLog.QueryFailedTasks(ctx, queryReq)
if err != nil {
logrus.Errorf("查询失败的分页任务异常:%v", err)
return
}
if len(failedPageTasks) == 0 {
logrus.Info("✓ 当前没有需要补偿的失败分页任务")
return
}
logrus.Infof("发现 %d 个需要补偿的失败分页任务,开始并发处理...", len(failedPageTasks))
maxConcurrency := 5
var successCount int64
var failCount int64
var manualReviewCount int64
sem := semaphore.NewWeighted(int64(maxConcurrency))
var wg sync.WaitGroup
for _, pageTask := range failedPageTasks {
wg.Add(1)
go func(task *taskDto.SyncTaskLogItem) {
defer wg.Done()
if err := sem.Acquire(ctx, 1); err != nil {
logrus.Errorf("获取信号量失败:%v", err)
atomic.AddInt64(&failCount, 1)
return
}
defer sem.Release(1)
if task.RetryCount >= task.MaxRetry {
logrus.Warnf("⚠ 分页任务 %s 已达到最大重试次数 %d标记为需人工处理", task.TaskID, task.MaxRetry)
updateReq := &taskDto.UpdateSyncTaskLogReq{
ID: task.Id,
Status: "manual_review",
ErrorMessage: fmt.Sprintf("已达到最大重试次数 %d 次", task.MaxRetry),
ErrorCode: "MAX_RETRY_EXCEEDED",
}
dao.SyncTaskLog.Update(ctx, updateReq)
s.sendAlert(task)
atomic.AddInt64(&manualReviewCount, 1)
return
}
logrus.Infof("▶ 开始补偿分页任务:%s (广告主=%d, 第 %d/%d 次重试)",
task.TaskID, task.AdvertiserID, task.RetryCount+1, task.MaxRetry)
if s.compensatePageTask(ctx, task) {
atomic.AddInt64(&successCount, 1)
logrus.Infof("✓ 分页任务 %s 补偿成功", task.TaskID)
parentTaskID := s.extractParentTaskID(task.TaskID)
if parentTaskID != "" {
s.checkAndUpdateParentTaskStatus(ctx, parentTaskID)
}
} else {
atomic.AddInt64(&failCount, 1)
logrus.Warnf("✗ 分页任务 %s 补偿失败", task.TaskID)
}
}(pageTask)
}
wg.Wait()
finalSuccess := atomic.LoadInt64(&successCount)
finalFail := atomic.LoadInt64(&failCount)
finalManualReview := atomic.LoadInt64(&manualReviewCount)
logrus.Infof("=== 补偿任务执行完成:总计=%d, 成功=%d, 失败=%d, 需人工处理=%d ===",
len(failedPageTasks), finalSuccess, finalFail, finalManualReview)
}
// compensatePageTask 补偿单个分页任务重新请求API并插入数据
// 返回 true 表示补偿成功false 表示补偿失败
func (s *CompensationScheduler) compensatePageTask(ctx context.Context, pageTask *taskDto.SyncTaskLogItem) bool {
retryCount := pageTask.RetryCount + 1
updateReq := &taskDto.UpdateSyncTaskLogReq{
ID: pageTask.Id,
Status: "retrying",
RetryCount: &retryCount,
}
if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil {
logrus.Errorf("更新任务状态为 retrying 失败:%v", err)
return false
}
startTime := s.parseTime(pageTask.StartTime)
endTime := s.parseTime(pageTask.EndTime)
pageNumber := s.extractPageNumber(pageTask.TaskID)
if pageNumber == 0 {
logrus.Errorf("无法从任务ID %s 解析页码", pageTask.TaskID)
s.markPageTaskFailed(ctx, pageTask.Id, retryCount, "无法解析页码", "PARSE_PAGE_NUMBER_FAILED")
return false
}
pageSize := s.extractPageSize(pageTask)
req := &syncdata.AccountReportRequest{
AdvertiserID: pageTask.AdvertiserID,
StartTime: startTime.UnixMilli(),
EndTime: endTime.UnixMilli(),
SelectColumns: []string{"impression", "click", "cost", "t0GMV"},
GroupType: 1,
QueryVersion: 1,
PageInfo: &syncdata.PageInfo{
CurrentPage: pageNumber,
PageSize: pageSize,
},
}
maxRetries := 3
parentTaskID := s.extractParentTaskID(pageTask.TaskID)
pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, pageNumber)
result, _, err := s.syncService.SyncSinglePageWithTask(ctx, req, true, maxRetries, pageTaskID, pageNumber)
if err != nil {
logrus.Errorf("补偿分页任务 %s 失败(第 %d 次):%v", pageTask.TaskID, retryCount, err)
s.markPageTaskFailed(ctx, pageTask.Id, retryCount, err.Error(), "PAGE_COMPENSATION_FAILED")
return false
}
logrus.Infof("✓ 补偿分页任务 %s 成功 - 记录数=%d", pageTask.TaskID, result.DetailCount)
return true
}
// markPageTaskFailed 标记分页任务为失败状态,并设置下次重试时间(指数退避策略)
func (s *CompensationScheduler) markPageTaskFailed(ctx context.Context, taskID int64, retryCount int, errMsg, errCode string) {
backoffMinutes := s.calculateBackoff(retryCount)
nextRetry := time.Now().Add(time.Duration(backoffMinutes) * time.Minute)
updateReq := &taskDto.UpdateSyncTaskLogReq{
ID: taskID,
Status: "failed",
ErrorMessage: errMsg,
ErrorCode: errCode,
NextRetryTime: nextRetry,
}
dao.SyncTaskLog.Update(ctx, updateReq)
}
// checkAndUpdateParentTaskStatus 检查主任务的所有分页任务状态,如果全部成功则更新主任务状态
func (s *CompensationScheduler) checkAndUpdateParentTaskStatus(ctx context.Context, parentTaskID string) {
logrus.Infof(">>> 检查主任务 %s 的所有分页任务状态...", parentTaskID)
parentTask, err := dao.SyncTaskLog.GetByTaskID(ctx, parentTaskID, "account_report")
if err != nil || parentTask == nil {
logrus.Warnf("未找到主任务 %s跳过状态更新", parentTaskID)
return
}
if parentTask.Status == "success" {
logrus.Infof("主任务 %s 已经是成功状态,无需更新", parentTaskID)
return
}
allPageTasks, err := dao.SyncTaskLog.QueryAllPageTasksByParentID(ctx, parentTaskID, 1000)
if err != nil {
logrus.Errorf("查询主任务 %s 的分页任务失败:%v", parentTaskID, err)
return
}
if len(allPageTasks) == 0 {
logrus.Warnf("主任务 %s 没有找到任何分页任务", parentTaskID)
return
}
failedPages := make([]int, 0)
successPages := make([]int, 0)
for _, pageTask := range allPageTasks {
pageNumber := s.extractPageNumber(pageTask.TaskID)
if pageTask.Status == "success" {
successPages = append(successPages, pageNumber)
} else if pageTask.Status == "failed" || pageTask.Status == "manual_review" {
failedPages = append(failedPages, pageNumber)
}
}
logrus.Infof("主任务 %s 分页状态:总数=%d, 成功=%d, 失败=%d",
parentTaskID, len(allPageTasks), len(successPages), len(failedPages))
if len(failedPages) == 0 {
logrus.Infof("✓ 主任务 %s 的所有分页任务都已成功,更新主任务状态为 success", parentTaskID)
summary := map[string]interface{}{
"total_pages": len(allPageTasks),
"success_pages": len(successPages),
"failed_pages": 0,
"compensated": true,
}
updateReq := &taskDto.UpdateSyncTaskLogReq{
ID: parentTask.Id,
Status: "success",
ResultSummary: summary,
}
if err := dao.SyncTaskLog.Update(ctx, updateReq); err != nil {
logrus.Errorf("更新主任务 %s 状态失败:%v", parentTaskID, err)
}
} else {
logrus.Infof("⚠ 主任务 %s 仍有 %d 个失败的分页任务:%v保持部分失败状态",
parentTaskID, len(failedPages), failedPages)
}
}
// extractParentTaskID 从分页任务ID中提取主任务ID
// 例如:从 "12345_1234567890_account_page_2" 提取 "12345_1234567890_account"
func (s *CompensationScheduler) extractParentTaskID(taskID string) string {
if idx := strings.LastIndex(taskID, "_page_"); idx > 0 {
return taskID[:idx]
}
return ""
}
// extractPageNumber 从分页任务ID中提取页码
// 例如:从 "12345_1234567890_account_page_2" 提取 2
func (s *CompensationScheduler) extractPageNumber(taskID string) int {
if idx := strings.LastIndex(taskID, "_page_"); idx > 0 {
var pageNum int
fmt.Sscanf(taskID[idx+6:], "%d", &pageNum)
return pageNum
}
return 0
}
// extractPageSize 从任务日志的 PageInfo 或 RequestParams 字段中提取每页大小
// 优先级PageInfo.page_size > RequestParams.page_size > 默认值10
func (s *CompensationScheduler) extractPageSize(pageTask *taskDto.SyncTaskLogItem) int {
logrus.Infof("DEBUG - PageInfo 类型: %T, 值: %+v", pageTask.PageInfo, pageTask.PageInfo)
logrus.Infof("DEBUG - RequestParams 类型: %T, 值: %+v", pageTask.RequestParams, pageTask.RequestParams)
// 优先从 PageInfo 中提取
if pageTask.PageInfo != nil {
switch v := pageTask.PageInfo.(type) {
case map[string]interface{}:
// 尝试 float64 类型JSON 数字默认类型)
if pageSize, ok := v["page_size"].(float64); ok {
return int(pageSize)
}
// 尝试 string 类型
if pageSizeStr, ok := v["page_size"].(string); ok {
var pageSize int
fmt.Sscanf(pageSizeStr, "%d", &pageSize)
if pageSize > 0 {
return pageSize
}
}
case map[string]string:
if pageSizeStr, ok := v["page_size"]; ok {
var pageSize int
fmt.Sscanf(pageSizeStr, "%d", &pageSize)
if pageSize > 0 {
return pageSize
}
}
}
}
// 其次从 RequestParams 中提取
if pageTask.RequestParams != nil {
switch v := pageTask.RequestParams.(type) {
case map[string]interface{}:
if pageSize, ok := v["page_size"].(float64); ok {
return int(pageSize)
}
if pageSizeStr, ok := v["page_size"].(string); ok {
var pageSize int
fmt.Sscanf(pageSizeStr, "%d", &pageSize)
if pageSize > 0 {
return pageSize
}
}
}
}
// 默认值改为 10
return 10
}
// calculateBackoff 根据重试次数计算退避时间(分钟)
// 重试次数1->5分钟, 2->15分钟, 3->30分钟, 4->60分钟, 5+->120分钟
func (s *CompensationScheduler) calculateBackoff(retryCount int) int {
backoffs := []int{5, 15, 30, 60, 120}
if retryCount <= len(backoffs) {
return backoffs[retryCount-1]
}
return backoffs[len(backoffs)-1]
}
// parseTime 解析时间字段,支持 time.Time 和字符串格式
func (s *CompensationScheduler) parseTime(t interface{}) time.Time {
switch v := t.(type) {
case time.Time:
return v
case string:
if parsed, err := time.Parse("2006-01-02 15:04:05", v); err == nil {
return parsed
}
}
return time.Now()
}
// sendAlert 发送告警通知(当前仅记录错误日志)
func (s *CompensationScheduler) sendAlert(task *taskDto.SyncTaskLogItem) {
logrus.Errorf("【告警】分页任务 %s 需要人工介入:广告主=%d, 错误=%s",
task.TaskID, task.AdvertiserID, task.ErrorMessage)
}
func main() {
scheduler := NewCompensationScheduler()
scheduler.RunCompensationOnce()
}