Files
cid/service/dataengine/tencent_content_check_service.go
2026-06-10 15:41:58 +08:00

391 lines
11 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package dataengine
import (
consts "cid/consts/dataengine"
dao "cid/dao/dataengine"
entity "cid/model/entity/dataengine"
yidunService "cid/service/yidun"
"context"
"encoding/json"
"fmt"
"time"
"gitea.redpowerfuture.com/red-future/common/beans"
"github.com/gogf/gf/v2/frame/g"
)
// ContentCheckConfig 送检配置
type ContentCheckConfig struct {
// 每批处理数量
BatchSize int `json:"batch_size"`
// 图片检测启用
ImageEnabled bool `json:"image_enabled"`
// 视频检测启用
VideoEnabled bool `json:"video_enabled"`
// 定时任务间隔(秒)
IntervalSeconds int `json:"interval_seconds"`
}
// DefaultConfig 默认配置
var DefaultConfig = ContentCheckConfig{
BatchSize: 10,
ImageEnabled: true,
VideoEnabled: true,
IntervalSeconds: 30,
}
// TencentContentCheckService 腾讯内容送检服务
type TencentContentCheckService struct {
config ContentCheckConfig
isRunning bool
}
// TencentContentCheck 送检服务单例
var TencentContentCheck = &TencentContentCheckService{
config: DefaultConfig,
}
// SetConfig 设置配置
func (s *TencentContentCheckService) SetConfig(config ContentCheckConfig) {
s.config = config
}
// Start 启动定时任务
func (s *TencentContentCheckService) Start(ctx context.Context) error {
if s.isRunning {
g.Log().Info(ctx, "送检服务已在运行中,跳过启动")
return nil
}
s.isRunning = true
g.Log().Infof(ctx, "启动内容送检服务,配置: batch_size=%d, interval=%ds, image=%v, video=%v",
s.config.BatchSize, s.config.IntervalSeconds, s.config.ImageEnabled, s.config.VideoEnabled)
go s.runScheduler(ctx)
return nil
}
// Stop 停止定时任务
func (s *TencentContentCheckService) Stop(ctx context.Context) {
s.isRunning = false
g.Log().Info(ctx, "停止内容送检服务")
}
// runScheduler 定时调度器
func (s *TencentContentCheckService) runScheduler(ctx context.Context) {
ticker := time.NewTicker(time.Duration(s.config.IntervalSeconds) * time.Second)
defer ticker.Stop()
// 启动时先执行一次
s.processAll(ctx)
for s.isRunning {
select {
case <-ticker.C:
s.processAll(ctx)
case <-ctx.Done():
s.isRunning = false
return
}
}
}
// processAll 处理所有待送检数据
func (s *TencentContentCheckService) processAll(ctx context.Context) {
// 添加系统用户上下文绕过gfdb租户验证
ctx = context.WithValue(ctx, "user", &beans.User{UserName: "system", TenantId: 1})
startTime := time.Now()
g.Log().Info(ctx, "开始处理待送检数据...")
var totalProcessed int
// 处理图片
if s.config.ImageEnabled {
imageCount, _ := dao.TencentImage.CountPending(ctx)
if imageCount > 0 {
count, _ := s.processImages(ctx)
totalProcessed += count
}
}
// 处理视频
if s.config.VideoEnabled {
videoCount, _ := dao.TencentVideo.CountPending(ctx)
if videoCount > 0 {
count, _ := s.processVideos(ctx)
totalProcessed += count
}
}
duration := time.Since(startTime).Milliseconds()
g.Log().Infof(ctx, "处理完成,共处理 %d 条数据,耗时 %dms", totalProcessed, duration)
}
// processImages 处理图片送检
func (s *TencentContentCheckService) processImages(ctx context.Context) (int, error) {
// 获取待送检图片
images, err := dao.TencentImage.GetPendingList(ctx, s.config.BatchSize)
if err != nil {
g.Log().Errorf(ctx, "获取待送检图片失败: %v", err)
return 0, err
}
if len(images) == 0 {
return 0, nil
}
g.Log().Infof(ctx, "开始送检 %d 张图片", len(images))
successCount := 0
failedCount := 0
for _, img := range images {
// 创建送检日志
log := s.createCheckLog(ctx, consts.SourceTableTencentImage, img.Id, img.ImageID, img.PreviewURL)
// 提交送检
err := s.submitImageCheck(ctx, &img, log)
if err != nil {
failedCount++
// 更新日志为失败
if log != nil {
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusFailed, "", err.Error())
}
} else {
successCount++
}
// 避免请求过快
time.Sleep(100 * time.Millisecond)
}
g.Log().Infof(ctx, "图片送检完成,成功: %d失败: %d", successCount, failedCount)
return len(images), nil
}
// processVideos 处理视频送检
func (s *TencentContentCheckService) processVideos(ctx context.Context) (int, error) {
// 获取待送检视频
videos, err := dao.TencentVideo.GetPendingList(ctx, s.config.BatchSize)
if err != nil {
g.Log().Errorf(ctx, "获取待送检视频失败: %v", err)
return 0, err
}
if len(videos) == 0 {
return 0, nil
}
g.Log().Infof(ctx, "开始送检 %d 个视频", len(videos))
successCount := 0
failedCount := 0
for _, video := range videos {
// 创建送检日志
log := s.createCheckLog(ctx, consts.SourceTableTencentVideo, video.Id, video.VideoID, video.PreviewURL)
// 提交送检
err := s.submitVideoCheck(ctx, &video, log)
if err != nil {
failedCount++
// 更新日志为失败
if log != nil {
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusFailed, "", err.Error())
}
} else {
successCount++
}
// 避免请求过快
time.Sleep(100 * time.Millisecond)
}
g.Log().Infof(ctx, "视频送检完成,成功: %d失败: %d", successCount, failedCount)
return len(videos), nil
}
// createCheckLog 创建送检日志
func (s *TencentContentCheckService) createCheckLog(ctx context.Context, sourceTable string, sourceID int64, mediaID string, mediaURL string) *entity.TencentContentCheckLog {
requestParam := map[string]interface{}{
"media_id": mediaID,
"url": mediaURL,
}
requestParamJSON, _ := json.Marshal(requestParam)
log := &entity.TencentContentCheckLog{
SourceTable: sourceTable,
SourceID: sourceID,
RequestURL: "易盾内容安全检测接口",
RequestParam: string(requestParamJSON),
Status: consts.CheckStatusPending,
CheckTime: time.Now().UnixMilli(),
}
id, err := dao.TencentContentCheckLog.Create(ctx, log)
if err != nil {
g.Log().Errorf(ctx, "创建送检日志失败: %v", err)
return nil
}
log.Id = id
g.Log().Debugf(ctx, "创建送检日志成功, id=%d, sourceTable=%s, sourceID=%d", id, sourceTable, sourceID)
return log
}
// submitImageCheck 提交图片送检
func (s *TencentContentCheckService) submitImageCheck(ctx context.Context, image *entity.TencentImage, log *entity.TencentContentCheckLog) error {
startTime := time.Now()
// 更新日志状态为送检中
if log != nil {
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusSubmitting, "", "")
}
// 获取回调地址
callbackURL := g.Cfg().MustGet(ctx, "yidun.image.callback_url").String()
// 调用易盾图片检测
result, err := yidunService.ImageDetection.DetectImage(ctx, image.PreviewURL, image.ImageID, callbackURL)
duration := time.Since(startTime).Milliseconds()
// 更新日志
if log != nil {
if err != nil {
dao.TencentContentCheckLog.UpdateDuration(ctx, log.Id, duration)
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusFailed, "", err.Error())
g.Log().Errorf(ctx, "图片送检失败, id=%d, url=%s, error=%v", image.Id, image.PreviewURL, err)
return err
}
// 更新日志和图片状态
responseData, _ := json.Marshal(result)
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusSuccess, string(responseData), "")
dao.TencentContentCheckLog.UpdateTaskID(ctx, log.Id, result.TaskID)
dao.TencentContentCheckLog.UpdateDuration(ctx, log.Id, duration)
}
g.Log().Infof(ctx, "图片送检成功, id=%d, imageId=%s, taskId=%s", image.Id, image.ImageID, result.TaskID)
return nil
}
// submitVideoCheck 提交视频送检
func (s *TencentContentCheckService) submitVideoCheck(ctx context.Context, video *entity.TencentVideo, log *entity.TencentContentCheckLog) error {
startTime := time.Now()
// 更新日志状态为送检中
if log != nil {
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusSubmitting, "", "")
}
// 获取回调地址
callbackURL := g.Cfg().MustGet(ctx, "yidun.video.callback_url").String()
// 调用易盾视频检测
result, err := yidunService.VideoDetection.DetectVideo(ctx, video.PreviewURL, video.VideoID, callbackURL)
duration := time.Since(startTime).Milliseconds()
// 更新日志
if log != nil {
if err != nil {
dao.TencentContentCheckLog.UpdateDuration(ctx, log.Id, duration)
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusFailed, "", err.Error())
g.Log().Errorf(ctx, "视频送检失败, id=%d, url=%s, error=%v", video.Id, video.PreviewURL, err)
return err
}
// 更新日志和视频状态
responseData, _ := json.Marshal(result)
dao.TencentContentCheckLog.UpdateStatus(ctx, log.Id, consts.CheckStatusSuccess, string(responseData), "")
dao.TencentContentCheckLog.UpdateTaskID(ctx, log.Id, result.TaskID)
dao.TencentContentCheckLog.UpdateDuration(ctx, log.Id, duration)
}
g.Log().Infof(ctx, "视频送检成功, id=%d, videoId=%s, taskId=%s", video.Id, video.VideoID, result.TaskID)
return nil
}
// SubmitImageByID 根据图片ID手动提交送检
func (s *TencentContentCheckService) SubmitImageByID(ctx context.Context, imageID string) (*yidunService.ImageSubmitResult, error) {
// 根据图片ID获取数据
image, err := dao.TencentImage.GetByImageID(ctx, imageID)
if err != nil {
return nil, fmt.Errorf("查询图片数据失败: %w", err)
}
if image == nil {
return nil, fmt.Errorf("未找到图片数据, imageID=%s", imageID)
}
// 创建送检日志
log := s.createCheckLog(ctx, consts.SourceTableTencentImage, image.Id, image.ImageID, image.PreviewURL)
if log == nil {
return nil, fmt.Errorf("创建送检日志失败")
}
// 提交送检
err = s.submitImageCheck(ctx, image, log)
if err != nil {
return nil, err
}
// 获取送检结果
return dao.TencentContentCheckLog.GetImageSubmitResult(ctx, log.Id)
}
// SubmitVideoByID 根据视频ID手动提交送检
func (s *TencentContentCheckService) SubmitVideoByID(ctx context.Context, videoID string) (*yidunService.VideoSubmitResult, error) {
// 根据视频ID获取数据
video, err := dao.TencentVideo.GetByVideoID(ctx, videoID)
if err != nil {
return nil, fmt.Errorf("查询视频数据失败: %w", err)
}
if video == nil {
return nil, fmt.Errorf("未找到视频数据, videoID=%s", videoID)
}
// 创建送检日志
log := s.createCheckLog(ctx, consts.SourceTableTencentVideo, video.Id, video.VideoID, video.PreviewURL)
if log == nil {
return nil, fmt.Errorf("创建送检日志失败")
}
// 提交送检
err = s.submitVideoCheck(ctx, video, log)
if err != nil {
return nil, err
}
// 获取送检结果
return dao.TencentContentCheckLog.GetVideoSubmitResult(ctx, log.Id)
}
// GetPendingStats 获取待送检统计
func (s *TencentContentCheckService) GetPendingStats(ctx context.Context) map[string]int {
stats := make(map[string]int)
if s.config.ImageEnabled {
count, _ := dao.TencentImage.CountPending(ctx)
stats["image_pending"] = count
}
if s.config.VideoEnabled {
count, _ := dao.TencentVideo.CountPending(ctx)
stats["video_pending"] = count
}
return stats
}
// IsRunning 获取运行状态
func (s *TencentContentCheckService) IsRunning() bool {
return s.isRunning
}
// GetConfig 获取当前配置
func (s *TencentContentCheckService) GetConfig() ContentCheckConfig {
return s.config
}