892 lines
27 KiB
Go
892 lines
27 KiB
Go
// Package service - 产品服务
|
||
// 功能:产品的增删改查、ZIP导入/导出、绑定/解绑客服账号、同步到RAGFlow、重试消费者
|
||
package service
|
||
|
||
import (
|
||
"archive/zip"
|
||
"bytes"
|
||
"context"
|
||
"customer-server/dao"
|
||
"customer-server/model/dto"
|
||
"customer-server/model/entity"
|
||
"customer-server/util"
|
||
"encoding/json"
|
||
"fmt"
|
||
"io"
|
||
"mime/multipart"
|
||
"strings"
|
||
"unicode/utf8"
|
||
|
||
"gitea.com/red-future/common/jaeger"
|
||
"gitea.com/red-future/common/rabbitmq"
|
||
"gitea.com/red-future/common/ragflow"
|
||
"gitea.com/red-future/common/utils"
|
||
"github.com/gogf/gf/v2/errors/gerror"
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/gogf/gf/v2/os/glog"
|
||
"github.com/gogf/gf/v2/os/grpool"
|
||
"github.com/gogf/gf/v2/os/gtime"
|
||
"github.com/gogf/gf/v2/util/gconv"
|
||
"go.mongodb.org/mongo-driver/v2/bson"
|
||
)
|
||
|
||
var (
|
||
Product = new(product)
|
||
productGrpool = grpool.New(50) // 文档解析协程池,最大50并发
|
||
)
|
||
|
||
type product struct{}
|
||
|
||
// Add 添加产品
|
||
// 参数: ctx - 上下文,req - 添加产品请求(包含产品名称、描述、价格等)
|
||
// 返回: res - 添加成功后的产品ID和RAGFlow文档ID,err - 错误信息
|
||
// 功能: 创建产品记录并自动上传到RAGFlow产品知识库(独立于话术知识库)
|
||
func (s *product) Add(ctx context.Context, req *dto.AddProductReq) (res *dto.AddProductRes, err error) {
|
||
// 校验产品名称长度
|
||
if utf8.RuneCountInString(req.Name) > 64 {
|
||
return nil, gerror.New("产品名称必须在64字以内")
|
||
}
|
||
|
||
// 校验产品详情长度
|
||
if utf8.RuneCountInString(req.Description) > 8192 {
|
||
return nil, gerror.New("产品详情必须在8192字以内")
|
||
}
|
||
|
||
// 去重检查:同一租户下名称唯一
|
||
existing, err := dao.Product.FindByName(ctx, req.Name)
|
||
if err != nil {
|
||
return nil, gerror.Wrap(err, "检查产品重复失败")
|
||
}
|
||
if existing != nil {
|
||
return nil, gerror.Newf("产品名称已存在:name=%s, id=%s", req.Name, existing.Id.Hex())
|
||
}
|
||
|
||
data := &entity.Product{}
|
||
if err = utils.Struct(req, data); err != nil {
|
||
return
|
||
}
|
||
// 先从token获取租户信息(在Insert之前)
|
||
user, err := util.GetTenantInfo(ctx)
|
||
if err != nil {
|
||
return nil, gerror.Wrap(err, "获取租户信息失败")
|
||
}
|
||
tenantId := gconv.String(user.TenantId)
|
||
if tenantId == "" {
|
||
return nil, gerror.New("租户ID为空")
|
||
}
|
||
|
||
// 确保产品知识库存在(检查话术知识库是否存在,作为租户是否初始化的标志)
|
||
speechcraftDatasetId, err := dao.RAGFlowConfig.FindDatasetIdByTenant(ctx, tenantId)
|
||
if err != nil || speechcraftDatasetId == "" {
|
||
return nil, gerror.Newf("租户知识库不存在,请先创建客服账号: tenant_id=%s", tenantId)
|
||
}
|
||
|
||
// 设置基础字段
|
||
now := gtime.Now().Time
|
||
data.CreatedAt = &now // 取地址赋值给指针类型
|
||
data.UpdatedAt = &now // 取地址赋值给指针类型
|
||
data.IsDeleted = false
|
||
// 统一使用string类型存储tenantId到MongoDB
|
||
data.TenantId = tenantId
|
||
|
||
// 插入产品到MongoDB
|
||
_, err = dao.Product.Insert(ctx, data)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 确保租户知识库存在(产品和话术共享租户知识库)
|
||
// 使用dataset_service提供的统一方法,自动处理创建、查找、保存等逻辑
|
||
datasetId, err := EnsureTenantDataset(ctx, tenantId)
|
||
if err != nil {
|
||
g.Log().Errorf(ctx, "确保租户知识库失败: %v", err)
|
||
return nil, gerror.Wrap(err, "获取租户知识库失败")
|
||
}
|
||
g.Log().Infof(ctx, "租户%s的知识库ID: %s", tenantId, datasetId)
|
||
|
||
// 同步上传到RAGFlow
|
||
ragflowClient := ragflow.GetGlobalClient()
|
||
g.Log().Infof(ctx, "准备上传产品到RAGFlow: product_id=%s, dataset_id=%s, name=%s", data.Id.Hex(), datasetId, data.Name)
|
||
filename := fmt.Sprintf("%s.txt", data.Name)
|
||
documentId, err := ragflowClient.UploadDocumentFromText(ctx, datasetId, data.Description, filename)
|
||
if err != nil {
|
||
// 回滚:删除刚插入的产品
|
||
dao.MongoDAO.Delete(ctx, bson.M{"_id": data.Id}, entity.ProductCollection)
|
||
g.Log().Errorf(ctx, "产品上传RAGFlow失败: product_id=%s, dataset_id=%s, error=%v", data.Id.Hex(), datasetId, err)
|
||
jaeger.RecordError(ctx, err, "产品上传RAGFlow失败")
|
||
return nil, gerror.Wrap(err, "文档上传到知识库失败")
|
||
}
|
||
|
||
// 异步触发解析(grpool自动管理goroutine生命周期,WithoutCancel保留追踪避免取消)
|
||
productGrpool.Add(ctx, func(ctx context.Context) {
|
||
parseCtx := context.WithoutCancel(ctx)
|
||
if err := ragflowClient.ParseDocuments(parseCtx, datasetId, []string{documentId}); err != nil {
|
||
g.Log().Errorf(parseCtx, "文档解析失败: document_id=%s, error=%v", documentId, err)
|
||
} else {
|
||
g.Log().Infof(parseCtx, "文档解析成功: document_id=%s", documentId)
|
||
}
|
||
})
|
||
|
||
// 更新MongoDB的RagSyncRecords数组(使用空accountName表示租户级文档)
|
||
syncTime := gtime.Now().Format("Y-m-d H:i:s")
|
||
record := entity.RagSyncRecord{
|
||
AccountName: "", // 空表示租户级文档
|
||
RagDocumentId: documentId,
|
||
RagSyncStatus: "synced",
|
||
SyncTime: syncTime,
|
||
RetryCount: 0,
|
||
}
|
||
filter := bson.M{"_id": data.Id}
|
||
update := bson.M{
|
||
"$set": bson.M{
|
||
"ragSyncRecords": []entity.RagSyncRecord{record},
|
||
"ragLastSyncTime": syncTime,
|
||
"updatedAt": gtime.Now().Time,
|
||
},
|
||
}
|
||
if _, _, err = dao.MongoDAO.UpdateOne(ctx, filter, update, entity.ProductCollection); err != nil {
|
||
g.Log().Errorf(ctx, "更新产品RagSyncRecords失败: %v", err)
|
||
// 不回滚,文档已上传成功
|
||
}
|
||
|
||
g.Log().Infof(ctx, "产品添加成功并上传到知识库: product_id=%s, document_id=%s", data.Id.Hex(), documentId)
|
||
res = &dto.AddProductRes{Id: data.Id.Hex()}
|
||
return
|
||
}
|
||
|
||
// Update 更新产品
|
||
// 参数: ctx - 上下文,req - 更新产品请求(包含产品ID和待更新字段)
|
||
// 返回: err - 错误信息
|
||
// 功能: 更新产品信息并同步到RAGFlow,支持文档删除重建
|
||
func (s *product) Update(ctx context.Context, req *dto.UpdateProductReq) (err error) {
|
||
// 如果更新了产品名称,校验长度
|
||
if req.Name != "" && utf8.RuneCountInString(req.Name) > 64 {
|
||
return gerror.New("产品名称必须在64字以内")
|
||
}
|
||
|
||
// 如果更新了产品详情,校验长度
|
||
if req.Description != "" && utf8.RuneCountInString(req.Description) > 8192 {
|
||
return gerror.New("产品详情必须在8192字以内")
|
||
}
|
||
|
||
// 去重检查:如果修改名称,检查是否与其他产品重复
|
||
if req.Name != "" {
|
||
existing, err := dao.Product.FindByName(ctx, req.Name)
|
||
if err != nil {
|
||
return gerror.Wrap(err, "检查产品重复失败")
|
||
}
|
||
if existing != nil && existing.Id.Hex() != req.Id {
|
||
return gerror.Newf("产品名称已存在:name=%s, id=%s", req.Name, existing.Id.Hex())
|
||
}
|
||
}
|
||
|
||
return dao.Product.Update(ctx, req)
|
||
}
|
||
|
||
// Delete 删除产品
|
||
// 参数: ctx - 上下文,req - 删除产品请求(包含产品ID)
|
||
// 返回: err - 错误信息
|
||
// 功能: 逻辑删除产品记录并从RAGFlow移除对应文档
|
||
func (s *product) Delete(ctx context.Context, req *dto.DeleteProductReq) (err error) {
|
||
g.Log().Infof(ctx, "[Delete] 开始删除产品 - productId: %s", req.Id)
|
||
|
||
// 1. 查询产品,获取RAGFlow同步记录(使用原生查询,避免租户过滤)
|
||
objectId, err := bson.ObjectIDFromHex(req.Id)
|
||
if err != nil {
|
||
return gerror.Wrap(err, "无效的产品ID")
|
||
}
|
||
|
||
var product entity.Product
|
||
filter := bson.M{"_id": objectId, "isDeleted": false}
|
||
err = dao.MongoDAO.FindOne(ctx, filter, &product, entity.ProductCollection)
|
||
if err != nil {
|
||
if err.Error() == "mongo: no documents in result" {
|
||
return gerror.New("产品不存在")
|
||
}
|
||
return gerror.Wrap(err, "查询产品失败")
|
||
}
|
||
|
||
g.Log().Infof(ctx, "[Delete] 查询到产品 - name: %s, ragSyncRecords数量: %d", product.Name, len(product.RagSyncRecords))
|
||
|
||
// 2. 删除RAGFlow中的文档
|
||
if len(product.RagSyncRecords) > 0 {
|
||
ragflowClient := ragflow.GetGlobalClient()
|
||
if ragflowClient != nil {
|
||
tenantId := gconv.String(product.TenantId)
|
||
|
||
// 查询租户的dataset_id
|
||
datasetId, err := dao.RAGFlowConfig.FindDatasetIdByTenant(ctx, tenantId)
|
||
if err != nil {
|
||
g.Log().Warningf(ctx, "查询租户知识库ID失败: %v", err)
|
||
} else if datasetId != "" {
|
||
// 收集所有需要删除的document_id
|
||
var documentIds []string
|
||
for _, record := range product.RagSyncRecords {
|
||
if record.RagDocumentId != "" {
|
||
documentIds = append(documentIds, record.RagDocumentId)
|
||
}
|
||
}
|
||
|
||
// 批量删除RAGFlow文档
|
||
if len(documentIds) > 0 {
|
||
if err := ragflowClient.DeleteDocument(ctx, datasetId, documentIds); err != nil {
|
||
g.Log().Errorf(ctx, "删除RAGFlow文档失败: %v, document_ids: %v", err, documentIds)
|
||
// 不阻断删除流程,记录错误后继续
|
||
} else {
|
||
g.Log().Infof(ctx, "成功删除RAGFlow文档: count=%d", len(documentIds))
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 3. 软删除MongoDB记录
|
||
return dao.Product.Delete(ctx, req)
|
||
}
|
||
|
||
// List 获取产品列表
|
||
// 参数: ctx - 上下文,req - 列表查询请求(支持分页、关键词搜索)
|
||
// 返回: res - 产品列表及分页信息,err - 错误信息
|
||
// 功能: 分页查询产品记录,支持按名称、描述模糊搜索
|
||
func (s *product) List(ctx context.Context, req *dto.ListProductReq) (res *dto.ListProductRes, err error) {
|
||
list, total, err := dao.Product.List(ctx, req)
|
||
if err != nil {
|
||
return
|
||
}
|
||
res = &dto.ListProductRes{
|
||
List: list,
|
||
Total: int(total),
|
||
}
|
||
return
|
||
}
|
||
|
||
// Export 导出产品为ZIP文件
|
||
// 参数: ctx - 上下文,req - 导出请求(包含筛选条件)
|
||
// 返回: zipData - ZIP文件字节数组,filename - 文件名,err - 错误信息
|
||
// 功能: 将产品数据导出为ZIP文件,包含JSON格式的产品列表
|
||
func (s *product) Export(ctx context.Context, req *dto.ExportProductReq) (zipData []byte, filename string, err error) {
|
||
// 清理输入参数,防止非法 UTF-8 字符
|
||
cleanName := strings.ToValidUTF8(req.Name, "")
|
||
|
||
// 1. 查询所有符合条件的产品
|
||
products, err := dao.Product.FindAllForExport(ctx, cleanName)
|
||
if err != nil {
|
||
return nil, "", err
|
||
}
|
||
|
||
if len(products) == 0 {
|
||
return nil, "", gerror.New("没有可导出的产品")
|
||
}
|
||
|
||
// 清理所有产品数据,确保 UTF-8 有效(防止数据库中的脏数据)
|
||
for i := range products {
|
||
products[i].Name = strings.ToValidUTF8(products[i].Name, "")
|
||
products[i].Description = strings.ToValidUTF8(products[i].Description, "")
|
||
// RagDocumentId字段在RagSyncRecords中,不在Product主体
|
||
}
|
||
|
||
// 2. 创建 ZIP 文件(内存中)
|
||
var buf bytes.Buffer
|
||
zipWriter := zip.NewWriter(&buf)
|
||
defer zipWriter.Close()
|
||
|
||
// 3. 为每个产品生成 TXT 文件并添加到 ZIP
|
||
for _, product := range products {
|
||
// 生成 TXT 内容(产品详情)
|
||
txtContent := s.generateTxt(product)
|
||
|
||
// 文件名就是产品名称(清理特殊字符)
|
||
cleanName := strings.ToValidUTF8(product.Name, "未命名")
|
||
safeFilename := s.sanitizeFilename(cleanName)
|
||
if safeFilename == "" {
|
||
safeFilename = "product"
|
||
}
|
||
txtFilename := safeFilename + ".txt"
|
||
|
||
// 添加文件到 ZIP
|
||
writer, err := zipWriter.Create(txtFilename)
|
||
if err != nil {
|
||
return nil, "", gerror.Newf("创建ZIP文件失败: %v", err)
|
||
}
|
||
|
||
if _, err := writer.Write([]byte(txtContent)); err != nil {
|
||
return nil, "", gerror.Newf("写入ZIP文件失败: %v", err)
|
||
}
|
||
}
|
||
|
||
// 5. 生成下载文件名
|
||
timestamp := gtime.Now().Format("Ymd_His")
|
||
filename = "products_export_" + timestamp + ".zip"
|
||
|
||
return buf.Bytes(), filename, nil
|
||
}
|
||
|
||
// generateTxt 将产品转换为 TXT 格式
|
||
// 新格式:文件名=产品名称,内容=产品详情
|
||
func (s *product) generateTxt(product *entity.Product) string {
|
||
// 清理产品详情,确保 UTF-8 有效
|
||
cleanDescription := strings.ToValidUTF8(product.Description, "")
|
||
|
||
// 直接返回产品详情
|
||
if cleanDescription != "" {
|
||
return cleanDescription
|
||
}
|
||
|
||
// 如果没有详情,返回空字符串
|
||
return ""
|
||
}
|
||
|
||
// sanitizeFilename 清理文件名中的特殊字符
|
||
func (s *product) sanitizeFilename(name string) string {
|
||
// 替换不安全的文件名字符
|
||
replacer := map[rune]rune{
|
||
'/': '_',
|
||
'\\': '_',
|
||
':': '_',
|
||
'*': '_',
|
||
'?': '_',
|
||
'"': '_',
|
||
'<': '_',
|
||
'>': '_',
|
||
'|': '_',
|
||
}
|
||
|
||
// 预分配容量,避免循环中动态扩容
|
||
result := make([]rune, 0, len(name))
|
||
for _, char := range name {
|
||
if newChar, exists := replacer[char]; exists {
|
||
result = append(result, newChar)
|
||
} else {
|
||
result = append(result, char)
|
||
}
|
||
}
|
||
|
||
filename := string(result)
|
||
// 限制文件名长度
|
||
if utf8.RuneCountInString(filename) > 50 {
|
||
runes := []rune(filename)
|
||
filename = string(runes[:50])
|
||
}
|
||
|
||
return filename
|
||
}
|
||
|
||
// Import 从ZIP文件导入产品
|
||
// 参数: ctx - 上下文,file - 上传的ZIP文件
|
||
// 返回: res - 导入结果(成功和失败数量),err - 错误信息
|
||
// 功能: 从ZIP文件批量导入产品数据并同步到RAGFlow,失败记录加入重试队列
|
||
func (s *product) Import(ctx context.Context, file *multipart.FileHeader) (res *dto.ImportProductRes, err error) {
|
||
res = &dto.ImportProductRes{
|
||
SuccessCount: 0,
|
||
FailCount: 0,
|
||
FailReasons: []string{},
|
||
}
|
||
|
||
// 1. 获取租户信息
|
||
user, err := util.GetTenantInfo(ctx)
|
||
if err != nil {
|
||
return nil, gerror.Wrap(err, "获取租户信息失败")
|
||
}
|
||
tenantId := gconv.String(user.TenantId)
|
||
if tenantId == "" {
|
||
return nil, gerror.New("租户ID为空")
|
||
}
|
||
|
||
// 2. 打开上传的文件
|
||
uploadedFile, err := file.Open()
|
||
if err != nil {
|
||
return nil, gerror.Newf("无法打开上传的文件: %v", err)
|
||
}
|
||
defer uploadedFile.Close()
|
||
|
||
// 3. 读取文件内容到内存
|
||
fileData, err := io.ReadAll(uploadedFile)
|
||
if err != nil {
|
||
return nil, gerror.Newf("读取文件失败: %v", err)
|
||
}
|
||
|
||
// 4. 解析 ZIP 文件
|
||
zipReader, err := zip.NewReader(bytes.NewReader(fileData), int64(len(fileData)))
|
||
if err != nil {
|
||
return nil, gerror.Newf("无法解析ZIP文件: %v", err)
|
||
}
|
||
|
||
// 4. 遍历 ZIP 中的所有文件
|
||
for _, zipFile := range zipReader.File {
|
||
// 只处理 .txt 文件
|
||
if !strings.HasSuffix(strings.ToLower(zipFile.Name), ".txt") {
|
||
continue
|
||
}
|
||
|
||
// 读取 TXT 文件内容(产品详情)
|
||
txtContent, err := s.readZipFile(zipFile)
|
||
if err != nil {
|
||
res.FailCount++
|
||
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+" 读取失败: "+err.Error())
|
||
continue
|
||
}
|
||
|
||
// 从文件名提取产品名称(移除 .txt 后缀)
|
||
productName := strings.TrimSuffix(zipFile.Name, ".txt")
|
||
productName = strings.TrimSpace(productName)
|
||
|
||
// 创建产品数据
|
||
productData, err := s.parseSimpleTxt(productName, txtContent)
|
||
if err != nil {
|
||
res.FailCount++
|
||
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+" 解析失败: "+err.Error())
|
||
continue
|
||
}
|
||
|
||
// 校验产品名称和详情长度
|
||
if utf8.RuneCountInString(productData.Name) > 64 {
|
||
res.FailCount++
|
||
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+": 产品名称超过64字")
|
||
continue
|
||
}
|
||
if utf8.RuneCountInString(productData.Description) > 8192 {
|
||
res.FailCount++
|
||
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+": 产品详情超过8192字")
|
||
continue
|
||
}
|
||
|
||
// 设置基础字段
|
||
now := gtime.Now().Time
|
||
productData.CreatedAt = &now // 取地址赋值给指针类型
|
||
productData.UpdatedAt = &now // 取地址赋值给指针类型
|
||
productData.IsDeleted = false
|
||
// 统一使用string类型存储tenantId到MongoDB
|
||
productData.TenantId = tenantId
|
||
|
||
// 插入数据库
|
||
_, err = dao.Product.Insert(ctx, productData)
|
||
if err != nil {
|
||
res.FailCount++
|
||
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+" 数据库插入失败: "+err.Error())
|
||
continue
|
||
}
|
||
|
||
// 同步上传到RAGFlow产品知识库(使用外层已声明的tenantId变量)
|
||
if tenantId != "" {
|
||
datasetId := fmt.Sprintf("dataset_product_tenant_%s", tenantId)
|
||
ragflowClient := ragflow.GetGlobalClient()
|
||
if ragflowClient != nil {
|
||
filename := fmt.Sprintf("%s.txt", productData.Name)
|
||
documentId, uploadErr := ragflowClient.UploadDocumentFromText(ctx, datasetId, productData.Description, filename)
|
||
if uploadErr != nil {
|
||
// 上传失败:回滚删除MongoDB记录
|
||
dao.MongoDAO.Delete(ctx, bson.M{"_id": productData.Id}, entity.ProductCollection)
|
||
res.FailCount++
|
||
res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+" 上传知识库失败: "+uploadErr.Error())
|
||
continue
|
||
}
|
||
|
||
// 更新ragDocumentId
|
||
filter := bson.M{"_id": productData.Id}
|
||
update := bson.M{"$set": bson.M{"ragDocumentId": documentId}}
|
||
dao.MongoDAO.UpdateOne(ctx, filter, update, entity.ProductCollection)
|
||
|
||
g.Log().Infof(ctx, "ZIP产品上传成功: name=%s, document_id=%s", productData.Name, documentId)
|
||
}
|
||
}
|
||
|
||
res.SuccessCount++
|
||
}
|
||
|
||
return res, nil
|
||
}
|
||
|
||
// readZipFile 读取 ZIP 文件中的单个文件内容
|
||
func (s *product) readZipFile(file *zip.File) (string, error) {
|
||
reader, err := file.Open()
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
defer reader.Close()
|
||
|
||
content, err := io.ReadAll(reader)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
return string(content), nil
|
||
}
|
||
|
||
// parseSimpleTxt 解析简化格式的 TXT 文件
|
||
// 文件名=产品名称,内容=产品详情
|
||
func (s *product) parseSimpleTxt(productName string, description string) (*entity.Product, error) {
|
||
product := &entity.Product{}
|
||
|
||
// 清理并验证产品名称
|
||
product.Name = strings.TrimSpace(strings.ToValidUTF8(productName, ""))
|
||
if product.Name == "" {
|
||
return nil, gerror.New("产品名称不能为空")
|
||
}
|
||
|
||
// 清理产品详情
|
||
product.Description = strings.TrimSpace(strings.ToValidUTF8(description, ""))
|
||
|
||
return product, nil
|
||
}
|
||
|
||
// parseTxt 解析 TXT 文件内容为产品实体(旧格式兼容)
|
||
func (s *product) parseTxt(content string) (*entity.Product, error) {
|
||
product := &entity.Product{}
|
||
lines := strings.Split(content, "\n")
|
||
|
||
var inDescription bool
|
||
var descriptionLines []string
|
||
|
||
for _, line := range lines {
|
||
line = strings.TrimSpace(line)
|
||
|
||
// 跳过空行
|
||
if line == "" {
|
||
continue
|
||
}
|
||
|
||
// 解析标题
|
||
if strings.HasPrefix(line, "=== ") && strings.HasSuffix(line, " ===") {
|
||
product.Name = strings.TrimSpace(line[4 : len(line)-4])
|
||
continue
|
||
}
|
||
|
||
// 跳过基本信息标记
|
||
if line == "【基本信息】" {
|
||
inDescription = false
|
||
continue
|
||
}
|
||
|
||
// 产品详情开始
|
||
if line == "【产品详情】" {
|
||
inDescription = true
|
||
continue
|
||
}
|
||
|
||
// 解析基本信息字段
|
||
if !inDescription {
|
||
// 跳过系统生成的字段(产品ID、创建时间等)
|
||
if strings.HasPrefix(line, "产品ID:") ||
|
||
strings.HasPrefix(line, "创建时间:") ||
|
||
strings.HasPrefix(line, "更新时间:") ||
|
||
strings.HasPrefix(line, "导出时间:") {
|
||
continue
|
||
}
|
||
|
||
// 解析 RAGFlow 文档 ID
|
||
if strings.HasPrefix(line, "RAGFlow文档ID:") {
|
||
// RAGFlow文档ID存储在RagSyncRecords中
|
||
continue
|
||
}
|
||
}
|
||
|
||
// 收集产品详情
|
||
if inDescription {
|
||
// 跳过分隔线和导出时间
|
||
if strings.Contains(line, "==================") || strings.HasPrefix(line, "导出时间:") {
|
||
continue
|
||
}
|
||
if line != "暂无产品详情" {
|
||
descriptionLines = append(descriptionLines, line)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 拼接产品详情
|
||
product.Description = strings.Join(descriptionLines, "\n")
|
||
|
||
// 校验必填字段
|
||
if product.Name == "" {
|
||
return nil, gerror.New("产品名称不能为空")
|
||
}
|
||
|
||
return product, nil
|
||
}
|
||
|
||
// BindToCustomerServices 绑定产品到多个客服账号
|
||
func (p *product) BindToCustomerServices(ctx context.Context, req *dto.BindProductReq) (res *dto.BindProductRes, err error) {
|
||
res = &dto.BindProductRes{}
|
||
|
||
// 1. 查询产品
|
||
product, err := dao.Product.GetById(ctx, req.ProductId)
|
||
if err != nil {
|
||
return nil, gerror.Wrapf(err, "查询产品失败")
|
||
}
|
||
if product == nil {
|
||
return nil, gerror.New("产品不存在")
|
||
}
|
||
|
||
// 2. 构建已存在的绑定map(去重)
|
||
existingMap := make(map[string]bool)
|
||
for _, csId := range product.AccountNames {
|
||
existingMap[csId] = true
|
||
}
|
||
|
||
// 3. 过滤并添加新绑定
|
||
var newBindings []string
|
||
var failedIds []string
|
||
|
||
for _, csId := range req.AccountNames {
|
||
// 检查去重:customer_service_id是否已存在
|
||
if existingMap[csId] {
|
||
failedIds = append(failedIds, csId)
|
||
g.Log().Warningf(ctx, "客服账号 %s 已绑定该产品,跳过", csId)
|
||
continue
|
||
}
|
||
|
||
// 验证客服账号是否存在
|
||
csAccount, err := dao.CustomerServiceAccount.FindByAccountName(ctx, csId)
|
||
if err != nil || csAccount == nil {
|
||
failedIds = append(failedIds, csId)
|
||
g.Log().Warningf(ctx, "客服账号 %s 不存在或已删除,跳过", csId)
|
||
continue
|
||
}
|
||
|
||
newBindings = append(newBindings, csId)
|
||
}
|
||
|
||
// 4. 如果没有新的绑定,直接返回
|
||
if len(newBindings) == 0 {
|
||
res.SuccessCount = 0
|
||
res.FailedIds = failedIds
|
||
res.Message = "所有客服账号均已绑定或不存在"
|
||
return res, nil
|
||
}
|
||
|
||
// 5. 更新产品绑定
|
||
product.AccountNames = append(product.AccountNames, newBindings...)
|
||
if err = dao.Product.UpdateEntity(ctx, product); err != nil {
|
||
return nil, gerror.Wrapf(err, "更新产品绑定失败")
|
||
}
|
||
|
||
// 6. 同步到RAGFlow(自动创建知识库)
|
||
for _, csId := range newBindings {
|
||
// 获取客服账号信息以获取tenant_id
|
||
csAccount, err := dao.CustomerServiceAccount.FindByAccountName(ctx, csId)
|
||
if err != nil || csAccount == nil {
|
||
g.Log().Errorf(ctx, "获取客服账号信息失败: %s", csId)
|
||
continue
|
||
}
|
||
|
||
// 同步到RAGFlow(会自动创建知识库)
|
||
tenantId := gconv.String(csAccount.TenantId)
|
||
_, err = p.SyncToRAGFlow(ctx, req.ProductId, csId, tenantId)
|
||
if err != nil {
|
||
g.Log().Errorf(ctx, "同步到RAGFlow失败: product_id=%s, cs_id=%s, error=%v", req.ProductId, csId, err)
|
||
// 不阻断绑定流程,失败会进入重试队列
|
||
}
|
||
}
|
||
|
||
res.SuccessCount = len(newBindings)
|
||
res.FailedIds = failedIds
|
||
res.Message = "绑定成功"
|
||
return
|
||
}
|
||
|
||
// UnbindFromCustomerService 从客服账号解绑产品
|
||
func (p *product) UnbindFromCustomerService(ctx context.Context, req *dto.UnbindProductReq) (res *dto.UnbindProductRes, err error) {
|
||
res = &dto.UnbindProductRes{}
|
||
|
||
product, err := dao.Product.GetById(ctx, req.ProductId)
|
||
if err != nil {
|
||
return nil, gerror.Wrapf(err, "查询产品失败")
|
||
}
|
||
if product == nil {
|
||
return nil, gerror.New("产品不存在")
|
||
}
|
||
|
||
// 查找并移除绑定
|
||
var newBindings []string
|
||
found := false
|
||
|
||
for _, csId := range product.AccountNames {
|
||
if csId == req.AccountName {
|
||
found = true
|
||
continue
|
||
}
|
||
newBindings = append(newBindings, csId)
|
||
}
|
||
|
||
if !found {
|
||
res.Success = false
|
||
res.Message = "未找到该绑定关系"
|
||
return res, nil
|
||
}
|
||
|
||
product.AccountNames = newBindings
|
||
if err = dao.Product.UpdateEntity(ctx, product); err != nil {
|
||
return nil, gerror.Wrapf(err, "解绑失败")
|
||
}
|
||
|
||
res.Success = true
|
||
res.Message = "解绑成功"
|
||
return
|
||
}
|
||
|
||
// SyncToRAGFlow 同步产品到RAGFlow(租户级知识库)
|
||
func (p *product) SyncToRAGFlow(ctx context.Context, productId, accountName, tenantId string) (documentId string, err error) {
|
||
// 1. 查询产品
|
||
product, err := dao.Product.GetById(ctx, productId)
|
||
if err != nil {
|
||
return "", gerror.Wrapf(err, "查询产品失败")
|
||
}
|
||
if product == nil {
|
||
return "", gerror.New("产品不存在")
|
||
}
|
||
|
||
// 2. 获取租户的产品知识库ID
|
||
datasetId := fmt.Sprintf("dataset_product_tenant_%s", tenantId)
|
||
|
||
// 2.1 确保知识库存在,不存在则自动创建
|
||
if err := p.ensureDatasetExists(ctx, datasetId, tenantId, "产品"); err != nil {
|
||
return "", gerror.Wrapf(err, "确保知识库存在失败")
|
||
}
|
||
|
||
// 3. 调用RAGFlow上传文档
|
||
ragflowClient := ragflow.GetGlobalClient()
|
||
filename := fmt.Sprintf("%s_%s.txt", product.Name, accountName)
|
||
documentId, err = ragflowClient.UploadDocumentFromText(ctx, datasetId, product.Description, filename)
|
||
if err != nil {
|
||
jaeger.RecordError(ctx, err, "产品上传RAGFlow失败")
|
||
p.sendToRetryQueue(ctx, productId, accountName, tenantId, 0)
|
||
return "", err
|
||
}
|
||
|
||
// 4. 更新MongoDB的RagSyncRecord
|
||
now := gtime.Now().Format("Y-m-d H:i:s")
|
||
updated := false
|
||
for i := range product.RagSyncRecords {
|
||
record := &product.RagSyncRecords[i]
|
||
if record.AccountName == accountName {
|
||
record.RagDocumentId = documentId
|
||
record.RagSyncStatus = "synced"
|
||
record.SyncTime = now
|
||
record.RetryCount = 0
|
||
updated = true
|
||
break
|
||
}
|
||
}
|
||
|
||
if !updated {
|
||
product.RagSyncRecords = append(product.RagSyncRecords, entity.RagSyncRecord{
|
||
AccountName: accountName,
|
||
RagDocumentId: documentId,
|
||
RagSyncStatus: "synced",
|
||
SyncTime: now,
|
||
RetryCount: 0,
|
||
})
|
||
}
|
||
|
||
product.RagLastSyncTime = now
|
||
if err = dao.Product.UpdateEntity(ctx, product); err != nil {
|
||
return "", gerror.Wrapf(err, "更新产品同步状态失败")
|
||
}
|
||
|
||
glog.Infof(ctx, "产品同步成功: product_id=%s, account_name=%s, document_id=%s", productId, accountName, documentId)
|
||
return documentId, nil
|
||
}
|
||
|
||
// ensureDatasetExists 已废弃,改用公共方法 EnsureTenantDataset
|
||
// 保留此方法仅为兼容性,直接调用公共方法
|
||
func (p *product) ensureDatasetExists(ctx context.Context, datasetId, tenantId, datasetType string) error {
|
||
_, err := EnsureTenantDataset(ctx, tenantId)
|
||
return err
|
||
}
|
||
|
||
// sendToRetryQueue 发送到重试队列
|
||
func (p *product) sendToRetryQueue(ctx context.Context, productId, accountName, tenantId string, retryCount int) {
|
||
msg := dto.RAGFlowSyncRetryMsg{
|
||
Type: "product",
|
||
Id: productId,
|
||
AccountName: accountName,
|
||
TenantId: tenantId,
|
||
RetryCount: retryCount,
|
||
}
|
||
|
||
var delay int
|
||
switch retryCount {
|
||
case 0:
|
||
delay = 5 * 60
|
||
case 1:
|
||
delay = 15 * 60
|
||
case 2:
|
||
delay = 60 * 60
|
||
default:
|
||
glog.Warningf(ctx, "产品同步重试次数超限,标记为失败: %s", productId)
|
||
p.markSyncFailed(ctx, productId, accountName)
|
||
return
|
||
}
|
||
|
||
if err := rabbitmq.PublishWithDelay(ctx, "ragflow.sync.retry.product", msg, delay); err != nil {
|
||
jaeger.RecordError(ctx, err, "发送RAGFlow重试消息失败")
|
||
}
|
||
}
|
||
|
||
// markSyncFailed 标记同步失败
|
||
func (p *product) markSyncFailed(ctx context.Context, productId, accountName string) {
|
||
product, err := dao.Product.GetById(ctx, productId)
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
for i := range product.RagSyncRecords {
|
||
record := &product.RagSyncRecords[i]
|
||
if record.AccountName == accountName {
|
||
record.RagSyncStatus = "failed"
|
||
record.SyncTime = gtime.Now().Format("Y-m-d H:i:s")
|
||
break
|
||
}
|
||
}
|
||
|
||
dao.Product.UpdateEntity(ctx, product)
|
||
}
|
||
|
||
// HandleRAGFlowSyncRetry RAGFlow同步重试消费者
|
||
func (p *product) HandleRAGFlowSyncRetry(ctx context.Context, msg dto.RAGFlowSyncRetryMsg) error {
|
||
glog.Infof(ctx, "处理RAGFlow同步重试: type=%s, id=%s, retry=%d", msg.Type, msg.Id, msg.RetryCount)
|
||
|
||
if msg.Type != "product" {
|
||
return nil
|
||
}
|
||
|
||
_, err := p.SyncToRAGFlow(ctx, msg.Id, msg.AccountName, msg.TenantId)
|
||
if err != nil {
|
||
p.sendToRetryQueue(ctx, msg.Id, msg.AccountName, msg.TenantId, msg.RetryCount+1)
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// ProductRetryConsumer 产品RAGFlow重试消费者
|
||
type ProductRetryConsumer struct {
|
||
queueName string
|
||
consumer *rabbitmq.Consumer
|
||
}
|
||
|
||
// NewProductRetryConsumer 创建产品RAGFlow重试消费者
|
||
func NewProductRetryConsumer(ctx context.Context) *ProductRetryConsumer {
|
||
return &ProductRetryConsumer{
|
||
queueName: "ragflow.sync.retry.product",
|
||
}
|
||
}
|
||
|
||
// Start 启动消费者
|
||
func (c *ProductRetryConsumer) Start(ctx context.Context) error {
|
||
c.consumer = rabbitmq.NewConsumer(c.queueName, func(ctx context.Context, body []byte) error {
|
||
var msg dto.RAGFlowSyncRetryMsg
|
||
if err := json.Unmarshal(body, &msg); err != nil {
|
||
return err
|
||
}
|
||
return Product.HandleRAGFlowSyncRetry(ctx, msg)
|
||
})
|
||
return c.consumer.Start(ctx)
|
||
}
|
||
|
||
// Stop 停止消费者
|
||
func (c *ProductRetryConsumer) Stop(ctx context.Context) {
|
||
if c.consumer != nil {
|
||
c.consumer.Stop(ctx)
|
||
}
|
||
}
|