同步视频
This commit is contained in:
158
dao/tencent/video_dao.go
Normal file
158
dao/tencent/video_dao.go
Normal file
@@ -0,0 +1,158 @@
|
||||
package tencent
|
||||
|
||||
import (
|
||||
"context"
|
||||
consts "dataengine/consts/public"
|
||||
entity "dataengine/model/entity/tencent"
|
||||
|
||||
"gitea.com/red-future/common/db/gfdb"
|
||||
"github.com/gogf/gf/v2/frame/g"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type videoDao struct{}
|
||||
|
||||
var Video = new(videoDao)
|
||||
|
||||
// BatchUpsert 批量插入或更新(使用 OnConflict 实现 Upsert)
|
||||
func (d *videoDao) BatchUpsert(ctx context.Context, items []*entity.Video) (successCount int, err error) {
|
||||
if len(items) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
logrus.Infof("开始批量Upsert视频素材: %d 条记录", len(items))
|
||||
|
||||
// 分批处理,每批100条
|
||||
batchSize := 100
|
||||
successCount = 0
|
||||
|
||||
for i := 0; i < len(items); i += batchSize {
|
||||
end := i + batchSize
|
||||
if end > len(items) {
|
||||
end = len(items)
|
||||
}
|
||||
|
||||
batch := items[i:end]
|
||||
|
||||
logrus.Infof("处理第 %d-%d 条视频素材记录", i+1, end)
|
||||
|
||||
// 执行批量插入,使用 OnConflict 实现 Upsert
|
||||
result, err := gfdb.DB(ctx).Model(ctx, consts.TencentVideoTable).
|
||||
Data(batch).
|
||||
OnConflict("(video_id, account_id)").
|
||||
Save()
|
||||
|
||||
if err != nil {
|
||||
logrus.Errorf("批量Upsert视频素材失败: %v,尝试逐条处理", err)
|
||||
// 批量失败,逐条处理
|
||||
for _, item := range batch {
|
||||
if upsertErr := d.upsertSingle(ctx, item); upsertErr != nil {
|
||||
logrus.Errorf("逐条Upsert视频素材失败: video_id=%s, account_id=%d, err=%v", item.VideoId, item.AccountId, upsertErr)
|
||||
} else {
|
||||
successCount++
|
||||
}
|
||||
}
|
||||
} else {
|
||||
affected, _ := result.RowsAffected()
|
||||
successCount += int(affected)
|
||||
logrus.Infof("批量Upsert视频素材成功: 影响 %d 条记录", affected)
|
||||
}
|
||||
}
|
||||
|
||||
logrus.Infof("批量Upsert视频素材完成: 成功 %d 条", successCount)
|
||||
return successCount, nil
|
||||
}
|
||||
|
||||
// upsertSingle 单条插入或更新
|
||||
func (d *videoDao) upsertSingle(ctx context.Context, item *entity.Video) error {
|
||||
var existing entity.Video
|
||||
err := gfdb.DB(ctx).Model(ctx, consts.TencentVideoTable).
|
||||
Where(entity.VideoCols.VideoId, item.VideoId).
|
||||
Where(entity.VideoCols.AccountId, item.AccountId).
|
||||
WhereNull("deleted_at").
|
||||
Scan(&existing)
|
||||
|
||||
if err != nil && existing.Id == 0 {
|
||||
// 记录不存在,执行插入
|
||||
_, err = gfdb.DB(ctx).Model(ctx, consts.TencentVideoTable).
|
||||
Data(item).
|
||||
Insert()
|
||||
return err
|
||||
}
|
||||
|
||||
// 记录存在,执行更新
|
||||
_, err = gfdb.DB(ctx).Model(ctx, consts.TencentVideoTable).
|
||||
Where("id", existing.Id).
|
||||
Data(g.Map{
|
||||
entity.VideoCols.Width: item.Width,
|
||||
entity.VideoCols.Height: item.Height,
|
||||
entity.VideoCols.FileSize: item.FileSize,
|
||||
entity.VideoCols.Type: item.Type,
|
||||
entity.VideoCols.Signature: item.Signature,
|
||||
entity.VideoCols.Description: item.Description,
|
||||
entity.VideoCols.PreviewUrl: item.PreviewUrl,
|
||||
entity.VideoCols.Status: item.Status,
|
||||
entity.VideoCols.LastModifiedTime: item.LastModifiedTime,
|
||||
}).
|
||||
Update()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// ListAll 获取所有视频素材
|
||||
func (d *videoDao) ListAll(ctx context.Context) ([]entity.Video, error) {
|
||||
var list []entity.Video
|
||||
err := gfdb.DB(ctx).Model(ctx, consts.TencentVideoTable).
|
||||
WhereNull("deleted_at").
|
||||
OrderAsc(entity.VideoCols.VideoId).
|
||||
Scan(&list)
|
||||
|
||||
return list, err
|
||||
}
|
||||
|
||||
// ListWithPage 分页查询视频素材(支持时间过滤)
|
||||
func (d *videoDao) ListWithPage(ctx context.Context, page, pageSize int, accountId *int64, startTime, endTime *int64, status string) ([]entity.Video, int, error) {
|
||||
model := gfdb.DB(ctx).Model(ctx, consts.TencentVideoTable).
|
||||
WhereNull("deleted_at")
|
||||
|
||||
// 账户ID过滤
|
||||
if accountId != nil && *accountId > 0 {
|
||||
model = model.Where(entity.VideoCols.AccountId, *accountId)
|
||||
}
|
||||
|
||||
// 状态过滤
|
||||
if status != "" {
|
||||
model = model.Where(entity.VideoCols.Status, status)
|
||||
}
|
||||
|
||||
// 时间范围过滤(根据 last_modified_time)
|
||||
if startTime != nil && *startTime > 0 {
|
||||
model = model.WhereGTE(entity.VideoCols.LastModifiedTime, *startTime)
|
||||
}
|
||||
if endTime != nil && *endTime > 0 {
|
||||
model = model.WhereLTE(entity.VideoCols.LastModifiedTime, *endTime)
|
||||
}
|
||||
|
||||
// 设置排序(按最后修改时间降序)
|
||||
model = model.OrderDesc(entity.VideoCols.LastModifiedTime)
|
||||
|
||||
// 获取总数
|
||||
total, err := model.Count()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
// 分页查询
|
||||
var list []entity.Video
|
||||
if page > 0 && pageSize > 0 {
|
||||
err = model.Page(page, pageSize).Scan(&list)
|
||||
} else {
|
||||
err = model.Scan(&list)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
return list, total, nil
|
||||
}
|
||||
Reference in New Issue
Block a user