Files
common/ragflow/client.go
2026-03-12 08:51:07 +08:00

176 lines
5.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package ragflow
import (
"context"
"net"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gclient"
)
var (
// globalClient 全局 RAGFlow 客户端(单例,延迟初始化)
globalClient *Client
clientOnce sync.Once
)
// initClient 延迟初始化客户端
func initClient() {
clientOnce.Do(func() {
ctx := context.Background()
// 读取配置
baseURL, apiKey := loadConfig(ctx)
// 如果配置不完整,跳过初始化
if baseURL == "" || apiKey == "" {
g.Log().Warning(ctx, "⚠️ RAGFlow 配置未找到,请在项目 config.yml 中添加 ragflow.base_url 和 ragflow.api_key")
return
}
// 自定义 Transport增大连接池解决并发连接不足导致的超时
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 200, // 最大空闲连接数
MaxIdleConnsPerHost: 100, // 每个 host 最大空闲连接数(关键!默认只有 2
MaxConnsPerHost: 100, // 每个 host 最大连接数
IdleConnTimeout: 90 * time.Second, // 空闲连接超时
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
ResponseHeaderTimeout: 180 * time.Second, // 等待响应头超时(关键!)
}
// 初始化全局客户端
httpClient := gclient.New()
httpClient.SetBrowserMode(false)
httpClient.SetHeader("Authorization", "Bearer "+apiKey)
httpClient.SetHeader("Content-Type", "application/json")
httpClient.SetTimeout(180 * time.Second) // RAGFlow AI 推理需要较长时间
// 设置自定义 Transport
httpClient.Client.Transport = transport
globalClient = &Client{
BaseURL: strings.TrimSuffix(baseURL, "/"),
APIKey: apiKey,
HTTPClient: httpClient,
}
g.Log().Infof(ctx, "✅ RAGFlow 全局客户端初始化成功: baseURL=%s", baseURL)
})
}
// loadConfig 从配置文件加载 RAGFlow 配置
func loadConfig(ctx context.Context) (baseURL, apiKey string) {
// 使用 GoFrame 全局配置(从项目的 config.yml 读取)
baseURL = g.Cfg().MustGet(ctx, "ragflow.base_url", "").String()
apiKey = g.Cfg().MustGet(ctx, "ragflow.api_key", "").String()
return
}
// GetGlobalClient 获取全局客户端(延迟初始化)
// 使用示例client := ragflow.GetGlobalClient()
func GetGlobalClient() *Client {
initClient()
return globalClient
}
// Client RAGFlow API 客户端
type Client struct {
BaseURL string
APIKey string
HTTPClient *gclient.Client // HTTP 客户端
}
// CommonResponse 通用响应结构
type CommonResponse struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
// IsSuccess 检查响应是否成功
func (r *CommonResponse) IsSuccess() bool {
return r.Code == 0
}
// request 发送 HTTP 请求
func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) (err error) {
fullURL := c.BaseURL + path
var reqBody string
if body != nil {
jsonData, err := gjson.Encode(body)
if err != nil {
return gerror.Newf("marshal request body failed: %v", err)
}
reqBody = string(jsonData)
}
// 设置 180 秒超时RAGFlow AI 推理需要较长时间)
reqCtx, cancel := context.WithTimeout(ctx, 180*time.Second)
defer cancel()
var resp *gclient.Response
switch method {
case "GET":
resp, err = c.HTTPClient.Get(reqCtx, fullURL)
case "POST":
resp, err = c.HTTPClient.Post(reqCtx, fullURL, reqBody)
case "PUT":
resp, err = c.HTTPClient.Put(reqCtx, fullURL, reqBody)
case "DELETE":
resp, err = c.HTTPClient.Delete(reqCtx, fullURL, reqBody)
default:
return gerror.Newf("unsupported method: %s", method)
}
if err != nil {
g.Log().Errorf(ctx, "[RAGFlow HTTP] 请求失败: method=%s, url=%s, error=%v", method, fullURL, err)
return gerror.Newf("request failed: %v", err)
}
defer resp.Close()
respBody := resp.ReadAll()
// 打印响应详情
g.Log().Debugf(ctx, "[RAGFlow HTTP] 响应: status=%d, body=%s", resp.StatusCode, string(respBody))
if resp.StatusCode != http.StatusOK {
g.Log().Errorf(ctx, "[RAGFlow HTTP] 非200响应: status=%d, body=%s", resp.StatusCode, string(respBody))
return gerror.Newf("http status %d: %s", resp.StatusCode, string(respBody))
}
if err = gjson.DecodeTo(respBody, result); err != nil {
g.Log().Errorf(ctx, "[RAGFlow HTTP] 解析响应失败: body=%s, error=%v", string(respBody), err)
return gerror.Newf("unmarshal response failed: %v", err)
}
return
}
// buildQueryString 构建查询字符串
func buildQueryString(params map[string]interface{}) string {
if len(params) == 0 {
return ""
}
parts := make([]string, 0, len(params))
for k, v := range params {
parts = append(parts, url.QueryEscape(k)+"="+url.QueryEscape(g.NewVar(v).String()))
}
return strings.Join(parts, "&")
}