diff --git a/common/eino/a.go b/common/eino/indexer.go similarity index 98% rename from common/eino/a.go rename to common/eino/indexer.go index e9bd1ea..f6f160e 100644 --- a/common/eino/a.go +++ b/common/eino/indexer.go @@ -44,13 +44,13 @@ func (i *PGVectorIndexer) Store(ctx context.Context, docs []*schema.Document, op // 回调 ctx = callbacks.OnStart(ctx, &indexer.CallbackInput{Docs: docs}) - ids, err := i.bulkStore(ctx, docs, commonOpts) + rows, err = i.bulkStore(ctx, docs, commonOpts) if err != nil { callbacks.OnError(ctx, err) return } - callbacks.OnEnd(ctx, &indexer.CallbackOutput{IDs: gconv.Strings(ids)}) + callbacks.OnEnd(ctx, &indexer.CallbackOutput{IDs: gconv.Strings(rows)}) return } diff --git a/common/eino/retriever.go b/common/eino/retriever.go new file mode 100644 index 0000000..a74e313 --- /dev/null +++ b/common/eino/retriever.go @@ -0,0 +1,117 @@ +package eino + +import ( + "context" + "errors" + + "github.com/cloudwego/eino/callbacks" + "github.com/cloudwego/eino/components/embedding" + "github.com/cloudwego/eino/components/retriever" + "github.com/cloudwego/eino/schema" + "github.com/gogf/gf/v2/util/gconv" + "github.com/pgvector/pgvector-go" +) + +type PGVectorRetrieverConfig struct { + Embedder embedding.Embedder + DefaultTopK int + DefaultIndex string +} + +type PGVectorRetriever struct { + embedder embedding.Embedder + topK int + index string +} + +func NewPGVectorRetriever(config *PGVectorRetrieverConfig) (*PGVectorRetriever, error) { + if config.Embedder == nil { + return nil, errors.New("embedder is required") + } + if config.DefaultTopK <= 0 { + config.DefaultTopK = 5 + } + + return &PGVectorRetriever{ + embedder: config.Embedder, + topK: config.DefaultTopK, + index: config.DefaultIndex, + }, nil +} + +func (r *PGVectorRetriever) Retrieve(ctx context.Context, query string, opts ...retriever.Option) ([]*schema.Document, error) { + + // 1. 处理公共 Option(官方标准写法) + options := &retriever.Options{ + Index: &r.index, + TopK: &r.topK, + Embedding: r.embedder, + } + options = retriever.GetCommonOptions(options, opts...) + + // 2. 回调(官方标准) + ctx = callbacks.OnStart(ctx, &retriever.CallbackInput{ + Query: query, + TopK: *options.TopK, + }) + + // 3. 执行检索 + docs, err := r.doRetrieve(ctx, query, options) + if err != nil { + callbacks.OnError(ctx, err) + return nil, err + } + + // 4. 完成回调 + callbacks.OnEnd(ctx, &retriever.CallbackOutput{ + Docs: docs, + }) + + return docs, nil +} + +func (r *PGVectorRetriever) doRetrieve(ctx context.Context, query string, opts *retriever.Options) ([]*schema.Document, error) { + + // 1. 生成向量 + vectors, err := opts.Embedding.EmbedStrings(ctx, []string{query}) + if err != nil { + return nil, err + } + if len(vectors) == 0 { + return nil, errors.New("empty query vector") + } + + queryVec := pgvector.NewVector(vectors[0]) + topK := *opts.TopK + + // 2. PG 向量相似度检索 SQL + sql := ` + SELECT id, content, dataset_id, document_id, + vector <-> ? AS distance + FROM document_chunk + ORDER BY distance ASC + LIMIT ? +` + + // 3. 查询 + rows, err := dao.DocumentChunk.GetDB().GetAll(ctx, sql, queryVec, topK) + if err != nil { + return nil, err + } + + // 4. 转为 Eino Document + docs := make([]*schema.Document, 0, len(rows)) + for _, row := range rows { + docs = append(docs, &schema.Document{ + ID: gconv.String(row["id"]), + Content: gconv.String(row["content"]), + Metadata: map[string]any{ + "dataset_id": row["dataset_id"], + "document_id": row["document_id"], + "distance": row["distance"], + }, + }) + } + + return docs, nil +} diff --git a/config.yml b/config.yml index 3996fb8..93fddcb 100644 --- a/config.yml +++ b/config.yml @@ -12,8 +12,9 @@ database: user: "postgres" pass: "Bjang09@686^*^" name: "rag" + prefix: "rag_knowledge_" # (可选)表名前缀 role: "master" # (可选)数据库主从角色(master/slave),默认为master。如果不使用应用主从机制请不配置或留空即可。 - debug: false # (可选)开启调试模式 + debug: true # (可选)开启调试模式 dryRun: false # (可选)ORM空跑(只读不写) charset: "utf8" # (可选)数据库编码(如: utf8mb4/utf8/gbk/gb2312),一般设置为utf8mb4。默认为utf8。 timezone: "Asia/Shanghai" # (可选)时区配置,例如:Local @@ -30,7 +31,8 @@ database: port: "15432" user: "postgres" pass: "Bjang09@686^*^" - name: "rag" + name: "tenant-1" + prefix: "rag_knowledge_" # (可选)表名前缀 role: "slave" # (可选)数据库主从角色(master/slave),默认为master。如果不使用应用主从机制请不配置或留空即可。 debug: false # (可选)开启调试模式 dryRun: false # (可选)ORM空跑(只读不写) @@ -44,15 +46,36 @@ database: updatedAt: "updated_at" # (可选)自动更新时间字段名称 deletedAt: "deleted_at" # (可选)软删除时间字段名称 timeMaintainDisabled: false # (可选)是否完全关闭时间更新特性,为true时CreatedAt/UpdatedAt/DeletedAt都将失效 - tenant-1: + rag_knowledge: - type: "pgsql" host: "localhost" port: "5432" user: "postgres" pass: "123456" - name: "tenant" + name: "tenant-1" + prefix: "rag_knowledge_" # (可选)表名前缀 + role: "master" + debug: true # (可选)开启调试模式 + dryRun: false # (可选)ORM空跑(只读不写) + charset: "utf8" # (可选)数据库编码(如: utf8mb4/utf8/gbk/gb2312),一般设置为utf8mb4。默认为utf8。 + timezone: "Asia/Shanghai" # (可选)时区配置,例如:Local + maxIdle: 5 # (可选)连接池最大闲置的连接数(默认10) + maxOpen: 20 # (可选)连接池最大打开的连接数(默认无限制) + maxLifetime: "30s" # (可选)连接对象可重复使用的时间长度(默认30秒) + maxIdleConnTime: "30s" # (可选,v2.10新增)连接池中空闲连接的最大生存时间(默认30秒)。可以通过配置文件或SetConnMaxIdleTime方法设置,避免长时间空闲连接占用资源。 + createdAt: "created_at" # (可选)自动创建时间字段名称 + updatedAt: "updated_at" # (可选)自动更新时间字段名称 + deletedAt: "deleted_at" # (可选)软删除时间字段名称 + timeMaintainDisabled: false # (可选)是否完全关闭时间更新特性,为true时CreatedAt/UpdatedAt/DeletedAt都将失效 + rag_vector: + - type: "pgsql" + host: "localhost" + port: "5432" + user: "postgres" + pass: "123456" + name: "tenant-1" + prefix: "rag_vector_" # (可选)表名前缀 role: "master" - prefix: "rag_" # (可选)表名前缀 debug: true # (可选)开启调试模式 dryRun: false # (可选)ORM空跑(只读不写) charset: "utf8" # (可选)数据库编码(如: utf8mb4/utf8/gbk/gb2312),一般设置为utf8mb4。默认为utf8。 diff --git a/consts/public/table_name.go b/consts/public/table_name.go index 55e4859..e9a6ef3 100644 --- a/consts/public/table_name.go +++ b/consts/public/table_name.go @@ -1,5 +1,10 @@ package public +const ( + DbNameKnowledge = "rag_knowledge" + DbNameVector = "rag_vector" +) + // sql 数据库表名 const ( TableNameDocument = "document" diff --git a/dao/dataset.go b/dao/dataset.go index 35c0f68..dcf80d8 100644 --- a/dao/dataset.go +++ b/dao/dataset.go @@ -22,7 +22,7 @@ func (d *datasetDao) Insert(ctx context.Context, req *dto.CreateDatasetReq) (id if err = gconv.Struct(req, &res); err != nil { return } - r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDataset).Data(&res).Insert() + r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDataset).Data(&res).Insert() if err != nil { return } @@ -31,7 +31,7 @@ func (d *datasetDao) Insert(ctx context.Context, req *dto.CreateDatasetReq) (id // Update 更新数据集 func (d *datasetDao) Update(ctx context.Context, req *dto.UpdateDatasetReq) (rows int64, err error) { - model := gfdb.DB(ctx).Model(ctx, public.TableNameDataset).OmitEmpty() + model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDataset).OmitEmpty() if !g.IsEmpty(req.DocumentCount) { model.Data(entity.DatasetCol.DocumentCount, &gdb.Counter{ Field: entity.DatasetCol.DocumentCount, @@ -53,7 +53,7 @@ func (d *datasetDao) Update(ctx context.Context, req *dto.UpdateDatasetReq) (row // Delete 删除数据集 func (d *datasetDao) Delete(ctx context.Context, req *dto.DeleteDatasetReq) (rows int64, err error) { - r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDataset).Where(entity.DatasetCol.Id, req.Id).Delete() + r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDataset).Where(entity.DatasetCol.Id, req.Id).Delete() if err != nil { return } @@ -61,7 +61,7 @@ func (d *datasetDao) Delete(ctx context.Context, req *dto.DeleteDatasetReq) (row } func (d *datasetDao) GetByID(ctx context.Context, req *dto.GetDatasetReq, fields ...string) (res *entity.Dataset, err error) { - r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDataset).Where(entity.DatasetCol.Id, req.Id).Fields(fields).One() + r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDataset).Where(entity.DatasetCol.Id, req.Id).Fields(fields).One() if err != nil { return } @@ -71,7 +71,7 @@ func (d *datasetDao) GetByID(ctx context.Context, req *dto.GetDatasetReq, fields // List 获取数据集列表 func (d *datasetDao) List(ctx context.Context, req *dto.ListDatasetReq, fields ...string) (res []*entity.Dataset, total int, err error) { - model := gfdb.DB(ctx).Model(ctx, public.TableNameDataset).Fields(fields).OmitEmpty() + model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDataset).Fields(fields).OmitEmpty() if !g.IsEmpty(req.Keyword) { model.WhereLike(entity.DatasetCol.Name, "%"+req.Keyword+"%") } diff --git a/dao/dataset_index.go b/dao/dataset_index.go index 0df04b7..fa9c078 100644 --- a/dao/dataset_index.go +++ b/dao/dataset_index.go @@ -16,7 +16,7 @@ type datasetIndexDao struct{} // Insert 插入数据集索引 func (d *datasetIndexDao) Insert(ctx context.Context, index *entity.DatasetIndex) (id int64, err error) { - _, err = gfdb.DB(ctx).Model(ctx, public.TableNameDatasetIndex).Data(index).Insert() + _, err = gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDatasetIndex).Data(index).Insert() if err != nil { return } @@ -25,7 +25,7 @@ func (d *datasetIndexDao) Insert(ctx context.Context, index *entity.DatasetIndex // GetByDatasetId 根据数据集ID获取索引 func (d *datasetIndexDao) GetByDatasetId(ctx context.Context, datasetId int64) (result *entity.DatasetIndex, err error) { - err = gfdb.DB(ctx).Model(ctx, public.TableNameDatasetIndex).Where(entity.DatasetIndexCol.DatasetId, datasetId).Scan(&result) + err = gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDatasetIndex).Where(entity.DatasetIndexCol.DatasetId, datasetId).Scan(&result) if err != nil { if err == sql.ErrNoRows { return nil, nil @@ -37,23 +37,20 @@ func (d *datasetIndexDao) GetByDatasetId(ctx context.Context, datasetId int64) ( // IncVectorCount 增加或减少向量数量 func (d *datasetIndexDao) IncVectorCount(ctx context.Context, id int64, delta int64) (err error) { - _, err = gfdb.DB(ctx).Model(ctx, public.TableNameDatasetIndex). + _, err = gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDatasetIndex). Where(entity.DatasetIndexCol.Id, id). Increment(entity.DatasetIndexCol.VectorCount, delta) return } func (d *datasetIndexDao) InsertIndex(ctx context.Context, indexName string) (err error) { - prefix, err := gfdb.GetTablePrefix(ctx) - if err != nil { - return - } + db := gfdb.DB(ctx, public.DbNameVector) sqlStr := fmt.Sprintf(` CREATE INDEX IF NOT EXISTS %s ON %s USING ivfflat (vector vector_cosine_ops) WHERE vector IS NOT NULL; - `, indexName, prefix+public.TableNameDocumentChunk) - _, err = gfdb.DB(ctx).Exec(ctx, sqlStr) + `, indexName, gfdb.TablePrefix+public.TableNameDocumentChunk) + _, err = db.Exec(ctx, sqlStr) return } diff --git a/dao/document.go b/dao/document.go index b2f2a21..5f943fc 100644 --- a/dao/document.go +++ b/dao/document.go @@ -22,7 +22,7 @@ func (d *documentDao) Insert(ctx context.Context, req *dto.CreateDocumentReq) (i if err = gconv.Struct(req, &res); err != nil { return } - r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDocument).Data(&res).Insert() + r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDocument).Data(&res).Insert() if err != nil { return } @@ -31,7 +31,7 @@ func (d *documentDao) Insert(ctx context.Context, req *dto.CreateDocumentReq) (i // Update 更新文件 func (d *documentDao) Update(ctx context.Context, req *dto.UpdateDocumentReq) (rows int64, err error) { - model := gfdb.DB(ctx).Model(ctx, public.TableNameDocument).OmitEmpty() + model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDocument).OmitEmpty() if !g.IsEmpty(req.ChunkCount) { model.Data(entity.DocumentCol.ChunkCount, &gdb.Counter{ Field: entity.DocumentCol.ChunkCount, @@ -48,7 +48,7 @@ func (d *documentDao) Update(ctx context.Context, req *dto.UpdateDocumentReq) (r // Delete 删除文件 func (d *documentDao) Delete(ctx context.Context, req *dto.DeleteDocumentReq) (rows int64, err error) { - r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDocument).Where(entity.DocumentCol.Id, req.Id).Delete() + r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDocument).Where(entity.DocumentCol.Id, req.Id).Delete() if err != nil { return } @@ -57,7 +57,7 @@ func (d *documentDao) Delete(ctx context.Context, req *dto.DeleteDocumentReq) (r // GetByID 根据ID获取文件 func (d *documentDao) GetByID(ctx context.Context, req *dto.GetDocumentReq, fields ...string) (res *entity.Document, err error) { - r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDocument).Where(entity.DocumentCol.Id, req.Id).Fields(fields).One() + r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDocument).Where(entity.DocumentCol.Id, req.Id).Fields(fields).One() if err != nil { return } @@ -67,7 +67,7 @@ func (d *documentDao) GetByID(ctx context.Context, req *dto.GetDocumentReq, fiel // List 获取文件列表 func (d *documentDao) List(ctx context.Context, req *dto.ListDocumentReq, fields ...string) (res []*entity.Document, total int, err error) { - model := gfdb.DB(ctx).Model(ctx, public.TableNameDocument).OmitEmpty() + model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameDocument).OmitEmpty() if !g.IsEmpty(req.Keyword) { model.WhereLike(entity.DocumentCol.Title, "%"+req.Keyword+"%") } diff --git a/dao/document_chunk.go b/dao/document_chunk.go index f99f7a1..c39501f 100644 --- a/dao/document_chunk.go +++ b/dao/document_chunk.go @@ -20,7 +20,7 @@ func (d *documentChunkDao) BatchInsert(ctx context.Context, req []*dto.VectorDoc if err = gconv.Structs(req, &res); err != nil { return } - r, err := gfdb.DB(ctx).Model(ctx, public.TableNameDocumentChunk).Data(&res).Insert() + r, err := gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDocumentChunk).Data(&res).Insert() if err != nil { return } @@ -29,7 +29,7 @@ func (d *documentChunkDao) BatchInsert(ctx context.Context, req []*dto.VectorDoc // Update 更新文件块 func (d *documentChunkDao) Update(ctx context.Context, req *dto.UpdateDocumentChunkReq) (rows int64, err error) { - model := gfdb.DB(ctx).Model(ctx, public.TableNameDocumentChunk) + model := gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDocumentChunk) r, err := model.Data(&req).Where(entity.DocumentChunkCol.Id, req.Id).Update() if err != nil { return @@ -39,7 +39,7 @@ func (d *documentChunkDao) Update(ctx context.Context, req *dto.UpdateDocumentCh // List 文件块列表 func (d *documentChunkDao) List(ctx context.Context, req *dto.ListDocumentChunkReq, fields ...string) (res []*entity.DocumentChunk, total int, err error) { - model := gfdb.DB(ctx).Model(ctx, public.TableNameDocumentChunk).Fields(fields).OmitEmpty(). + model := gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDocumentChunk).Fields(fields).OmitEmpty(). Where(entity.DocumentChunkCol.DatasetId, req.DatasetId). Where(entity.DocumentChunkCol.DocumentId, req.DocumentId). Where(entity.DocumentChunkCol.Status, req.Status). @@ -55,50 +55,3 @@ func (d *documentChunkDao) List(ctx context.Context, req *dto.ListDocumentChunkR err = r.Structs(&res) return } - -//// Insert 插入向量文档 -//func (d *vectorDocumentDao) Insert(ctx context.Context, docs []*entity.DocumentChunk) (ids []interface{}, err error) { -// if len(docs) == 0 { -// return -// } -// interfaces := make([]interface{}, len(docs)) -// for i := range docs { -// interfaces[i] = docs[i] -// } -// return mongoDB.Insert(ctx, interfaces, CollectionVectorDoc) -//} -// -//// DeleteByIDs 根据ID删除向量文档 -//func (d *vectorDocumentDao) DeleteByIDs(ctx context.Context, ids []string) (err error) { -// if len(ids) == 0 { -// return -// } -// objectIDs := make([]bson.ObjectID, len(ids)) -// for i, id := range ids { -// objectIDs[i], err = bson.ObjectIDFromHex(id) -// if err != nil { -// return err -// } -// } -// filter := bson.M{"_id": bson.M{"$in": objectIDs}} -// _, err = mongoDB.Delete(ctx, filter, CollectionVectorDoc) -// return -//} -// -//// GetByIndexID 根据索引ID获取向量文档 -//func (d *vectorDocumentDao) GetByIndexID(ctx context.Context, indexID string, limit int) (result []*entity.DocumentChunk, err error) { -// filter := bson.M{"indexId": indexID} -// page := &beans.Page{PageNum: 1, PageSize: int64(limit)} -// _, err = mongoDB.Find(ctx, filter, &result, CollectionVectorDoc, page, nil) -// return -//} -// -//// GetByVectorIDs 根据向量ID获取向量文档 -//func (d *vectorDocumentDao) GetByVectorIDs(ctx context.Context, vectorIDs []string) (result []*entity.DocumentChunk, err error) { -// if len(vectorIDs) == 0 { -// return -// } -// filter := bson.M{"vectorId": bson.M{"$in": vectorIDs}} -// _, err = mongoDB.Find(ctx, filter, &result, CollectionVectorDoc, &beans.Page{PageSize: -1}, nil) -// return -//} diff --git a/dao/keyword.go b/dao/keyword.go index f5544a3..156648c 100644 --- a/dao/keyword.go +++ b/dao/keyword.go @@ -20,7 +20,7 @@ func (d *keywordDao) Insert(ctx context.Context, req *dto.CreateKeywordReq) (id if err = gconv.Struct(req, &res); err != nil { return } - r, err := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).Data(&res).Insert() + r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).Data(&res).Insert() if err != nil { return } @@ -32,7 +32,7 @@ func (d *keywordDao) BatchSaveOrUpdate(ctx context.Context, req []*dto.CreateKey if err = gconv.Structs(req, &res); err != nil { return } - r, err := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).Data(&res).OnConflict( + r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).Data(&res).OnConflict( entity.KeywordCol.TenantId, entity.KeywordCol.DatasetId, entity.KeywordCol.DocumentId, @@ -44,7 +44,7 @@ func (d *keywordDao) BatchSaveOrUpdate(ctx context.Context, req []*dto.CreateKey } func (d *keywordDao) Update(ctx context.Context, req *dto.UpdateKeywordReq) (rows int64, err error) { - model := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword) + model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword) r, err := model.Data(&req).Where(entity.KeywordCol.Id, req.Id).Update() if err != nil { return @@ -53,7 +53,7 @@ func (d *keywordDao) Update(ctx context.Context, req *dto.UpdateKeywordReq) (row } func (d *keywordDao) Delete(ctx context.Context, req *dto.DeleteKeywordReq) (rows int64, err error) { - r, err := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).Where(entity.KeywordCol.Id, req.Id).Delete() + r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).Where(entity.KeywordCol.Id, req.Id).Delete() if err != nil { return } @@ -61,7 +61,7 @@ func (d *keywordDao) Delete(ctx context.Context, req *dto.DeleteKeywordReq) (row } func (d *keywordDao) Count(ctx context.Context, req *dto.ListKeywordReq) (count int, err error) { - count, err = gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).OmitEmpty(). + count, err = gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).OmitEmpty(). Where(entity.KeywordCol.DatasetId, req.DatasetId). Where(entity.KeywordCol.DocumentId, req.DocumentId). Where(entity.KeywordCol.Word, req.Word).Count() @@ -69,7 +69,7 @@ func (d *keywordDao) Count(ctx context.Context, req *dto.ListKeywordReq) (count } func (d *keywordDao) GetByID(ctx context.Context, req *dto.GetKeywordReq, fields ...string) (res *entity.Document, err error) { - r, err := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).Where(entity.KeywordCol.Id, req.Id).Fields(fields).One() + r, err := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).Where(entity.KeywordCol.Id, req.Id).Fields(fields).One() if err != nil { return } @@ -78,7 +78,7 @@ func (d *keywordDao) GetByID(ctx context.Context, req *dto.GetKeywordReq, fields } func (d *keywordDao) List(ctx context.Context, req *dto.ListKeywordReq, fields ...string) (res []*entity.Keyword, total int, err error) { - model := gfdb.DB(ctx).Model(ctx, public.TableNameKeyword).Fields(fields).OmitEmpty() + model := gfdb.DB(ctx, public.DbNameKnowledge).Model(ctx, public.TableNameKeyword).Fields(fields).OmitEmpty() if !g.IsEmpty(req.Keyword) { model.WhereLike(entity.KeywordCol.Word, "%"+req.Keyword+"%") } diff --git a/service/document.go b/service/document.go index 47af42f..12c9964 100644 --- a/service/document.go +++ b/service/document.go @@ -2,6 +2,7 @@ package service import ( "context" + "errors" "fmt" "rag/common/eino" "rag/common/gse" @@ -123,6 +124,9 @@ func (s *documentService) Process(ctx context.Context, req *dto.ProcessDocumentR if err != nil { return nil, err } + if g.IsEmpty(doc) { + return nil, errors.New("document not found") + } // 2. 使用eino框架进行文件切分(并发执行) var vectorDocsCount, chunks int64 diff --git a/service/document_chunk.go b/service/document_chunk.go index 103308d..d60303a 100644 --- a/service/document_chunk.go +++ b/service/document_chunk.go @@ -57,49 +57,6 @@ func (s *documentChunkService) DocsChunkMsg(ctx context.Context, msg any) (err e return } - //ctx = context.WithValue(ctx, "user", &beans.User{ - // TenantId: req[0].TenantId, - // UserName: req[0].Creator, - //}) - - // 调用eino接口获取向量 - //var vectorDocsStr = make([]string, 0, len(req)) - //for _, t := range req { - // vectorDocsStr = append(vectorDocsStr, t.Content) - //} - //embeddings, err := eino.EmbedStrings(ctx, vectorDocsStr) - //if err != nil { - // g.Log().Error(ctx, "DocsChunkMsg err:", err) - // err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusFailed.Code()) - // return - //} - - // 获取向量维度 - //dimension := 0 - //if len(embeddings) > 0 { - // dimension = len(embeddings[0]) - //} - - // 创建或更新DatasetIndex - //err = s.createOrUpdateDatasetIndex(ctx, req[0].DatasetId, dimension, int64(len(req))) - //if err != nil { - // g.Log().Error(ctx, "CreateOrUpdateDatasetIndex err:", err) - // err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusFailed.Code()) - // return - //} - - // 更新向量文档 - //for i, embedding := range embeddings { - // req[i].Vector = pgvector.NewVector(gconv.Float32s(embedding)) - // req[i].VectorStatus = document.VectorStatusCompleted.Code() - // req[i].Status = document.StatusEnable.Code() - //} - //_, err = dao.DocumentChunk.BatchInsert(ctx, req) - //if err != nil { - // g.Log().Error(ctx, "DocsChunkMsg err:", err) - // err = s.publishKnowledgeDocumentMsg(ctx, req[0].TenantId, req[0].Creator, req[0].DocumentId, document.VectorStatusFailed.Code()) - // return - //} idx := eino.NewPGVectorIndexer(&eino.PGVectorIndexerOptions{ BatchSize: 10, }) @@ -108,63 +65,14 @@ func (s *documentChunkService) DocsChunkMsg(ctx context.Context, msg any) (err e g.Log().Error(ctx, "DocsChunkMsg rows: , err:", rows, err) return } - tenantId := docs[0].MetaData[entity.DocumentChunkCol.TenantId].(uint64) - creator := docs[0].MetaData[entity.DocumentChunkCol.Creator].(string) - documentId := docs[0].MetaData[entity.DocumentChunkCol.DocumentId].(int64) + tenantId := gconv.Uint64(docs[0].MetaData[entity.DocumentChunkCol.TenantId]) + creator := gconv.String(docs[0].MetaData[entity.DocumentChunkCol.Creator]) + documentId := gconv.Int64(docs[0].MetaData[entity.DocumentChunkCol.DocumentId]) err = s.publishKnowledgeDocumentMsg(ctx, tenantId, creator, documentId, document.VectorStatusCompleted.Code()) return } -//// createOrUpdateDatasetIndex 创建或更新数据集索引 -//func (s *documentChunkService) createOrUpdateDatasetIndex(ctx context.Context, datasetId int64, dimension int, vectorCount int64) (err error) { -// // 查询数据集是否已有索引 -// existIndex, err := dao.DatasetIndex.GetByDatasetId(ctx, datasetId) -// if err != nil && !errors.Is(err, sql.ErrNoRows) { -// return err -// } -// -// // 已有索引 → 只更新数量 -// if existIndex != nil { -// _ = dao.DatasetIndex.IncVectorCount(ctx, existIndex.Id, vectorCount) -// return nil -// } -// -// // ====================== 创建新索引 ====================== -// indexName := fmt.Sprintf("idx_dataset_%d_vector", datasetId) // 真实PG索引名 -// // 1. 插入索引配置 -// index := &entity.DatasetIndex{ -// DatasetId: datasetId, -// Name: indexName, -// Dimension: dimension, -// FieldType: "float", -// MetricType: "COSINE", -// Status: gconv.PtrInt8(1), -// VectorCount: vectorCount, -// Description: fmt.Sprintf("数据集%d向量索引", datasetId), -// } -// _, err = dao.DatasetIndex.Insert(ctx, index) -// if err != nil { -// return err -// } -// -// // 2. 真正创建 PGVector 索引(唯一真实索引!) -// err = s.createRealPGVectorIndex(ctx, indexName) -// return err -//} -// -//// createRealPGVectorIndex 真正在PostgreSQL创建向量索引(真实可用) -//func (s *documentChunkService) createRealPGVectorIndex(ctx context.Context, indexName string) error { -// // 执行真实建索引语句 -// err := dao.DatasetIndex.InsertIndex(ctx, indexName) -// if err != nil { -// g.Log().Error(ctx, "创建向量索引失败:", err) -// return err -// } -// g.Log().Info(ctx, "PGVector真实索引创建成功:"+indexName) -// return nil -//} - // publishKnowledgeDocumentMsg 发布消息 func (s *documentChunkService) publishKnowledgeDocumentMsg(ctx context.Context, tenantId uint64, creator string, documentId int64, vectorStatus document.VectorStatus) (err error) { knowledgeDocumentMsg := dto.KnowledgeDocumentMsg{ diff --git a/update.sql b/update.sql index 4da8bca..aebb904 100644 --- a/update.sql +++ b/update.sql @@ -134,9 +134,9 @@ CREATE TABLE IF NOT EXISTS rag_knowledge_keyword ( ); -- 唯一索引:保证 租户 + 数据集 + 文档 + 关键词 全局唯一 -CREATE UNIQUE INDEX uk_rag_knowledge_keyword_tenant_dataset_doc_word - ON rag_knowledge_keyword(tenant_id, dataset_id, document_id, word) - WHERE deleted_at IS NULL; +-- CREATE UNIQUE INDEX uk_rag_knowledge_keyword_tenant_dataset_doc_word +-- ON rag_knowledge_keyword(tenant_id, dataset_id, document_id, word) +-- WHERE deleted_at IS NULL; -- 索引(按业务高频查询) CREATE INDEX idx_keyword_tenant_id ON rag_knowledge_keyword(tenant_id); @@ -159,4 +159,119 @@ COMMENT ON COLUMN rag_knowledge_keyword.document_id IS '文档ID'; COMMENT ON COLUMN rag_knowledge_keyword.word IS '关键词'; COMMENT ON COLUMN rag_knowledge_keyword.weight IS '权重'; ---------------------pgsql创建rag_knowledge_keyword表语句--------------------------- \ No newline at end of file +--------------------pgsql创建rag_knowledge_keyword表语句--------------------------- + + + +--------------------pgsql创建rag_vector_dataset_index表语句--------------------------- +-- 向量数据集索引表 +CREATE TABLE IF NOT EXISTS rag_vector_dataset_index ( + -- 基础字段 + id BIGINT PRIMARY KEY, -- 主键ID(非自增) + tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID int8 + creator VARCHAR(64) NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updater VARCHAR(64) NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + deleted_at timestamp(6), + + -- 核心字段 + dataset_id INT8 NOT NULL, + name VARCHAR(255) NOT NULL, + collection VARCHAR(255) NOT NULL, + dimension INT NOT NULL, + field_type VARCHAR(50) NOT NULL, + metric_type VARCHAR(50) NOT NULL, + status SMALLINT NOT NULL DEFAULT 1, -- 状态:1启用/0停用 + vector_count INT8 NOT NULL DEFAULT 0, + description TEXT + ); + +-- 唯一约束 +ALTER TABLE rag_vector_dataset_index ADD CONSTRAINT uk_dataset_id_name UNIQUE (dataset_id, name); + +-- 索引 +CREATE INDEX idx_dataset_index_tenant_id ON rag_vector_dataset_index(tenant_id); +CREATE INDEX idx_dataset_index_dataset_id ON rag_vector_dataset_index(dataset_id); +CREATE INDEX idx_dataset_index_status ON rag_vector_dataset_index(status); + +-- 注释 +COMMENT ON TABLE rag_vector_dataset_index IS '向量数据集索引表'; +COMMENT ON COLUMN rag_vector_dataset_index.id IS '主键ID(非自增)'; +COMMENT ON COLUMN rag_vector_dataset_index.tenant_id IS '租户ID'; +COMMENT ON COLUMN rag_vector_dataset_index.creator IS '创建人'; +COMMENT ON COLUMN rag_vector_dataset_index.created_at IS '创建时间'; +COMMENT ON COLUMN rag_vector_dataset_index.updater IS '更新人'; +COMMENT ON COLUMN rag_vector_dataset_index.updated_at IS '更新时间'; +COMMENT ON COLUMN rag_vector_dataset_index.deleted_at IS '删除时间(软删)'; +COMMENT ON COLUMN rag_vector_dataset_index.dataset_id IS '数据集ID'; +COMMENT ON COLUMN rag_vector_dataset_index.name IS '索引名称'; +COMMENT ON COLUMN rag_vector_dataset_index.collection IS '向量集合名称'; +COMMENT ON COLUMN rag_vector_dataset_index.dimension IS '向量维度'; +COMMENT ON COLUMN rag_vector_dataset_index.field_type IS '字段类型'; +COMMENT ON COLUMN rag_vector_dataset_index.metric_type IS '度量类型'; +COMMENT ON COLUMN rag_vector_dataset_index.status IS '状态'; +COMMENT ON COLUMN rag_vector_dataset_index.vector_count IS '向量数量'; +COMMENT ON COLUMN rag_vector_dataset_index.description IS '描述'; + +--------------------pgsql创建rag_vector_dataset_index表语句--------------------------- + +--------------------pgsql创建rag_vector_document_chunk表语句--------------------------- + +CREATE EXTENSION IF NOT EXISTS vector; + +-- 文档分块向量表 +CREATE TABLE IF NOT EXISTS rag_vector_document_chunk ( + -- 基础字段 + id BIGINT PRIMARY KEY, -- 主键ID(非自增) + tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID int8 + creator VARCHAR(64) NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updater VARCHAR(64) NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + deleted_at timestamp(6), + + -- 核心字段 + status SMALLINT NOT NULL DEFAULT 1, -- 状态:1启用/0停用 + vector_status SMALLINT NOT NULL DEFAULT 1, -- 向量化状态: 1pending, 2processing, 3completed, 4failed,5partCompleted + dataset_id INT8 NOT NULL, + document_id INT8 NOT NULL, + content TEXT NOT NULL, + content_hash VARCHAR(128) NOT NULL, + chunk_index INT8 NOT NULL, + + -- 向量字段(pgvector) + vector vector(1024) NOT NULL, + + -- 扩展信息 + metadata JSONB + ); + +-- 索引 +CREATE INDEX idx_chunk_tenant_id ON rag_vector_document_chunk(tenant_id); +CREATE INDEX idx_chunk_dataset_id ON rag_vector_document_chunk(dataset_id); +CREATE INDEX idx_chunk_document_id ON rag_vector_document_chunk(document_id); +CREATE INDEX idx_chunk_content_hash ON rag_vector_document_chunk(content_hash); +CREATE INDEX idx_chunk_status ON rag_vector_document_chunk(status); +CREATE INDEX idx_chunk_vector_status ON rag_vector_document_chunk(vector_status); + +-- 注释 +COMMENT ON TABLE rag_vector_document_chunk IS '文档分块向量表'; +COMMENT ON COLUMN rag_vector_document_chunk.id IS '主键ID(非自增)'; +COMMENT ON COLUMN rag_vector_document_chunk.tenant_id IS '租户ID'; +COMMENT ON COLUMN rag_vector_document_chunk.creator IS '创建人'; +COMMENT ON COLUMN rag_vector_document_chunk.created_at IS '创建时间'; +COMMENT ON COLUMN rag_vector_document_chunk.updater IS '更新人'; +COMMENT ON COLUMN rag_vector_document_chunk.updated_at IS '更新时间'; +COMMENT ON COLUMN rag_vector_document_chunk.deleted_at IS '删除时间(软删)'; +COMMENT ON COLUMN rag_vector_document_chunk.status IS '状态'; +COMMENT ON COLUMN rag_vector_document_chunk.vector_status IS '向量生成状态'; +COMMENT ON COLUMN rag_vector_document_chunk.dataset_id IS '数据集ID'; +COMMENT ON COLUMN rag_vector_document_chunk.document_id IS '文档ID'; +COMMENT ON COLUMN rag_vector_document_chunk.content IS '分块内容'; +COMMENT ON COLUMN rag_vector_document_chunk.content_hash IS '内容哈希'; +COMMENT ON COLUMN rag_vector_document_chunk.chunk_index IS '分块序号'; +COMMENT ON COLUMN rag_vector_document_chunk.vector IS '向量数据'; +COMMENT ON COLUMN rag_vector_document_chunk.metadata IS '扩展元数据'; + +--------------------pgsql创建rag_vector_document_chunk表语句--------------------------- \ No newline at end of file