修复补偿机制的日志表缺失的问题
This commit is contained in:
@@ -25,12 +25,21 @@ func (d *SyncTaskLogDao) Create(ctx context.Context, req *dto.CreateSyncTaskLogR
|
|||||||
return existingTask.Id, nil
|
return existingTask.Id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var entityData entity.SyncTaskLog
|
data := map[string]interface{}{
|
||||||
if err := gconv.Struct(req, &entityData); err != nil {
|
"task_id": req.TaskID,
|
||||||
return 0, err
|
"task_type": req.TaskType,
|
||||||
|
"advertiser_id": req.AdvertiserID,
|
||||||
|
"start_time": req.StartTime,
|
||||||
|
"end_time": req.EndTime,
|
||||||
|
"status": req.Status,
|
||||||
|
"max_retry": req.MaxRetry,
|
||||||
|
"page_info": req.PageInfo,
|
||||||
|
"request_params": req.RequestParams,
|
||||||
|
"retry_count": 0,
|
||||||
|
"duration_ms": 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
r, err := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).Data(&entityData).Insert()
|
r, err := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).Data(data).Insert()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -423,7 +423,8 @@ func (s *SyncService) syncSinglePageWithTaskForConcurrent(ctx context.Context, r
|
|||||||
|
|
||||||
pageLogID, err := dao.SyncTaskLog.Create(ctx, pageLogReq)
|
pageLogID, err := dao.SyncTaskLog.Create(ctx, pageLogReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.Errorf("创建分页任务日志失败:%v", err)
|
logrus.Errorf("创建分页任务日志失败(page=%d):%v", pageNumber, err)
|
||||||
|
return nil, 0, fmt.Errorf("创建分页任务日志失败:%w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
updatePageLog := func(status, errMsg, errorCode string, retryCount int) {
|
updatePageLog := func(status, errMsg, errorCode string, retryCount int) {
|
||||||
@@ -510,7 +511,8 @@ type ConcurrentSyncConfig struct {
|
|||||||
|
|
||||||
func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *AccountReportRequest, config ConcurrentSyncConfig) (*SyncResult, error) {
|
func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *AccountReportRequest, config ConcurrentSyncConfig) (*SyncResult, error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
parentTaskID := fmt.Sprintf("%d_%d_account", req.AdvertiserID, req.StartTime)
|
batchID := startTime.UnixNano()
|
||||||
|
parentTaskID := fmt.Sprintf("%d_%d_%d_account", req.AdvertiserID, req.StartTime, batchID)
|
||||||
|
|
||||||
logReq := &taskDto.CreateSyncTaskLogReq{
|
logReq := &taskDto.CreateSyncTaskLogReq{
|
||||||
TaskID: parentTaskID,
|
TaskID: parentTaskID,
|
||||||
@@ -577,14 +579,15 @@ func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *Acco
|
|||||||
}
|
}
|
||||||
|
|
||||||
totalPages := (currentData.TotalCount + pageSize - 1) / pageSize
|
totalPages := (currentData.TotalCount + pageSize - 1) / pageSize
|
||||||
logrus.Infof("开始并发同步 - 总记录数:%d, 总页数:%d, 每页大小:%d, 并发数:%d",
|
logrus.Infof("开始并发同步 - 批次ID:%d, 总记录数:%d, 总页数:%d, 每页大小:%d, 并发数:%d",
|
||||||
currentData.TotalCount, totalPages, pageSize, config.MaxConcurrency)
|
batchID, currentData.TotalCount, totalPages, pageSize, config.MaxConcurrency)
|
||||||
|
|
||||||
updateParentLog("running", "", "", map[string]interface{}{
|
updateParentLog("running", "", "", map[string]interface{}{
|
||||||
"total_pages": totalPages,
|
"total_pages": totalPages,
|
||||||
"total_records": currentData.TotalCount,
|
"total_records": currentData.TotalCount,
|
||||||
"page_size": pageSize,
|
"page_size": pageSize,
|
||||||
"concurrency": config.MaxConcurrency,
|
"concurrency": config.MaxConcurrency,
|
||||||
|
"batch_id": batchID,
|
||||||
})
|
})
|
||||||
|
|
||||||
pageResults := make([]*PageSyncResult, totalPages)
|
pageResults := make([]*PageSyncResult, totalPages)
|
||||||
@@ -602,7 +605,7 @@ func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *Acco
|
|||||||
eg, egCtx := errgroup.WithContext(ctx)
|
eg, egCtx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
for pageNum := 1; pageNum <= totalPages; pageNum++ {
|
for pageNum := 1; pageNum <= totalPages; pageNum++ {
|
||||||
if err := sem.Acquire(egCtx, 1); err != nil {
|
if err := sem.Acquire(ctx, 1); err != nil {
|
||||||
logrus.Errorf("获取信号量失败:%v", err)
|
logrus.Errorf("获取信号量失败:%v", err)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -611,12 +614,6 @@ func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *Acco
|
|||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
defer sem.Release(1)
|
defer sem.Release(1)
|
||||||
|
|
||||||
select {
|
|
||||||
case <-egCtx.Done():
|
|
||||||
return egCtx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, currentPage)
|
pageTaskID := fmt.Sprintf("%s_page_%d", parentTaskID, currentPage)
|
||||||
pageStartTime := time.Now()
|
pageStartTime := time.Now()
|
||||||
|
|
||||||
@@ -650,7 +647,6 @@ func (s *SyncService) SyncAccountReportConcurrent(ctx context.Context, req *Acco
|
|||||||
newFailedCount := atomic.AddInt64(&failedPages, 1)
|
newFailedCount := atomic.AddInt64(&failedPages, 1)
|
||||||
if int(newFailedCount) > config.MaxRetries {
|
if int(newFailedCount) > config.MaxRetries {
|
||||||
logrus.Warnf("失败页数超过阈值 %d", config.MaxRetries)
|
logrus.Warnf("失败页数超过阈值 %d", config.MaxRetries)
|
||||||
return fmt.Errorf("失败页数超过阈值")
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user