Files
data-engine/dao/copydata/sync_task_log_dao.go

169 lines
4.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package copydata
import (
consts "cid/consts/public"
dto "cid/model/dto/copydata"
entity "cid/model/entity/copydata"
"context"
"time"
"gitea.com/red-future/common/db/gfdb"
"github.com/gogf/gf/v2/database/gdb"
"github.com/gogf/gf/v2/util/gconv"
"github.com/sirupsen/logrus"
)
var SyncTaskLog = new(SyncTaskLogDao)
type SyncTaskLogDao struct{}
// Create 创建任务日志如果task_id已存在则返回现有ID
func (d *SyncTaskLogDao) Create(ctx context.Context, req *dto.CreateSyncTaskLogReq) (int64, error) {
existingTask, err := d.GetByTaskID(ctx, req.TaskID, req.TaskType)
if err == nil && existingTask != nil {
logrus.Debugf("任务日志已存在task_id=%s, task_type=%s, id=%d", req.TaskID, req.TaskType, existingTask.Id)
return existingTask.Id, nil
}
data := map[string]interface{}{
"task_id": req.TaskID,
"task_type": req.TaskType,
"advertiser_id": req.AdvertiserID,
"start_time": req.StartTime,
"end_time": req.EndTime,
"status": req.Status,
"max_retry": req.MaxRetry,
"page_info": req.PageInfo,
"request_params": req.RequestParams,
"retry_count": 0,
"duration_ms": 0,
}
r, err := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).Data(data).Insert()
if err != nil {
return 0, err
}
return r.LastInsertId()
}
// Update 更新任务日志
func (d *SyncTaskLogDao) Update(ctx context.Context, req *dto.UpdateSyncTaskLogReq) error {
data := make(gdb.Map)
if req.Status != "" {
data["status"] = req.Status
}
if req.RetryCount != nil {
data["retry_count"] = *req.RetryCount
}
if req.ErrorMessage != "" {
data["error_message"] = req.ErrorMessage
}
if req.ErrorCode != "" {
data["error_code"] = req.ErrorCode
}
if req.ResultSummary != nil {
data["result_summary"] = req.ResultSummary
}
if req.NextRetryTime != nil {
data["next_retry_time"] = req.NextRetryTime
}
if req.CompletedAt != nil {
data["completed_at"] = req.CompletedAt
}
if req.DurationMs != nil {
data["duration_ms"] = *req.DurationMs
}
data["updated_at"] = time.Now()
_, err := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).
Data(data).
Where("id", req.ID).
Update()
return err
}
// QueryFailedTasks 查询需要补偿的失败任务
func (d *SyncTaskLogDao) QueryFailedTasks(ctx context.Context, req *dto.QueryFailedTasksReq) ([]*dto.SyncTaskLogItem, error) {
model := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).Model
// 状态过滤
if len(req.Status) > 0 {
model = model.WhereIn("status", req.Status)
}
// 任务类型过滤
if req.TaskType != "" {
model = model.Where("task_type", req.TaskType)
}
// 只查询到达重试时间的任务(或从未设置过重试时间)
model = model.Where(
"(next_retry_time <= ? OR next_retry_time IS NULL)",
time.Now(),
)
// 限制数量
limit := req.Limit
if limit <= 0 {
limit = 100
}
model = model.Limit(limit)
var results []*entity.SyncTaskLog
if err := model.Scan(&results); err != nil {
return nil, err
}
items := make([]*dto.SyncTaskLogItem, len(results))
for i, r := range results {
item := &dto.SyncTaskLogItem{}
gconv.Struct(r, item)
items[i] = item
}
return items, nil
}
// GetByTaskID 根据任务ID获取日志
func (d *SyncTaskLogDao) GetByTaskID(ctx context.Context, taskID, taskType string) (*entity.SyncTaskLog, error) {
var result *entity.SyncTaskLog
err := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).
Where("task_id", taskID).
Where("task_type", taskType).
Scan(&result)
if err != nil {
return nil, err
}
return result, nil
}
// QueryAllPageTasksByParentID 根据主任务ID查询所有分页任务
func (d *SyncTaskLogDao) QueryAllPageTasksByParentID(ctx context.Context, parentTaskID string, limit int) ([]*dto.SyncTaskLogItem, error) {
if limit <= 0 {
limit = 1000
}
model := gfdb.DB(ctx).Model(ctx, consts.SyncTaskLogTable).Model
model = model.Where("task_type", "account_report_page")
model = model.WhereLike("task_id", parentTaskID+"_page_%")
model = model.Limit(limit)
var results []*entity.SyncTaskLog
if err := model.Scan(&results); err != nil {
return nil, err
}
items := make([]*dto.SyncTaskLogItem, len(results))
for i, r := range results {
item := &dto.SyncTaskLogItem{}
gconv.Struct(r, item)
items[i] = item
}
return items, nil
}