feat: 支持多模型提供商 embedding

This commit is contained in:
2026-04-01 13:38:33 +08:00
parent bcbe6eba78
commit 2e4a0a89f1
9 changed files with 631 additions and 447 deletions

View File

@@ -1,364 +1,347 @@
package rpc
import (
"context"
"encoding/json"
"errors"
"strings"
"sync"
"time"
"gitea.com/red-future/common/consul"
"gitea.com/red-future/common/jaeger"
"github.com/gogf/gf/v2/frame/g"
rpcxClient "github.com/smallnest/rpcx/client"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
var (
// pluginsContainer rpcx插件容器全局统一设置
// init()中添加链路追踪插件所有client共用此容器
pluginsContainer = rpcxClient.NewPluginContainer()
// clientPool 连接池缓存key为服务名value为客户端实例
clientPool = make(map[string]*rpcxClient.OneClient)
// poolMutex 连接池锁
poolMutex sync.RWMutex
// healthCheckInterval 健康检查间隔(秒)
healthCheckInterval = 30
// lastHealthCheckTime 上次健康检查时间key为服务名
lastHealthCheckTime = make(map[string]time.Time)
// serviceAddrCache 服务地址缓存key为服务名value为地址
serviceAddrCache = make(map[string]string)
)
func init() {
// 全局设置链路追踪插件所有client共用
pluginsContainer.Add(&TracingPlugin{})
// 启动后台健康检查协程
go healthCheckLoop()
}
// healthCheckLoop 后台健康检查循环
func healthCheckLoop() {
ticker := time.NewTicker(time.Duration(healthCheckInterval) * time.Second)
defer ticker.Stop()
for range ticker.C {
checkAllConnections()
}
}
// checkAllConnections 检查所有缓存连接的健康状态
func checkAllConnections() {
poolMutex.Lock()
defer poolMutex.Unlock()
now := time.Now()
for serviceName, client := range clientPool {
// 检查连接是否需要健康检查
if lastCheck, ok := lastHealthCheckTime[serviceName]; ok {
if now.Sub(lastCheck) < time.Duration(healthCheckInterval)*time.Second {
continue
}
}
ctx := context.Background()
// 检查连接健康状态(心跳检测)
if !isClientHealthy(ctx, client, serviceName) {
g.Log().Warningf(ctx, "检测到服务[%s]连接不健康,将从连接池移除", serviceName)
client.Close()
delete(clientPool, serviceName)
delete(lastHealthCheckTime, serviceName)
delete(serviceAddrCache, serviceName)
continue
}
// 连接健康,检查服务地址是否发生变化
currentAddr, err := consul.GetInstanceAddr(ctx, serviceName)
if err != nil {
g.Log().Warningf(ctx, "健康检查时从consul获取服务[%s]地址失败: %v保持现有连接", serviceName, err)
lastHealthCheckTime[serviceName] = now
continue
}
// 检查地址是否发生变化
if oldAddr, ok := serviceAddrCache[serviceName]; ok && oldAddr != currentAddr {
g.Log().Infof(ctx, "检测到服务[%s]地址变化: %s -> %s重建连接", serviceName, oldAddr, currentAddr)
// 关闭旧连接并从连接池移除,下次请求时会创建新连接
client.Close()
delete(clientPool, serviceName)
delete(lastHealthCheckTime, serviceName)
// 更新缓存的新地址
serviceAddrCache[serviceName] = currentAddr
} else {
// 地址未变化,更新检查时间
if !ok {
serviceAddrCache[serviceName] = currentAddr
}
g.Log().Debugf(ctx, "服务[%s]地址未变化,连接健康", serviceName)
}
lastHealthCheckTime[serviceName] = now
}
}
// isClientHealthy 检查client是否健康
// 使用心跳检测方式:尝试调用服务的心跳方法
func isClientHealthy(ctx context.Context, client *rpcxClient.OneClient, serviceName string) bool {
if client == nil {
return false
}
// 设置较短的超时时间,避免阻塞
pingCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
// 尝试调用健康检查方法
// 大多数服务都会提供 Ping 或 Health 方法
// 如果服务没有提供这些方法,会返回错误,我们认为是健康的
// 因为连接本身是正常的,只是方法不存在
var reply interface{}
err := client.Call(pingCtx, serviceName, "Ping", nil, &reply)
// 如果调用成功,连接肯定健康
if err == nil {
return true
}
// 如果是方法不存在的错误说明连接是健康的只是服务没有Ping方法
// 这种情况下我们认为是健康的
if isMethodNotFoundError(err) || isServiceNotFoundError(err) {
return true
}
// 其他错误(网络错误、超时等)说明连接不健康
g.Log().Warningf(ctx, "健康检查失败,服务[%s]连接可能不健康: %v", serviceName, err)
return false
}
// isMethodNotFoundError 判断是否是方法未找到错误
func isMethodNotFoundError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
// rpcx 方法不存在的常见错误信息
return strings.Contains(errStr, "not found") ||
strings.Contains(errStr, "no such") ||
strings.Contains(errStr, "service not found") ||
strings.Contains(errStr, "method not found")
}
// isServiceNotFoundError 判断是否是服务未找到错误
func isServiceNotFoundError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
return strings.Contains(errStr, "no service") ||
strings.Contains(errStr, "service not registered")
}
// getOrCreateClient 从连接池获取或创建客户端(带连接池)
func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, error) {
if g.IsEmpty(serviceName) {
return nil, errors.New("服务名称不能为空")
}
// 先尝试从连接池获取
poolMutex.RLock()
client, exists := clientPool[serviceName]
poolMutex.RUnlock()
// 如果存在且健康,直接返回
if exists && isClientHealthy(ctx, client, serviceName) {
g.Log().Debugf(ctx, "从连接池获取rpcx客户端[%s]", serviceName)
return client, nil
}
// 不存在或不健康,重新创建
poolMutex.Lock()
defer poolMutex.Unlock()
// 双重检查,防止并发时重复创建
if client, exists := clientPool[serviceName]; exists && isClientHealthy(ctx, client, serviceName) {
return client, nil
}
// 获取服务实例地址
addr, err := consul.GetInstanceAddr(ctx, serviceName)
if err != nil {
g.Log().Errorf(ctx, "从consul获取服务[%s]地址失败: %v", serviceName, err)
return nil, err
}
g.Log().Debugf(ctx, "服务[%s]地址: %s", serviceName, addr)
// 缓存服务地址,用于健康检查时对比
serviceAddrCache[serviceName] = addr
// 创建服务发现
discovery, err := rpcxClient.NewPeer2PeerDiscovery("tcp@"+addr, "")
if err != nil {
g.Log().Errorf(ctx, "创建服务发现失败: %v", err)
return nil, err
}
// 创建新客户端
newClient := rpcxClient.NewOneClient(
rpcxClient.Failtry,
rpcxClient.RandomSelect,
discovery,
rpcxClient.DefaultOption,
)
newClient.SetPlugins(pluginsContainer)
// 更新连接池
if oldClient, ok := clientPool[serviceName]; ok && oldClient != nil {
oldClient.Close()
}
clientPool[serviceName] = newClient
lastHealthCheckTime[serviceName] = time.Now()
g.Log().Infof(ctx, "rpcx客户端[%s]创建并加入连接池", serviceName)
return newClient, nil
}
// Call 调用rpcx服务方法
// serviceName: 服务名称
// serviceMethod: 服务方法
// args: 请求参数
// reply: 响应结果
func Call(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}) error {
// 从连接池获取客户端(不再关闭连接)
client, err := getOrCreateClient(ctx, serviceName)
if err != nil {
g.Log().Errorf(ctx, "获取rpcx客户端失败: %v", err)
return err
}
// 设置超时
callCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// 调用服务方法
err = client.Call(callCtx, serviceName, serviceMethod, args, reply)
if err != nil {
g.Log().Errorf(ctx, "调用服务[%s]方法[%s]失败: %v", serviceName, serviceMethod, err)
// 如果调用失败,检查连接是否需要重新创建
poolMutex.Lock()
if pooledClient, ok := clientPool[serviceName]; ok && pooledClient == client {
// 标记为不健康,下次请求时会重新创建
delete(lastHealthCheckTime, serviceName)
}
poolMutex.Unlock()
return err
}
return nil
}
// Close 关闭指定服务的连接(用于清理连接池)
func Close(serviceName string) {
poolMutex.Lock()
defer poolMutex.Unlock()
if client, ok := clientPool[serviceName]; ok {
client.Close()
delete(clientPool, serviceName)
delete(lastHealthCheckTime, serviceName)
delete(serviceAddrCache, serviceName)
g.Log().Infof(context.Background(), "rpcx客户端[%s]已从连接池移除", serviceName)
}
}
// CloseAll 关闭所有连接(用于优雅停机)
func CloseAll() {
poolMutex.Lock()
defer poolMutex.Unlock()
for serviceName, client := range clientPool {
client.Close()
g.Log().Infof(context.Background(), "rpcx客户端[%s]已关闭", serviceName)
}
clientPool = make(map[string]*rpcxClient.OneClient)
lastHealthCheckTime = make(map[string]time.Time)
serviceAddrCache = make(map[string]string)
}
// TracingPlugin rpcx链路追踪插件
// 实现 rpcx 的 PreCallPlugin 和 PostCallPlugin 接口
type TracingPlugin struct{}
// PreCall 调用前拦截 - 创建jaeger span
func (p *TracingPlugin) PreCall(ctx context.Context, serviceName, serviceMethod string, args interface{}) (err error) {
// 创建span名称格式: ServiceName.Method
spanName := serviceName + "." + serviceMethod
ctx, span := jaeger.NewSpan(ctx, spanName)
// 记录服务和方法信息
span.SetAttributes(
attribute.String("rpc.service", serviceName),
attribute.String("rpc.method", serviceMethod),
attribute.String("rpc.system", "rpcx"),
)
var data []byte
// 记录请求参数序列化为JSON
if args != nil {
if data, err = json.Marshal(args); err == nil {
argsStr := string(data)
// 限制长度,避免过大
if len(argsStr) > 2000 {
argsStr = argsStr[:2000] + "... (truncated)"
}
span.SetAttributes(attribute.String("rpc.request", argsStr))
}
}
g.Log().Debugf(ctx, "[rpcx] 调用开始: %s.%s", serviceName, serviceMethod)
return
}
// PostCall 调用后拦截 - 记录结果和错误
func (p *TracingPlugin) PostCall(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}, err error) error {
span := trace.SpanFromContext(ctx)
if span != nil && span.IsRecording() {
defer span.End()
// 记录响应结果
if reply != nil {
if data, err := json.Marshal(reply); err == nil {
replyStr := string(data)
// 限制长度,避免过大
if len(replyStr) > 2000 {
replyStr = replyStr[:2000] + "... (truncated)"
}
span.SetAttributes(attribute.String("rpc.response", replyStr))
}
}
// 处理错误
if err != nil {
jaeger.RecordError(ctx, err, "rpcx调用失败")
span.SetStatus(codes.Error, err.Error())
g.Log().Errorf(ctx, "[rpcx] 调用失败: %s.%s, 错误: %v", serviceName, serviceMethod, err)
} else {
g.Log().Debugf(ctx, "[rpcx] 调用成功: %s.%s", serviceName, serviceMethod)
}
}
return nil
}
//var (
// // pluginsContainer rpcx插件容器全局统一设置
// // init()中添加链路追踪插件所有client共用此容器
// pluginsContainer = rpcxClient.NewPluginContainer()
//
// // clientPool 连接池缓存key为服务名value为客户端实例
// clientPool = make(map[string]*rpcxClient.OneClient)
//
// // poolMutex 连接池锁
// poolMutex sync.RWMutex
//
// // healthCheckInterval 健康检查间隔(秒)
// healthCheckInterval = 30
//
// // lastHealthCheckTime 上次健康检查时间key为服务名
// lastHealthCheckTime = make(map[string]time.Time)
//
// // serviceAddrCache 服务地址缓存key为服务名value为地址
// serviceAddrCache = make(map[string]string)
//)
//
//func init() {
// // 全局设置链路追踪插件所有client共用
// pluginsContainer.Add(&TracingPlugin{})
//
// // 启动后台健康检查协程
// go healthCheckLoop()
//}
//
//// healthCheckLoop 后台健康检查循环
//func healthCheckLoop() {
// ticker := time.NewTicker(time.Duration(healthCheckInterval) * time.Second)
// defer ticker.Stop()
//
// for range ticker.C {
// checkAllConnections()
// }
//}
//
//// checkAllConnections 检查所有缓存连接的健康状态
//func checkAllConnections() {
// poolMutex.Lock()
// defer poolMutex.Unlock()
//
// now := time.Now()
// for serviceName, client := range clientPool {
// // 检查连接是否需要健康检查
// if lastCheck, ok := lastHealthCheckTime[serviceName]; ok {
// if now.Sub(lastCheck) < time.Duration(healthCheckInterval)*time.Second {
// continue
// }
// }
//
// ctx := context.Background()
//
// // 检查连接健康状态(心跳检测)
// if !isClientHealthy(ctx, client, serviceName) {
// g.Log().Warningf(ctx, "检测到服务[%s]连接不健康,将从连接池移除", serviceName)
// client.Close()
// delete(clientPool, serviceName)
// delete(lastHealthCheckTime, serviceName)
// delete(serviceAddrCache, serviceName)
// continue
// }
//
// // 连接健康,检查服务地址是否发生变化
// currentAddr, err := consul.GetInstanceAddr(ctx, serviceName)
// if err != nil {
// g.Log().Warningf(ctx, "健康检查时从consul获取服务[%s]地址失败: %v保持现有连接", serviceName, err)
// lastHealthCheckTime[serviceName] = now
// continue
// }
//
// // 检查地址是否发生变化
// if oldAddr, ok := serviceAddrCache[serviceName]; ok && oldAddr != currentAddr {
// g.Log().Infof(ctx, "检测到服务[%s]地址变化: %s -> %s重建连接", serviceName, oldAddr, currentAddr)
// // 关闭旧连接并从连接池移除,下次请求时会创建新连接
// client.Close()
// delete(clientPool, serviceName)
// delete(lastHealthCheckTime, serviceName)
// // 更新缓存的新地址
// serviceAddrCache[serviceName] = currentAddr
// } else {
// // 地址未变化,更新检查时间
// if !ok {
// serviceAddrCache[serviceName] = currentAddr
// }
// g.Log().Debugf(ctx, "服务[%s]地址未变化,连接健康", serviceName)
// }
//
// lastHealthCheckTime[serviceName] = now
// }
//}
//
//// isClientHealthy 检查client是否健康
//// 使用心跳检测方式:尝试调用服务的心跳方法
//func isClientHealthy(ctx context.Context, client *rpcxClient.OneClient, serviceName string) bool {
// if client == nil {
// return false
// }
//
// // 设置较短的超时时间,避免阻塞
// pingCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
// defer cancel()
//
// // 尝试调用健康检查方法
// // 大多数服务都会提供 Ping 或 Health 方法
// // 如果服务没有提供这些方法,会返回错误,我们认为是健康的
// // 因为连接本身是正常的,只是方法不存在
// var reply interface{}
// err := client.Call(pingCtx, serviceName, "Ping", nil, &reply)
//
// // 如果调用成功,连接肯定健康
// if err == nil {
// return true
// }
//
// // 如果是方法不存在的错误说明连接是健康的只是服务没有Ping方法
// // 这种情况下我们认为是健康的
// if isMethodNotFoundError(err) || isServiceNotFoundError(err) {
// return true
// }
//
// // 其他错误(网络错误、超时等)说明连接不健康
// g.Log().Warningf(ctx, "健康检查失败,服务[%s]连接可能不健康: %v", serviceName, err)
// return false
//}
//
//// isMethodNotFoundError 判断是否是方法未找到错误
//func isMethodNotFoundError(err error) bool {
// if err == nil {
// return false
// }
// errStr := err.Error()
// // rpcx 方法不存在的常见错误信息
// return strings.Contains(errStr, "not found") ||
// strings.Contains(errStr, "no such") ||
// strings.Contains(errStr, "service not found") ||
// strings.Contains(errStr, "method not found")
//}
//
//// isServiceNotFoundError 判断是否是服务未找到错误
//func isServiceNotFoundError(err error) bool {
// if err == nil {
// return false
// }
// errStr := err.Error()
// return strings.Contains(errStr, "no service") ||
// strings.Contains(errStr, "service not registered")
//}
//
//// getOrCreateClient 从连接池获取或创建客户端(带连接池)
//func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, error) {
// if g.IsEmpty(serviceName) {
// return nil, errors.New("服务名称不能为空")
// }
//
// // 先尝试从连接池获取
// poolMutex.RLock()
// client, exists := clientPool[serviceName]
// poolMutex.RUnlock()
//
// // 如果存在且健康,直接返回
// if exists && isClientHealthy(ctx, client, serviceName) {
// g.Log().Debugf(ctx, "从连接池获取rpcx客户端[%s]", serviceName)
// return client, nil
// }
//
// // 不存在或不健康,重新创建
// poolMutex.Lock()
// defer poolMutex.Unlock()
//
// // 双重检查,防止并发时重复创建
// if client, exists := clientPool[serviceName]; exists && isClientHealthy(ctx, client, serviceName) {
// return client, nil
// }
//
// // 获取服务实例地址
// addr, err := consul.GetInstanceAddr(ctx, serviceName)
// if err != nil {
// g.Log().Errorf(ctx, "从consul获取服务[%s]地址失败: %v", serviceName, err)
// return nil, err
// }
//
// g.Log().Debugf(ctx, "服务[%s]地址: %s", serviceName, addr)
//
// // 缓存服务地址,用于健康检查时对比
// serviceAddrCache[serviceName] = addr
//
// // 创建服务发现
// discovery, err := rpcxClient.NewPeer2PeerDiscovery("tcp@"+addr, "")
// if err != nil {
// g.Log().Errorf(ctx, "创建服务发现失败: %v", err)
// return nil, err
// }
//
// // 创建新客户端
// newClient := rpcxClient.NewOneClient(
// rpcxClient.Failtry,
// rpcxClient.RandomSelect,
// discovery,
// rpcxClient.DefaultOption,
// )
// newClient.SetPlugins(pluginsContainer)
//
// // 更新连接池
// if oldClient, ok := clientPool[serviceName]; ok && oldClient != nil {
// oldClient.Close()
// }
// clientPool[serviceName] = newClient
// lastHealthCheckTime[serviceName] = time.Now()
//
// g.Log().Infof(ctx, "rpcx客户端[%s]创建并加入连接池", serviceName)
//
// return newClient, nil
//}
//
//// Call 调用rpcx服务方法
//// serviceName: 服务名称
//// serviceMethod: 服务方法
//// args: 请求参数
//// reply: 响应结果
//func Call(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}) error {
// // 从连接池获取客户端(不再关闭连接)
// client, err := getOrCreateClient(ctx, serviceName)
// if err != nil {
// g.Log().Errorf(ctx, "获取rpcx客户端失败: %v", err)
// return err
// }
//
// // 设置超时
// callCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
// defer cancel()
//
// // 调用服务方法
// err = client.Call(callCtx, serviceName, serviceMethod, args, reply)
// if err != nil {
// g.Log().Errorf(ctx, "调用服务[%s]方法[%s]失败: %v", serviceName, serviceMethod, err)
//
// // 如果调用失败,检查连接是否需要重新创建
// poolMutex.Lock()
// if pooledClient, ok := clientPool[serviceName]; ok && pooledClient == client {
// // 标记为不健康,下次请求时会重新创建
// delete(lastHealthCheckTime, serviceName)
// }
// poolMutex.Unlock()
//
// return err
// }
//
// return nil
//}
//
//// Close 关闭指定服务的连接(用于清理连接池)
//func Close(serviceName string) {
// poolMutex.Lock()
// defer poolMutex.Unlock()
//
// if client, ok := clientPool[serviceName]; ok {
// client.Close()
// delete(clientPool, serviceName)
// delete(lastHealthCheckTime, serviceName)
// delete(serviceAddrCache, serviceName)
// g.Log().Infof(context.Background(), "rpcx客户端[%s]已从连接池移除", serviceName)
// }
//}
//
//// CloseAll 关闭所有连接(用于优雅停机)
//func CloseAll() {
// poolMutex.Lock()
// defer poolMutex.Unlock()
//
// for serviceName, client := range clientPool {
// client.Close()
// g.Log().Infof(context.Background(), "rpcx客户端[%s]已关闭", serviceName)
// }
// clientPool = make(map[string]*rpcxClient.OneClient)
// lastHealthCheckTime = make(map[string]time.Time)
// serviceAddrCache = make(map[string]string)
//}
//
//// TracingPlugin rpcx链路追踪插件
//// 实现 rpcx 的 PreCallPlugin 和 PostCallPlugin 接口
//type TracingPlugin struct{}
//
//// PreCall 调用前拦截 - 创建jaeger span
//func (p *TracingPlugin) PreCall(ctx context.Context, serviceName, serviceMethod string, args interface{}) (err error) {
// // 创建span名称格式: ServiceName.Method
// spanName := serviceName + "." + serviceMethod
// ctx, span := jaeger.NewSpan(ctx, spanName)
//
// // 记录服务和方法信息
// span.SetAttributes(
// attribute.String("rpc.service", serviceName),
// attribute.String("rpc.method", serviceMethod),
// attribute.String("rpc.system", "rpcx"),
// )
// var data []byte
// // 记录请求参数序列化为JSON
// if args != nil {
// if data, err = json.Marshal(args); err == nil {
// argsStr := string(data)
// // 限制长度,避免过大
// if len(argsStr) > 2000 {
// argsStr = argsStr[:2000] + "... (truncated)"
// }
// span.SetAttributes(attribute.String("rpc.request", argsStr))
// }
// }
//
// g.Log().Debugf(ctx, "[rpcx] 调用开始: %s.%s", serviceName, serviceMethod)
//
// return
//}
//
//// PostCall 调用后拦截 - 记录结果和错误
//func (p *TracingPlugin) PostCall(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}, err error) error {
// span := trace.SpanFromContext(ctx)
// if span != nil && span.IsRecording() {
// defer span.End()
//
// // 记录响应结果
// if reply != nil {
// if data, err := json.Marshal(reply); err == nil {
// replyStr := string(data)
// // 限制长度,避免过大
// if len(replyStr) > 2000 {
// replyStr = replyStr[:2000] + "... (truncated)"
// }
// span.SetAttributes(attribute.String("rpc.response", replyStr))
// }
// }
//
// // 处理错误
// if err != nil {
// jaeger.RecordError(ctx, err, "rpcx调用失败")
// span.SetStatus(codes.Error, err.Error())
// g.Log().Errorf(ctx, "[rpcx] 调用失败: %s.%s, 错误: %v", serviceName, serviceMethod, err)
// } else {
// g.Log().Debugf(ctx, "[rpcx] 调用成功: %s.%s", serviceName, serviceMethod)
// }
// }
//
// return nil
//}