391 lines
11 KiB
Go
391 lines
11 KiB
Go
package dataengine
|
||
|
||
import (
|
||
consts "cid/consts/dataengine"
|
||
dao "cid/dao/dataengine"
|
||
entity "cid/model/entity/dataengine"
|
||
yidunService "cid/service/yidun"
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"time"
|
||
|
||
"gitea.redpowerfuture.com/red-future/common/beans"
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
)
|
||
|
||
// ContentCheckConfig 送检配置
|
||
type ContentCheckConfig struct {
|
||
// 每批处理数量
|
||
BatchSize int `json:"batch_size"`
|
||
// 图片检测启用
|
||
ImageEnabled bool `json:"image_enabled"`
|
||
// 视频检测启用
|
||
VideoEnabled bool `json:"video_enabled"`
|
||
// 定时任务间隔(秒)
|
||
IntervalSeconds int `json:"interval_seconds"`
|
||
}
|
||
|
||
// DefaultConfig 默认配置
|
||
var DefaultConfig = ContentCheckConfig{
|
||
BatchSize: 10,
|
||
ImageEnabled: true,
|
||
VideoEnabled: true,
|
||
IntervalSeconds: 30,
|
||
}
|
||
|
||
// TencentContentCheckService 腾讯内容送检服务
|
||
type TencentContentCheckService struct {
|
||
config ContentCheckConfig
|
||
isRunning bool
|
||
}
|
||
|
||
// TencentContentCheck 送检服务单例
|
||
var TencentContentCheck = &TencentContentCheckService{
|
||
config: DefaultConfig,
|
||
}
|
||
|
||
// SetConfig 设置配置
|
||
func (s *TencentContentCheckService) SetConfig(config ContentCheckConfig) {
|
||
s.config = config
|
||
}
|
||
|
||
// Start 启动定时任务
|
||
func (s *TencentContentCheckService) Start(ctx context.Context) error {
|
||
if s.isRunning {
|
||
g.Log().Info(ctx, "送检服务已在运行中,跳过启动")
|
||
return nil
|
||
}
|
||
|
||
s.isRunning = true
|
||
g.Log().Infof(ctx, "启动内容送检服务,配置: batch_size=%d, interval=%ds, image=%v, video=%v",
|
||
s.config.BatchSize, s.config.IntervalSeconds, s.config.ImageEnabled, s.config.VideoEnabled)
|
||
|
||
go s.runScheduler(ctx)
|
||
return nil
|
||
}
|
||
|
||
// Stop 停止定时任务
|
||
func (s *TencentContentCheckService) Stop(ctx context.Context) {
|
||
s.isRunning = false
|
||
g.Log().Info(ctx, "停止内容送检服务")
|
||
}
|
||
|
||
// runScheduler 定时调度器
|
||
func (s *TencentContentCheckService) runScheduler(ctx context.Context) {
|
||
ticker := time.NewTicker(time.Duration(s.config.IntervalSeconds) * time.Second)
|
||
defer ticker.Stop()
|
||
|
||
// 启动时先执行一次
|
||
s.processAll(ctx)
|
||
|
||
for s.isRunning {
|
||
select {
|
||
case <-ticker.C:
|
||
s.processAll(ctx)
|
||
case <-ctx.Done():
|
||
s.isRunning = false
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// processAll 处理所有待送检数据
|
||
func (s *TencentContentCheckService) processAll(ctx context.Context) {
|
||
// 添加系统用户上下文,绕过gfdb租户验证
|
||
ctx = context.WithValue(ctx, "user", &beans.User{UserName: "system", TenantId: 1})
|
||
|
||
startTime := time.Now()
|
||
g.Log().Info(ctx, "开始处理待送检数据...")
|
||
|
||
var totalProcessed int
|
||
|
||
// 处理图片
|
||
if s.config.ImageEnabled {
|
||
imageCount, _ := dao.TencentImage.CountPending(ctx)
|
||
if imageCount > 0 {
|
||
count, _ := s.processImages(ctx)
|
||
totalProcessed += count
|
||
}
|
||
}
|
||
|
||
// 处理视频
|
||
if s.config.VideoEnabled {
|
||
videoCount, _ := dao.TencentVideo.CountPending(ctx)
|
||
if videoCount > 0 {
|
||
count, _ := s.processVideos(ctx)
|
||
totalProcessed += count
|
||
}
|
||
}
|
||
|
||
duration := time.Since(startTime).Milliseconds()
|
||
g.Log().Infof(ctx, "处理完成,共处理 %d 条数据,耗时 %dms", totalProcessed, duration)
|
||
}
|
||
|
||
// processImages 处理图片送检
|
||
func (s *TencentContentCheckService) processImages(ctx context.Context) (int, error) {
|
||
// 获取待送检图片
|
||
images, err := dao.TencentImage.GetPendingList(ctx, s.config.BatchSize)
|
||
if err != nil {
|
||
g.Log().Errorf(ctx, "获取待送检图片失败: %v", err)
|
||
return 0, err
|
||
}
|
||
|
||
if len(images) == 0 {
|
||
return 0, nil
|
||
}
|
||
|
||
g.Log().Infof(ctx, "开始送检 %d 张图片", len(images))
|
||
|
||
successCount := 0
|
||
failedCount := 0
|
||
|
||
for _, img := range images {
|
||
// 创建送检日志
|
||
log := s.createCheckLog(ctx, consts.SourceTableTencentImage, img.Id, img.ImageID, img.PreviewURL)
|
||
|
||
// 提交送检
|
||
err := s.submitImageCheck(ctx, &img, log)
|
||
if err != nil {
|
||
failedCount++
|
||
// 更新日志为失败
|
||
if log != nil {
|
||
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusFailed, "", err.Error())
|
||
}
|
||
} else {
|
||
successCount++
|
||
}
|
||
|
||
// 避免请求过快
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
|
||
g.Log().Infof(ctx, "图片送检完成,成功: %d,失败: %d", successCount, failedCount)
|
||
return len(images), nil
|
||
}
|
||
|
||
// processVideos 处理视频送检
|
||
func (s *TencentContentCheckService) processVideos(ctx context.Context) (int, error) {
|
||
// 获取待送检视频
|
||
videos, err := dao.TencentVideo.GetPendingList(ctx, s.config.BatchSize)
|
||
if err != nil {
|
||
g.Log().Errorf(ctx, "获取待送检视频失败: %v", err)
|
||
return 0, err
|
||
}
|
||
|
||
if len(videos) == 0 {
|
||
return 0, nil
|
||
}
|
||
|
||
g.Log().Infof(ctx, "开始送检 %d 个视频", len(videos))
|
||
|
||
successCount := 0
|
||
failedCount := 0
|
||
|
||
for _, video := range videos {
|
||
// 创建送检日志
|
||
log := s.createCheckLog(ctx, consts.SourceTableTencentVideo, video.Id, video.VideoID, video.PreviewURL)
|
||
|
||
// 提交送检
|
||
err := s.submitVideoCheck(ctx, &video, log)
|
||
if err != nil {
|
||
failedCount++
|
||
// 更新日志为失败
|
||
if log != nil {
|
||
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusFailed, "", err.Error())
|
||
}
|
||
} else {
|
||
successCount++
|
||
}
|
||
|
||
// 避免请求过快
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
|
||
g.Log().Infof(ctx, "视频送检完成,成功: %d,失败: %d", successCount, failedCount)
|
||
return len(videos), nil
|
||
}
|
||
|
||
// createCheckLog 创建送检日志
|
||
func (s *TencentContentCheckService) createCheckLog(ctx context.Context, sourceTable string, sourceID int64, mediaID string, mediaURL string) *entity.TencentContentCheckLog {
|
||
requestParam := map[string]interface{}{
|
||
"media_id": mediaID,
|
||
"url": mediaURL,
|
||
}
|
||
requestParamJSON, _ := json.Marshal(requestParam)
|
||
|
||
log := &entity.TencentContentCheckLog{
|
||
SourceTable: sourceTable,
|
||
SourceID: sourceID,
|
||
RequestURL: "易盾内容安全检测接口",
|
||
RequestParam: string(requestParamJSON),
|
||
Status: consts.CheckStatusPending,
|
||
CheckTime: time.Now().UnixMilli(),
|
||
}
|
||
|
||
id, err := dao.TencentContentCheckLog.Create(ctx, log)
|
||
if err != nil {
|
||
g.Log().Errorf(ctx, "创建送检日志失败: %v", err)
|
||
return nil
|
||
}
|
||
log.Id = id
|
||
|
||
g.Log().Debugf(ctx, "创建送检日志成功, id=%d, sourceTable=%s, sourceID=%d", id, sourceTable, sourceID)
|
||
return log
|
||
}
|
||
|
||
// submitImageCheck 提交图片送检
|
||
func (s *TencentContentCheckService) submitImageCheck(ctx context.Context, image *entity.TencentImage, log *entity.TencentContentCheckLog) error {
|
||
startTime := time.Now()
|
||
|
||
// 更新日志状态为送检中
|
||
if log != nil {
|
||
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusSubmitting, "", "")
|
||
}
|
||
|
||
// 获取回调地址
|
||
callbackURL := g.Cfg().MustGet(ctx, "yidun.image.callback_url").String()
|
||
|
||
// 调用易盾图片检测
|
||
result, err := yidunService.ImageDetection.DetectImage(ctx, image.PreviewURL, image.ImageID, callbackURL)
|
||
|
||
duration := time.Since(startTime).Milliseconds()
|
||
|
||
// 更新日志
|
||
if log != nil {
|
||
if err != nil {
|
||
dao.TencentContentCheckLog.UpdateDuration(ctx, log.Id, duration)
|
||
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusFailed, "", err.Error())
|
||
g.Log().Errorf(ctx, "图片送检失败, id=%d, url=%s, error=%v", image.Id, image.PreviewURL, err)
|
||
return err
|
||
}
|
||
|
||
// 更新日志和图片状态
|
||
responseData, _ := json.Marshal(result)
|
||
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusSuccess, string(responseData), "")
|
||
dao.TencentContentCheckLog.UpdateTaskID(ctx, log.Id, result.TaskID)
|
||
dao.TencentContentCheckLog.UpdateDuration(ctx, log.Id, duration)
|
||
}
|
||
|
||
g.Log().Infof(ctx, "图片送检成功, id=%d, imageId=%s, taskId=%s", image.Id, image.ImageID, result.TaskID)
|
||
return nil
|
||
}
|
||
|
||
// submitVideoCheck 提交视频送检
|
||
func (s *TencentContentCheckService) submitVideoCheck(ctx context.Context, video *entity.TencentVideo, log *entity.TencentContentCheckLog) error {
|
||
startTime := time.Now()
|
||
|
||
// 更新日志状态为送检中
|
||
if log != nil {
|
||
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusSubmitting, "", "")
|
||
}
|
||
|
||
// 获取回调地址
|
||
callbackURL := g.Cfg().MustGet(ctx, "yidun.video.callback_url").String()
|
||
|
||
// 调用易盾视频检测
|
||
result, err := yidunService.VideoDetection.DetectVideo(ctx, video.PreviewURL, video.VideoID, callbackURL)
|
||
|
||
duration := time.Since(startTime).Milliseconds()
|
||
|
||
// 更新日志
|
||
if log != nil {
|
||
if err != nil {
|
||
dao.TencentContentCheckLog.UpdateDuration(ctx, log.Id, duration)
|
||
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusFailed, "", err.Error())
|
||
g.Log().Errorf(ctx, "视频送检失败, id=%d, url=%s, error=%v", video.Id, video.PreviewURL, err)
|
||
return err
|
||
}
|
||
|
||
// 更新日志和视频状态
|
||
responseData, _ := json.Marshal(result)
|
||
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusSuccess, string(responseData), "")
|
||
dao.TencentContentCheckLog.UpdateTaskID(ctx, log.Id, result.TaskID)
|
||
dao.TencentContentCheckLog.UpdateDuration(ctx, log.Id, duration)
|
||
}
|
||
|
||
g.Log().Infof(ctx, "视频送检成功, id=%d, videoId=%s, taskId=%s", video.Id, video.VideoID, result.TaskID)
|
||
return nil
|
||
}
|
||
|
||
// SubmitImageByID 根据图片ID手动提交送检
|
||
func (s *TencentContentCheckService) SubmitImageByID(ctx context.Context, imageID string) (*yidunService.ImageSubmitResult, error) {
|
||
// 根据图片ID获取数据
|
||
image, err := dao.TencentImage.GetByImageID(ctx, imageID)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("查询图片数据失败: %w", err)
|
||
}
|
||
if image == nil {
|
||
return nil, fmt.Errorf("未找到图片数据, imageID=%s", imageID)
|
||
}
|
||
|
||
// 创建送检日志
|
||
log := s.createCheckLog(ctx, consts.SourceTableTencentImage, image.Id, image.ImageID, image.PreviewURL)
|
||
if log == nil {
|
||
return nil, fmt.Errorf("创建送检日志失败")
|
||
}
|
||
|
||
// 提交送检
|
||
err = s.submitImageCheck(ctx, image, log)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 获取送检结果
|
||
return dao.TencentContentCheckLog.GetImageSubmitResult(ctx, log.Id)
|
||
}
|
||
|
||
// SubmitVideoByID 根据视频ID手动提交送检
|
||
func (s *TencentContentCheckService) SubmitVideoByID(ctx context.Context, videoID string) (*yidunService.VideoSubmitResult, error) {
|
||
// 根据视频ID获取数据
|
||
video, err := dao.TencentVideo.GetByVideoID(ctx, videoID)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("查询视频数据失败: %w", err)
|
||
}
|
||
if video == nil {
|
||
return nil, fmt.Errorf("未找到视频数据, videoID=%s", videoID)
|
||
}
|
||
|
||
// 创建送检日志
|
||
log := s.createCheckLog(ctx, consts.SourceTableTencentVideo, video.Id, video.VideoID, video.PreviewURL)
|
||
if log == nil {
|
||
return nil, fmt.Errorf("创建送检日志失败")
|
||
}
|
||
|
||
// 提交送检
|
||
err = s.submitVideoCheck(ctx, video, log)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 获取送检结果
|
||
return dao.TencentContentCheckLog.GetVideoSubmitResult(ctx, log.Id)
|
||
}
|
||
|
||
// GetPendingStats 获取待送检统计
|
||
func (s *TencentContentCheckService) GetPendingStats(ctx context.Context) map[string]int {
|
||
stats := make(map[string]int)
|
||
|
||
if s.config.ImageEnabled {
|
||
count, _ := dao.TencentImage.CountPending(ctx)
|
||
stats["image_pending"] = count
|
||
}
|
||
|
||
if s.config.VideoEnabled {
|
||
count, _ := dao.TencentVideo.CountPending(ctx)
|
||
stats["video_pending"] = count
|
||
}
|
||
|
||
return stats
|
||
}
|
||
|
||
// IsRunning 获取运行状态
|
||
func (s *TencentContentCheckService) IsRunning() bool {
|
||
return s.isRunning
|
||
}
|
||
|
||
// GetConfig 获取当前配置
|
||
func (s *TencentContentCheckService) GetConfig() ContentCheckConfig {
|
||
return s.config
|
||
}
|