package service import ( "context" "time" "model-asynch/dao" "github.com/gogf/gf/v2/frame/g" ) var Cleaner = &cleaner{} type cleaner struct{} func (c *cleaner) Start(ctx context.Context) { if !g.Cfg().MustGet(ctx, "asynch.cleaner.enabled", true).Bool() { g.Log().Warningf(ctx, "[cleaner] asynch.cleaner.enabled=false,cleaner 未启动") return } intervalStr := g.Cfg().MustGet(ctx, "asynch.cleaner.interval", "10m").String() interval, _ := time.ParseDuration(intervalStr) if interval <= 0 { interval = 10 * time.Minute } go func() { ticker := time.NewTicker(interval) defer ticker.Stop() g.Log().Infof(ctx, "[cleaner] started, interval=%s", interval) for { select { case <-ctx.Done(): return case <-ticker.C: c.runOnce(ctx) } } }() } func (c *cleaner) runOnce(ctx context.Context) { // 1) 清理已下载(state=4)且过期的任务(硬删除 + OSS) expired, err := dao.Task.ListExpiredDownloadedGlobal(ctx, 200) if err != nil { g.Log().Errorf(ctx, "[cleaner] list expired(downloaded) error: %v", err) } else { for _, t := range expired { _ = dao.Task.HardDeleteByIDGlobal(ctx, t.Id) } g.Log().Infof(ctx, "[cleaner] expired(downloaded) cleaned, count=%d", len(expired)) } // 2) 超时任务标失败 timeoutStr := g.Cfg().MustGet(ctx, "asynch.worker.taskTimeout", "30m").String() timeout, _ := time.ParseDuration(timeoutStr) if timeout > 0 { list, err := dao.Task.ListTimeoutTasksGlobal(ctx, timeout, 200) if err != nil { g.Log().Errorf(ctx, "[cleaner] list timeout error: %v", err) } else { for _, t := range list { _ = dao.Task.UpdateFailedGlobal(ctx, t.Id, "任务超时自动失败") } g.Log().Infof(ctx, "[cleaner] timeout cleaned, count=%d", len(list)) } } // 3) 失败(state=3)的任务按模型配置 retry_times 重新入队(放到队尾) retryable, err := dao.Task.ListFailedRetryableGlobal(ctx, 200) if err != nil { g.Log().Errorf(ctx, "[cleaner] list failed retryable error: %v", err) } else { for _, t := range retryable { _ = dao.Task.RequeueForRetryGlobal(ctx, t.Id) } g.Log().Infof(ctx, "[cleaner] failed retryable cleaned, count=%d", len(retryable)) } // 4) 超过重试次数仍失败(state=3)的任务:硬删除 exhausted, err := dao.Task.ListFailedExhaustedGlobal(ctx, 200) if err != nil { g.Log().Errorf(ctx, "[cleaner] list failed exhausted error: %v", err) } else { for _, t := range exhausted { _ = dao.Task.HardDeleteByIDGlobal(ctx, t.Id) } g.Log().Infof(ctx, "[cleaner] failed exhausted cleaned, count=%d", len(exhausted)) } }