From 43a8834c5a121e0cf42c6f2fa74e94db948e0e86 Mon Sep 17 00:00:00 2001 From: Cold <16419454+cold502@user.noreply.gitee.com> Date: Tue, 9 Dec 2025 17:55:08 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8D=E5=90=8C=E6=9C=8D=E5=8A=A1=E6=B3=A8?= =?UTF-8?q?=E5=86=8C=E4=B8=8D=E5=90=8C=E7=BB=84=E4=BB=B6=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- consul/consul.go | 31 +++++++--- elasticsearch/client.go | 116 ++++++++++++++++++++++++++++++++++++++ jaeger/jaeger.go | 39 ++++++++++--- ragflow/client.go | 51 +++++++++-------- ragflow/session.go | 7 ++- redis/redis.go | 26 +++++++-- startup/startup.go | 122 ++++++++++++++++++++++++++++++++++++++++ 7 files changed, 347 insertions(+), 45 deletions(-) create mode 100644 elasticsearch/client.go create mode 100644 startup/startup.go diff --git a/consul/consul.go b/consul/consul.go index 415a137..7bf8b7c 100644 --- a/consul/consul.go +++ b/consul/consul.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net" + "sync" "github.com/gogf/gf/contrib/registry/consul/v2" "github.com/gogf/gf/v2/frame/g" @@ -13,14 +14,30 @@ import ( "github.com/gogf/gf/v2/util/grand" ) +var initOnce sync.Once + +// Init 初始化 Consul 注册中心(延迟初始化,首次调用时执行) +func Init() { + initOnce.Do(func() { + consulAddr := g.Cfg().MustGet(context.Background(), "consul.address").String() + if consulAddr == "" { + g.Log().Warning(context.Background(), "⚠️ Consul 配置未找到,跳过初始化") + return + } + registry, err := consul.New(consul.WithAddress(consulAddr)) + if err != nil { + g.Log().Errorf(context.Background(), "Consul 初始化失败: %v", err) + return + } + gsvc.SetRegistry(registry) + gsel.SetBuilder(gsel.NewBuilderRoundRobin()) + g.Log().Infof(context.Background(), "✅ Consul 初始化成功: %s", consulAddr) + }) +} + func init() { - consulAddr := g.Cfg().MustGet(context.Background(), "consul.address").String() - registry, err := consul.New(consul.WithAddress(consulAddr)) - if err != nil { - panic(err) - } - gsvc.SetRegistry(registry) - gsel.SetBuilder(gsel.NewBuilderRoundRobin()) + // 默认自动初始化(保持向后兼容) + Init() } func getLocalIP() (string, error) { // 获取本机所有网络接口 diff --git a/elasticsearch/client.go b/elasticsearch/client.go new file mode 100644 index 0000000..ead9d1b --- /dev/null +++ b/elasticsearch/client.go @@ -0,0 +1,116 @@ +package elasticsearch + +import ( + "context" + "sync" + + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/glog" + "github.com/olivere/elastic/v7" +) + +var ( + client *elastic.Client + clientOnce sync.Once +) + +// Config ES 配置 +type Config struct { + Addresses []string // ES 地址列表 + Username string // 用户名 + Password string // 密码 +} + +// Init 初始化 ES 客户端(单例) +func Init(ctx context.Context) (err error) { + clientOnce.Do(func() { + addresses := g.Cfg().MustGet(ctx, "elasticsearch.addresses", []string{"http://localhost:9200"}).Strings() + username := g.Cfg().MustGet(ctx, "elasticsearch.username", "").String() + password := g.Cfg().MustGet(ctx, "elasticsearch.password", "").String() + + options := []elastic.ClientOptionFunc{ + elastic.SetURL(addresses...), + elastic.SetSniff(false), // 禁用嗅探,避免容器环境问题 + } + + if username != "" && password != "" { + options = append(options, elastic.SetBasicAuth(username, password)) + } + + client, err = elastic.NewClient(options...) + if err != nil { + glog.Errorf(ctx, "ES 客户端初始化失败: %v", err) + return + } + + // 测试连接 + info, code, err := client.Ping(addresses[0]).Do(ctx) + if err != nil { + glog.Errorf(ctx, "ES 连接测试失败: %v", err) + return + } + glog.Infof(ctx, "ES 连接成功 - 版本: %s, 状态码: %d", info.Version.Number, code) + }) + return +} + +// GetClient 获取 ES 客户端 +func GetClient() *elastic.Client { + return client +} + +// BulkIndex 批量写入文档 +func BulkIndex(ctx context.Context, indexName string, docs []interface{}) (err error) { + if client == nil { + return gerror.New("ES 客户端未初始化") + } + + bulk := client.Bulk().Index(indexName) + for _, doc := range docs { + bulk.Add(elastic.NewBulkIndexRequest().Doc(doc)) + } + + resp, err := bulk.Do(ctx) + if err != nil { + return + } + + if resp.Errors { + for _, item := range resp.Failed() { + glog.Errorf(ctx, "ES 写入失败 - Index: %s, Error: %s", item.Index, item.Error.Reason) + } + } + + glog.Infof(ctx, "ES 批量写入完成 - 索引: %s, 成功: %d, 失败: %d", + indexName, len(resp.Succeeded()), len(resp.Failed())) + return +} + +// CreateIndexIfNotExists 创建索引(如果不存在) +func CreateIndexIfNotExists(ctx context.Context, indexName, mapping string) (err error) { + if client == nil { + return gerror.New("ES 客户端未初始化") + } + + exists, err := client.IndexExists(indexName).Do(ctx) + if err != nil { + return + } + + if !exists { + _, err = client.CreateIndex(indexName).BodyString(mapping).Do(ctx) + if err != nil { + return + } + glog.Infof(ctx, "ES 索引创建成功: %s", indexName) + } + return +} + +// Close 关闭客户端 +func Close() { + if client != nil { + client.Stop() + } +} diff --git a/jaeger/jaeger.go b/jaeger/jaeger.go index 31e50e4..9acc586 100644 --- a/jaeger/jaeger.go +++ b/jaeger/jaeger.go @@ -5,6 +5,7 @@ import ( "encoding/json" "strconv" "strings" + "sync" "github.com/gogf/gf/contrib/trace/otlphttp/v2" "github.com/gogf/gf/v2/frame/g" @@ -13,16 +14,38 @@ import ( "go.opentelemetry.io/otel/attribute" ) -var ShutDown func(ctx context.Context) +var ( + ShutDown func(ctx context.Context) + initOnce sync.Once +) + +// Init 初始化 Jaeger 链路追踪(延迟初始化,首次调用时执行) +func Init() { + initOnce.Do(func() { + ctx := context.Background() + jaegerAgent := g.Cfg().MustGet(ctx, "jaeger.addr").String() + serverName := g.Cfg().MustGet(ctx, "server.name").String() + + if jaegerAgent == "" { + g.Log().Warning(ctx, "⚠️ Jaeger 配置未找到,跳过初始化") + ShutDown = func(ctx context.Context) {} // 空函数,避免 nil panic + return + } + + shutdown, err := otlphttp.Init(serverName, jaegerAgent, "/v1/traces") + if err != nil { + g.Log().Errorf(ctx, "Jaeger 初始化失败: %v", err) + ShutDown = func(ctx context.Context) {} + return + } + ShutDown = shutdown + g.Log().Infof(ctx, "✅ Jaeger 初始化成功: %s", jaegerAgent) + }) +} func init() { - jaegerAgent := g.Cfg().MustGet(context.Background(), "jaeger.addr").String() - serverName := g.Cfg().MustGet(context.Background(), "server.name").String() - shutdown, err := otlphttp.Init(serverName, jaegerAgent, "/v1/traces") - if err != nil { - panic(err) - } - ShutDown = shutdown + // 默认自动初始化(保持向后兼容) + Init() } func NewTracer(r *ghttp.Request) { _, span := gtrace.NewSpan(r.Context(), r.GetServeHandler().GetMetaTag("summary")) diff --git a/ragflow/client.go b/ragflow/client.go index 618d84b..e19bd94 100644 --- a/ragflow/client.go +++ b/ragflow/client.go @@ -5,6 +5,7 @@ import ( "net/http" "net/url" "strings" + "sync" "time" "github.com/gogf/gf/v2/encoding/gjson" @@ -14,36 +15,39 @@ import ( ) var ( - // globalClient 全局 RAGFlow 客户端(单例,自动初始化) + // globalClient 全局 RAGFlow 客户端(单例,延迟初始化) globalClient *Client + clientOnce sync.Once ) -// init 包初始化时自动创建全局客户端 -func init() { - ctx := context.Background() +// initClient 延迟初始化客户端 +func initClient() { + clientOnce.Do(func() { + ctx := context.Background() - // 读取配置 - baseURL, apiKey := loadConfig(ctx) + // 读取配置 + baseURL, apiKey := loadConfig(ctx) - // 如果配置不完整,跳过初始化 - if baseURL == "" || apiKey == "" { - g.Log().Warning(ctx, "⚠️ RAGFlow 配置未找到,请在项目 config.yml 中添加 ragflow.base_url 和 ragflow.api_key") - return - } + // 如果配置不完整,跳过初始化 + if baseURL == "" || apiKey == "" { + g.Log().Warning(ctx, "⚠️ RAGFlow 配置未找到,请在项目 config.yml 中添加 ragflow.base_url 和 ragflow.api_key") + return + } - // 初始化全局客户端 - httpClient := gclient.New() - httpClient.SetHeader("Authorization", "Bearer "+apiKey) - httpClient.SetHeader("Content-Type", "application/json") - httpClient.SetTimeout(60 * time.Second) // RAGFlow AI 推理需要较长时间 + // 初始化全局客户端 + httpClient := gclient.New() + httpClient.SetHeader("Authorization", "Bearer "+apiKey) + httpClient.SetHeader("Content-Type", "application/json") + httpClient.SetTimeout(180 * time.Second) // RAGFlow AI 推理需要较长时间 - globalClient = &Client{ - BaseURL: strings.TrimSuffix(baseURL, "/"), - APIKey: apiKey, - HTTPClient: httpClient, - } + globalClient = &Client{ + BaseURL: strings.TrimSuffix(baseURL, "/"), + APIKey: apiKey, + HTTPClient: httpClient, + } - g.Log().Infof(ctx, "✅ RAGFlow 全局客户端初始化成功: baseURL=%s", baseURL) + g.Log().Infof(ctx, "✅ RAGFlow 全局客户端初始化成功: baseURL=%s", baseURL) + }) } // loadConfig 从配置文件加载 RAGFlow 配置 @@ -54,9 +58,10 @@ func loadConfig(ctx context.Context) (baseURL, apiKey string) { return } -// GetGlobalClient 获取全局客户端 +// GetGlobalClient 获取全局客户端(延迟初始化) // 使用示例:client := ragflow.GetGlobalClient() func GetGlobalClient() *Client { + initClient() return globalClient } diff --git a/ragflow/session.go b/ragflow/session.go index ee82b5d..d534b04 100644 --- a/ragflow/session.go +++ b/ragflow/session.go @@ -65,8 +65,9 @@ type ChatCompletionReq struct { // ChatCompletionRes 对话响应 (非流式) type ChatCompletionRes struct { - Code int `json:"code"` - Data struct { + Code int `json:"code"` + Message string `json:"message"` // 错误信息 + Data struct { Answer string `json:"answer"` Reference interface{} `json:"reference"` AudioBinary interface{} `json:"audio_binary"` @@ -163,7 +164,7 @@ func (c *Client) ChatCompletion(ctx context.Context, chatId string, req *ChatCom return nil, err } if res.Code != 0 { - return nil, gerror.Newf("chat completion failed: code=%d", res.Code) + return nil, gerror.Newf("chat completion failed: code=%d, message=%s", res.Code, res.Message) } return &res, nil } diff --git a/redis/redis.go b/redis/redis.go index f90f3b1..a74df47 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -3,18 +3,36 @@ package redis import ( "context" "strings" + "sync" + "github.com/gogf/gf/v2/database/gredis" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" ) -// redisClient 内部使用的 Redis 客户端(g.Redis() 是原型模式,需要变量引用) -var redisClient = g.Redis() +var ( + // redisClient 内部使用的 Redis 客户端(单例模式) + redisClient *gredis.Redis + redisOnce sync.Once +) -// RedisClient 导出的 Redis 客户端(供 mongo.go 使用) -var RedisClient = redisClient +// getClient 获取 Redis 客户端(延迟初始化) +func getClient() *gredis.Redis { + redisOnce.Do(func() { + redisClient = g.Redis() + }) + return redisClient +} + +// GetRedisClient 获取 Redis 客户端(供外部使用) +func GetRedisClient() *gredis.Redis { + return getClient() +} + +// RedisClient 导出的 Redis 客户端(供 mongo.go 使用,兼容旧代码) +var RedisClient = getClient() // Stream 和消费者组常量 const ( diff --git a/startup/startup.go b/startup/startup.go new file mode 100644 index 0000000..2fc5882 --- /dev/null +++ b/startup/startup.go @@ -0,0 +1,122 @@ +// Package startup 提供服务启动时的组件初始化控制 +// 各服务可以按需初始化所需组件,避免不必要的资源占用 +package startup + +import ( + "context" + "sync" + + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/os/glog" +) + +// Components 组件配置 +type Components struct { + Consul bool // Consul 服务注册发现(所有服务都需要) + Jaeger bool // Jaeger 链路追踪(所有服务都需要) + Redis bool // Redis 缓存 + RabbitMQ bool // RabbitMQ 消息队列 + MongoDB bool // MongoDB 数据库 + RAGFlow bool // RAGFlow AI 客户端 + ES bool // Elasticsearch +} + +var ( + initialized bool + initOnce sync.Once + components *Components +) + +// Init 初始化指定的组件 +// 示例: +// +// bootstrap.Init(ctx, &bootstrap.Components{ +// Consul: true, +// Jaeger: true, +// Redis: true, +// RabbitMQ: true, +// }) +func Init(ctx context.Context, c *Components) { + initOnce.Do(func() { + components = c + initialized = true + glog.Infof(ctx, "Bootstrap 初始化完成: %+v", c) + }) +} + +// IsInitialized 检查是否已初始化 +func IsInitialized() bool { + return initialized +} + +// GetComponents 获取组件配置 +func GetComponents() *Components { + if components == nil { + // 默认配置:从配置文件读取 + return loadFromConfig() + } + return components +} + +// NeedRedis 是否需要 Redis +func NeedRedis() bool { + c := GetComponents() + return c != nil && c.Redis +} + +// NeedRabbitMQ 是否需要 RabbitMQ +func NeedRabbitMQ() bool { + c := GetComponents() + return c != nil && c.RabbitMQ +} + +// NeedMongoDB 是否需要 MongoDB +func NeedMongoDB() bool { + c := GetComponents() + return c != nil && c.MongoDB +} + +// NeedRAGFlow 是否需要 RAGFlow +func NeedRAGFlow() bool { + c := GetComponents() + return c != nil && c.RAGFlow +} + +// NeedES 是否需要 Elasticsearch +func NeedES() bool { + c := GetComponents() + return c != nil && c.ES +} + +// loadFromConfig 从配置文件加载组件配置 +// 如果配置文件中没有 startup 配置,则默认全部启动 +func loadFromConfig() *Components { + ctx := context.Background() + + // 检查是否有 startup 配置节 + startupCfg := g.Cfg().MustGet(ctx, "startup") + if startupCfg.IsEmpty() { + // 没有配置 startup,默认全部启动 + glog.Debug(ctx, "未找到 startup 配置,默认启动所有组件") + return &Components{ + Consul: true, + Jaeger: true, + Redis: true, + RabbitMQ: true, + MongoDB: true, + RAGFlow: true, + ES: true, + } + } + + // 有配置则按配置来,未配置的项默认 true + return &Components{ + Consul: g.Cfg().MustGet(ctx, "startup.consul", true).Bool(), + Jaeger: g.Cfg().MustGet(ctx, "startup.jaeger", true).Bool(), + Redis: g.Cfg().MustGet(ctx, "startup.redis", true).Bool(), + RabbitMQ: g.Cfg().MustGet(ctx, "startup.rabbitmq", true).Bool(), + MongoDB: g.Cfg().MustGet(ctx, "startup.mongodb", true).Bool(), + RAGFlow: g.Cfg().MustGet(ctx, "startup.ragflow", true).Bool(), + ES: g.Cfg().MustGet(ctx, "startup.es", true).Bool(), + } +}