From fb0cb27d1d907c518a075359d31ccc76f19498e4 Mon Sep 17 00:00:00 2001 From: Cold <16419454+cold502@user.noreply.gitee.com> Date: Thu, 27 Nov 2025 17:38:42 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9A=82=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ragflow/agent.go | 132 ++++++++++++++++++++++++++++++++++++++ ragflow/chat.go | 10 ++- ragflow/chunk.go | 10 ++- ragflow/client.go | 109 +++++++++++++++++++++++++++++++ ragflow/dataset.go | 10 ++- ragflow/document.go | 10 ++- ragflow/openai.go | 122 +++++++++++++++++++++++++++++++++++ ragflow/service/client.go | 77 ---------------------- ragflow/session.go | 10 ++- ragflow/system.go | 39 +++++++++++ 10 files changed, 437 insertions(+), 92 deletions(-) create mode 100644 ragflow/agent.go create mode 100644 ragflow/client.go create mode 100644 ragflow/openai.go delete mode 100644 ragflow/service/client.go create mode 100644 ragflow/system.go diff --git a/ragflow/agent.go b/ragflow/agent.go new file mode 100644 index 0000000..7e0ed1e --- /dev/null +++ b/ragflow/agent.go @@ -0,0 +1,132 @@ +package ragflow + +import ( + "context" + "fmt" +) + +// Agent AGENT 管理 +// 参考: https://ragflow.com.cn/docs/dev/http_api_reference#agent-管理 + +// Agent Agent 结构体 +type Agent struct { + ID string `json:"id"` + Title string `json:"title"` + Description string `json:"description"` + DSL map[string]interface{} `json:"dsl"` // Canvas DSL 对象 +} + +// CreateAgentReq 创建 Agent 请求 +type CreateAgentReq struct { + Title string `json:"title"` // 必需 + Description string `json:"description,omitempty"` // 可选,默认为 None + DSL map[string]interface{} `json:"dsl"` // 必需,Canvas DSL 对象 +} + +// UpdateAgentReq 更新 Agent 请求 +type UpdateAgentReq struct { + Title string `json:"title,omitempty"` + Description string `json:"description,omitempty"` + DSL map[string]interface{} `json:"dsl,omitempty"` +} + +// ListAgentsReq 列出 Agent 请求 +type ListAgentsReq struct { + Page int `json:"page,omitempty"` + PageSize int `json:"page_size,omitempty"` + OrderBy string `json:"orderby,omitempty"` + Desc bool `json:"desc,omitempty"` + Title string `json:"title,omitempty"` + ID string `json:"id,omitempty"` +} + +// ListAgentsRes 列出 Agent 响应 +type ListAgentsRes struct { + Code int `json:"code"` + Data []*Agent `json:"data"` + Total int `json:"total"` +} + +// CreateAgent 创建 Agent +// POST /api/v1/agents +func (c *Client) CreateAgent(ctx context.Context, req *CreateAgentReq) error { + var res CommonResponse + if err := c.request(ctx, "POST", "/api/v1/agents", req, &res); err != nil { + return fmt.Errorf("create agent failed: %w", err) + } + if !res.IsSuccess() { + return fmt.Errorf("create agent failed: %s", res.Message) + } + return nil +} + +// UpdateAgent 更新 Agent +// PUT /api/v1/agents/{agent_id} +func (c *Client) UpdateAgent(ctx context.Context, agentID string, req *UpdateAgentReq) error { + path := fmt.Sprintf("/api/v1/agents/%s", agentID) + var res CommonResponse + if err := c.request(ctx, "PUT", path, req, &res); err != nil { + return fmt.Errorf("update agent failed: %w", err) + } + if !res.IsSuccess() { + return fmt.Errorf("update agent failed: %s", res.Message) + } + return nil +} + +// DeleteAgent 删除 Agent +// DELETE /api/v1/agents/{agent_id} +func (c *Client) DeleteAgent(ctx context.Context, agentID string) error { + path := fmt.Sprintf("/api/v1/agents/%s", agentID) + var res CommonResponse + if err := c.request(ctx, "DELETE", path, nil, &res); err != nil { + return fmt.Errorf("delete agent failed: %w", err) + } + if !res.IsSuccess() { + return fmt.Errorf("delete agent failed: %s", res.Message) + } + return nil +} + +// ListAgents 列出 Agent +// GET /api/v1/agents +func (c *Client) ListAgents(ctx context.Context, req *ListAgentsReq) (*ListAgentsRes, error) { + path := "/api/v1/agents" + if req != nil { + params := map[string]interface{}{} + if req.Page > 0 { + params["page"] = req.Page + } + if req.PageSize > 0 { + params["page_size"] = req.PageSize + } + if req.OrderBy != "" { + params["orderby"] = req.OrderBy + } + if req.Desc { + params["desc"] = "true" + } else { + params["desc"] = "false" + } + if req.Title != "" { + params["title"] = req.Title + } + if req.ID != "" { + params["id"] = req.ID + } + + query := buildQueryString(params) + if query != "" { + path += "?" + query + } + } + + var res ListAgentsRes + if err := c.request(ctx, "GET", path, nil, &res); err != nil { + return nil, fmt.Errorf("list agents failed: %w", err) + } + if res.Code != 0 { + return nil, fmt.Errorf("list agents failed: code=%d", res.Code) + } + return &res, nil +} diff --git a/ragflow/chat.go b/ragflow/chat.go index addc182..bb19251 100644 --- a/ragflow/chat.go +++ b/ragflow/chat.go @@ -5,7 +5,10 @@ import ( "fmt" ) -// Chat 结构体 +// 聊天助手管理 +// 参考: https://ragflow.com.cn/docs/dev/http_api_reference#聊天助手管理 + +// Chat 聊天助手结构体 type Chat struct { Id string `json:"id"` Name string `json:"name"` @@ -131,8 +134,9 @@ func (c *Client) ListChats(ctx context.Context, req *ListChatsReq) (*ListChatsRe params["id"] = req.Id } - for k, v := range params { - path += fmt.Sprintf("%s=%v&", k, v) + query := buildQueryString(params) + if query != "" { + path += "?" + query } var res ListChatsRes diff --git a/ragflow/chunk.go b/ragflow/chunk.go index e2c2182..dad9e59 100644 --- a/ragflow/chunk.go +++ b/ragflow/chunk.go @@ -5,7 +5,10 @@ import ( "fmt" ) -// Chunk 结构体 +// 数据集内知识块管理 +// 参考: https://ragflow.com.cn/docs/dev/http_api_reference#数据集内知识块管理 + +// Chunk 知识块结构体 type Chunk struct { Id string `json:"id"` Content string `json:"content"` @@ -120,8 +123,9 @@ func (c *Client) ListChunks(ctx context.Context, datasetId, documentId string, r params["id"] = req.Id } - for k, v := range params { - path += fmt.Sprintf("%s=%v&", k, v) + query := buildQueryString(params) + if query != "" { + path += "?" + query } var res ListChunksRes diff --git a/ragflow/client.go b/ragflow/client.go new file mode 100644 index 0000000..44d028c --- /dev/null +++ b/ragflow/client.go @@ -0,0 +1,109 @@ +package ragflow + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + + "github.com/gogf/gf/v2/net/gclient" +) + +// Client RAGFlow API 客户端 +type Client struct { + BaseURL string + APIKey string + HTTPClient *gclient.Client +} + +// NewClient 创建新的 RAGFlow 客户端 +func NewClient(baseURL, apiKey string) *Client { + client := gclient.New() + client.SetHeader("Authorization", fmt.Sprintf("Bearer %s", apiKey)) + client.SetHeader("Content-Type", "application/json") + + return &Client{ + BaseURL: strings.TrimSuffix(baseURL, "/"), + APIKey: apiKey, + HTTPClient: client, + } +} + +// CommonResponse 通用响应结构 +type CommonResponse struct { + Code int `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data,omitempty"` +} + +// IsSuccess 检查响应是否成功 +func (r *CommonResponse) IsSuccess() bool { + return r.Code == 0 +} + +// request 发送 HTTP 请求 +func (c *Client) request(ctx context.Context, method, path string, body interface{}, result interface{}) error { + fullURL := c.BaseURL + path + + var reqBody io.Reader + if body != nil { + jsonData, err := json.Marshal(body) + if err != nil { + return fmt.Errorf("marshal request body failed: %w", err) + } + reqBody = strings.NewReader(string(jsonData)) + } + + var resp *gclient.Response + var err error + + switch method { + case "GET": + resp, err = c.HTTPClient.Get(ctx, fullURL) + case "POST": + resp, err = c.HTTPClient.Post(ctx, fullURL, reqBody) + case "PUT": + resp, err = c.HTTPClient.Put(ctx, fullURL, reqBody) + case "DELETE": + resp, err = c.HTTPClient.Delete(ctx, fullURL, reqBody) + default: + return fmt.Errorf("unsupported method: %s", method) + } + + if err != nil { + return fmt.Errorf("http request failed: %w", err) + } + defer resp.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("http request failed with status: %d", resp.StatusCode) + } + + respBody, err := resp.ReadAll() + if err != nil { + return fmt.Errorf("read response body failed: %w", err) + } + + if err := json.Unmarshal(respBody, result); err != nil { + return fmt.Errorf("unmarshal response failed: %w", err) + } + + return nil +} + +// buildQueryString 构建查询字符串 +func buildQueryString(params map[string]interface{}) string { + if len(params) == 0 { + return "" + } + + var parts []string + for k, v := range params { + parts = append(parts, fmt.Sprintf("%s=%v", url.QueryEscape(k), url.QueryEscape(fmt.Sprintf("%v", v)))) + } + return strings.Join(parts, "&") +} + diff --git a/ragflow/dataset.go b/ragflow/dataset.go index ea0de16..0301d70 100644 --- a/ragflow/dataset.go +++ b/ragflow/dataset.go @@ -5,7 +5,10 @@ import ( "fmt" ) -// Dataset 结构体 +// 数据集管理 +// 参考: https://ragflow.com.cn/docs/dev/http_api_reference#数据集管理 + +// Dataset 数据集结构体 type Dataset struct { Id string `json:"id"` Name string `json:"name"` @@ -120,8 +123,9 @@ func (c *Client) ListDatasets(ctx context.Context, req *ListDatasetsReq) (*ListD } // 拼接 query string - for k, v := range params { - path += fmt.Sprintf("%s=%v&", k, v) + query := buildQueryString(params) + if query != "" { + path += "?" + query } var res ListDatasetsRes diff --git a/ragflow/document.go b/ragflow/document.go index 0d2e5e8..44827ca 100644 --- a/ragflow/document.go +++ b/ragflow/document.go @@ -5,7 +5,10 @@ import ( "fmt" ) -// Document 结构体 +// 数据集内文件管理 +// 参考: https://ragflow.com.cn/docs/dev/http_api_reference#数据集内文件管理 + +// Document 文档结构体 type Document struct { Id string `json:"id"` DatasetId string `json:"dataset_id"` @@ -93,8 +96,9 @@ func (c *Client) ListDocuments(ctx context.Context, datasetId string, req *ListD params["create_time_to"] = req.CreateTimeTo } - for k, v := range params { - path += fmt.Sprintf("%s=%v&", k, v) + query := buildQueryString(params) + if query != "" { + path += "?" + query } var res ListDocumentsRes diff --git a/ragflow/openai.go b/ragflow/openai.go new file mode 100644 index 0000000..b56008b --- /dev/null +++ b/ragflow/openai.go @@ -0,0 +1,122 @@ +package ragflow + +import ( + "context" + "encoding/json" + "fmt" +) + +// OpenAICompatibleAPI 与 OpenAI 兼容的 API +// 参考: https://ragflow.com.cn/docs/dev/http_api_reference#与-openai-兼容的-api + +// ChatCompletionMessage OpenAI 格式的消息 +type ChatCompletionMessage struct { + Role string `json:"role"` // "user", "assistant", "system" + Content string `json:"content"` +} + +// ChatCompletionRequest OpenAI 格式的聊天补全请求 +type ChatCompletionRequest struct { + Model string `json:"model"` // 模型名称(服务器会自动解析,可设置为任意值) + Messages []ChatCompletionMessage `json:"messages"` // 消息列表,必须至少包含一条 user 消息 + Stream bool `json:"stream,omitempty"` // 是否流式返回,默认 false +} + +// ChatCompletionResponse OpenAI 格式的聊天补全响应(非流式) +type ChatCompletionResponse struct { + ID string `json:"id"` + Object string `json:"object"` + Created int64 `json:"created"` + Model string `json:"model"` + Choices []struct { + Index int `json:"index"` + Message ChatCompletionMessage `json:"message"` + FinishReason string `json:"finish_reason"` + } `json:"choices"` + Usage struct { + PromptTokens int `json:"prompt_tokens"` + CompletionTokens int `json:"completion_tokens"` + TotalTokens int `json:"total_tokens"` + } `json:"usage"` +} + +// ChatCompletionChunk 流式响应块 +type ChatCompletionChunk struct { + ID string `json:"id"` + Object string `json:"object"` + Created int64 `json:"created"` + Model string `json:"model"` + Choices []struct { + Index int `json:"index"` + Delta struct { + Content string `json:"content"` + Role string `json:"role"` + } `json:"delta"` + FinishReason *string `json:"finish_reason"` + } `json:"choices"` + Usage *struct { + PromptTokens int `json:"prompt_tokens"` + CompletionTokens int `json:"completion_tokens"` + TotalTokens int `json:"total_tokens"` + } `json:"usage,omitempty"` +} + +// CreateChatCompletion 创建聊天补全(与聊天助手) +// POST /api/v1/chats_openai/{chat_id}/chat/completions +func (c *Client) CreateChatCompletion(ctx context.Context, chatID string, req *ChatCompletionRequest) (*ChatCompletionResponse, error) { + path := fmt.Sprintf("/api/v1/chats_openai/%s/chat/completions", chatID) + + var resp ChatCompletionResponse + if err := c.request(ctx, "POST", path, req, &resp); err != nil { + return nil, fmt.Errorf("create chat completion failed: %w", err) + } + + return &resp, nil +} + +// CreateAgentCompletion 创建 Agent 补全 +// POST /api/v1/agents_openai/{agent_id}/chat/completions +func (c *Client) CreateAgentCompletion(ctx context.Context, agentID string, req *ChatCompletionRequest) (*ChatCompletionResponse, error) { + path := fmt.Sprintf("/api/v1/agents_openai/%s/chat/completions", agentID) + + var resp ChatCompletionResponse + if err := c.request(ctx, "POST", path, req, &resp); err != nil { + return nil, fmt.Errorf("create agent completion failed: %w", err) + } + + return &resp, nil +} + +// CreateChatCompletionStream 创建流式聊天补全(与聊天助手) +// 注意:流式响应需要特殊处理,这里返回一个可用于读取流的接口 +func (c *Client) CreateChatCompletionStream(ctx context.Context, chatID string, req *ChatCompletionRequest) (*StreamReader, error) { + req.Stream = true + apiPath := fmt.Sprintf("/api/v1/chats_openai/%s/chat/completions", chatID) + + // TODO: 实现流式读取逻辑 + return nil, fmt.Errorf("stream mode not implemented yet") +} + +// StreamReader 流式响应读取器 +type StreamReader struct { + decoder *json.Decoder + close func() error +} + +// ReadChunk 读取下一个响应块 +func (sr *StreamReader) ReadChunk() (*ChatCompletionChunk, error) { + var chunk ChatCompletionChunk + if err := sr.decoder.Decode(&chunk); err != nil { + return nil, err + } + return &chunk, nil +} + +// Close 关闭流 +func (sr *StreamReader) Close() error { + if sr.close != nil { + return sr.close() + } + return nil +} + diff --git a/ragflow/service/client.go b/ragflow/service/client.go deleted file mode 100644 index 4a45dfe..0000000 --- a/ragflow/service/client.go +++ /dev/null @@ -1,77 +0,0 @@ -package service - -import ( - "context" - "fmt" - "time" - - "gitee.com/red-future---jilin-g/common/ragflow/dto" - "github.com/gogf/gf/v2/encoding/gjson" - "github.com/gogf/gf/v2/frame/g" - "github.com/gogf/gf/v2/net/gclient" -) - -type Client struct { - BaseURL string - ApiKey string - Client *gclient.Client -} - -// NewClient 创建一个新的 RAGFlow 客户端 -func NewClient(baseUrl, apiKey string) *Client { - return &Client{ - BaseURL: baseUrl, - ApiKey: apiKey, - Client: g.Client().SetTimeout(30 * time.Second), - } -} - -// request 发送 HTTP 请求 -func (c *Client) request(ctx context.Context, method, path string, data interface{}, result interface{}) error { - url := fmt.Sprintf("%s%s", c.BaseURL, path) - - req := c.Client.Header(map[string]string{ - "Authorization": fmt.Sprintf("Bearer %s", c.ApiKey), - "Content-Type": "application/json", - }) - - var res *gclient.Response - var err error - - switch method { - case "GET": - res, err = req.Get(ctx, url, data) - case "POST": - res, err = req.Post(ctx, url, data) - case "PUT": - res, err = req.Put(ctx, url, data) - case "DELETE": - res, err = req.Delete(ctx, url, data) - default: - return fmt.Errorf("unsupported method: %s", method) - } - - if err != nil { - return err - } - defer res.Close() - - // 读取响应体 - body := res.ReadAllString() - - // 解析响应 - if result != nil { - if err := gjson.DecodeTo(body, result); err != nil { - return fmt.Errorf("failed to decode response: %v, body: %s", err, body) - } - - // 检查业务错误码 - if commonRes, ok := result.(*dto.CommonResponse); ok { - if !commonRes.IsSuccess() { - return fmt.Errorf("api error: code=%d, message=%s", commonRes.Code, commonRes.Message) - } - } - } - - return nil -} diff --git a/ragflow/session.go b/ragflow/session.go index 0c15136..3f8a240 100644 --- a/ragflow/session.go +++ b/ragflow/session.go @@ -5,7 +5,10 @@ import ( "fmt" ) -// Session 结构体 +// 会话管理 +// 参考: https://ragflow.com.cn/docs/dev/http_api_reference#会话管理 + +// Session 会话结构体 type Session struct { Id string `json:"id"` Name string `json:"name"` @@ -116,8 +119,9 @@ func (c *Client) ListSessions(ctx context.Context, chatId string, req *ListSessi params["user_id"] = req.UserId } - for k, v := range params { - path += fmt.Sprintf("%s=%v&", k, v) + query := buildQueryString(params) + if query != "" { + path += "?" + query } var res ListSessionsRes diff --git a/ragflow/system.go b/ragflow/system.go new file mode 100644 index 0000000..f294c91 --- /dev/null +++ b/ragflow/system.go @@ -0,0 +1,39 @@ +package ragflow + +import ( + "context" + "fmt" +) + +// System 系统管理 +// 参考: https://ragflow.com.cn/docs/dev/http_api_reference#系统 + +// HealthStatus 健康状态 +type HealthStatus struct { + DB string `json:"db"` // "ok" 或 "nok" + Redis string `json:"redis"` // "ok" 或 "nok" + DocEngine string `json:"doc_engine"` // "ok" 或 "nok" + Storage string `json:"storage"` // "ok" 或 "nok" + Status string `json:"status"` // 整体状态: "ok" 或 "nok" + Meta map[string]interface{} `json:"_meta,omitempty"` // 详细错误信息 +} + +// CheckHealth 检查系统健康状况 +// GET /v1/system/healthz +func (c *Client) CheckHealth(ctx context.Context) (*HealthStatus, error) { + var status HealthStatus + if err := c.request(ctx, "GET", "/v1/system/healthz", nil, &status); err != nil { + return nil, fmt.Errorf("check health failed: %w", err) + } + return &status, nil +} + +// IsHealthy 检查系统是否健康 +func (c *Client) IsHealthy(ctx context.Context) (bool, error) { + status, err := c.CheckHealth(ctx) + if err != nil { + return false, err + } + return status.Status == "ok", nil +} +