Files
customer-server/service/product_service.go
2026-03-14 10:02:49 +08:00

892 lines
27 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Package service - 产品服务
// 功能产品的增删改查、ZIP导入/导出、绑定/解绑客服账号、同步到RAGFlow、重试消费者
package service
import (
"archive/zip"
"bytes"
"context"
"customer-server/dao"
"customer-server/model/dto"
"customer-server/model/entity"
"customer-server/util"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"strings"
"unicode/utf8"
"gitea.com/red-future/common/jaeger"
"gitea.com/red-future/common/rabbitmq"
"gitea.com/red-future/common/ragflow"
"gitea.com/red-future/common/utils"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/grpool"
"github.com/gogf/gf/v2/os/gtime"
"github.com/gogf/gf/v2/util/gconv"
"go.mongodb.org/mongo-driver/v2/bson"
)
var (
Product = new(product)
productGrpool = grpool.New(50) // 文档解析协程池最大50并发
)
type product struct{}
// Add 添加产品
// 参数: ctx - 上下文req - 添加产品请求(包含产品名称、描述、价格等)
// 返回: res - 添加成功后的产品ID和RAGFlow文档IDerr - 错误信息
// 功能: 创建产品记录并自动上传到RAGFlow产品知识库独立于话术知识库
func (s *product) Add(ctx context.Context, req *dto.AddProductReq) (res *dto.AddProductRes, err error) {
// 校验产品名称长度
if utf8.RuneCountInString(req.Name) > 64 {
return nil, gerror.New("产品名称必须在64字以内")
}
// 校验产品详情长度
if utf8.RuneCountInString(req.Description) > 8192 {
return nil, gerror.New("产品详情必须在8192字以内")
}
// 去重检查:同一租户下名称唯一
existing, err := dao.Product.FindByName(ctx, req.Name)
if err != nil {
return nil, gerror.Wrap(err, "检查产品重复失败")
}
if existing != nil {
return nil, gerror.Newf("产品名称已存在name=%s, id=%s", req.Name, existing.Id.Hex())
}
data := &entity.Product{}
if err = utils.Struct(req, data); err != nil {
return
}
// 先从token获取租户信息在Insert之前
user, err := util.GetTenantInfo(ctx)
if err != nil {
return nil, gerror.Wrap(err, "获取租户信息失败")
}
tenantId := gconv.String(user.TenantId)
if tenantId == "" {
return nil, gerror.New("租户ID为空")
}
// 确保产品知识库存在(检查话术知识库是否存在,作为租户是否初始化的标志)
speechcraftDatasetId, err := dao.RAGFlowConfig.FindDatasetIdByTenant(ctx, tenantId)
if err != nil || speechcraftDatasetId == "" {
return nil, gerror.Newf("租户知识库不存在,请先创建客服账号: tenant_id=%s", tenantId)
}
// 设置基础字段
now := gtime.Now().Time
data.CreatedAt = &now // 取地址赋值给指针类型
data.UpdatedAt = &now // 取地址赋值给指针类型
data.IsDeleted = false
// 统一使用string类型存储tenantId到MongoDB
data.TenantId = tenantId
// 插入产品到MongoDB
_, err = dao.Product.Insert(ctx, data)
if err != nil {
return nil, err
}
// 确保租户知识库存在(产品和话术共享租户知识库)
// 使用dataset_service提供的统一方法自动处理创建、查找、保存等逻辑
datasetId, err := EnsureTenantDataset(ctx, tenantId)
if err != nil {
g.Log().Errorf(ctx, "确保租户知识库失败: %v", err)
return nil, gerror.Wrap(err, "获取租户知识库失败")
}
g.Log().Infof(ctx, "租户%s的知识库ID: %s", tenantId, datasetId)
// 同步上传到RAGFlow
ragflowClient := ragflow.GetGlobalClient()
g.Log().Infof(ctx, "准备上传产品到RAGFlow: product_id=%s, dataset_id=%s, name=%s", data.Id.Hex(), datasetId, data.Name)
filename := fmt.Sprintf("%s.txt", data.Name)
documentId, err := ragflowClient.UploadDocumentFromText(ctx, datasetId, data.Description, filename)
if err != nil {
// 回滚:删除刚插入的产品
dao.MongoDAO.Delete(ctx, bson.M{"_id": data.Id}, entity.ProductCollection)
g.Log().Errorf(ctx, "产品上传RAGFlow失败: product_id=%s, dataset_id=%s, error=%v", data.Id.Hex(), datasetId, err)
jaeger.RecordError(ctx, err, "产品上传RAGFlow失败")
return nil, gerror.Wrap(err, "文档上传到知识库失败")
}
// 异步触发解析grpool自动管理goroutine生命周期WithoutCancel保留追踪避免取消
productGrpool.Add(ctx, func(ctx context.Context) {
parseCtx := context.WithoutCancel(ctx)
if err := ragflowClient.ParseDocuments(parseCtx, datasetId, []string{documentId}); err != nil {
g.Log().Errorf(parseCtx, "文档解析失败: document_id=%s, error=%v", documentId, err)
} else {
g.Log().Infof(parseCtx, "文档解析成功: document_id=%s", documentId)
}
})
// 更新MongoDB的RagSyncRecords数组使用空accountName表示租户级文档
syncTime := gtime.Now().Format("Y-m-d H:i:s")
record := entity.RagSyncRecord{
AccountName: "", // 空表示租户级文档
RagDocumentId: documentId,
RagSyncStatus: "synced",
SyncTime: syncTime,
RetryCount: 0,
}
filter := bson.M{"_id": data.Id}
update := bson.M{
"$set": bson.M{
"ragSyncRecords": []entity.RagSyncRecord{record},
"ragLastSyncTime": syncTime,
"updatedAt": gtime.Now().Time,
},
}
if _, _, err = dao.MongoDAO.UpdateOne(ctx, filter, update, entity.ProductCollection); err != nil {
g.Log().Errorf(ctx, "更新产品RagSyncRecords失败: %v", err)
// 不回滚,文档已上传成功
}
g.Log().Infof(ctx, "产品添加成功并上传到知识库: product_id=%s, document_id=%s", data.Id.Hex(), documentId)
res = &dto.AddProductRes{Id: data.Id.Hex()}
return
}
// Update 更新产品
// 参数: ctx - 上下文req - 更新产品请求包含产品ID和待更新字段
// 返回: err - 错误信息
// 功能: 更新产品信息并同步到RAGFlow支持文档删除重建
func (s *product) Update(ctx context.Context, req *dto.UpdateProductReq) (err error) {
// 如果更新了产品名称,校验长度
if req.Name != "" && utf8.RuneCountInString(req.Name) > 64 {
return gerror.New("产品名称必须在64字以内")
}
// 如果更新了产品详情,校验长度
if req.Description != "" && utf8.RuneCountInString(req.Description) > 8192 {
return gerror.New("产品详情必须在8192字以内")
}
// 去重检查:如果修改名称,检查是否与其他产品重复
if req.Name != "" {
existing, err := dao.Product.FindByName(ctx, req.Name)
if err != nil {
return gerror.Wrap(err, "检查产品重复失败")
}
if existing != nil && existing.Id.Hex() != req.Id {
return gerror.Newf("产品名称已存在name=%s, id=%s", req.Name, existing.Id.Hex())
}
}
return dao.Product.Update(ctx, req)
}
// Delete 删除产品
// 参数: ctx - 上下文req - 删除产品请求包含产品ID
// 返回: err - 错误信息
// 功能: 逻辑删除产品记录并从RAGFlow移除对应文档
func (s *product) Delete(ctx context.Context, req *dto.DeleteProductReq) (err error) {
g.Log().Infof(ctx, "[Delete] 开始删除产品 - productId: %s", req.Id)
// 1. 查询产品获取RAGFlow同步记录使用原生查询避免租户过滤
objectId, err := bson.ObjectIDFromHex(req.Id)
if err != nil {
return gerror.Wrap(err, "无效的产品ID")
}
var product entity.Product
filter := bson.M{"_id": objectId, "isDeleted": false}
err = dao.MongoDAO.FindOne(ctx, filter, &product, entity.ProductCollection)
if err != nil {
if err.Error() == "mongo: no documents in result" {
return gerror.New("产品不存在")
}
return gerror.Wrap(err, "查询产品失败")
}
g.Log().Infof(ctx, "[Delete] 查询到产品 - name: %s, ragSyncRecords数量: %d", product.Name, len(product.RagSyncRecords))
// 2. 删除RAGFlow中的文档
if len(product.RagSyncRecords) > 0 {
ragflowClient := ragflow.GetGlobalClient()
if ragflowClient != nil {
tenantId := gconv.String(product.TenantId)
// 查询租户的dataset_id
datasetId, err := dao.RAGFlowConfig.FindDatasetIdByTenant(ctx, tenantId)
if err != nil {
g.Log().Warningf(ctx, "查询租户知识库ID失败: %v", err)
} else if datasetId != "" {
// 收集所有需要删除的document_id
var documentIds []string
for _, record := range product.RagSyncRecords {
if record.RagDocumentId != "" {
documentIds = append(documentIds, record.RagDocumentId)
}
}
// 批量删除RAGFlow文档
if len(documentIds) > 0 {
if err := ragflowClient.DeleteDocument(ctx, datasetId, documentIds); err != nil {
g.Log().Errorf(ctx, "删除RAGFlow文档失败: %v, document_ids: %v", err, documentIds)
// 不阻断删除流程,记录错误后继续
} else {
g.Log().Infof(ctx, "成功删除RAGFlow文档: count=%d", len(documentIds))
}
}
}
}
}
// 3. 软删除MongoDB记录
return dao.Product.Delete(ctx, req)
}
// List 获取产品列表
// 参数: ctx - 上下文req - 列表查询请求(支持分页、关键词搜索)
// 返回: res - 产品列表及分页信息err - 错误信息
// 功能: 分页查询产品记录,支持按名称、描述模糊搜索
func (s *product) List(ctx context.Context, req *dto.ListProductReq) (res *dto.ListProductRes, err error) {
list, total, err := dao.Product.List(ctx, req)
if err != nil {
return
}
res = &dto.ListProductRes{
List: list,
Total: int(total),
}
return
}
// Export 导出产品为ZIP文件
// 参数: ctx - 上下文req - 导出请求(包含筛选条件)
// 返回: zipData - ZIP文件字节数组filename - 文件名err - 错误信息
// 功能: 将产品数据导出为ZIP文件包含JSON格式的产品列表
func (s *product) Export(ctx context.Context, req *dto.ExportProductReq) (zipData []byte, filename string, err error) {
// 清理输入参数,防止非法 UTF-8 字符
cleanName := strings.ToValidUTF8(req.Name, "")
// 1. 查询所有符合条件的产品
products, err := dao.Product.FindAllForExport(ctx, cleanName)
if err != nil {
return nil, "", err
}
if len(products) == 0 {
return nil, "", gerror.New("没有可导出的产品")
}
// 清理所有产品数据,确保 UTF-8 有效(防止数据库中的脏数据)
for i := range products {
products[i].Name = strings.ToValidUTF8(products[i].Name, "")
products[i].Description = strings.ToValidUTF8(products[i].Description, "")
// RagDocumentId字段在RagSyncRecords中不在Product主体
}
// 2. 创建 ZIP 文件(内存中)
var buf bytes.Buffer
zipWriter := zip.NewWriter(&buf)
defer zipWriter.Close()
// 3. 为每个产品生成 TXT 文件并添加到 ZIP
for _, product := range products {
// 生成 TXT 内容(产品详情)
txtContent := s.generateTxt(product)
// 文件名就是产品名称(清理特殊字符)
cleanName := strings.ToValidUTF8(product.Name, "未命名")
safeFilename := s.sanitizeFilename(cleanName)
if safeFilename == "" {
safeFilename = "product"
}
txtFilename := safeFilename + ".txt"
// 添加文件到 ZIP
writer, err := zipWriter.Create(txtFilename)
if err != nil {
return nil, "", gerror.Newf("创建ZIP文件失败: %v", err)
}
if _, err := writer.Write([]byte(txtContent)); err != nil {
return nil, "", gerror.Newf("写入ZIP文件失败: %v", err)
}
}
// 5. 生成下载文件名
timestamp := gtime.Now().Format("Ymd_His")
filename = "products_export_" + timestamp + ".zip"
return buf.Bytes(), filename, nil
}
// generateTxt 将产品转换为 TXT 格式
// 新格式:文件名=产品名称,内容=产品详情
func (s *product) generateTxt(product *entity.Product) string {
// 清理产品详情,确保 UTF-8 有效
cleanDescription := strings.ToValidUTF8(product.Description, "")
// 直接返回产品详情
if cleanDescription != "" {
return cleanDescription
}
// 如果没有详情,返回空字符串
return ""
}
// sanitizeFilename 清理文件名中的特殊字符
func (s *product) sanitizeFilename(name string) string {
// 替换不安全的文件名字符
replacer := map[rune]rune{
'/': '_',
'\\': '_',
':': '_',
'*': '_',
'?': '_',
'"': '_',
'<': '_',
'>': '_',
'|': '_',
}
// 预分配容量,避免循环中动态扩容
result := make([]rune, 0, len(name))
for _, char := range name {
if newChar, exists := replacer[char]; exists {
result = append(result, newChar)
} else {
result = append(result, char)
}
}
filename := string(result)
// 限制文件名长度
if utf8.RuneCountInString(filename) > 50 {
runes := []rune(filename)
filename = string(runes[:50])
}
return filename
}
// Import 从ZIP文件导入产品
// 参数: ctx - 上下文file - 上传的ZIP文件
// 返回: res - 导入结果成功和失败数量err - 错误信息
// 功能: 从ZIP文件批量导入产品数据并同步到RAGFlow失败记录加入重试队列
func (s *product) Import(ctx context.Context, file *multipart.FileHeader) (res *dto.ImportProductRes, err error) {
res = &dto.ImportProductRes{
SuccessCount: 0,
FailCount: 0,
FailReasons: []string{},
}
// 1. 获取租户信息
user, err := util.GetTenantInfo(ctx)
if err != nil {
return nil, gerror.Wrap(err, "获取租户信息失败")
}
tenantId := gconv.String(user.TenantId)
if tenantId == "" {
return nil, gerror.New("租户ID为空")
}
// 2. 打开上传的文件
uploadedFile, err := file.Open()
if err != nil {
return nil, gerror.Newf("无法打开上传的文件: %v", err)
}
defer uploadedFile.Close()
// 3. 读取文件内容到内存
fileData, err := io.ReadAll(uploadedFile)
if err != nil {
return nil, gerror.Newf("读取文件失败: %v", err)
}
// 4. 解析 ZIP 文件
zipReader, err := zip.NewReader(bytes.NewReader(fileData), int64(len(fileData)))
if err != nil {
return nil, gerror.Newf("无法解析ZIP文件: %v", err)
}
// 4. 遍历 ZIP 中的所有文件
for _, zipFile := range zipReader.File {
// 只处理 .txt 文件
if !strings.HasSuffix(strings.ToLower(zipFile.Name), ".txt") {
continue
}
// 读取 TXT 文件内容(产品详情)
txtContent, err := s.readZipFile(zipFile)
if err != nil {
res.FailCount++
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+" 读取失败: "+err.Error())
continue
}
// 从文件名提取产品名称(移除 .txt 后缀)
productName := strings.TrimSuffix(zipFile.Name, ".txt")
productName = strings.TrimSpace(productName)
// 创建产品数据
productData, err := s.parseSimpleTxt(productName, txtContent)
if err != nil {
res.FailCount++
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+" 解析失败: "+err.Error())
continue
}
// 校验产品名称和详情长度
if utf8.RuneCountInString(productData.Name) > 64 {
res.FailCount++
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+": 产品名称超过64字")
continue
}
if utf8.RuneCountInString(productData.Description) > 8192 {
res.FailCount++
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+": 产品详情超过8192字")
continue
}
// 设置基础字段
now := gtime.Now().Time
productData.CreatedAt = &now // 取地址赋值给指针类型
productData.UpdatedAt = &now // 取地址赋值给指针类型
productData.IsDeleted = false
// 统一使用string类型存储tenantId到MongoDB
productData.TenantId = tenantId
// 插入数据库
_, err = dao.Product.Insert(ctx, productData)
if err != nil {
res.FailCount++
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+" 数据库插入失败: "+err.Error())
continue
}
// 同步上传到RAGFlow产品知识库使用外层已声明的tenantId变量
if tenantId != "" {
datasetId := fmt.Sprintf("dataset_product_tenant_%s", tenantId)
ragflowClient := ragflow.GetGlobalClient()
if ragflowClient != nil {
filename := fmt.Sprintf("%s.txt", productData.Name)
documentId, uploadErr := ragflowClient.UploadDocumentFromText(ctx, datasetId, productData.Description, filename)
if uploadErr != nil {
// 上传失败回滚删除MongoDB记录
dao.MongoDAO.Delete(ctx, bson.M{"_id": productData.Id}, entity.ProductCollection)
res.FailCount++
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+" 上传知识库失败: "+uploadErr.Error())
continue
}
// 更新ragDocumentId
filter := bson.M{"_id": productData.Id}
update := bson.M{"$set": bson.M{"ragDocumentId": documentId}}
dao.MongoDAO.UpdateOne(ctx, filter, update, entity.ProductCollection)
g.Log().Infof(ctx, "ZIP产品上传成功: name=%s, document_id=%s", productData.Name, documentId)
}
}
res.SuccessCount++
}
return res, nil
}
// readZipFile 读取 ZIP 文件中的单个文件内容
func (s *product) readZipFile(file *zip.File) (string, error) {
reader, err := file.Open()
if err != nil {
return "", err
}
defer reader.Close()
content, err := io.ReadAll(reader)
if err != nil {
return "", err
}
return string(content), nil
}
// parseSimpleTxt 解析简化格式的 TXT 文件
// 文件名=产品名称,内容=产品详情
func (s *product) parseSimpleTxt(productName string, description string) (*entity.Product, error) {
product := &entity.Product{}
// 清理并验证产品名称
product.Name = strings.TrimSpace(strings.ToValidUTF8(productName, ""))
if product.Name == "" {
return nil, gerror.New("产品名称不能为空")
}
// 清理产品详情
product.Description = strings.TrimSpace(strings.ToValidUTF8(description, ""))
return product, nil
}
// parseTxt 解析 TXT 文件内容为产品实体(旧格式兼容)
func (s *product) parseTxt(content string) (*entity.Product, error) {
product := &entity.Product{}
lines := strings.Split(content, "\n")
var inDescription bool
var descriptionLines []string
for _, line := range lines {
line = strings.TrimSpace(line)
// 跳过空行
if line == "" {
continue
}
// 解析标题
if strings.HasPrefix(line, "=== ") && strings.HasSuffix(line, " ===") {
product.Name = strings.TrimSpace(line[4 : len(line)-4])
continue
}
// 跳过基本信息标记
if line == "【基本信息】" {
inDescription = false
continue
}
// 产品详情开始
if line == "【产品详情】" {
inDescription = true
continue
}
// 解析基本信息字段
if !inDescription {
// 跳过系统生成的字段产品ID、创建时间等
if strings.HasPrefix(line, "产品ID:") ||
strings.HasPrefix(line, "创建时间:") ||
strings.HasPrefix(line, "更新时间:") ||
strings.HasPrefix(line, "导出时间:") {
continue
}
// 解析 RAGFlow 文档 ID
if strings.HasPrefix(line, "RAGFlow文档ID:") {
// RAGFlow文档ID存储在RagSyncRecords中
continue
}
}
// 收集产品详情
if inDescription {
// 跳过分隔线和导出时间
if strings.Contains(line, "==================") || strings.HasPrefix(line, "导出时间:") {
continue
}
if line != "暂无产品详情" {
descriptionLines = append(descriptionLines, line)
}
}
}
// 拼接产品详情
product.Description = strings.Join(descriptionLines, "\n")
// 校验必填字段
if product.Name == "" {
return nil, gerror.New("产品名称不能为空")
}
return product, nil
}
// BindToCustomerServices 绑定产品到多个客服账号
func (p *product) BindToCustomerServices(ctx context.Context, req *dto.BindProductReq) (res *dto.BindProductRes, err error) {
res = &dto.BindProductRes{}
// 1. 查询产品
product, err := dao.Product.GetById(ctx, req.ProductId)
if err != nil {
return nil, gerror.Wrapf(err, "查询产品失败")
}
if product == nil {
return nil, gerror.New("产品不存在")
}
// 2. 构建已存在的绑定map去重
existingMap := make(map[string]bool)
for _, csId := range product.AccountNames {
existingMap[csId] = true
}
// 3. 过滤并添加新绑定
var newBindings []string
var failedIds []string
for _, csId := range req.AccountNames {
// 检查去重customer_service_id是否已存在
if existingMap[csId] {
failedIds = append(failedIds, csId)
g.Log().Warningf(ctx, "客服账号 %s 已绑定该产品,跳过", csId)
continue
}
// 验证客服账号是否存在
csAccount, err := dao.CustomerServiceAccount.FindByAccountName(ctx, csId)
if err != nil || csAccount == nil {
failedIds = append(failedIds, csId)
g.Log().Warningf(ctx, "客服账号 %s 不存在或已删除,跳过", csId)
continue
}
newBindings = append(newBindings, csId)
}
// 4. 如果没有新的绑定,直接返回
if len(newBindings) == 0 {
res.SuccessCount = 0
res.FailedIds = failedIds
res.Message = "所有客服账号均已绑定或不存在"
return res, nil
}
// 5. 更新产品绑定
product.AccountNames = append(product.AccountNames, newBindings...)
if err = dao.Product.UpdateEntity(ctx, product); err != nil {
return nil, gerror.Wrapf(err, "更新产品绑定失败")
}
// 6. 同步到RAGFlow自动创建知识库
for _, csId := range newBindings {
// 获取客服账号信息以获取tenant_id
csAccount, err := dao.CustomerServiceAccount.FindByAccountName(ctx, csId)
if err != nil || csAccount == nil {
g.Log().Errorf(ctx, "获取客服账号信息失败: %s", csId)
continue
}
// 同步到RAGFlow会自动创建知识库
tenantId := gconv.String(csAccount.TenantId)
_, err = p.SyncToRAGFlow(ctx, req.ProductId, csId, tenantId)
if err != nil {
g.Log().Errorf(ctx, "同步到RAGFlow失败: product_id=%s, cs_id=%s, error=%v", req.ProductId, csId, err)
// 不阻断绑定流程,失败会进入重试队列
}
}
res.SuccessCount = len(newBindings)
res.FailedIds = failedIds
res.Message = "绑定成功"
return
}
// UnbindFromCustomerService 从客服账号解绑产品
func (p *product) UnbindFromCustomerService(ctx context.Context, req *dto.UnbindProductReq) (res *dto.UnbindProductRes, err error) {
res = &dto.UnbindProductRes{}
product, err := dao.Product.GetById(ctx, req.ProductId)
if err != nil {
return nil, gerror.Wrapf(err, "查询产品失败")
}
if product == nil {
return nil, gerror.New("产品不存在")
}
// 查找并移除绑定
var newBindings []string
found := false
for _, csId := range product.AccountNames {
if csId == req.AccountName {
found = true
continue
}
newBindings = append(newBindings, csId)
}
if !found {
res.Success = false
res.Message = "未找到该绑定关系"
return res, nil
}
product.AccountNames = newBindings
if err = dao.Product.UpdateEntity(ctx, product); err != nil {
return nil, gerror.Wrapf(err, "解绑失败")
}
res.Success = true
res.Message = "解绑成功"
return
}
// SyncToRAGFlow 同步产品到RAGFlow租户级知识库
func (p *product) SyncToRAGFlow(ctx context.Context, productId, accountName, tenantId string) (documentId string, err error) {
// 1. 查询产品
product, err := dao.Product.GetById(ctx, productId)
if err != nil {
return "", gerror.Wrapf(err, "查询产品失败")
}
if product == nil {
return "", gerror.New("产品不存在")
}
// 2. 获取租户的产品知识库ID
datasetId := fmt.Sprintf("dataset_product_tenant_%s", tenantId)
// 2.1 确保知识库存在,不存在则自动创建
if err := p.ensureDatasetExists(ctx, datasetId, tenantId, "产品"); err != nil {
return "", gerror.Wrapf(err, "确保知识库存在失败")
}
// 3. 调用RAGFlow上传文档
ragflowClient := ragflow.GetGlobalClient()
filename := fmt.Sprintf("%s_%s.txt", product.Name, accountName)
documentId, err = ragflowClient.UploadDocumentFromText(ctx, datasetId, product.Description, filename)
if err != nil {
jaeger.RecordError(ctx, err, "产品上传RAGFlow失败")
p.sendToRetryQueue(ctx, productId, accountName, tenantId, 0)
return "", err
}
// 4. 更新MongoDB的RagSyncRecord
now := gtime.Now().Format("Y-m-d H:i:s")
updated := false
for i := range product.RagSyncRecords {
record := &product.RagSyncRecords[i]
if record.AccountName == accountName {
record.RagDocumentId = documentId
record.RagSyncStatus = "synced"
record.SyncTime = now
record.RetryCount = 0
updated = true
break
}
}
if !updated {
product.RagSyncRecords = append(product.RagSyncRecords, entity.RagSyncRecord{
AccountName: accountName,
RagDocumentId: documentId,
RagSyncStatus: "synced",
SyncTime: now,
RetryCount: 0,
})
}
product.RagLastSyncTime = now
if err = dao.Product.UpdateEntity(ctx, product); err != nil {
return "", gerror.Wrapf(err, "更新产品同步状态失败")
}
glog.Infof(ctx, "产品同步成功: product_id=%s, account_name=%s, document_id=%s", productId, accountName, documentId)
return documentId, nil
}
// ensureDatasetExists 已废弃,改用公共方法 EnsureTenantDataset
// 保留此方法仅为兼容性,直接调用公共方法
func (p *product) ensureDatasetExists(ctx context.Context, datasetId, tenantId, datasetType string) error {
_, err := EnsureTenantDataset(ctx, tenantId)
return err
}
// sendToRetryQueue 发送到重试队列
func (p *product) sendToRetryQueue(ctx context.Context, productId, accountName, tenantId string, retryCount int) {
msg := dto.RAGFlowSyncRetryMsg{
Type: "product",
Id: productId,
AccountName: accountName,
TenantId: tenantId,
RetryCount: retryCount,
}
var delay int
switch retryCount {
case 0:
delay = 5 * 60
case 1:
delay = 15 * 60
case 2:
delay = 60 * 60
default:
glog.Warningf(ctx, "产品同步重试次数超限,标记为失败: %s", productId)
p.markSyncFailed(ctx, productId, accountName)
return
}
if err := rabbitmq.PublishWithDelay(ctx, "ragflow.sync.retry.product", msg, delay); err != nil {
jaeger.RecordError(ctx, err, "发送RAGFlow重试消息失败")
}
}
// markSyncFailed 标记同步失败
func (p *product) markSyncFailed(ctx context.Context, productId, accountName string) {
product, err := dao.Product.GetById(ctx, productId)
if err != nil {
return
}
for i := range product.RagSyncRecords {
record := &product.RagSyncRecords[i]
if record.AccountName == accountName {
record.RagSyncStatus = "failed"
record.SyncTime = gtime.Now().Format("Y-m-d H:i:s")
break
}
}
dao.Product.UpdateEntity(ctx, product)
}
// HandleRAGFlowSyncRetry RAGFlow同步重试消费者
func (p *product) HandleRAGFlowSyncRetry(ctx context.Context, msg dto.RAGFlowSyncRetryMsg) error {
glog.Infof(ctx, "处理RAGFlow同步重试: type=%s, id=%s, retry=%d", msg.Type, msg.Id, msg.RetryCount)
if msg.Type != "product" {
return nil
}
_, err := p.SyncToRAGFlow(ctx, msg.Id, msg.AccountName, msg.TenantId)
if err != nil {
p.sendToRetryQueue(ctx, msg.Id, msg.AccountName, msg.TenantId, msg.RetryCount+1)
return err
}
return nil
}
// ProductRetryConsumer 产品RAGFlow重试消费者
type ProductRetryConsumer struct {
queueName string
consumer *rabbitmq.Consumer
}
// NewProductRetryConsumer 创建产品RAGFlow重试消费者
func NewProductRetryConsumer(ctx context.Context) *ProductRetryConsumer {
return &ProductRetryConsumer{
queueName: "ragflow.sync.retry.product",
}
}
// Start 启动消费者
func (c *ProductRetryConsumer) Start(ctx context.Context) error {
c.consumer = rabbitmq.NewConsumer(c.queueName, func(ctx context.Context, body []byte) error {
var msg dto.RAGFlowSyncRetryMsg
if err := json.Unmarshal(body, &msg); err != nil {
return err
}
return Product.HandleRAGFlowSyncRetry(ctx, msg)
})
return c.consumer.Start(ctx)
}
// Stop 停止消费者
func (c *ProductRetryConsumer) Stop(ctx context.Context) {
if c.consumer != nil {
c.consumer.Stop(ctx)
}
}