// Package service - 产品服务 // 功能:产品的增删改查、ZIP导入/导出、绑定/解绑客服账号、同步到RAGFlow、重试消费者 package service import ( "archive/zip" "bytes" "context" "customer-server/dao" "customer-server/model/dto" "customer-server/model/entity" "customer-server/util" "encoding/json" "fmt" "io" "mime/multipart" "strings" "unicode/utf8" "gitea.com/red-future/common/jaeger" "gitea.com/red-future/common/rabbitmq" "gitea.com/red-future/common/ragflow" "gitea.com/red-future/common/utils" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/grpool" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" "go.mongodb.org/mongo-driver/v2/bson" ) var ( Product = new(product) productGrpool = grpool.New(50) // 文档解析协程池,最大50并发 ) type product struct{} // Add 添加产品 // 参数: ctx - 上下文,req - 添加产品请求(包含产品名称、描述、价格等) // 返回: res - 添加成功后的产品ID和RAGFlow文档ID,err - 错误信息 // 功能: 创建产品记录并自动上传到RAGFlow产品知识库(独立于话术知识库) func (s *product) Add(ctx context.Context, req *dto.AddProductReq) (res *dto.AddProductRes, err error) { // 校验产品名称长度 if utf8.RuneCountInString(req.Name) > 64 { return nil, gerror.New("产品名称必须在64字以内") } // 校验产品详情长度 if utf8.RuneCountInString(req.Description) > 8192 { return nil, gerror.New("产品详情必须在8192字以内") } // 去重检查:同一租户下名称唯一 existing, err := dao.Product.FindByName(ctx, req.Name) if err != nil { return nil, gerror.Wrap(err, "检查产品重复失败") } if existing != nil { return nil, gerror.Newf("产品名称已存在:name=%s, id=%s", req.Name, existing.Id.Hex()) } data := &entity.Product{} if err = utils.Struct(req, data); err != nil { return } // 先从token获取租户信息(在Insert之前) user, err := util.GetTenantInfo(ctx) if err != nil { return nil, gerror.Wrap(err, "获取租户信息失败") } tenantId := gconv.String(user.TenantId) if tenantId == "" { return nil, gerror.New("租户ID为空") } // 确保产品知识库存在(检查话术知识库是否存在,作为租户是否初始化的标志) speechcraftDatasetId, err := dao.RAGFlowConfig.FindDatasetIdByTenant(ctx, tenantId) if err != nil || speechcraftDatasetId == "" { return nil, gerror.Newf("租户知识库不存在,请先创建客服账号: tenant_id=%s", tenantId) } // 设置基础字段 now := gtime.Now().Time data.CreatedAt = &now // 取地址赋值给指针类型 data.UpdatedAt = &now // 取地址赋值给指针类型 data.IsDeleted = false // 统一使用string类型存储tenantId到MongoDB data.TenantId = tenantId // 插入产品到MongoDB _, err = dao.Product.Insert(ctx, data) if err != nil { return nil, err } // 确保租户知识库存在(产品和话术共享租户知识库) // 使用dataset_service提供的统一方法,自动处理创建、查找、保存等逻辑 datasetId, err := EnsureTenantDataset(ctx, tenantId) if err != nil { g.Log().Errorf(ctx, "确保租户知识库失败: %v", err) return nil, gerror.Wrap(err, "获取租户知识库失败") } g.Log().Infof(ctx, "租户%s的知识库ID: %s", tenantId, datasetId) // 同步上传到RAGFlow ragflowClient := ragflow.GetGlobalClient() g.Log().Infof(ctx, "准备上传产品到RAGFlow: product_id=%s, dataset_id=%s, name=%s", data.Id.Hex(), datasetId, data.Name) filename := fmt.Sprintf("%s.txt", data.Name) documentId, err := ragflowClient.UploadDocumentFromText(ctx, datasetId, data.Description, filename) if err != nil { // 回滚:删除刚插入的产品 dao.MongoDAO.Delete(ctx, bson.M{"_id": data.Id}, entity.ProductCollection) g.Log().Errorf(ctx, "产品上传RAGFlow失败: product_id=%s, dataset_id=%s, error=%v", data.Id.Hex(), datasetId, err) jaeger.RecordError(ctx, err, "产品上传RAGFlow失败") return nil, gerror.Wrap(err, "文档上传到知识库失败") } // 异步触发解析(grpool自动管理goroutine生命周期,WithoutCancel保留追踪避免取消) productGrpool.Add(ctx, func(ctx context.Context) { parseCtx := context.WithoutCancel(ctx) if err := ragflowClient.ParseDocuments(parseCtx, datasetId, []string{documentId}); err != nil { g.Log().Errorf(parseCtx, "文档解析失败: document_id=%s, error=%v", documentId, err) } else { g.Log().Infof(parseCtx, "文档解析成功: document_id=%s", documentId) } }) // 更新MongoDB的RagSyncRecords数组(使用空accountName表示租户级文档) syncTime := gtime.Now().Format("Y-m-d H:i:s") record := entity.RagSyncRecord{ AccountName: "", // 空表示租户级文档 RagDocumentId: documentId, RagSyncStatus: "synced", SyncTime: syncTime, RetryCount: 0, } filter := bson.M{"_id": data.Id} update := bson.M{ "$set": bson.M{ "ragSyncRecords": []entity.RagSyncRecord{record}, "ragLastSyncTime": syncTime, "updatedAt": gtime.Now().Time, }, } if _, _, err = dao.MongoDAO.UpdateOne(ctx, filter, update, entity.ProductCollection); err != nil { g.Log().Errorf(ctx, "更新产品RagSyncRecords失败: %v", err) // 不回滚,文档已上传成功 } g.Log().Infof(ctx, "产品添加成功并上传到知识库: product_id=%s, document_id=%s", data.Id.Hex(), documentId) res = &dto.AddProductRes{Id: data.Id.Hex()} return } // Update 更新产品 // 参数: ctx - 上下文,req - 更新产品请求(包含产品ID和待更新字段) // 返回: err - 错误信息 // 功能: 更新产品信息并同步到RAGFlow,支持文档删除重建 func (s *product) Update(ctx context.Context, req *dto.UpdateProductReq) (err error) { // 如果更新了产品名称,校验长度 if req.Name != "" && utf8.RuneCountInString(req.Name) > 64 { return gerror.New("产品名称必须在64字以内") } // 如果更新了产品详情,校验长度 if req.Description != "" && utf8.RuneCountInString(req.Description) > 8192 { return gerror.New("产品详情必须在8192字以内") } // 去重检查:如果修改名称,检查是否与其他产品重复 if req.Name != "" { existing, err := dao.Product.FindByName(ctx, req.Name) if err != nil { return gerror.Wrap(err, "检查产品重复失败") } if existing != nil && existing.Id.Hex() != req.Id { return gerror.Newf("产品名称已存在:name=%s, id=%s", req.Name, existing.Id.Hex()) } } return dao.Product.Update(ctx, req) } // Delete 删除产品 // 参数: ctx - 上下文,req - 删除产品请求(包含产品ID) // 返回: err - 错误信息 // 功能: 逻辑删除产品记录并从RAGFlow移除对应文档 func (s *product) Delete(ctx context.Context, req *dto.DeleteProductReq) (err error) { g.Log().Infof(ctx, "[Delete] 开始删除产品 - productId: %s", req.Id) // 1. 查询产品,获取RAGFlow同步记录(使用原生查询,避免租户过滤) objectId, err := bson.ObjectIDFromHex(req.Id) if err != nil { return gerror.Wrap(err, "无效的产品ID") } var product entity.Product filter := bson.M{"_id": objectId, "isDeleted": false} err = dao.MongoDAO.FindOne(ctx, filter, &product, entity.ProductCollection) if err != nil { if err.Error() == "mongo: no documents in result" { return gerror.New("产品不存在") } return gerror.Wrap(err, "查询产品失败") } g.Log().Infof(ctx, "[Delete] 查询到产品 - name: %s, ragSyncRecords数量: %d", product.Name, len(product.RagSyncRecords)) // 2. 删除RAGFlow中的文档 if len(product.RagSyncRecords) > 0 { ragflowClient := ragflow.GetGlobalClient() if ragflowClient != nil { tenantId := gconv.String(product.TenantId) // 查询租户的dataset_id datasetId, err := dao.RAGFlowConfig.FindDatasetIdByTenant(ctx, tenantId) if err != nil { g.Log().Warningf(ctx, "查询租户知识库ID失败: %v", err) } else if datasetId != "" { // 收集所有需要删除的document_id var documentIds []string for _, record := range product.RagSyncRecords { if record.RagDocumentId != "" { documentIds = append(documentIds, record.RagDocumentId) } } // 批量删除RAGFlow文档 if len(documentIds) > 0 { if err := ragflowClient.DeleteDocument(ctx, datasetId, documentIds); err != nil { g.Log().Errorf(ctx, "删除RAGFlow文档失败: %v, document_ids: %v", err, documentIds) // 不阻断删除流程,记录错误后继续 } else { g.Log().Infof(ctx, "成功删除RAGFlow文档: count=%d", len(documentIds)) } } } } } // 3. 软删除MongoDB记录 return dao.Product.Delete(ctx, req) } // List 获取产品列表 // 参数: ctx - 上下文,req - 列表查询请求(支持分页、关键词搜索) // 返回: res - 产品列表及分页信息,err - 错误信息 // 功能: 分页查询产品记录,支持按名称、描述模糊搜索 func (s *product) List(ctx context.Context, req *dto.ListProductReq) (res *dto.ListProductRes, err error) { list, total, err := dao.Product.List(ctx, req) if err != nil { return } res = &dto.ListProductRes{ List: list, Total: int(total), } return } // Export 导出产品为ZIP文件 // 参数: ctx - 上下文,req - 导出请求(包含筛选条件) // 返回: zipData - ZIP文件字节数组,filename - 文件名,err - 错误信息 // 功能: 将产品数据导出为ZIP文件,包含JSON格式的产品列表 func (s *product) Export(ctx context.Context, req *dto.ExportProductReq) (zipData []byte, filename string, err error) { // 清理输入参数,防止非法 UTF-8 字符 cleanName := strings.ToValidUTF8(req.Name, "") // 1. 查询所有符合条件的产品 products, err := dao.Product.FindAllForExport(ctx, cleanName) if err != nil { return nil, "", err } if len(products) == 0 { return nil, "", gerror.New("没有可导出的产品") } // 清理所有产品数据,确保 UTF-8 有效(防止数据库中的脏数据) for i := range products { products[i].Name = strings.ToValidUTF8(products[i].Name, "") products[i].Description = strings.ToValidUTF8(products[i].Description, "") // RagDocumentId字段在RagSyncRecords中,不在Product主体 } // 2. 创建 ZIP 文件(内存中) var buf bytes.Buffer zipWriter := zip.NewWriter(&buf) defer zipWriter.Close() // 3. 为每个产品生成 TXT 文件并添加到 ZIP for _, product := range products { // 生成 TXT 内容(产品详情) txtContent := s.generateTxt(product) // 文件名就是产品名称(清理特殊字符) cleanName := strings.ToValidUTF8(product.Name, "未命名") safeFilename := s.sanitizeFilename(cleanName) if safeFilename == "" { safeFilename = "product" } txtFilename := safeFilename + ".txt" // 添加文件到 ZIP writer, err := zipWriter.Create(txtFilename) if err != nil { return nil, "", gerror.Newf("创建ZIP文件失败: %v", err) } if _, err := writer.Write([]byte(txtContent)); err != nil { return nil, "", gerror.Newf("写入ZIP文件失败: %v", err) } } // 5. 生成下载文件名 timestamp := gtime.Now().Format("Ymd_His") filename = "products_export_" + timestamp + ".zip" return buf.Bytes(), filename, nil } // generateTxt 将产品转换为 TXT 格式 // 新格式:文件名=产品名称,内容=产品详情 func (s *product) generateTxt(product *entity.Product) string { // 清理产品详情,确保 UTF-8 有效 cleanDescription := strings.ToValidUTF8(product.Description, "") // 直接返回产品详情 if cleanDescription != "" { return cleanDescription } // 如果没有详情,返回空字符串 return "" } // sanitizeFilename 清理文件名中的特殊字符 func (s *product) sanitizeFilename(name string) string { // 替换不安全的文件名字符 replacer := map[rune]rune{ '/': '_', '\\': '_', ':': '_', '*': '_', '?': '_', '"': '_', '<': '_', '>': '_', '|': '_', } // 预分配容量,避免循环中动态扩容 result := make([]rune, 0, len(name)) for _, char := range name { if newChar, exists := replacer[char]; exists { result = append(result, newChar) } else { result = append(result, char) } } filename := string(result) // 限制文件名长度 if utf8.RuneCountInString(filename) > 50 { runes := []rune(filename) filename = string(runes[:50]) } return filename } // Import 从ZIP文件导入产品 // 参数: ctx - 上下文,file - 上传的ZIP文件 // 返回: res - 导入结果(成功和失败数量),err - 错误信息 // 功能: 从ZIP文件批量导入产品数据并同步到RAGFlow,失败记录加入重试队列 func (s *product) Import(ctx context.Context, file *multipart.FileHeader) (res *dto.ImportProductRes, err error) { res = &dto.ImportProductRes{ SuccessCount: 0, FailCount: 0, FailReasons: []string{}, } // 1. 获取租户信息 user, err := util.GetTenantInfo(ctx) if err != nil { return nil, gerror.Wrap(err, "获取租户信息失败") } tenantId := gconv.String(user.TenantId) if tenantId == "" { return nil, gerror.New("租户ID为空") } // 2. 打开上传的文件 uploadedFile, err := file.Open() if err != nil { return nil, gerror.Newf("无法打开上传的文件: %v", err) } defer uploadedFile.Close() // 3. 读取文件内容到内存 fileData, err := io.ReadAll(uploadedFile) if err != nil { return nil, gerror.Newf("读取文件失败: %v", err) } // 4. 解析 ZIP 文件 zipReader, err := zip.NewReader(bytes.NewReader(fileData), int64(len(fileData))) if err != nil { return nil, gerror.Newf("无法解析ZIP文件: %v", err) } // 4. 遍历 ZIP 中的所有文件 for _, zipFile := range zipReader.File { // 只处理 .txt 文件 if !strings.HasSuffix(strings.ToLower(zipFile.Name), ".txt") { continue } // 读取 TXT 文件内容(产品详情) txtContent, err := s.readZipFile(zipFile) if err != nil { res.FailCount++ res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+" 读取失败: "+err.Error()) continue } // 从文件名提取产品名称(移除 .txt 后缀) productName := strings.TrimSuffix(zipFile.Name, ".txt") productName = strings.TrimSpace(productName) // 创建产品数据 productData, err := s.parseSimpleTxt(productName, txtContent) if err != nil { res.FailCount++ res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+" 解析失败: "+err.Error()) continue } // 校验产品名称和详情长度 if utf8.RuneCountInString(productData.Name) > 64 { res.FailCount++ res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+": 产品名称超过64字") continue } if utf8.RuneCountInString(productData.Description) > 8192 { res.FailCount++ res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+": 产品详情超过8192字") continue } // 设置基础字段 now := gtime.Now().Time productData.CreatedAt = &now // 取地址赋值给指针类型 productData.UpdatedAt = &now // 取地址赋值给指针类型 productData.IsDeleted = false // 统一使用string类型存储tenantId到MongoDB productData.TenantId = tenantId // 插入数据库 _, err = dao.Product.Insert(ctx, productData) if err != nil { res.FailCount++ res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+" 数据库插入失败: "+err.Error()) continue } // 同步上传到RAGFlow产品知识库(使用外层已声明的tenantId变量) if tenantId != "" { datasetId := fmt.Sprintf("dataset_product_tenant_%s", tenantId) ragflowClient := ragflow.GetGlobalClient() if ragflowClient != nil { filename := fmt.Sprintf("%s.txt", productData.Name) documentId, uploadErr := ragflowClient.UploadDocumentFromText(ctx, datasetId, productData.Description, filename) if uploadErr != nil { // 上传失败:回滚删除MongoDB记录 dao.MongoDAO.Delete(ctx, bson.M{"_id": productData.Id}, entity.ProductCollection) res.FailCount++ res.FailReasons = append(res.FailReasons, "文件 "+zipFile.Name+" 上传知识库失败: "+uploadErr.Error()) continue } // 更新ragDocumentId filter := bson.M{"_id": productData.Id} update := bson.M{"$set": bson.M{"ragDocumentId": documentId}} dao.MongoDAO.UpdateOne(ctx, filter, update, entity.ProductCollection) g.Log().Infof(ctx, "ZIP产品上传成功: name=%s, document_id=%s", productData.Name, documentId) } } res.SuccessCount++ } return res, nil } // readZipFile 读取 ZIP 文件中的单个文件内容 func (s *product) readZipFile(file *zip.File) (string, error) { reader, err := file.Open() if err != nil { return "", err } defer reader.Close() content, err := io.ReadAll(reader) if err != nil { return "", err } return string(content), nil } // parseSimpleTxt 解析简化格式的 TXT 文件 // 文件名=产品名称,内容=产品详情 func (s *product) parseSimpleTxt(productName string, description string) (*entity.Product, error) { product := &entity.Product{} // 清理并验证产品名称 product.Name = strings.TrimSpace(strings.ToValidUTF8(productName, "")) if product.Name == "" { return nil, gerror.New("产品名称不能为空") } // 清理产品详情 product.Description = strings.TrimSpace(strings.ToValidUTF8(description, "")) return product, nil } // parseTxt 解析 TXT 文件内容为产品实体(旧格式兼容) func (s *product) parseTxt(content string) (*entity.Product, error) { product := &entity.Product{} lines := strings.Split(content, "\n") var inDescription bool var descriptionLines []string for _, line := range lines { line = strings.TrimSpace(line) // 跳过空行 if line == "" { continue } // 解析标题 if strings.HasPrefix(line, "=== ") && strings.HasSuffix(line, " ===") { product.Name = strings.TrimSpace(line[4 : len(line)-4]) continue } // 跳过基本信息标记 if line == "【基本信息】" { inDescription = false continue } // 产品详情开始 if line == "【产品详情】" { inDescription = true continue } // 解析基本信息字段 if !inDescription { // 跳过系统生成的字段(产品ID、创建时间等) if strings.HasPrefix(line, "产品ID:") || strings.HasPrefix(line, "创建时间:") || strings.HasPrefix(line, "更新时间:") || strings.HasPrefix(line, "导出时间:") { continue } // 解析 RAGFlow 文档 ID if strings.HasPrefix(line, "RAGFlow文档ID:") { // RAGFlow文档ID存储在RagSyncRecords中 continue } } // 收集产品详情 if inDescription { // 跳过分隔线和导出时间 if strings.Contains(line, "==================") || strings.HasPrefix(line, "导出时间:") { continue } if line != "暂无产品详情" { descriptionLines = append(descriptionLines, line) } } } // 拼接产品详情 product.Description = strings.Join(descriptionLines, "\n") // 校验必填字段 if product.Name == "" { return nil, gerror.New("产品名称不能为空") } return product, nil } // BindToCustomerServices 绑定产品到多个客服账号 func (p *product) BindToCustomerServices(ctx context.Context, req *dto.BindProductReq) (res *dto.BindProductRes, err error) { res = &dto.BindProductRes{} // 1. 查询产品 product, err := dao.Product.GetById(ctx, req.ProductId) if err != nil { return nil, gerror.Wrapf(err, "查询产品失败") } if product == nil { return nil, gerror.New("产品不存在") } // 2. 构建已存在的绑定map(去重) existingMap := make(map[string]bool) for _, csId := range product.AccountNames { existingMap[csId] = true } // 3. 过滤并添加新绑定 var newBindings []string var failedIds []string for _, csId := range req.AccountNames { // 检查去重:customer_service_id是否已存在 if existingMap[csId] { failedIds = append(failedIds, csId) g.Log().Warningf(ctx, "客服账号 %s 已绑定该产品,跳过", csId) continue } // 验证客服账号是否存在 csAccount, err := dao.CustomerServiceAccount.FindByAccountName(ctx, csId) if err != nil || csAccount == nil { failedIds = append(failedIds, csId) g.Log().Warningf(ctx, "客服账号 %s 不存在或已删除,跳过", csId) continue } newBindings = append(newBindings, csId) } // 4. 如果没有新的绑定,直接返回 if len(newBindings) == 0 { res.SuccessCount = 0 res.FailedIds = failedIds res.Message = "所有客服账号均已绑定或不存在" return res, nil } // 5. 更新产品绑定 product.AccountNames = append(product.AccountNames, newBindings...) if err = dao.Product.UpdateEntity(ctx, product); err != nil { return nil, gerror.Wrapf(err, "更新产品绑定失败") } // 6. 同步到RAGFlow(自动创建知识库) for _, csId := range newBindings { // 获取客服账号信息以获取tenant_id csAccount, err := dao.CustomerServiceAccount.FindByAccountName(ctx, csId) if err != nil || csAccount == nil { g.Log().Errorf(ctx, "获取客服账号信息失败: %s", csId) continue } // 同步到RAGFlow(会自动创建知识库) tenantId := gconv.String(csAccount.TenantId) _, err = p.SyncToRAGFlow(ctx, req.ProductId, csId, tenantId) if err != nil { g.Log().Errorf(ctx, "同步到RAGFlow失败: product_id=%s, cs_id=%s, error=%v", req.ProductId, csId, err) // 不阻断绑定流程,失败会进入重试队列 } } res.SuccessCount = len(newBindings) res.FailedIds = failedIds res.Message = "绑定成功" return } // UnbindFromCustomerService 从客服账号解绑产品 func (p *product) UnbindFromCustomerService(ctx context.Context, req *dto.UnbindProductReq) (res *dto.UnbindProductRes, err error) { res = &dto.UnbindProductRes{} product, err := dao.Product.GetById(ctx, req.ProductId) if err != nil { return nil, gerror.Wrapf(err, "查询产品失败") } if product == nil { return nil, gerror.New("产品不存在") } // 查找并移除绑定 var newBindings []string found := false for _, csId := range product.AccountNames { if csId == req.AccountName { found = true continue } newBindings = append(newBindings, csId) } if !found { res.Success = false res.Message = "未找到该绑定关系" return res, nil } product.AccountNames = newBindings if err = dao.Product.UpdateEntity(ctx, product); err != nil { return nil, gerror.Wrapf(err, "解绑失败") } res.Success = true res.Message = "解绑成功" return } // SyncToRAGFlow 同步产品到RAGFlow(租户级知识库) func (p *product) SyncToRAGFlow(ctx context.Context, productId, accountName, tenantId string) (documentId string, err error) { // 1. 查询产品 product, err := dao.Product.GetById(ctx, productId) if err != nil { return "", gerror.Wrapf(err, "查询产品失败") } if product == nil { return "", gerror.New("产品不存在") } // 2. 获取租户的产品知识库ID datasetId := fmt.Sprintf("dataset_product_tenant_%s", tenantId) // 2.1 确保知识库存在,不存在则自动创建 if err := p.ensureDatasetExists(ctx, datasetId, tenantId, "产品"); err != nil { return "", gerror.Wrapf(err, "确保知识库存在失败") } // 3. 调用RAGFlow上传文档 ragflowClient := ragflow.GetGlobalClient() filename := fmt.Sprintf("%s_%s.txt", product.Name, accountName) documentId, err = ragflowClient.UploadDocumentFromText(ctx, datasetId, product.Description, filename) if err != nil { jaeger.RecordError(ctx, err, "产品上传RAGFlow失败") p.sendToRetryQueue(ctx, productId, accountName, tenantId, 0) return "", err } // 4. 更新MongoDB的RagSyncRecord now := gtime.Now().Format("Y-m-d H:i:s") updated := false for i := range product.RagSyncRecords { record := &product.RagSyncRecords[i] if record.AccountName == accountName { record.RagDocumentId = documentId record.RagSyncStatus = "synced" record.SyncTime = now record.RetryCount = 0 updated = true break } } if !updated { product.RagSyncRecords = append(product.RagSyncRecords, entity.RagSyncRecord{ AccountName: accountName, RagDocumentId: documentId, RagSyncStatus: "synced", SyncTime: now, RetryCount: 0, }) } product.RagLastSyncTime = now if err = dao.Product.UpdateEntity(ctx, product); err != nil { return "", gerror.Wrapf(err, "更新产品同步状态失败") } glog.Infof(ctx, "产品同步成功: product_id=%s, account_name=%s, document_id=%s", productId, accountName, documentId) return documentId, nil } // ensureDatasetExists 已废弃,改用公共方法 EnsureTenantDataset // 保留此方法仅为兼容性,直接调用公共方法 func (p *product) ensureDatasetExists(ctx context.Context, datasetId, tenantId, datasetType string) error { _, err := EnsureTenantDataset(ctx, tenantId) return err } // sendToRetryQueue 发送到重试队列 func (p *product) sendToRetryQueue(ctx context.Context, productId, accountName, tenantId string, retryCount int) { msg := dto.RAGFlowSyncRetryMsg{ Type: "product", Id: productId, AccountName: accountName, TenantId: tenantId, RetryCount: retryCount, } var delay int switch retryCount { case 0: delay = 5 * 60 case 1: delay = 15 * 60 case 2: delay = 60 * 60 default: glog.Warningf(ctx, "产品同步重试次数超限,标记为失败: %s", productId) p.markSyncFailed(ctx, productId, accountName) return } if err := rabbitmq.PublishWithDelay(ctx, "ragflow.sync.retry.product", msg, delay); err != nil { jaeger.RecordError(ctx, err, "发送RAGFlow重试消息失败") } } // markSyncFailed 标记同步失败 func (p *product) markSyncFailed(ctx context.Context, productId, accountName string) { product, err := dao.Product.GetById(ctx, productId) if err != nil { return } for i := range product.RagSyncRecords { record := &product.RagSyncRecords[i] if record.AccountName == accountName { record.RagSyncStatus = "failed" record.SyncTime = gtime.Now().Format("Y-m-d H:i:s") break } } dao.Product.UpdateEntity(ctx, product) } // HandleRAGFlowSyncRetry RAGFlow同步重试消费者 func (p *product) HandleRAGFlowSyncRetry(ctx context.Context, msg dto.RAGFlowSyncRetryMsg) error { glog.Infof(ctx, "处理RAGFlow同步重试: type=%s, id=%s, retry=%d", msg.Type, msg.Id, msg.RetryCount) if msg.Type != "product" { return nil } _, err := p.SyncToRAGFlow(ctx, msg.Id, msg.AccountName, msg.TenantId) if err != nil { p.sendToRetryQueue(ctx, msg.Id, msg.AccountName, msg.TenantId, msg.RetryCount+1) return err } return nil } // ProductRetryConsumer 产品RAGFlow重试消费者 type ProductRetryConsumer struct { queueName string consumer *rabbitmq.Consumer } // NewProductRetryConsumer 创建产品RAGFlow重试消费者 func NewProductRetryConsumer(ctx context.Context) *ProductRetryConsumer { return &ProductRetryConsumer{ queueName: "ragflow.sync.retry.product", } } // Start 启动消费者 func (c *ProductRetryConsumer) Start(ctx context.Context) error { c.consumer = rabbitmq.NewConsumer(c.queueName, func(ctx context.Context, body []byte) error { var msg dto.RAGFlowSyncRetryMsg if err := json.Unmarshal(body, &msg); err != nil { return err } return Product.HandleRAGFlowSyncRetry(ctx, msg) }) return c.consumer.Start(ctx) } // Stop 停止消费者 func (c *ProductRetryConsumer) Stop(ctx context.Context) { if c.consumer != nil { c.consumer.Stop(ctx) } }