package ragflow import ( "context" "net/url" "strings" "sync" "sync/atomic" "gitee.com/red-future---jilin-g/common/http" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/frame/g" ) var ( // globalClient 全局 RAGFlow 客户端(单例,延迟初始化) globalClient *Client clientOnce sync.Once ) // initClient 延迟初始化客户端 func initClient() { clientOnce.Do(func() { ctx := context.Background() // 读取配置 endpoints, apiKey := loadConfig(ctx) // 如果配置不完整,跳过初始化 if len(endpoints) == 0 || apiKey == "" { g.Log().Warning(ctx, "⚠️ RAGFlow 配置未找到,请在 config.yml 中添加 ragflow.base_url 或在 Consul 中配置 ragflow.endpoints") return } globalClient = &Client{ Endpoints: endpoints, APIKey: apiKey, } if len(endpoints) == 1 { g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: endpoint=%s", endpoints[0]) } else { g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: endpoints=%v (负载均衡)", endpoints) } }) } // loadConfig 从配置加载 RAGFlow 配置(支持实例级配置) // 优先级: // 1. Consul实例级配置 ragflow.endpoints (数组) // 2. Consul全局配置 ragflow.endpoints (数组) // 3. config.yml的 ragflow.base_url (单个URL,向后兼容) func loadConfig(ctx context.Context) (endpoints []string, apiKey string) { // 尝试从Consul读取endpoints(支持实例级配置) // 注意:这里不能直接导入customerService/service包,会造成循环依赖 // 所以只能从config.yml读取,Consul配置需要在customerservice层面调用时传入 // 读取API Key apiKey = g.Cfg().MustGet(ctx, "ragflow.api_key", "").String() // 尝试读取endpoints数组(从config.yml或Consul同步的配置) endpointsConfig := g.Cfg().MustGet(ctx, "ragflow.endpoints") if !endpointsConfig.IsEmpty() { endpoints = endpointsConfig.Strings() // 去除尾部斜杠 for i := range endpoints { endpoints[i] = strings.TrimSuffix(endpoints[i], "/") } return } // Fallback到单个base_url(向后兼容) baseURL := g.Cfg().MustGet(ctx, "ragflow.base_url", "").String() if baseURL != "" { endpoints = []string{strings.TrimSuffix(baseURL, "/")} } return } // GetGlobalClient 获取全局客户端(延迟初始化) func GetGlobalClient() *Client { initClient() return globalClient } // Client RAGFlow API 客户端(支持负载均衡) type Client struct { Endpoints []string // RAGFlow实例列表 APIKey string // API密钥 currentIndex atomic.Uint64 // 当前轮询索引(原子操作) } // getNextEndpoint 获取下一个endpoint(轮询算法) func (c *Client) getNextEndpoint() string { if len(c.Endpoints) == 0 { return "" } if len(c.Endpoints) == 1 { return c.Endpoints[0] } // 原子递增并取模,实现轮询 idx := c.currentIndex.Add(1) % uint64(len(c.Endpoints)) return c.Endpoints[idx] } // CommonResponse 通用响应结构 type CommonResponse struct { Code int `json:"code"` Message string `json:"message"` Data interface{} `json:"data,omitempty"` } // IsSuccess 检查响应是否成功 func (r *CommonResponse) IsSuccess() bool { return r.Code == 0 } // request 发送 HTTP 请求 // // 为什么不使用 common/http 包: // // 1. common/http/http.go:61 会用内部请求的Authorization覆盖RAGFlow API key: // Httpclient.SetHeader("Authorization", g.RequestFromCtx(ctx).GetHeader("Authorization")) // 这会导致RAGFlow API认证失败,因为内部token不是RAGFlow的API key // // 2. common/http/http.go:69-74 强制解析为内部API响应格式(ghttp.DefaultHandlerResponse): // resultStrut := &ghttp.DefaultHandlerResponse{} // if err = gconv.Struct(result, &resultStrut); err != nil { // err = errors.New(resultStrut.Message) // } else if resultStrut.Code == 200 || resultStrut.Code == 0 { // gconv.Struct(resultStrut.Data, target) // } // RAGFlow API返回格式与内部API不同,会导致解析失败 // // 因此直接使用 g.Client() 调用第三方API,避免上述问题 func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) (err error) { endpoint := c.getNextEndpoint() if endpoint == "" { return gerror.New("RAGFlow endpoints not configured") } fullURL := endpoint + path // 添加详细日志:请求信息 g.Log().Infof(ctx, "RAGFlow请求: %s %s", method, fullURL) if body != nil { bodyJSON := g.NewVar(body).String() g.Log().Infof(ctx, "RAGFlow请求体: %s", bodyJSON) } // 创建新的HTTP客户端实例(避免共享状态) var headers = make(map[string]string) headers["Authorization"] = "Bearer " + c.APIKey headers["Content-Type"] = "application/json" switch method { case "GET": err = http.Get(ctx, fullURL, headers, result, body) case "POST": err = http.Post(ctx, fullURL, headers, result, body) case "PUT": err = http.Put(ctx, fullURL, headers, result, body) case "DELETE": if body != nil { err = http.Delete(ctx, fullURL, headers, result, body) } else { err = http.Delete(ctx, fullURL, headers, result) } default: return gerror.Newf("unsupported method: %s", method) } return } // buildQueryString 构建查询字符串 func buildQueryString(params map[string]interface{}) string { if len(params) == 0 { return "" } parts := make([]string, 0, len(params)) for k, v := range params { parts = append(parts, url.QueryEscape(k)+"="+url.QueryEscape(g.NewVar(v).String())) } return strings.Join(parts, "&") }