package service import ( "context" "rag/common/eino" "rag/consts/document" "rag/consts/public" "rag/dao" "rag/model/dto" "rag/model/entity" gmq "github.com/bjang03/gmq/core/gmq" "github.com/bjang03/gmq/mq" "github.com/bjang03/gmq/types" "github.com/cloudwego/eino/components/indexer" "github.com/cloudwego/eino/schema" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/util/gconv" ) var DocumentChunk = new(documentChunkService) type documentChunkService struct{} const ( DatasetIndexStatusReady = "ready" ) // Update 更新文件块 func (s *documentChunkService) Update(ctx context.Context, req *dto.UpdateDocumentChunkReq) (err error) { _, err = dao.DocumentChunk.Update(ctx, req) return } // List 获取文件块列表 func (s *documentChunkService) List(ctx context.Context, req *dto.ListDocumentChunkReq) (res *dto.ListDocumentChunkRes, err error) { list, total, err := dao.DocumentChunk.List(ctx, req) if err != nil { return } res = &dto.ListDocumentChunkRes{ Total: total, } err = gconv.Struct(list, &res.List) return } func (s *documentChunkService) DocsChunkMsg(ctx context.Context, msg any) (err error) { var docs = make([]*schema.Document, 0) msgMap := gconv.Map(msg) if err = gconv.Structs(msgMap["data"], &docs); err != nil { g.Log().Error(ctx, "DocsChunkMsg err:", err) return } if len(docs) == 0 { g.Log().Error(ctx, "DocsChunkMsg err:", "msg is empty") return } //ctx = context.WithValue(ctx, "user", &beans.User{ // TenantId: req[0].TenantId, // UserName: req[0].Creator, //}) // 调用eino接口获取向量 //var vectorDocsStr = make([]string, 0, len(req)) //for _, t := range req { // vectorDocsStr = append(vectorDocsStr, t.Content) //} //embeddings, err := eino.EmbedStrings(ctx, vectorDocsStr) //if err != nil { // g.Log().Error(ctx, "DocsChunkMsg err:", err) // err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusFailed.Code()) // return //} // 获取向量维度 //dimension := 0 //if len(embeddings) > 0 { // dimension = len(embeddings[0]) //} // 创建或更新DatasetIndex //err = s.createOrUpdateDatasetIndex(ctx, req[0].DatasetId, dimension, int64(len(req))) //if err != nil { // g.Log().Error(ctx, "CreateOrUpdateDatasetIndex err:", err) // err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusFailed.Code()) // return //} // 更新向量文档 //for i, embedding := range embeddings { // req[i].Vector = pgvector.NewVector(gconv.Float32s(embedding)) // req[i].VectorStatus = document.VectorStatusCompleted.Code() // req[i].Status = document.StatusEnable.Code() //} //_, err = dao.DocumentChunk.BatchInsert(ctx, req) //if err != nil { // g.Log().Error(ctx, "DocsChunkMsg err:", err) // err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusFailed.Code()) // return //} idx := eino.NewPGVectorIndexer(&eino.PGVectorIndexerOptions{ BatchSize: 10, }) rows, err := idx.Store(ctx, docs, indexer.WithEmbedding(eino.EmbedderDashscope)) if err != nil || rows == 0 { g.Log().Error(ctx, "DocsChunkMsg rows: , err:", rows, err) return } tenantId := docs[0].MetaData[entity.DocumentChunkCol.TenantId].(uint64) creator := docs[0].MetaData[entity.DocumentChunkCol.Creator].(string) documentId := docs[0].MetaData[entity.DocumentChunkCol.DocumentId].(int64) err = s.publishKnowledgeDocumentMsg(ctx, tenantId, creator, documentId, document.VectorStatusCompleted.Code()) return } //// createOrUpdateDatasetIndex 创建或更新数据集索引 //func (s *documentChunkService) createOrUpdateDatasetIndex(ctx context.Context, datasetId int64, dimension int, vectorCount int64) (err error) { // // 查询数据集是否已有索引 // existIndex, err := dao.DatasetIndex.GetByDatasetId(ctx, datasetId) // if err != nil && !errors.Is(err, sql.ErrNoRows) { // return err // } // // // 已有索引 → 只更新数量 // if existIndex != nil { // _ = dao.DatasetIndex.IncVectorCount(ctx, existIndex.Id, vectorCount) // return nil // } // // // ====================== 创建新索引 ====================== // indexName := fmt.Sprintf("idx_dataset_%d_vector", datasetId) // 真实PG索引名 // // 1. 插入索引配置 // index := &entity.DatasetIndex{ // DatasetId: datasetId, // Name: indexName, // Dimension: dimension, // FieldType: "float", // MetricType: "COSINE", // Status: gconv.PtrInt8(1), // VectorCount: vectorCount, // Description: fmt.Sprintf("数据集%d向量索引", datasetId), // } // _, err = dao.DatasetIndex.Insert(ctx, index) // if err != nil { // return err // } // // // 2. 真正创建 PGVector 索引(唯一真实索引!) // err = s.createRealPGVectorIndex(ctx, indexName) // return err //} // //// createRealPGVectorIndex 真正在PostgreSQL创建向量索引(真实可用) //func (s *documentChunkService) createRealPGVectorIndex(ctx context.Context, indexName string) error { // // 执行真实建索引语句 // err := dao.DatasetIndex.InsertIndex(ctx, indexName) // if err != nil { // g.Log().Error(ctx, "创建向量索引失败:", err) // return err // } // g.Log().Info(ctx, "PGVector真实索引创建成功:"+indexName) // return nil //} // publishKnowledgeDocumentMsg 发布消息 func (s *documentChunkService) publishKnowledgeDocumentMsg(ctx context.Context, tenantId uint64, creator string, documentId int64, vectorStatus document.VectorStatus) (err error) { knowledgeDocumentMsg := dto.KnowledgeDocumentMsg{ TenantId: tenantId, Creator: creator, Id: documentId, VectorStatus: vectorStatus, } err = gmq.GetGmq("primary").GmqPublish(ctx, &mq.RedisPubMessage{ PubMessage: types.PubMessage{ Topic: public.KnowledgeDocumentVectorStatusTopic, Data: knowledgeDocumentMsg, }, }) return }