Files
model-asynch/service/cleaner.go
WangLiZhao 23b83cae39 refactor(cleaner): 移除未使用的存储删除接口
清理任务时不再调用 Storage.DeleteByTask 方法,因为当前 OSS 服务暂未提供删除接口。
从 StorageService 接口和 ossStorage 实现中移除 DeleteByTask 方法,并更新清理逻辑。
2026-04-23 15:02:24 +08:00

90 lines
2.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package service
import (
"context"
"time"
"model-asynch/dao"
"github.com/gogf/gf/v2/frame/g"
)
var Cleaner = &cleaner{}
type cleaner struct{}
func (c *cleaner) Start(ctx context.Context) {
if !g.Cfg().MustGet(ctx, "asynch.cleaner.enabled", true).Bool() {
g.Log().Warningf(ctx, "[cleaner] asynch.cleaner.enabled=falsecleaner 未启动")
return
}
intervalStr := g.Cfg().MustGet(ctx, "asynch.cleaner.interval", "10m").String()
interval, _ := time.ParseDuration(intervalStr)
if interval <= 0 {
interval = 10 * time.Minute
}
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
g.Log().Infof(ctx, "[cleaner] started, interval=%s", interval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.runOnce(ctx)
}
}
}()
}
func (c *cleaner) runOnce(ctx context.Context) {
// 1) 清理已下载(state=4)且过期的任务(硬删除 + OSS
expired, err := dao.Task.ListExpiredDownloadedGlobal(ctx, 200)
if err != nil {
g.Log().Errorf(ctx, "[cleaner] list expired(downloaded) error: %v", err)
} else {
for _, t := range expired {
_ = dao.Task.HardDeleteByIDGlobal(ctx, t.Id)
}
g.Log().Infof(ctx, "[cleaner] expired(downloaded) cleaned, count=%d", len(expired))
}
// 2) 超时任务标失败
timeoutStr := g.Cfg().MustGet(ctx, "asynch.worker.taskTimeout", "30m").String()
timeout, _ := time.ParseDuration(timeoutStr)
if timeout > 0 {
list, err := dao.Task.ListTimeoutTasksGlobal(ctx, timeout, 200)
if err != nil {
g.Log().Errorf(ctx, "[cleaner] list timeout error: %v", err)
} else {
for _, t := range list {
_ = dao.Task.UpdateFailedGlobal(ctx, t.Id, "任务超时自动失败")
}
g.Log().Infof(ctx, "[cleaner] timeout cleaned, count=%d", len(list))
}
}
// 3) 失败(state=3)的任务按模型配置 retry_times 重新入队(放到队尾)
retryable, err := dao.Task.ListFailedRetryableGlobal(ctx, 200)
if err != nil {
g.Log().Errorf(ctx, "[cleaner] list failed retryable error: %v", err)
} else {
for _, t := range retryable {
_ = dao.Task.RequeueForRetryGlobal(ctx, t.Id)
}
g.Log().Infof(ctx, "[cleaner] failed retryable cleaned, count=%d", len(retryable))
}
// 4) 超过重试次数仍失败(state=3)的任务:硬删除
exhausted, err := dao.Task.ListFailedExhaustedGlobal(ctx, 200)
if err != nil {
g.Log().Errorf(ctx, "[cleaner] list failed exhausted error: %v", err)
} else {
for _, t := range exhausted {
_ = dao.Task.HardDeleteByIDGlobal(ctx, t.Id)
}
g.Log().Infof(ctx, "[cleaner] failed exhausted cleaned, count=%d", len(exhausted))
}
}