diff --git a/common/eino/indexer.go b/common/eino/indexer.go index f6f160e..76bee6f 100644 --- a/common/eino/indexer.go +++ b/common/eino/indexer.go @@ -100,9 +100,9 @@ func (i *PGVectorIndexer) doStore(ctx context.Context, docs []*schema.Document, } // 转成业务实体 - var chunks []*dto.VectorDocumentChunkMsg + var chunks []*dto.VectorDocumentVectorMsg for idx, doc := range docs { - ck := new(dto.VectorDocumentChunkMsg) + ck := new(dto.VectorDocumentVectorMsg) err = gconv.Struct(doc.MetaData, ck) if err != nil { glog.Errorf(ctx, "doStore err: %v", err) @@ -126,7 +126,7 @@ func (i *PGVectorIndexer) doStore(ctx context.Context, docs []*schema.Document, return } // 入库 - rows, err = dao.DocumentChunk.BatchInsert(ctx, chunks) + rows, err = dao.DocumentVector.BatchInsert(ctx, chunks) return } diff --git a/common/eino/retriever.go b/common/eino/retriever.go index f7750ae..ac2529e 100644 --- a/common/eino/retriever.go +++ b/common/eino/retriever.go @@ -210,7 +210,7 @@ func (r *PGVectorRetriever) doRetrieveVector(ctx context.Context, query string, } datasetIds := gconv.Int64s(opts.DSLInfo["dataset_ids"]) - rows, err := dao.DocumentChunk.GetAllByVector(ctx, datasetIds, queryVec, topK) + rows, err := dao.DocumentVector.GetAllByVector(ctx, datasetIds, queryVec, topK) if err != nil { return nil, err } @@ -239,7 +239,7 @@ func (r *PGVectorRetriever) doRetrieveMeilisearch(ctx context.Context, query str datasetIds := gconv.Int64s(opts.DSLInfo["dataset_ids"]) // 调用你已有的 Meilisearch DAO - rows, err := dao.DocumentChunk.SearchByKeywords(ctx, query, datasetIds, topK) + rows, err := dao.DocumentVector.SearchByKeywords(ctx, query, datasetIds, topK) if err != nil { return nil, err } diff --git a/common/task/base_task.go b/common/task/base_task.go deleted file mode 100644 index 8e07d5c..0000000 --- a/common/task/base_task.go +++ /dev/null @@ -1,69 +0,0 @@ -package task - -import ( - "time" - - "gitea.com/red-future/common/beans" -) - -type baseTaskCol struct { - beans.SQLBaseCol - TaskType string - Status string - Priority string - ParentTaskID string - TotalItems string - ProcessedItems string - Progress string - StartTime string - EndTime string - Duration string - SuccessCount string - FailCount string - Executor string - DocumentID string - Remark string -} - -var BaseTaskCol = baseTaskCol{ - SQLBaseCol: beans.DefSQLBaseCol, - TaskType: "task_type", - Status: "status", - Priority: "task_priority", - ParentTaskID: "parent_task_id", - TotalItems: "total_items", - ProcessedItems: "processed_items", - Progress: "progress", - StartTime: "start_time", - EndTime: "end_time", - Duration: "duration", - SuccessCount: "success_count", - FailCount: "fail_count", - Executor: "executor", - DocumentID: "document_id", - Remark: "remark", -} - -// SQLBaseTask 任务基类 - SQL版本 -type SQLBaseTask struct { - beans.SQLBaseDO `orm:",inline"` - // 任务核心信息 - TaskType TaskType `orm:"task_type" json:"taskType" dc:"任务类型"` - Status TaskStatus `orm:"status" json:"status" dc:"任务状态"` - Priority TaskPriority `orm:"task_priority" json:"priority,omitempty" dc:"任务优先级"` - ParentTaskID int64 `orm:"parent_task_id" json:"parentTaskId,omitempty" dc:"父任务ID"` - // 任务进度 - TotalItems int64 `orm:"total_items" json:"totalItems" dc:"总数"` - ProcessedItems int64 `orm:"processed_items" json:"processedItems" dc:"已处理数"` - Progress float64 `orm:"progress" json:"progress" dc:"进度"` // 0~100 百分比 - // 任务结果 - StartTime *time.Time `orm:"start_time" json:"startTime" dc:"开始时间"` - EndTime *time.Time `orm:"end_time" json:"endTime,omitempty" dc:"结束时间"` - Duration int64 `orm:"duration" json:"duration,omitempty" dc:"耗时(毫秒)"` - SuccessCount int64 `orm:"success_count" json:"successCount" dc:"成功数"` - FailCount int64 `orm:"fail_count" json:"failCount" dc:"失败数"` - // 其他 - Executor string `orm:"executor" json:"executor,omitempty" dc:"执行器标识"` - DocumentID int64 `orm:"document_id" json:"documentId,omitempty" dc:"文档ID"` - Remark string `orm:"remark" json:"remark,omitempty" dc:"备注/错误信息"` -} diff --git a/config.yml b/config.yml index f15927b..9c1aef1 100644 --- a/config.yml +++ b/config.yml @@ -5,47 +5,6 @@ server: # Database. database: - default: - - type: "pgsql" - host: "116.204.74.41" - port: "15432" - user: "postgres" - pass: "Bjang09@686^*^" - name: "rag" - prefix: "rag_knowledge_" # (可选)表名前缀 - role: "master" # (可选)数据库主从角色(master/slave),默认为master。如果不使用应用主从机制请不配置或留空即可。 - debug: true # (可选)开启调试模式 - dryRun: false # (可选)ORM空跑(只读不写) - charset: "utf8" # (可选)数据库编码(如: utf8mb4/utf8/gbk/gb2312),一般设置为utf8mb4。默认为utf8。 - timezone: "Asia/Shanghai" # (可选)时区配置,例如:Local - maxIdle: 5 # (可选)连接池最大闲置的连接数(默认10) - maxOpen: 20 # (可选)连接池最大打开的连接数(默认无限制) - maxLifetime: "30s" # (可选)连接对象可重复使用的时间长度(默认30秒) - maxIdleConnTime: "30s" # (可选,v2.10新增)连接池中空闲连接的最大生存时间(默认30秒)。可以通过配置文件或SetConnMaxIdleTime方法设置,避免长时间空闲连接占用资源。 - createdAt: "created_at" # (可选)自动创建时间字段名称 - updatedAt: "updated_at" # (可选)自动更新时间字段名称 - deletedAt: "deleted_at" # (可选)软删除时间字段名称 - timeMaintainDisabled: false # (可选)是否完全关闭时间更新特性,为true时CreatedAt/UpdatedAt/DeletedAt都将失效 - - type: "pgsql" - host: "116.204.74.41" - port: "15432" - user: "postgres" - pass: "Bjang09@686^*^" - name: "tenant-1" - prefix: "rag_knowledge_" # (可选)表名前缀 - role: "slave" # (可选)数据库主从角色(master/slave),默认为master。如果不使用应用主从机制请不配置或留空即可。 - debug: false # (可选)开启调试模式 - dryRun: false # (可选)ORM空跑(只读不写) - charset: "utf8" # (可选)数据库编码(如: utf8mb4/utf8/gbk/gb2312),一般设置为utf8mb4。默认为utf8。 - timezone: "Asia/Shanghai" # (可选)时区配置,例如:Local - maxIdle: 5 # (可选)连接池最大闲置的连接数(默认10) - maxOpen: 20 # (可选)连接池最大打开的连接数(默认无限制) - maxLifetime: "30s" # (可选)连接对象可重复使用的时间长度(默认30秒) - maxIdleConnTime: "30s" # (可选,v2.10新增)连接池中空闲连接的最大生存时间(默认30秒)。可以通过配置文件或SetConnMaxIdleTime方法设置,避免长时间空闲连接占用资源。 - createdAt: "created_at" # (可选)自动创建时间字段名称 - updatedAt: "updated_at" # (可选)自动更新时间字段名称 - deletedAt: "deleted_at" # (可选)软删除时间字段名称 - timeMaintainDisabled: false # (可选)是否完全关闭时间更新特性,为true时CreatedAt/UpdatedAt/DeletedAt都将失效 rag_knowledge: - type: "pgsql" host: "116.204.74.41" @@ -123,19 +82,6 @@ eino: # 文件上传服务地址,与oss模块minio中的endpoint一致 filePrefix: "http://116.204.74.41:9000" -gmq: - redis: - primary: - addr: "116.204.74.41" - port: "6379" - db: 0 - username: "" - password: "" - poolSize: 10 - minIdleConn: 5 - maxActiveConn: 10 - maxRetries: 30 - # Meilisearch 全文检索配置 meilisearch: default: diff --git a/consts/public/redis_key.go b/consts/public/msg_key.go similarity index 54% rename from consts/public/redis_key.go rename to consts/public/msg_key.go index 696f283..059cafc 100644 --- a/consts/public/redis_key.go +++ b/consts/public/msg_key.go @@ -1,5 +1,7 @@ package public +const GmqMsgPluginsName = "gmq_msg" + const KnowledgeLockEsKey = "rag_binary:knowledge:lock:knowledgeIdEs-%v" const KnowledgeLockSqlKey = "rag_binary:knowledge:lock:knowledgeIdSql-%v" const KnowledgeContentHashEsKey = "rag_binary:knowledge:knowledgeId:contentHashEs-%v" @@ -13,8 +15,8 @@ const ( ) const ( - KnowledgeDocumentChunkTopic = "knowledge:document:chunk:stream" // 请求 Stream 键名(与发消息的key一致) - KnowledgeDocumentChunkConsumer = "knowledge-document-chunk-consumer" // 消费者名称(唯一标识) - KnowledgeDocumentChunkBatchSize = 1 // 批处理大小(每次读取1条) - KnowledgeDocumentChunkAutoAck = false // ACK是否自动确认(true自动确认,false不确认) + KnowledgeDocumentVectorTopic = "knowledge:document:vector:stream" // 请求 Stream 键名(与发消息的key一致) + KnowledgeDocumentVectorConsumer = "knowledge-document-vector-consumer" // 消费者名称(唯一标识) + KnowledgeDocumentVectorCount = 1 // 批处理大小(每次读取1条) + KnowledgeDocumentVectorAutoAck = false // ACK是否自动确认(true自动确认,false不确认) ) diff --git a/consts/public/table_name.go b/consts/public/table_name.go index e31d1e0..85e2535 100644 --- a/consts/public/table_name.go +++ b/consts/public/table_name.go @@ -8,12 +8,12 @@ const ( // sql 数据库表名 const ( - TableNameDocument = "document" - TableNameDataset = "dataset" - TableNameKeyword = "keyword" - TableNameTask = "task" - TableNameDatasetIndex = "dataset_index" - TableNameDocumentChunk = "document_chunk" + TableNameDocument = "document" + TableNameDataset = "dataset" + TableNameKeyword = "keyword" + TableNameTask = "task" + TableNameDatasetIndex = "dataset_index" + TableNameDocumentVector = "document_vector" ) // es 索引名称 diff --git a/common/task/consts.go b/consts/task/consts.go similarity index 100% rename from common/task/consts.go rename to consts/task/consts.go diff --git a/controller/dataset.go b/controller/dataset.go index c67e58f..785970b 100644 --- a/controller/dataset.go +++ b/controller/dataset.go @@ -40,9 +40,3 @@ func (c *dataset) List(ctx context.Context, req *dto.ListDatasetReq) (res *dto.L res, err = service.Dataset.List(ctx, req) return } - -// Search 搜索 -//func (c *dataset) Search(ctx context.Context, req *dto.SearchReq) (res *dto.SearchRes, err error) { -// res, err = service.Dataset.Search(ctx, req) -// return -//} diff --git a/controller/document.go b/controller/document.go index e5bb376..9da0aea 100644 --- a/controller/document.go +++ b/controller/document.go @@ -2,7 +2,6 @@ package controller import ( "context" - "rag/model/dto" "rag/service" @@ -47,8 +46,8 @@ func (c *document) List(ctx context.Context, req *dto.ListDocumentReq) (res *dto return } -// Process 处理文件(向量化) -func (c *document) Process(ctx context.Context, req *dto.ProcessDocumentReq) (res *beans.ResponseEmpty, err error) { - err = service.Document.Process(ctx, req) +// DocumentVector 处理文件(向量化) +func (c *document) DocumentVector(ctx context.Context, req *dto.DocumentVectorReq) (res *beans.ResponseEmpty, err error) { + err = service.Document.Vector(ctx, req) return } diff --git a/controller/document_chunk.go b/controller/document_chunk.go deleted file mode 100644 index 12869a5..0000000 --- a/controller/document_chunk.go +++ /dev/null @@ -1,29 +0,0 @@ -package controller - -import ( - "context" - "rag/model/dto" - "rag/service" - - "gitea.com/red-future/common/beans" - "github.com/gogf/gf/v2/frame/g" -) - -type documentChunk struct{} - -var DocumentChunk = new(documentChunk) - -// Update 更新文件片段 -func (c *documentChunk) Update(ctx context.Context, req *dto.UpdateDocumentChunkReq) (res *beans.ResponseEmpty, err error) { - err = service.DocumentChunk.Update(ctx, req) - return -} - -// List 文件片段列表 -func (c *documentChunk) List(ctx context.Context, req *dto.ListDocumentChunkReq) (res *dto.ListDocumentChunkRes, err error) { - if !g.IsEmpty(req.Page) { - req.Page = &beans.Page{PageNum: 1, PageSize: 20} - } - res, err = service.DocumentChunk.List(ctx, req) - return -} diff --git a/controller/document_vector.go b/controller/document_vector.go new file mode 100644 index 0000000..1acdce5 --- /dev/null +++ b/controller/document_vector.go @@ -0,0 +1,35 @@ +package controller + +import ( + "context" + "rag/model/dto" + "rag/service" + + "gitea.com/red-future/common/beans" + "github.com/gogf/gf/v2/frame/g" +) + +type documentVector struct{} + +var DocumentVector = new(documentVector) + +// Query 执行RAG查询 +func (c *documentVector) Query(ctx context.Context, req *dto.RAGQueryReq) (res *dto.RAGQueryRes, err error) { + res, err = service.DocumentVector.Query(ctx, req) + return +} + +// Update 更新文件片段 +func (c *documentVector) Update(ctx context.Context, req *dto.UpdateDocumentVectorReq) (res *beans.ResponseEmpty, err error) { + err = service.DocumentVector.Update(ctx, req) + return +} + +// List 文件片段列表 +func (c *documentVector) List(ctx context.Context, req *dto.ListDocumentVectorReq) (res *dto.ListDocumentVectorRes, err error) { + if !g.IsEmpty(req.Page) { + req.Page = &beans.Page{PageNum: 1, PageSize: 20} + } + res, err = service.DocumentVector.List(ctx, req) + return +} diff --git a/controller/keyword.go b/controller/keyword.go index fcbd34f..07e1944 100644 --- a/controller/keyword.go +++ b/controller/keyword.go @@ -2,7 +2,6 @@ package controller import ( "context" - "rag/model/dto" "rag/service" diff --git a/controller/rag_query.go b/controller/rag_query.go deleted file mode 100644 index a480566..0000000 --- a/controller/rag_query.go +++ /dev/null @@ -1,17 +0,0 @@ -package controller - -import ( - "context" - "rag/model/dto" - "rag/service" -) - -type ragQuery struct{} - -var RAGQuery = new(ragQuery) - -// Query 执行RAG查询 -func (c *ragQuery) Query(ctx context.Context, req *dto.RAGQueryReq) (res *dto.RAGQueryRes, err error) { - res, err = service.RAGQuery.Query(ctx, req) - return -} diff --git a/dao/dataset_index.go b/dao/dataset_index.go index 96f336c..06d4c8d 100644 --- a/dao/dataset_index.go +++ b/dao/dataset_index.go @@ -51,7 +51,7 @@ func (d *datasetIndexDao) InsertIndex(ctx context.Context, indexName string) (er USING ivfflat (vector vector_cosine_ops) WITH (lists = 100) WHERE vector IS NOT NULL; - `, indexName, gfdb.TablePrefix+public.TableNameDocumentChunk) + `, indexName, gfdb.TablePrefix+public.TableNameDocumentVector) _, err = db.Exec(ctx, sqlStr) return } diff --git a/dao/document_chunk.go b/dao/document_vector.go similarity index 55% rename from dao/document_chunk.go rename to dao/document_vector.go index 577c8f3..e51370e 100644 --- a/dao/document_chunk.go +++ b/dao/document_vector.go @@ -15,17 +15,17 @@ import ( "github.com/pgvector/pgvector-go" ) -var DocumentChunk = new(documentChunkDao) +var DocumentVector = new(documentVectorDao) -type documentChunkDao struct{} +type documentVectorDao struct{} // BatchInsert 批量插入文件块 -func (d *documentChunkDao) BatchInsert(ctx context.Context, req []*dto.VectorDocumentChunkMsg) (rows int64, err error) { - var res []*entity.DocumentChunk +func (d *documentVectorDao) BatchInsert(ctx context.Context, req []*dto.VectorDocumentVectorMsg) (rows int64, err error) { + var res []*entity.DocumentVector if err = gconv.Structs(req, &res); err != nil { return } - r, err := gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDocumentChunk).Data(&res).Insert() + r, err := gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDocumentVector).Data(&res).Insert() if err != nil { return } @@ -33,9 +33,9 @@ func (d *documentChunkDao) BatchInsert(ctx context.Context, req []*dto.VectorDoc } // Update 更新文件块 -func (d *documentChunkDao) Update(ctx context.Context, req *dto.UpdateDocumentChunkReq) (rows int64, err error) { - model := gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDocumentChunk) - r, err := model.Data(&req).Where(entity.DocumentChunkCol.Id, req.Id).Update() +func (d *documentVectorDao) Update(ctx context.Context, req *dto.UpdateDocumentVectorReq) (rows int64, err error) { + model := gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDocumentVector) + r, err := model.Data(&req).Where(entity.DocumentVectorCol.Id, req.Id).Update() if err != nil { return } @@ -43,13 +43,13 @@ func (d *documentChunkDao) Update(ctx context.Context, req *dto.UpdateDocumentCh } // List 文件块列表 -func (d *documentChunkDao) List(ctx context.Context, req *dto.ListDocumentChunkReq, fields ...string) (res []*entity.DocumentChunk, total int, err error) { - model := gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDocumentChunk).Fields(fields).OmitEmpty(). - Where(entity.DocumentChunkCol.DatasetId, req.DatasetId). - Where(entity.DocumentChunkCol.DocumentId, req.DocumentId). - Where(entity.DocumentChunkCol.Status, req.Status). - Where(entity.DocumentChunkCol.VectorStatus, req.VectorStatus). - OrderDesc(entity.DocumentChunkCol.CreatedAt) +func (d *documentVectorDao) List(ctx context.Context, req *dto.ListDocumentVectorReq, fields ...string) (res []*entity.DocumentVector, total int, err error) { + model := gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDocumentVector).Fields(fields).OmitEmpty(). + Where(entity.DocumentVectorCol.DatasetId, req.DatasetId). + Where(entity.DocumentVectorCol.DocumentId, req.DocumentId). + Where(entity.DocumentVectorCol.Status, req.Status). + Where(entity.DocumentVectorCol.VectorStatus, req.VectorStatus). + OrderDesc(entity.DocumentVectorCol.CreatedAt) if req.Page != nil { model.Page(int(req.Page.PageNum), int(req.Page.PageSize)) } @@ -61,27 +61,22 @@ func (d *documentChunkDao) List(ctx context.Context, req *dto.ListDocumentChunkR return } -func (d *documentChunkDao) GetAllByVector(ctx context.Context, datasetId []int64, queryVec pgvector.Vector, topK int) (list gdb.List, err error) { - sql := ` - SELECT id, content, dataset_id, document_id, - vector <=> ? AS distance - FROM rag_vector_document_chunk - WHERE dataset_id IN (?) - AND vector IS NOT NULL - ORDER BY distance ASC - LIMIT ? -` - // 顺序:vector, dataset_id, topK - result, err := gfdb.DB(ctx, public.DbNameVector).GetAll(ctx, sql, queryVec, datasetId, topK) +func (d *documentVectorDao) GetAllByVector(ctx context.Context, datasetIds []int64, vector pgvector.Vector, topK int) (list gdb.List, err error) { + result, err := gfdb.DB(ctx, public.DbNameVector).Model(ctx, public.TableNameDocumentVector). + Fields("id, content, dataset_id, document_id, vector <=> ? AS distance", vector). + WhereIn(entity.DocumentVectorCol.DatasetId, datasetIds). + WhereNotNull(entity.DocumentVectorCol.Vector). + OrderAsc("distance"). + Limit(topK). + All() if err != nil { return nil, err } - return result.List(), nil } // SearchByKeywords 通过关键词全文检索文档块 -func (d *documentChunkDao) SearchByKeywords(ctx context.Context, query string, datasetIds []int64, topK int) (list gdb.List, err error) { +func (d *documentVectorDao) SearchByKeywords(ctx context.Context, query string, datasetIds []int64, topK int) (list gdb.List, err error) { // 构建 meilisearch 查询参数 searchParams := &meilisearch.SearchParams{ Query: query, diff --git a/main.go b/main.go index 501248a..9d793bf 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "rag/consts/public" "rag/controller" "rag/service" + "strings" "syscall" _ "gitea.com/red-future/common/config" @@ -28,9 +29,8 @@ func main() { http.RouteRegister([]interface{}{ controller.Dataset, controller.Document, - controller.DocumentChunk, + controller.DocumentVector, controller.Keyword, - controller.RAGQuery, }) err := utils.InitGseTool(ctx) @@ -38,15 +38,21 @@ func main() { g.Log().Error(ctx, "gse 分词工具初始化失败:", err) } - gmq.Init("config.yml") - - if err := gmq.GetGmq("primary").GmqSubscribe(ctx, &mq.RedisSubMessage{ + redisAddress := g.Cfg().MustGet(ctx, "redis.default.address").String() + redisAddressList := strings.Split(redisAddress, ":") + gmq.GmqRegister(public.GmqMsgPluginsName, &mq.RedisConn{ + RedisConfig: mq.RedisConfig{ + Addr: redisAddressList[0], + Port: redisAddressList[1], + }, + }) + if err = gmq.GetGmq(public.GmqMsgPluginsName).GmqSubscribe(ctx, &mq.RedisSubMessage{ SubMessage: types.SubMessage{ - Topic: public.KnowledgeDocumentChunkTopic, - ConsumerName: public.KnowledgeDocumentChunkConsumer, - AutoAck: public.KnowledgeDocumentChunkAutoAck, - FetchCount: public.KnowledgeDocumentChunkBatchSize, - HandleFunc: service.DocumentChunk.DocsChunkMsg, + Topic: public.KnowledgeDocumentVectorTopic, + ConsumerName: public.KnowledgeDocumentVectorConsumer, + AutoAck: public.KnowledgeDocumentVectorAutoAck, + FetchCount: public.KnowledgeDocumentVectorCount, + HandleFunc: service.DocumentVector.DocsChunkMsg, }, }); err != nil { return diff --git a/model/dto/dataset.go b/model/dto/dataset.go index 804044d..a2aa297 100644 --- a/model/dto/dataset.go +++ b/model/dto/dataset.go @@ -8,7 +8,7 @@ import ( // CreateDatasetReq 创建数据集请求 type CreateDatasetReq struct { - g.Meta `path:"/createDataset" method:"post" tags:"知识库(数据集)管理" summary:"创建知识库(数据集)" dc:"创建知识库(数据集)"` + g.Meta `path:"/create" method:"post" tags:"知识库(数据集)管理" summary:"创建知识库(数据集)" dc:"创建知识库(数据集)"` Name string `json:"name" v:"required#名称不能为空"` Description string `json:"description"` @@ -21,7 +21,7 @@ type CreateDatasetRes struct { // UpdateDatasetReq 更新数据集请求 type UpdateDatasetReq struct { - g.Meta `path:"/updateDataset" method:"put" tags:"知识库(数据集)管理" summary:"更新知识库(数据集)" dc:"更新知识库(数据集)"` + g.Meta `path:"/update" method:"put" tags:"知识库(数据集)管理" summary:"更新知识库(数据集)" dc:"更新知识库(数据集)"` Id int64 `json:"id" v:"required#ID不能为空"` Name string `json:"name"` @@ -32,21 +32,21 @@ type UpdateDatasetReq struct { // DeleteDatasetReq 删除数据集请求 type DeleteDatasetReq struct { - g.Meta `path:"/deleteDataset" method:"delete" tags:"知识库(数据集)管理" summary:"删除知识库(数据集)" dc:"删除知识库(数据集)"` + g.Meta `path:"/delete" method:"delete" tags:"知识库(数据集)管理" summary:"删除知识库(数据集)" dc:"删除知识库(数据集)"` Id int64 `json:"id" v:"required#ID不能为空"` } // GetDatasetReq 获取数据集请求 type GetDatasetReq struct { - g.Meta `path:"/getDataset" method:"get" tags:"知识库(数据集)管理" summary:"获取知识库(数据集)详情" dc:"获取知识库(数据集)详情"` + g.Meta `path:"/get" method:"get" tags:"知识库(数据集)管理" summary:"获取知识库(数据集)详情" dc:"获取知识库(数据集)详情"` Id int64 `json:"id" v:"required#ID不能为空"` } // ListDatasetReq 数据集列表请求 type ListDatasetReq struct { - g.Meta `path:"/listDataset" method:"get" tags:"知识库(数据集)管理" summary:"获取知识库(数据集)列表" dc:"分页查询知识库(数据集)列表,支持多条件筛选"` + g.Meta `path:"/list" method:"get" tags:"知识库(数据集)管理" summary:"获取知识库(数据集)列表" dc:"分页查询知识库(数据集)列表,支持多条件筛选"` Page *beans.Page `json:"page"` Keyword string `json:"keyword" dc:"关键词搜索"` diff --git a/model/dto/document.go b/model/dto/document.go index 89d0069..d01f348 100644 --- a/model/dto/document.go +++ b/model/dto/document.go @@ -10,7 +10,7 @@ import ( // CreateDocumentReq 创建文件请求 type CreateDocumentReq struct { - g.Meta `path:"/createDocument" method:"post" tags:"文件管理" summary:"创建文件" dc:"创建文件"` + g.Meta `path:"/create" method:"post" tags:"文件管理" summary:"创建文件" dc:"创建文件"` DatasetId int64 `json:"datasetId" v:"required#数据集ID不能为空"` Title string `json:"title" v:"required#标题不能为空"` @@ -26,7 +26,7 @@ type CreateDocumentRes struct { // UpdateDocumentReq 更新文件请求 type UpdateDocumentReq struct { - g.Meta `path:"/updateDocument" method:"put" tags:"文件管理" summary:"更新文件" dc:"更新文件"` + g.Meta `path:"/update" method:"put" tags:"文件管理" summary:"更新文件" dc:"更新文件"` Id int64 `json:"id" v:"required#ID不能为空"` Status document.Status `json:"status"` @@ -36,21 +36,21 @@ type UpdateDocumentReq struct { // DeleteDocumentReq 删除文件请求 type DeleteDocumentReq struct { - g.Meta `path:"/deleteDocument" method:"delete" tags:"文件管理" summary:"删除文件" dc:"删除文件"` + g.Meta `path:"/delete" method:"delete" tags:"文件管理" summary:"删除文件" dc:"删除文件"` Id int64 `json:"id" v:"required#ID不能为空"` } // GetDocumentReq 获取文件请求 type GetDocumentReq struct { - g.Meta `path:"/getDocument" method:"get" tags:"文件管理" summary:"获取文件详情" dc:"获取文件详情"` + g.Meta `path:"/get" method:"get" tags:"文件管理" summary:"获取文件详情" dc:"获取文件详情"` Id int64 `json:"id" v:"required#ID不能为空"` } // ListDocumentReq 文件列表请求 type ListDocumentReq struct { - g.Meta `path:"/listDocument" method:"get" tags:"文件管理" summary:"获取文件列表" dc:"分页查询文件列表,支持多条件筛选"` + g.Meta `path:"/list" method:"get" tags:"文件管理" summary:"获取文件列表" dc:"分页查询文件列表,支持多条件筛选"` Page *beans.Page `json:"page"` DatasetId int64 `json:"datasetId"` @@ -76,27 +76,16 @@ type DocumentVO struct { UpdatedAt *gtime.Time `json:"updatedAt" dc:"更新时间"` } -// ProcessDocumentReq 处理文件请求(向量化) -type ProcessDocumentReq struct { - g.Meta `path:"/getProcess" method:"get" tags:"文件管理" summary:"文件向量化处理" dc:"文件向量化处理"` +// DocumentVectorReq 处理文件请求(向量化) +type DocumentVectorReq struct { + g.Meta `path:"/vectorization" method:"post" tags:"文件管理" summary:"文件向量化处理" dc:"文件向量化处理"` Id int64 `json:"id" v:"required#ID不能为空"` DatasetId int64 `json:"datasetId" v:"required#数据集ID不能为空"` } -type ListDocumentChunkRPC struct { - List []*DocumentChunkRPC `json:"list"` -} - -type DocumentChunkRPC struct { +type DocumentVectorRPC struct { Id int64 `json:"id" dc:"id"` DatasetId int64 `json:"datasetId" dc:"所属数据集ID"` ContentHash string `json:"contentHash" dc:"内容hash"` } - -type KnowledgeDocumentMsg struct { - TenantId uint64 `json:"tenantId"` - Creator string `json:"creator"` - Id int64 `json:"id"` - VectorStatus document.VectorStatus `json:"vectorStatus"` -} diff --git a/model/dto/document_chunk.go b/model/dto/document_vector.go similarity index 61% rename from model/dto/document_chunk.go rename to model/dto/document_vector.go index 6ff3f96..27724ee 100644 --- a/model/dto/document_chunk.go +++ b/model/dto/document_vector.go @@ -9,17 +9,37 @@ import ( "github.com/pgvector/pgvector-go" ) -// UpdateDocumentChunkReq 更新文件块向量请求 -type UpdateDocumentChunkReq struct { - g.Meta `path:"/updateDocumentChunk" method:"put" tags:"文件块向量管理" summary:"更新文件块" dc:"更新文件块"` +// RAGQueryReq RAG查询请求 +type RAGQueryReq struct { + g.Meta `path:"/ragQuery" method:"post" tags:"RAG查询" summary:"执行RAG查询" dc:"执行RAG查询"` + + Content string `json:"content" v:"required#查询内容不能为空" dc:"用户问题"` + DatasetIds []int64 `json:"datasetIds" dc:"数据集ID"` + History []*Message `json:"history" dc:"历史对话"` + TopK int `json:"topK" d:"5" dc:"检索topK,默认5"` +} + +type Message struct { + Role string `json:"role"` + Content string `json:"content"` +} + +// RAGQueryRes RAG查询响应 +type RAGQueryRes struct { + Answer string `json:"answer" dc:"生成的答案"` +} + +// UpdateDocumentVectorReq 更新文件块向量请求 +type UpdateDocumentVectorReq struct { + g.Meta `path:"/update" method:"put" tags:"文件块向量管理" summary:"更新文件块" dc:"更新文件块"` Id int64 `json:"id" v:"required#ID不能为空"` Status document.Status `json:"status"` } -// ListDocumentChunkReq 文件块向量列表请求 -type ListDocumentChunkReq struct { - g.Meta `path:"/listDocumentChunk" method:"get" tags:"文件块向量管理" summary:"获取文件块向量列表" dc:"分页查询文件块向量列表,支持多条件筛选"` +// ListDocumentVectorReq 文件块向量列表请求 +type ListDocumentVectorReq struct { + g.Meta `path:"/list" method:"get" tags:"文件块向量管理" summary:"获取文件块向量列表" dc:"分页查询文件块向量列表,支持多条件筛选"` Page *beans.Page `json:"page"` DatasetId int64 `json:"datasetId"` @@ -28,13 +48,13 @@ type ListDocumentChunkReq struct { VectorStatus document.VectorStatus `json:"vectorStatus"` } -// ListDocumentChunkRes 文件块向量列表响应 -type ListDocumentChunkRes struct { - List []*DocumentChunkItem `json:"list"` - Total int `json:"total"` +// ListDocumentVectorRes 文件块向量列表响应 +type ListDocumentVectorRes struct { + List []*DocumentVectorVO `json:"list"` + Total int `json:"total"` } -type DocumentChunkItem struct { +type DocumentVectorVO struct { Id int64 `json:"id,string" dc:"id"` Status document.Status `json:"status" dc:"状态"` VectorStatus document.VectorStatus `json:"vectorStatus" dc:"向量状态"` @@ -49,7 +69,7 @@ type DocumentChunkItem struct { UpdatedAt *gtime.Time `json:"updatedAt" dc:"更新时间"` } -type VectorDocumentChunkMsg struct { +type VectorDocumentVectorMsg struct { TenantId uint64 `json:"tenantId"` Creator string `json:"creator"` DatasetId int64 `json:"datasetId"` // 数据集ID diff --git a/model/dto/keyword.go b/model/dto/keyword.go index b6ca6dc..5fd2a72 100644 --- a/model/dto/keyword.go +++ b/model/dto/keyword.go @@ -8,7 +8,7 @@ import ( // CreateKeywordReq 创建关键词请求 type CreateKeywordReq struct { - g.Meta `path:"/createKeyword" method:"post" tags:"关键词管理" summary:"创建关键词" dc:"创建关键词"` + g.Meta `path:"/create" method:"post" tags:"关键词管理" summary:"创建关键词" dc:"创建关键词"` DatasetId int64 `json:"datasetId" v:"required#数据集ID不能为空"` DocumentId int64 `json:"documentId" v:"required#文档ID不能为空"` @@ -23,7 +23,7 @@ type CreateKeywordRes struct { // UpdateKeywordReq 更新关键词请求 type UpdateKeywordReq struct { - g.Meta `path:"/updateKeyword" method:"put" tags:"关键词管理" summary:"更新关键词" dc:"更新关键词"` + g.Meta `path:"/update" method:"put" tags:"关键词管理" summary:"更新关键词" dc:"更新关键词"` Id int64 `json:"id" v:"required#ID不能为空"` Word string `json:"word"` @@ -32,21 +32,21 @@ type UpdateKeywordReq struct { // DeleteKeywordReq 删除关键词请求 type DeleteKeywordReq struct { - g.Meta `path:"/deleteKeyword" method:"delete" tags:"关键词管理" summary:"删除关键词" dc:"删除关键词"` + g.Meta `path:"/delete" method:"delete" tags:"关键词管理" summary:"删除关键词" dc:"删除关键词"` Id int64 `json:"id" v:"required#ID不能为空"` } // GetKeywordReq 获取关键词请求 type GetKeywordReq struct { - g.Meta `path:"/getKeyword" method:"get" tags:"关键词管理" summary:"获取关键词详情" dc:"获取关键词详情"` + g.Meta `path:"/get" method:"get" tags:"关键词管理" summary:"获取关键词详情" dc:"获取关键词详情"` Id int64 `json:"id" v:"required#ID不能为空"` } // ListKeywordReq 关键词列表请求 type ListKeywordReq struct { - g.Meta `path:"/listKeyword" method:"get" tags:"关键词管理" summary:"获取关键词列表" dc:"分页查询关键词列表,支持多条件筛选"` + g.Meta `path:"/list" method:"get" tags:"关键词管理" summary:"获取关键词列表" dc:"分页查询关键词列表,支持多条件筛选"` Page *beans.Page `json:"page"` DatasetId int64 `json:"datasetId"` diff --git a/model/dto/rag_query.go b/model/dto/rag_query.go deleted file mode 100644 index 9682aad..0000000 --- a/model/dto/rag_query.go +++ /dev/null @@ -1,25 +0,0 @@ -package dto - -import ( - "github.com/gogf/gf/v2/frame/g" -) - -// RAGQueryReq RAG查询请求 -type RAGQueryReq struct { - g.Meta `path:"/ragQuery" method:"post" tags:"RAG查询" summary:"执行RAG查询" dc:"执行RAG查询"` - - Content string `json:"content" v:"required#查询内容不能为空" dc:"用户问题"` - DatasetIds []int64 `json:"datasetIds" dc:"数据集ID"` - History []*Message `json:"history" dc:"历史对话"` - TopK int `json:"topK" d:"5" dc:"检索topK,默认5"` -} - -type Message struct { - Role string `json:"role"` - Content string `json:"content"` -} - -// RAGQueryRes RAG查询响应 -type RAGQueryRes struct { - Answer string `json:"answer" dc:"生成的答案"` -} diff --git a/model/dto/task.go b/model/dto/task.go index 9e52e4c..c94a476 100644 --- a/model/dto/task.go +++ b/model/dto/task.go @@ -1,7 +1,7 @@ package dto import ( - "rag/common/task" + "rag/consts/task" ) // WriteTaskProgressReq 写入任务进度请求 diff --git a/model/entity/document_chunk.go b/model/entity/document_vector.go similarity index 91% rename from model/entity/document_chunk.go rename to model/entity/document_vector.go index 16f8e24..942ce87 100644 --- a/model/entity/document_chunk.go +++ b/model/entity/document_vector.go @@ -7,7 +7,7 @@ import ( "github.com/pgvector/pgvector-go" ) -type documentChunkCol struct { +type documentVectorCol struct { beans.SQLBaseCol Status string VectorStatus string @@ -20,7 +20,7 @@ type documentChunkCol struct { Metadata string } -var DocumentChunkCol = documentChunkCol{ +var DocumentVectorCol = documentVectorCol{ SQLBaseCol: beans.DefSQLBaseCol, Status: "status", VectorStatus: "vector_status", @@ -33,8 +33,8 @@ var DocumentChunkCol = documentChunkCol{ Metadata: "metadata", } -// DocumentChunk 文档切分块实体 -type DocumentChunk struct { +// DocumentVector 文档切分块实体 +type DocumentVector struct { beans.SQLBaseDO `orm:",inline"` Status document.Status `orm:"status" json:"status" dc:"状态"` diff --git a/model/entity/task.go b/model/entity/task.go index c06d791..0a372ca 100644 --- a/model/entity/task.go +++ b/model/entity/task.go @@ -1,7 +1,7 @@ package entity import ( - "rag/common/task" + "rag/consts/task" "gitea.com/red-future/common/beans" ) diff --git a/service/document.go b/service/document.go index a00fd06..6caac1f 100644 --- a/service/document.go +++ b/service/document.go @@ -5,9 +5,9 @@ import ( "errors" "fmt" "rag/common/eino" - "rag/common/task" "rag/consts/document" "rag/consts/public" + "rag/consts/task" "rag/dao" "rag/model/dto" "rag/model/entity" @@ -123,8 +123,8 @@ func (s *documentService) List(ctx context.Context, req *dto.ListDocumentReq) (r return } -// Process 处理文件(使用eino框架切分和向量化) -func (s *documentService) Process(ctx context.Context, req *dto.ProcessDocumentReq) (err error) { +// Vector 处理文件(使用eino框架切分和向量化) +func (s *documentService) Vector(ctx context.Context, req *dto.DocumentVectorReq) (err error) { // 1. 查询文件信息 documentReq := dto.GetDocumentReq{Id: req.Id} doc, err := dao.Document.GetByID(ctx, &documentReq) @@ -403,9 +403,9 @@ func (s *documentService) sqlSplitDocument(ctx context.Context, doc *entity.Docu metaData[entity.DocumentCol.TenantId] = doc.TenantId metaData[entity.DocumentCol.Creator] = doc.Creator metaData[entity.DocumentCol.DatasetId] = doc.DatasetId - metaData[entity.DocumentChunkCol.DocumentId] = doc.Id - metaData[entity.DocumentChunkCol.ContentHash] = contentHash - metaData[entity.DocumentChunkCol.ChunkIndex] = gconv.Int64(i) + metaData[entity.DocumentVectorCol.DocumentId] = doc.Id + metaData[entity.DocumentVectorCol.ContentHash] = contentHash + metaData[entity.DocumentVectorCol.ChunkIndex] = gconv.Int64(i) t.MetaData = metaData docsChunk = append(docsChunk, t) } @@ -423,9 +423,9 @@ func (s *documentService) sqlSplitDocument(ctx context.Context, doc *entity.Docu // 4. 发送消息到队列 if len(docsChunk) > 0 { - err = gmq.GetGmq("primary").GmqPublish(ctx, &mq.RedisPubMessage{ + err = gmq.GetGmq(public.GmqMsgPluginsName).GmqPublish(ctx, &mq.RedisPubMessage{ PubMessage: types.PubMessage{ - Topic: public.KnowledgeDocumentChunkTopic, + Topic: public.KnowledgeDocumentVectorTopic, Data: docsChunk, }, }) @@ -541,12 +541,12 @@ func (s *documentService) esSplitDocument(ctx context.Context, doc *entity.Docum continue } meiliDocs = append(meiliDocs, map[string]interface{}{ - entity.DocumentChunkCol.Id: contentHash, - entity.DocumentChunkCol.DatasetId: doc.DatasetId, - entity.DocumentChunkCol.DocumentId: doc.Id, - entity.DocumentChunkCol.Content: t.Content, - entity.DocumentChunkCol.ContentHash: contentHash, - entity.DocumentChunkCol.ChunkIndex: i, + entity.DocumentVectorCol.Id: contentHash, + entity.DocumentVectorCol.DatasetId: doc.DatasetId, + entity.DocumentVectorCol.DocumentId: doc.Id, + entity.DocumentVectorCol.Content: t.Content, + entity.DocumentVectorCol.ContentHash: contentHash, + entity.DocumentVectorCol.ChunkIndex: i, }) } @@ -621,7 +621,7 @@ func (s *documentService) getHistoryData(ctx context.Context, doc *entity.Docume } // 3. Redis 无数据:根据 contentKey 类型选择查询方式 - var dictData = make([]*dto.DocumentChunkRPC, 0) + var dictData = make([]*dto.DocumentVectorRPC, 0) if public.KnowledgeContentHashSqlKey == contentKey { // SQL 方式:调用 HTTP 接口查询 dictData, err = s.getHistoryDataFromHttp(ctx, doc) @@ -658,9 +658,9 @@ func (s *documentService) getHistoryData(ctx context.Context, doc *entity.Docume } // getHistoryDataFromHttp 通过 HTTP 接口查询历史数据 -func (s *documentService) getHistoryDataFromHttp(ctx context.Context, doc *entity.Document) (dictData []*dto.DocumentChunkRPC, err error) { +func (s *documentService) getHistoryDataFromHttp(ctx context.Context, doc *entity.Document) (dictData []*dto.DocumentVectorRPC, err error) { // 调用接口获取数据 - res, _, err := dao.DocumentChunk.List(ctx, &dto.ListDocumentChunkReq{ + res, _, err := dao.DocumentVector.List(ctx, &dto.ListDocumentVectorReq{ DatasetId: doc.DatasetId, Status: gconv.PtrInt8(1), }) @@ -669,7 +669,7 @@ func (s *documentService) getHistoryDataFromHttp(ctx context.Context, doc *entit } // getHistoryDataFromMeilisearch 通过 meilisearch 查询历史数据 -func (s *documentService) getHistoryDataFromMeilisearch(ctx context.Context, doc *entity.Document) (dictData []*dto.DocumentChunkRPC, err error) { +func (s *documentService) getHistoryDataFromMeilisearch(ctx context.Context, doc *entity.Document) (dictData []*dto.DocumentVectorRPC, err error) { // 构建 meilisearch 查询参数 searchParams := &meilisearch.SearchParams{ Filter: fmt.Sprintf("datasetId = %d", doc.DatasetId), @@ -684,9 +684,9 @@ func (s *documentService) getHistoryDataFromMeilisearch(ctx context.Context, doc } // 转换查询结果 - dictData = make([]*dto.DocumentChunkRPC, 0) + dictData = make([]*dto.DocumentVectorRPC, 0) for _, hit := range hits { - item := &dto.DocumentChunkRPC{} + item := &dto.DocumentVectorRPC{} if err = gconv.Struct(hit, item); err != nil { return } diff --git a/service/document_chunk.go b/service/document_chunk.go deleted file mode 100644 index d9d4187..0000000 --- a/service/document_chunk.go +++ /dev/null @@ -1,84 +0,0 @@ -package service - -import ( - "context" - "rag/common/eino" - "rag/common/task" - "rag/dao" - "rag/model/dto" - "rag/model/entity" - - "gitea.com/red-future/common/beans" - "github.com/cloudwego/eino/components/indexer" - "github.com/cloudwego/eino/schema" - "github.com/gogf/gf/v2/frame/g" - "github.com/gogf/gf/v2/util/gconv" -) - -var DocumentChunk = new(documentChunkService) - -type documentChunkService struct{} - -// Update 更新文件块 -func (s *documentChunkService) Update(ctx context.Context, req *dto.UpdateDocumentChunkReq) (err error) { - _, err = dao.DocumentChunk.Update(ctx, req) - return -} - -// List 获取文件块列表 -func (s *documentChunkService) List(ctx context.Context, req *dto.ListDocumentChunkReq) (res *dto.ListDocumentChunkRes, err error) { - list, total, err := dao.DocumentChunk.List(ctx, req) - if err != nil { - return - } - res = &dto.ListDocumentChunkRes{ - Total: total, - } - err = gconv.Struct(list, &res.List) - return -} - -func (s *documentChunkService) DocsChunkMsg(ctx context.Context, msg any) (err error) { - var docs = make([]*schema.Document, 0) - msgMap := gconv.Map(msg) - if err = gconv.Structs(msgMap["data"], &docs); err != nil { - g.Log().Error(ctx, "DocsChunkMsg err:", err) - return - } - if len(docs) == 0 { - g.Log().Error(ctx, "DocsChunkMsg err:", "msg is empty") - return - } - ctx = context.WithValue(ctx, "user", &beans.User{ - TenantId: gconv.Uint64(docs[0].MetaData[entity.DocumentChunkCol.TenantId]), - UserName: gconv.String(docs[0].MetaData[entity.DocumentChunkCol.Creator]), - }) - idx := eino.NewPGVectorIndexer(&eino.PGVectorIndexerOptions{ - BatchSize: 10, - }) - documentId := gconv.Int64(docs[0].MetaData[entity.DocumentChunkCol.DocumentId]) - rows, err := idx.Store(ctx, docs, indexer.WithEmbedding(eino.EmbedderDashscope)) - if err != nil || rows == 0 { - g.Log().Error(ctx, "DocsChunkMsg rows: , err:", rows, err) - // 写入任务进度失败 任务类型为sql存储 - remark := " 向量存储数量: " + gconv.String(rows) - if err != nil { - remark = "向量存储失败: " + err.Error() - } - err = Task.WriteTaskProgress(ctx, &dto.WriteTaskProgressReq{ - TaskId: documentId, - TaskType: task.TaskTypeGenerateVector, - Status: task.TaskStatusFailed, - Remark: remark, - }) - return - } - // 写入任务进度成功 任务类型为sql存储 - err = Task.WriteTaskProgress(ctx, &dto.WriteTaskProgressReq{ - TaskId: documentId, - TaskType: task.TaskTypeGenerateVector, - Status: task.TaskStatusCompleted, - Remark: "向量生成完成", - }) - return -} diff --git a/service/document_vector.go b/service/document_vector.go new file mode 100644 index 0000000..2a4bb91 --- /dev/null +++ b/service/document_vector.go @@ -0,0 +1,129 @@ +package service + +import ( + "context" + "fmt" + "rag/common/eino" + "rag/consts/task" + "rag/dao" + "rag/model/dto" + "rag/model/entity" + + "gitea.com/red-future/common/beans" + "github.com/cloudwego/eino/components/indexer" + "github.com/cloudwego/eino/components/retriever" + "github.com/cloudwego/eino/schema" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/gconv" +) + +var DocumentVector = new(documentVectorService) + +type documentVectorService struct{} + +// Query 执行RAG查询 +func (s *documentVectorService) Query(ctx context.Context, req *dto.RAGQueryReq) (*dto.RAGQueryRes, error) { + if req.TopK <= 0 { + req.TopK = 5 + } + + // 4. 使用向量检索器进行查询 + r, err := eino.NewPGVectorRetriever(&eino.PGVectorRetrieverConfig{ + Embedder: eino.EmbedderDashscope, + DefaultTopK: req.TopK, + }) + if err != nil { + g.Log().Errorf(ctx, "初始化向量检索器失败: %v", err) + return nil, fmt.Errorf("初始化向量检索器失败: %w", err) + } + + // 5. 执行向量检索 + docs, err := r.Retrieve(ctx, req.Content, retriever.WithEmbedding(eino.EmbedderDashscope), retriever.WithDSLInfo(map[string]any{ + "dataset_ids": req.DatasetIds, + })) + if err != nil { + g.Log().Errorf(ctx, "向量检索失败: %v", err) + return nil, fmt.Errorf("向量检索失败: %w", err) + } + + messages := make([]*schema.Message, 0) + err = gconv.Struct(req.History, &messages) + if err != nil { + g.Log().Errorf(ctx, "转换历史消息失败: %v", err) + return nil, fmt.Errorf("转换历史消息失败: %w", err) + } + + replyMsg, err := eino.NewChatModel(ctx, req.Content, docs, messages) + if err != nil { + g.Log().Errorf(ctx, "向量检索失败: %v", err) + return nil, fmt.Errorf("向量检索失败: %w", err) + } + + return &dto.RAGQueryRes{ + Answer: replyMsg.Content, + }, nil +} + +// Update 更新文件块 +func (s *documentVectorService) Update(ctx context.Context, req *dto.UpdateDocumentVectorReq) (err error) { + _, err = dao.DocumentVector.Update(ctx, req) + return +} + +// List 获取文件块列表 +func (s *documentVectorService) List(ctx context.Context, req *dto.ListDocumentVectorReq) (res *dto.ListDocumentVectorRes, err error) { + list, total, err := dao.DocumentVector.List(ctx, req) + if err != nil { + return + } + res = &dto.ListDocumentVectorRes{ + Total: total, + } + err = gconv.Struct(list, &res.List) + return +} + +func (s *documentVectorService) DocsChunkMsg(ctx context.Context, msg any) (err error) { + var docs = make([]*schema.Document, 0) + msgMap := gconv.Map(msg) + if err = gconv.Structs(msgMap["data"], &docs); err != nil { + g.Log().Error(ctx, "DocsChunkMsg err:", err) + return + } + if len(docs) == 0 { + g.Log().Error(ctx, "DocsChunkMsg err:", "msg is empty") + return + } + ctx = context.WithValue(ctx, "user", &beans.User{ + TenantId: gconv.Uint64(docs[0].MetaData[entity.DocumentVectorCol.TenantId]), + UserName: gconv.String(docs[0].MetaData[entity.DocumentVectorCol.Creator]), + }) + idx := eino.NewPGVectorIndexer(&eino.PGVectorIndexerOptions{ + BatchSize: 10, + }) + documentId := gconv.Int64(docs[0].MetaData[entity.DocumentVectorCol.DocumentId]) + rows, err := idx.Store(ctx, docs, indexer.WithEmbedding(eino.EmbedderDashscope)) + if err != nil || rows == 0 { + g.Log().Error(ctx, "DocsChunkMsg rows: , err:", rows, err) + // 写入任务进度失败 任务类型为sql存储 + remark := " 向量存储数量: " + gconv.String(rows) + if err != nil { + remark = "向量存储失败: " + err.Error() + } + err = Task.WriteTaskProgress(ctx, &dto.WriteTaskProgressReq{ + TaskId: documentId, + TaskType: task.TaskTypeGenerateVector, + Status: task.TaskStatusFailed, + Remark: remark, + }) + return + } + // 写入任务进度成功 任务类型为sql存储 + err = Task.WriteTaskProgress(ctx, &dto.WriteTaskProgressReq{ + TaskId: documentId, + TaskType: task.TaskTypeGenerateVector, + Status: task.TaskStatusCompleted, + Remark: "向量生成完成", + }) + return +} diff --git a/service/rag_query.go b/service/rag_query.go deleted file mode 100644 index 92ffe7b..0000000 --- a/service/rag_query.go +++ /dev/null @@ -1,60 +0,0 @@ -package service - -import ( - "context" - "fmt" - "rag/common/eino" - "rag/model/dto" - - "github.com/cloudwego/eino/components/retriever" - "github.com/cloudwego/eino/schema" - "github.com/gogf/gf/v2/os/glog" - "github.com/gogf/gf/v2/util/gconv" -) - -var RAGQuery = new(ragQueryService) - -type ragQueryService struct{} - -// Query 执行RAG查询 -func (s *ragQueryService) Query(ctx context.Context, req *dto.RAGQueryReq) (*dto.RAGQueryRes, error) { - if req.TopK <= 0 { - req.TopK = 5 - } - - // 4. 使用向量检索器进行查询 - r, err := eino.NewPGVectorRetriever(&eino.PGVectorRetrieverConfig{ - Embedder: eino.EmbedderDashscope, - DefaultTopK: req.TopK, - }) - if err != nil { - glog.Errorf(ctx, "初始化向量检索器失败: %v", err) - return nil, fmt.Errorf("初始化向量检索器失败: %w", err) - } - - // 5. 执行向量检索 - docs, err := r.Retrieve(ctx, req.Content, retriever.WithEmbedding(eino.EmbedderDashscope), retriever.WithDSLInfo(map[string]any{ - "dataset_ids": req.DatasetIds, - })) - if err != nil { - glog.Errorf(ctx, "向量检索失败: %v", err) - return nil, fmt.Errorf("向量检索失败: %w", err) - } - - messages := make([]*schema.Message, 0) - err = gconv.Struct(req.History, &messages) - if err != nil { - glog.Errorf(ctx, "转换历史消息失败: %v", err) - return nil, fmt.Errorf("转换历史消息失败: %w", err) - } - - replyMsg, err := eino.NewChatModel(ctx, req.Content, docs, messages) - if err != nil { - glog.Errorf(ctx, "向量检索失败: %v", err) - return nil, fmt.Errorf("向量检索失败: %w", err) - } - - return &dto.RAGQueryRes{ - Answer: replyMsg.Content, - }, nil -} diff --git a/service/task.go b/service/task.go index 9d16e72..39f66d4 100644 --- a/service/task.go +++ b/service/task.go @@ -2,11 +2,10 @@ package service import ( "context" + "rag/consts/task" "rag/dao" "rag/model/dto" - "rag/common/task" - "gitea.com/red-future/common/db/gfdb" "github.com/gogf/gf/v2/database/gdb" "github.com/gogf/gf/v2/frame/g" diff --git a/update.sql b/update.sql index d3972eb..1da633f 100644 --- a/update.sql +++ b/update.sql @@ -211,9 +211,9 @@ COMMENT ON COLUMN rag_knowledge_task.remark IS '备注'; -- 向量数据集索引表 CREATE TABLE IF NOT EXISTS rag_vector_dataset_index ( -- 基础字段 - id BIGINT PRIMARY KEY, -- 主键ID(非自增) - tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID int8 - creator VARCHAR(64) NOT NULL, + id BIGINT PRIMARY KEY, -- 主键ID(非自增) + tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID int8 + creator VARCHAR(64) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updater VARCHAR(64) NOT NULL, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -260,16 +260,16 @@ COMMENT ON COLUMN rag_vector_dataset_index.description IS '描述'; --------------------pgsql创建rag_vector_dataset_index表语句--------------------------- ---------------------pgsql创建rag_vector_document_chunk表语句--------------------------- +--------------------pgsql创建rag_vector_document_vector表语句--------------------------- CREATE EXTENSION IF NOT EXISTS vector; -- 文档分块向量表 -CREATE TABLE IF NOT EXISTS rag_vector_document_chunk ( +CREATE TABLE IF NOT EXISTS rag_vector_document_vector ( -- 基础字段 - id BIGINT PRIMARY KEY, -- 主键ID(非自增) - tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID int8 - creator VARCHAR(64) NOT NULL, + id BIGINT PRIMARY KEY, -- 主键ID(非自增) + tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID int8 + creator VARCHAR(64) NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, updater VARCHAR(64) NOT NULL, updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -292,30 +292,30 @@ CREATE TABLE IF NOT EXISTS rag_vector_document_chunk ( ); -- 索引 -CREATE INDEX idx_chunk_tenant_id ON rag_vector_document_chunk(tenant_id); -CREATE INDEX idx_chunk_dataset_id ON rag_vector_document_chunk(dataset_id); -CREATE INDEX idx_chunk_document_id ON rag_vector_document_chunk(document_id); -CREATE INDEX idx_chunk_content_hash ON rag_vector_document_chunk(content_hash); -CREATE INDEX idx_chunk_status ON rag_vector_document_chunk(status); -CREATE INDEX idx_chunk_vector_status ON rag_vector_document_chunk(vector_status); +CREATE INDEX idx_vector_tenant_id ON rag_vector_document_vector(tenant_id); +CREATE INDEX idx_vector_dataset_id ON rag_vector_document_vector(dataset_id); +CREATE INDEX idx_vector_document_id ON rag_vector_document_vector(document_id); +CREATE INDEX idx_vector_content_hash ON rag_vector_document_vector(content_hash); +CREATE INDEX idx_vector_status ON rag_vector_document_vector(status); +CREATE INDEX idx_vector_vector_status ON rag_vector_document_vector(vector_status); -- 注释 -COMMENT ON TABLE rag_vector_document_chunk IS '文档分块向量表'; -COMMENT ON COLUMN rag_vector_document_chunk.id IS '主键ID(非自增)'; -COMMENT ON COLUMN rag_vector_document_chunk.tenant_id IS '租户ID'; -COMMENT ON COLUMN rag_vector_document_chunk.creator IS '创建人'; -COMMENT ON COLUMN rag_vector_document_chunk.created_at IS '创建时间'; -COMMENT ON COLUMN rag_vector_document_chunk.updater IS '更新人'; -COMMENT ON COLUMN rag_vector_document_chunk.updated_at IS '更新时间'; -COMMENT ON COLUMN rag_vector_document_chunk.deleted_at IS '删除时间(软删)'; -COMMENT ON COLUMN rag_vector_document_chunk.status IS '状态'; -COMMENT ON COLUMN rag_vector_document_chunk.vector_status IS '向量生成状态'; -COMMENT ON COLUMN rag_vector_document_chunk.dataset_id IS '数据集ID'; -COMMENT ON COLUMN rag_vector_document_chunk.document_id IS '文档ID'; -COMMENT ON COLUMN rag_vector_document_chunk.content IS '分块内容'; -COMMENT ON COLUMN rag_vector_document_chunk.content_hash IS '内容哈希'; -COMMENT ON COLUMN rag_vector_document_chunk.chunk_index IS '分块序号'; -COMMENT ON COLUMN rag_vector_document_chunk.vector IS '向量数据'; -COMMENT ON COLUMN rag_vector_document_chunk.metadata IS '扩展元数据'; +COMMENT ON TABLE rag_vector_document_vector IS '文档分块向量表'; +COMMENT ON COLUMN rag_vector_document_vector.id IS '主键ID(非自增)'; +COMMENT ON COLUMN rag_vector_document_vector.tenant_id IS '租户ID'; +COMMENT ON COLUMN rag_vector_document_vector.creator IS '创建人'; +COMMENT ON COLUMN rag_vector_document_vector.created_at IS '创建时间'; +COMMENT ON COLUMN rag_vector_document_vector.updater IS '更新人'; +COMMENT ON COLUMN rag_vector_document_vector.updated_at IS '更新时间'; +COMMENT ON COLUMN rag_vector_document_vector.deleted_at IS '删除时间(软删)'; +COMMENT ON COLUMN rag_vector_document_vector.status IS '状态'; +COMMENT ON COLUMN rag_vector_document_vector.vector_status IS '向量生成状态'; +COMMENT ON COLUMN rag_vector_document_vector.dataset_id IS '数据集ID'; +COMMENT ON COLUMN rag_vector_document_vector.document_id IS '文档ID'; +COMMENT ON COLUMN rag_vector_document_vector.content IS '分块内容'; +COMMENT ON COLUMN rag_vector_document_vector.content_hash IS '内容哈希'; +COMMENT ON COLUMN rag_vector_document_vector.chunk_index IS '分块序号'; +COMMENT ON COLUMN rag_vector_document_vector.vector IS '向量数据'; +COMMENT ON COLUMN rag_vector_document_vector.metadata IS '扩展元数据'; ---------------------pgsql创建rag_vector_document_chunk表语句--------------------------- \ No newline at end of file +--------------------pgsql创建rag_vector_document_vector表语句--------------------------- \ No newline at end of file