diff --git a/config.yml b/config.yml index d1a0f95..2d83bf6 100644 --- a/config.yml +++ b/config.yml @@ -1,7 +1,70 @@ server: address: ":3000" name: "customer-server" + workerId: 1 +# Database. +database: + default: + - type: "pgsql" + host: "116.204.74.41" + port: "15432" + user: "postgres" + pass: "Bjang09@686^*^" + name: "customer_server" + role: "master" # (可选)数据库主从角色(master/slave),默认为master。如果不使用应用主从机制请不配置或留空即可。 + debug: false # (可选)开启调试模式 + dryRun: false # (可选)ORM空跑(只读不写) + charset: "utf8" # (可选)数据库编码(如: utf8mb4/utf8/gbk/gb2312),一般设置为utf8mb4。默认为utf8。 + timezone: "Asia/Shanghai" # (可选)时区配置,例如:Local + maxIdle: 5 # (可选)连接池最大闲置的连接数(默认10) + maxOpen: 20 # (可选)连接池最大打开的连接数(默认无限制) + maxLifetime: "30s" # (可选)连接对象可重复使用的时间长度(默认30秒) + maxIdleConnTime: "30s" # (可选,v2.10新增)连接池中空闲连接的最大生存时间(默认30秒)。可以通过配置文件或SetConnMaxIdleTime方法设置,避免长时间空闲连接占用资源。 + createdAt: "created_at" # (可选)自动创建时间字段名称 + updatedAt: "updated_at" # (可选)自动更新时间字段名称 + deletedAt: "deleted_at" # (可选)软删除时间字段名称 + timeMaintainDisabled: false # (可选)是否完全关闭时间更新特性,为true时CreatedAt/UpdatedAt/DeletedAt都将失效 + - type: "pgsql" + host: "116.204.74.41" + port: "15432" + user: "postgres" + pass: "Bjang09@686^*^" + name: "customer_server" + role: "slave" # (可选)数据库主从角色(master/slave),默认为master。如果不使用应用主从机制请不配置或留空即可。 + debug: false # (可选)开启调试模式 + dryRun: false # (可选)ORM空跑(只读不写) + charset: "utf8" # (可选)数据库编码(如: utf8mb4/utf8/gbk/gb2312),一般设置为utf8mb4。默认为utf8。 + timezone: "Asia/Shanghai" # (可选)时区配置,例如:Local + maxIdle: 5 # (可选)连接池最大闲置的连接数(默认10) + maxOpen: 20 # (可选)连接池最大打开的连接数(默认无限制) + maxLifetime: "30s" # (可选)连接对象可重复使用的时间长度(默认30秒) + maxIdleConnTime: "30s" # (可选,v2.10新增)连接池中空闲连接的最大生存时间(默认30秒)。可以通过配置文件或SetConnMaxIdleTime方法设置,避免长时间空闲连接占用资源。 + createdAt: "created_at" # (可选)自动创建时间字段名称 + updatedAt: "updated_at" # (可选)自动更新时间字段名称 + deletedAt: "deleted_at" # (可选)软删除时间字段名称 + timeMaintainDisabled: false # (可选)是否完全关闭时间更新特性,为true时CreatedAt/UpdatedAt/DeletedAt都将失效 + tenant-1: + - type: "pgsql" + host: "localhost" + port: "5432" + user: "postgres" + pass: "123456" + name: "tenant" + role: "master" + prefix: "customer_server_" # (可选)表名前缀 + debug: true # (可选)开启调试模式 + dryRun: false # (可选)ORM空跑(只读不写) + charset: "utf8" # (可选)数据库编码(如: utf8mb4/utf8/gbk/gb2312),一般设置为utf8mb4。默认为utf8。 + timezone: "Asia/Shanghai" # (可选)时区配置,例如:Local + maxIdle: 5 # (可选)连接池最大闲置的连接数(默认10) + maxOpen: 20 # (可选)连接池最大打开的连接数(默认无限制) + maxLifetime: "30s" # (可选)连接对象可重复使用的时间长度(默认30秒) + maxIdleConnTime: "30s" # (可选,v2.10新增)连接池中空闲连接的最大生存时间(默认30秒)。可以通过配置文件或SetConnMaxIdleTime方法设置,避免长时间空闲连接占用资源。 + createdAt: "created_at" # (可选)自动创建时间字段名称 + updatedAt: "updated_at" # (可选)自动更新时间字段名称 + deletedAt: "deleted_at" # (可选)软删除时间字段名称 + timeMaintainDisabled: false # (可选)是否完全关闭时间更新特性,为true时CreatedAt/UpdatedAt/DeletedAt都将失效 jwt: secret: "abcdefghijklmnopqrstuvwxyz" @@ -22,7 +85,7 @@ rate: # --- 内网服务器(192.168.3.200:27017,无认证) --- mongo: default: - address: "mongodb://192.168.3.200:27017/?directConnection=true" + address: "localhost:27017/?directConnection=true" database: "customer_service" logger: level: "all" @@ -30,7 +93,7 @@ mongo: redis: default: - address: 116.204.74.41:6379 + address: localhost:6379 db: 0 idleTimeout: "60s" maxConnLifetime: "90s" @@ -41,13 +104,13 @@ redis: maxActive: 100 consul: - address: 116.204.74.41:8500 + address: localhost:8500 rabbitmq: - host: 116.204.74.41 + host: localhost port: 5672 - username: root - password: root + username: admin + password: 123456 # 响应队列配置(从message迁移) responseExchange: "ragflow.response" responseQueue: "ragflow.response.queue" @@ -55,7 +118,7 @@ rabbitmq: # 不配置instanceName,直接使用os.Hostname()获取容器名/主机名作为实例ID jaeger: - addr: 116.204.74.41:4318 + addr: localhost:4318 # RAGFlow配置(customerservice只需要base_url和api_key用于更新prompt接口) ragflow: @@ -102,8 +165,3 @@ elasticsearch: - "http://116.204.74.41:9200" username: "" password: "" - -database: - logger: - level: "none" # 关闭数据库日志 - stdout: false diff --git a/consts/account/status.go b/consts/account/status.go new file mode 100644 index 0000000..2e40b30 --- /dev/null +++ b/consts/account/status.go @@ -0,0 +1,26 @@ +package account + +import "github.com/gogf/gf/v2/util/gconv" + +var ( + StatusDisable = newStatus(gconv.PtrInt8(0), "disable") + StatusEnable = newStatus(gconv.PtrInt8(1), "enable") +) + +type Status *int8 + +type status struct { + code Status + desc string +} + +func (s status) Code() Status { + return s.code +} +func (s status) Desc() string { + return s.desc +} + +func newStatus(code Status, desc string) status { + return status{code: code, desc: desc} +} diff --git a/consts/account/vector_status.go b/consts/account/vector_status.go new file mode 100644 index 0000000..f0028e4 --- /dev/null +++ b/consts/account/vector_status.go @@ -0,0 +1,28 @@ +package account + +import "github.com/gogf/gf/v2/util/gconv" + +var ( + VectorStatusPending = newVectorStatus(gconv.PtrInt8(1), "pending") + VectorStatusProcessing = newVectorStatus(gconv.PtrInt8(2), "processing") + VectorStatusCompleted = newVectorStatus(gconv.PtrInt8(3), "completed") + VectorStatusFailed = newVectorStatus(gconv.PtrInt8(4), "failed") +) + +type VectorStatus *int8 + +type vectorStatus struct { + code VectorStatus + desc string +} + +func (s vectorStatus) Code() VectorStatus { + return s.code +} +func (s vectorStatus) Desc() string { + return s.desc +} + +func newVectorStatus(code VectorStatus, desc string) vectorStatus { + return vectorStatus{code: code, desc: desc} +} diff --git a/consts/public/redis_key.go b/consts/public/redis_key.go new file mode 100644 index 0000000..695f37b --- /dev/null +++ b/consts/public/redis_key.go @@ -0,0 +1,15 @@ +package public + +const KnowledgeLockEsKey = "rag:knowledge:lock:knowledgeIdEs-%v" +const KnowledgeLockSqlKey = "rag:knowledge:lock:knowledgeIdSql-%v" +const KnowledgeContentHashEsKey = "rag:knowledge:knowledgeId:contentHashEs-%v" +const KnowledgeContentHashSqlKey = "rag:knowledge:knowledgeId:contentHashSql-%v" + +const KnowledgeDocumentChunkTopic = "knowledge:document:chunk:stream" // 请求 Stream 键名(与发消息的key一致) + +const ( + KnowledgeDocumentVectorStatusTopic = "knowledge:document:vector:status:stream" + KnowledgeDocumentVectorStatusConsumer = "knowledge-document-vector-status-consumer" + KnowledgeDocumentVectorStatusBatchSize = 1 + KnowledgeDocumentVectorStatusAutoAck = false +) diff --git a/consts/public/table_name.go b/consts/public/table_name.go new file mode 100644 index 0000000..08421d5 --- /dev/null +++ b/consts/public/table_name.go @@ -0,0 +1,13 @@ +package public + +// sql 数据库表名 +const ( + TableNameAccount = "account" + TableNameDataset = "dataset" + TableNameKeyword = "keyword" +) + +// es 索引名称 +const ( + IndexNameDocumentChunk = "document_chunk" // 文档分块索引 +) diff --git a/go.mod b/go.mod index 5c8e4f6..2a6e4a6 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module customer-server go 1.25.7 -//replace gitea.com/red-future/common => ../common +replace gitea.com/red-future/common v0.0.6 => ../common require ( gitea.com/red-future/common v0.0.6 @@ -52,7 +52,7 @@ require ( github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/magiconair/properties v1.8.10 // indirect - github.com/mailru/easyjson v0.7.7 // indirect + github.com/mailru/easyjson v0.9.0 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect diff --git a/go.sum b/go.sum index 08d0585..dc431f3 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,4 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -gitea.com/red-future/common v0.0.6 h1:2Otksfcy5V5JCBcqd2eRKh4WwZ/iAiIhJZMr6uM1x+Q= -gitea.com/red-future/common v0.0.6/go.mod h1:UI9N5UUjilbMPF7+/lypZSnqDVHigt14300oSRrAyZg= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v1.5.0 h1:W5quZX/G/csjUnuI8SUYlsHs9M38FC7znL0lIO+DvMg= github.com/BurntSushi/toml v1.5.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= @@ -198,8 +196,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/magiconair/properties v1.8.10 h1:s31yESBquKXCV9a/ScB3ESkOjUYYv+X0rg8SYxI99mE= github.com/magiconair/properties v1.8.10/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= -github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= -github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/mailru/easyjson v0.9.0 h1:PrnmzHw7262yW8sTBwxi1PdJA3Iw/EKBa8psRf7d9a4= +github.com/mailru/easyjson v0.9.0/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= diff --git a/main.go b/main.go index d67fdfb..2a2735a 100644 --- a/main.go +++ b/main.go @@ -8,7 +8,6 @@ import ( "os/signal" "syscall" - "gitea.com/red-future/common/elasticsearch" "gitea.com/red-future/common/http" "gitea.com/red-future/common/jaeger" "gitea.com/red-future/common/rabbitmq" @@ -45,41 +44,41 @@ func main() { controller.XiaohongshuController, // 小红书Webhook接口 }) - // 初始化消费者管理器并注册所有消费者 - mgr := rabbitmq.NewConsumerManager(ctx) - - // 注册响应消费者 - responseConsumer := service.NewResponseConsumer(ctx) - mgr.Register("响应消费者", responseConsumer.Start, responseConsumer.Stop) - - // 注册追问消费者 - followUpConsumer := service.NewFollowUpConsumer(ctx) - mgr.Register("追问消费者", followUpConsumer.Start, followUpConsumer.Stop) - - // 注册会话归档消费者 - sessionArchiveConsumer := service.NewSessionArchiveConsumer(ctx) - mgr.Register("会话归档消费者", sessionArchiveConsumer.Start, sessionArchiveConsumer.Stop) - - // 注册延时落库消费者 - delayedFlushConsumer := service.NewDelayedFlushConsumer(ctx) - mgr.Register("延时落库消费者", delayedFlushConsumer.Start, delayedFlushConsumer.Stop) - - // 初始化并启动所有消费者 - if err := mgr.Init(); err != nil { - glog.Fatalf(ctx, "消费者管理器初始化失败: %v", err) - } - - // 初始化 ES 客户端 - if !g.Cfg().MustGet(ctx, "elasticsearch").IsEmpty() { - if err := elasticsearch.Init(ctx); err != nil { - glog.Warningf(ctx, "ES 初始化失败(月度归档功能不可用): %v", err) - } - } - - // 启动月度归档定时任务 - if !g.Cfg().MustGet(ctx, "mongo").IsEmpty() && !g.Cfg().MustGet(ctx, "elasticsearch").IsEmpty() { - service.ArchiveService.StartCron(ctx) - } + //// 初始化消费者管理器并注册所有消费者 + //mgr := rabbitmq.NewConsumerManager(ctx) + // + //// 注册响应消费者 + //responseConsumer := service.NewResponseConsumer(ctx) + //mgr.Register("响应消费者", responseConsumer.Start, responseConsumer.Stop) + // + //// 注册追问消费者 + //followUpConsumer := service.NewFollowUpConsumer(ctx) + //mgr.Register("追问消费者", followUpConsumer.Start, followUpConsumer.Stop) + // + //// 注册会话归档消费者 + //sessionArchiveConsumer := service.NewSessionArchiveConsumer(ctx) + //mgr.Register("会话归档消费者", sessionArchiveConsumer.Start, sessionArchiveConsumer.Stop) + // + //// 注册延时落库消费者 + //delayedFlushConsumer := service.NewDelayedFlushConsumer(ctx) + //mgr.Register("延时落库消费者", delayedFlushConsumer.Start, delayedFlushConsumer.Stop) + // + //// 初始化并启动所有消费者 + //if err := mgr.Init(); err != nil { + // glog.Fatalf(ctx, "消费者管理器初始化失败: %v", err) + //} + // + //// 初始化 ES 客户端 + //if !g.Cfg().MustGet(ctx, "elasticsearch").IsEmpty() { + // if err := elasticsearch.Init(ctx); err != nil { + // glog.Warningf(ctx, "ES 初始化失败(月度归档功能不可用): %v", err) + // } + //} + // + //// 启动月度归档定时任务 + //if !g.Cfg().MustGet(ctx, "mongo").IsEmpty() && !g.Cfg().MustGet(ctx, "elasticsearch").IsEmpty() { + // service.ArchiveService.StartCron(ctx) + //} // 监听系统信号,支持 Ctrl+C 优雅退出 quit := make(chan os.Signal, 1) @@ -89,7 +88,7 @@ func main() { glog.Info(ctx, "收到停止信号,正在关闭服务...") // 停止所有消费者并关闭RabbitMQ连接 - mgr.Stop() + //mgr.Stop() glog.Info(ctx, "customerservice 服务已停止") } diff --git a/model/entity/account.go b/model/entity/account.go new file mode 100644 index 0000000..05b32bf --- /dev/null +++ b/model/entity/account.go @@ -0,0 +1,30 @@ +package entity + +import ( + "customer-server/consts/account" + + "gitea.com/red-future/common/beans" +) + +type Account struct { + beans.SQLBaseDO `orm:",inline"` + + DatasetIds []string `orm:"dataset_ids" json:"datasetIds" dc:"绑定的数据集ID列表"` + DocumentIds []string `orm:"document_ids" json:"documentIds" dc:"绑定的文档ID列表"` + SpeechcraftIds []string `orm:"speechcraftIds" json:"speechcraftIds" dc:"绑定的话术ID列表"` + AccountName string `orm:"account_name" json:"accountName" dc:"客服账号名称"` + Status account.Status `orm:"status" json:"status" dc:"客服账号状态"` + Greeting string `orm:"greeting" json:"greeting" dc:"开场白"` + Prompt []string `orm:"prompt" json:"prompt" dc:"提示词"` + SelfIdentity string `orm:"self_identity" json:"selfIdentity" dc:"AI身份描述"` + Platform string `orm:"platform" json:"platform" dc:"客服平台"` + + // 小红书平台专属字段(仅platform=xiaohongshu时有效) + AccessToken string `orm:"access_token" json:"accessToken" dc:"小红书AccessToken(14天有效期)"` + AppId int64 `orm:"app_id" json:"appId" dc:"小红书应用ID"` + SecretKey string `orm:"secret_key" json:"secretKey" dc:"小红书加解密密钥"` + XhsUserId string `orm:"xhs_user_id" json:"xhsUserId" dc:"小红书用户ID"` + ContactCardMessage string `orm:"contact_card_message" json:"contactCardMessage" dc:"留资卡文案"` + NameCardMessage string `orm:"name_card_message" json:"nameCardMessage" dc:"名片文案"` + CardTriggerCount int `orm:"card_trigger_count" json:"cardTriggerCount" dc:"卡片触发次数"` +} diff --git a/model/entity/scripted_speech.go b/model/entity/scripted_speech.go new file mode 100644 index 0000000..4c801dc --- /dev/null +++ b/model/entity/scripted_speech.go @@ -0,0 +1,14 @@ +package entity + +import ( + "gitea.com/red-future/common/beans" +) + +type ScriptedSpeech struct { + beans.SQLBaseDO `orm:",inline"` + + AccountId int64 `orm:"account_id" json:"accountId" dc:"账号ID"` + DatasetId int64 `orm:"dataset_id" json:"datasetId" dc:"数据集ID"` + QuestionContent string `orm:"question_content" json:"questionContent" dc:"问题内容"` + AnswerContent string `orm:"answer_content" json:"answerContent" dc:"回答内容"` +} diff --git a/update.sql b/update.sql index 807d512..2486ee3 100644 --- a/update.sql +++ b/update.sql @@ -1 +1,43 @@ ------------张斌2025-06-16 15:00:00-------------- \ No newline at end of file +-----------张斌2025-06-16 15:00:00-------------- + +--------------------pgsql创建rag_keyword表语句--------------------------- +-- 关键词表(文档关键词+权重) +CREATE TABLE IF NOT EXISTS rag_keyword ( + -- 基础字段(完全对齐项目规范) + id BIGINT PRIMARY KEY, -- 主键ID(非自增) + tenant_id BIGINT NOT NULL DEFAULT 0, -- 租户ID int8 + creator VARCHAR(64) NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updater VARCHAR(64) NOT NULL, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + deleted_at timestamp(6), + + -- 业务字段 + dataset_id BIGINT NOT NULL, -- 数据集ID + document_id BIGINT NOT NULL, -- 文件ID + word VARCHAR(255) NOT NULL, -- 关键词 + weight SMALLINT NOT NULL DEFAULT 0 -- 权重 + ); + +-- 索引(按业务高频查询) +CREATE INDEX idx_keyword_tenant_id ON rag_keyword(tenant_id); +CREATE INDEX idx_keyword_dataset_id ON rag_keyword(dataset_id); +CREATE INDEX idx_keyword_document_id ON rag_keyword(document_id); +CREATE INDEX idx_keyword_word ON rag_keyword(word); +CREATE INDEX idx_keyword_deleted_at ON rag_keyword(deleted_at); + +-- 表和字段注释 +COMMENT ON TABLE rag_keyword IS 'RAG关键词表(文档关键词+权重)'; +COMMENT ON COLUMN rag_keyword.id IS '主键ID(非自增)'; +COMMENT ON COLUMN rag_keyword.tenant_id IS '租户ID'; +COMMENT ON COLUMN rag_keyword.creator IS '创建人'; +COMMENT ON COLUMN rag_keyword.created_at IS '创建时间'; +COMMENT ON COLUMN rag_keyword.updater IS '更新人'; +COMMENT ON COLUMN rag_keyword.updated_at IS '更新时间'; +COMMENT ON COLUMN rag_keyword.deleted_at IS '删除时间(软删)'; +COMMENT ON COLUMN rag_keyword.dataset_id IS '数据集ID'; +COMMENT ON COLUMN rag_keyword.document_id IS '文档ID'; +COMMENT ON COLUMN rag_keyword.word IS '关键词'; +COMMENT ON COLUMN rag_keyword.weight IS '权重'; + +--------------------pgsql创建rag_keyword表语句--------------------------- \ No newline at end of file diff --git a/util/util.go b/util/util.go index b990fb8..09a20fa 100644 --- a/util/util.go +++ b/util/util.go @@ -10,12 +10,13 @@ import ( "gitea.com/red-future/common/utils" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/util/gconv" "go.mongodb.org/mongo-driver/v2/bson" ) // GetTenantInfo 获取租户信息 // 优先从 token 获取,失败则从请求参数 customerServiceId 查询 customer_service_account 表 -func GetTenantInfo(ctx context.Context) (user beans.User, err error) { +func GetTenantInfo(ctx context.Context) (user *beans.User, err error) { // 1. 优先从 token 获取 user, err = utils.GetUserInfo(ctx) if err == nil { @@ -51,7 +52,7 @@ func GetTenantInfo(ctx context.Context) (user beans.User, err error) { cacheKey := fmt.Sprintf("tenant:account:%s", accountName) cached, cacheErr := redis.RedisClient().Get(ctx, cacheKey) if cacheErr == nil && !g.IsEmpty(cached) { - user.TenantId = cached.Interface() + user.TenantId = gconv.Uint64(cached.Interface()) user.UserName = accountName return user, nil } @@ -59,7 +60,7 @@ func GetTenantInfo(ctx context.Context) (user beans.User, err error) { // 4. 缓存未命中,查询 customer_service_account 表 filter := bson.M{"accountName": accountName, "isDeleted": false} var account struct { - TenantId interface{} `bson:"tenantId"` + TenantId uint64 `bson:"tenantId"` } if findErr := mongo.GetDB().Collection("customer_service_account").FindOne(ctx, filter).Decode(&account); findErr != nil { return user, gerror.Newf("通过 accountName 查询租户失败: %v", findErr)