From eb5e0af30803bf4b9343afa20e6fd4f46f8ceb30 Mon Sep 17 00:00:00 2001 From: lmk <1095689763@qq.com> Date: Thu, 9 Apr 2026 13:43:15 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=A1=A5=E5=81=BF=E6=9C=BA?= =?UTF-8?q?=E5=88=B6=E7=9A=84=E6=97=A5=E5=BF=97=E8=A1=A8=E7=BC=BA=E5=A4=B1?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dao/copydata/sync_task_log_dao.go | 17 +++++++++++++---- syncdata/sync_service.go | 20 ++++++++------------ 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/dao/copydata/sync_task_log_dao.go b/dao/copydata/sync_task_log_dao.go index da05ca0..baf52da 100644 --- a/dao/copydata/sync_task_log_dao.go +++ b/dao/copydata/sync_task_log_dao.go @@ -25,12 +25,21 @@ func (d *SyncTaskLogDao) Create(ctx context.Context, req *dto.CreateSyncTaskLogR return existingTask.Id, nil } - var entityData entity.SyncTaskLog - if err := gconv.Struct(req, &entityData); err != nil { - return 0, err + data := map[string]interface{}{ + "task_id": req.TaskID, + "task_type": req.TaskType, + "advertiser_id": req.AdvertiserID, + "start_time": req.StartTime, + "end_time": req.EndTime, + "status": req.Status, + "max_retry": req.MaxRetry, + "page_info": req.PageInfo, + "request_params": req.RequestParams, + "retry_count": 0, + "duration_ms": 0, } - r, err := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).Data(&entityData).Insert() + r, err := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).Data(data).Insert() if err != nil { return 0, err } diff --git a/syncdata/sync_service.go b/syncdata/sync_service.go index b1832b5..902380f 100644 --- a/syncdata/sync_service.go +++ b/syncdata/sync_service.go @@ -423,7 +423,8 @@ func (s *SyncService) syncSinglePageWithTaskForConcurrent(ctx context.Context, r pageLogID, err := dao.SyncTaskLog.Create(ctx, pageLogReq) if err != nil { - logrus.Errorf("创建分页任务日志失败:%v", err) + logrus.Errorf("创建分页任务日志失败(page=%d):%v", pageNumber, err) + return nil, 0, fmt.Errorf("创建分页任务日志失败:%w", err) } updatePageLog := func(status, errMsg, errorCode string, retryCount int) { @@ -510,7 +511,8 @@ type ConcurrentSyncConfig struct { func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *AccountReportRequest, config ConcurrentSyncConfig) (*SyncResult, error) { startTime := time.Now() - parentTaskID := fmt.Sprintf("%d_%d_account", req.AdvertiserID, req.StartTime) + batchID := startTime.UnixNano() + parentTaskID := fmt.Sprintf("%d_%d_%d_account", req.AdvertiserID, req.StartTime, batchID) logReq := &taskDto.CreateSyncTaskLogReq{ TaskID: parentTaskID, @@ -577,14 +579,15 @@ func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *Acco } totalPages := (currentData.TotalCount + pageSize - 1) / pageSize - logrus.Infof("开始并发同步 - 总记录数:%d, 总页数:%d, 每页大小:%d, 并发数:%d", - currentData.TotalCount, totalPages, pageSize, config.MaxConcurrency) + logrus.Infof("开始并发同步 - 批次ID:%d, 总记录数:%d, 总页数:%d, 每页大小:%d, 并发数:%d", + batchID, currentData.TotalCount, totalPages, pageSize, config.MaxConcurrency) updateParentLog("running", "", "", map[string]interface{}{ "total_pages": totalPages, "total_records": currentData.TotalCount, "page_size": pageSize, "concurrency": config.MaxConcurrency, + "batch_id": batchID, }) pageResults := make([]*PageSyncResult, totalPages) @@ -602,7 +605,7 @@ func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *Acco eg, egCtx := errgroup.WithContext(ctx) for pageNum := 1; pageNum <= totalPages; pageNum++ { - if err := sem.Acquire(egCtx, 1); err != nil { + if err := sem.Acquire(ctx, 1); err != nil { logrus.Errorf("获取信号量失败:%v", err) break } @@ -611,12 +614,6 @@ func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *Acco eg.Go(func() error { defer sem.Release(1) - select { - case <-egCtx.Done(): - return egCtx.Err() - default: - } - pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, currentPage) pageStartTime := time.Now() @@ -650,7 +647,6 @@ func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *Acco newFailedCount := atomic.AddInt64(&failedPages, 1) if int(newFailedCount) > config.MaxRetries { logrus.Warnf("失败页数超过阈值 %d", config.MaxRetries) - return fmt.Errorf("失败页数超过阈值") } return nil }