Files
common/ragflow/client.go
2026-03-12 08:51:25 +08:00

164 lines
4.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package ragflow
import (
"context"
"net/url"
"strings"
"sync"
"sync/atomic"
commonHttp "gitee.com/red-future---jilin-g/common/http"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
)
var (
// globalClient 全局 RAGFlow 客户端(单例,延迟初始化)
globalClient *Client
clientOnce sync.Once
)
// initClient 延迟初始化客户端
func initClient() {
clientOnce.Do(func() {
ctx := context.Background()
// 读取配置
endpoints, apiKey := loadConfig(ctx)
// 如果配置不完整,跳过初始化
if len(endpoints) == 0 || apiKey == "" {
g.Log().Warning(ctx, "⚠️ RAGFlow 配置未找到,请在 config.yml 中添加 ragflow.base_url 或在 Consul 中配置 ragflow.endpoints")
return
}
globalClient = &Client{
Endpoints: endpoints,
APIKey: apiKey,
}
if len(endpoints) == 1 {
g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: endpoint=%s", endpoints[0])
} else {
g.Log().Infof(ctx, "✅ RAGFlow 客户端初始化成功: endpoints=%v (负载均衡)", endpoints)
}
})
}
// loadConfig 从配置加载 RAGFlow 配置(支持实例级配置)
// 优先级:
// 1. Consul实例级配置 ragflow.endpoints (数组)
// 2. Consul全局配置 ragflow.endpoints (数组)
// 3. config.yml的 ragflow.base_url (单个URL向后兼容)
func loadConfig(ctx context.Context) (endpoints []string, apiKey string) {
// 尝试从Consul读取endpoints支持实例级配置
// 注意这里不能直接导入customerService/service包会造成循环依赖
// 所以只能从config.yml读取Consul配置需要在customerservice层面调用时传入
// 读取API Key
apiKey = g.Cfg().MustGet(ctx, "ragflow.api_key", "").String()
// 尝试读取endpoints数组从config.yml或Consul同步的配置
endpointsConfig := g.Cfg().MustGet(ctx, "ragflow.endpoints")
if !endpointsConfig.IsEmpty() {
endpoints = endpointsConfig.Strings()
// 去除尾部斜杠
for i := range endpoints {
endpoints[i] = strings.TrimSuffix(endpoints[i], "/")
}
return
}
// Fallback到单个base_url向后兼容
baseURL := g.Cfg().MustGet(ctx, "ragflow.base_url", "").String()
if baseURL != "" {
endpoints = []string{strings.TrimSuffix(baseURL, "/")}
}
return
}
// GetGlobalClient 获取全局客户端(延迟初始化)
func GetGlobalClient() *Client {
initClient()
return globalClient
}
// Client RAGFlow API 客户端(支持负载均衡)
type Client struct {
Endpoints []string // RAGFlow实例列表
APIKey string // API密钥
currentIndex atomic.Uint64 // 当前轮询索引(原子操作)
}
// getNextEndpoint 获取下一个endpoint轮询算法
func (c *Client) getNextEndpoint() string {
if len(c.Endpoints) == 0 {
return ""
}
if len(c.Endpoints) == 1 {
return c.Endpoints[0]
}
// 原子递增并取模,实现轮询
idx := c.currentIndex.Add(1) % uint64(len(c.Endpoints))
return c.Endpoints[idx]
}
// CommonResponse 通用响应结构
type CommonResponse struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
// IsSuccess 检查响应是否成功
func (r *CommonResponse) IsSuccess() bool {
return r.Code == 0
}
// request 发送 HTTP 请求使用统一的common/http包支持负载均衡
func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) (err error) {
endpoint := c.getNextEndpoint()
if endpoint == "" {
return gerror.New("RAGFlow endpoints not configured")
}
fullURL := endpoint + path
headers := map[string]string{
"Authorization": "Bearer " + c.APIKey,
"Content-Type": "application/json",
}
switch method {
case "GET":
err = commonHttp.Get(ctx, fullURL, headers, result, body)
case "POST":
err = commonHttp.Post(ctx, fullURL, headers, result, body)
case "PUT":
err = commonHttp.Put(ctx, fullURL, headers, result, body)
case "DELETE":
err = commonHttp.Delete(ctx, fullURL, headers, result, body)
default:
return gerror.Newf("unsupported method: %s", method)
}
if err != nil {
return gerror.Newf("RAGFlow API request failed: %v", err)
}
return
}
// buildQueryString 构建查询字符串
func buildQueryString(params map[string]interface{}) string {
if len(params) == 0 {
return ""
}
parts := make([]string, 0, len(params))
for k, v := range params {
parts = append(parts, url.QueryEscape(k)+"="+url.QueryEscape(g.NewVar(v).String()))
}
return strings.Join(parts, "&")
}