From 6351655ced233cc337e5e6687c3750e27fa063f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=96=8C?= <259278618@qq.com> Date: Tue, 3 Mar 2026 16:51:01 +0800 Subject: [PATCH] .gitignore --- rpc/rpcx.go | 148 ++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 132 insertions(+), 16 deletions(-) diff --git a/rpc/rpcx.go b/rpc/rpcx.go index 226a8df..6412391 100644 --- a/rpc/rpcx.go +++ b/rpc/rpcx.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "sync" "time" "gitea.com/red-future/common/consul" @@ -12,30 +13,104 @@ import ( rpcxClient "github.com/smallnest/rpcx/client" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" - trace "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace" ) var ( // pluginsContainer rpcx插件容器(全局统一设置) // init()中添加链路追踪插件,所有client共用此容器 pluginsContainer = rpcxClient.NewPluginContainer() + + // clientPool 连接池缓存,key为服务名,value为客户端实例 + clientPool = make(map[string]*rpcxClient.OneClient) + + // poolMutex 连接池锁 + poolMutex sync.RWMutex + + // healthCheckInterval 健康检查间隔(秒) + healthCheckInterval = 30 + + // lastHealthCheckTime 上次健康检查时间,key为服务名 + lastHealthCheckTime = make(map[string]time.Time) ) func init() { // 全局设置链路追踪插件,所有client共用 pluginsContainer.Add(&TracingPlugin{}) + + // 启动后台健康检查协程 + go healthCheckLoop() } -// NewXClient 创建rpcx客户端 -// serviceName: 服务名称 -// 通过consul发现服务实例,并返回rpcx客户端 -func newXClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, error) { +// healthCheckLoop 后台健康检查循环 +func healthCheckLoop() { + ticker := time.NewTicker(time.Duration(healthCheckInterval) * time.Second) + defer ticker.Stop() + + for range ticker.C { + checkAllConnections() + } +} + +// checkAllConnections 检查所有缓存连接的健康状态 +func checkAllConnections() { + poolMutex.Lock() + defer poolMutex.Unlock() + + now := time.Now() + for serviceName := range clientPool { + // 检查连接是否需要健康检查 + if lastCheck, ok := lastHealthCheckTime[serviceName]; ok { + if now.Sub(lastCheck) < time.Duration(healthCheckInterval)*time.Second { + continue + } + } + + // 简单的健康检查策略: + // 1. 定期从consul重新获取服务地址,如果地址变化说明服务可能迁移了 + // 2. 下次调用失败时会触发重新创建连接 + // 3. 不主动断开连接,依赖实际调用的错误来触发重建 + lastHealthCheckTime[serviceName] = now + g.Log().Debugf(context.Background(), "服务[%s]连接健康检查完成", serviceName) + } +} + +// isClientHealthy 检查client是否健康(简化版) +// 实际健康检查依赖调用失败时触发重建 +func isClientHealthy(client *rpcxClient.OneClient) bool { + // rpcx有内置的重连机制,我们信任client对象的有效性 + // 只要client不为nil就认为是健康的 + // 实际的错误会在调用时暴露,触发重新创建 + return client != nil +} + +// getOrCreateClient 从连接池获取或创建客户端(带连接池) +func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, error) { if g.IsEmpty(serviceName) { return nil, errors.New("服务名称不能为空") } - // 使用consul.GetInstanceAddr获取服务实例地址 - // 每次都重新获取,确保使用最新的服务地址(支持服务重启、迁移等场景) + // 先尝试从连接池获取 + poolMutex.RLock() + client, exists := clientPool[serviceName] + poolMutex.RUnlock() + + // 如果存在且健康,直接返回 + if exists && isClientHealthy(client) { + g.Log().Debugf(ctx, "从连接池获取rpcx客户端[%s]", serviceName) + return client, nil + } + + // 不存在或不健康,重新创建 + poolMutex.Lock() + defer poolMutex.Unlock() + + // 双重检查,防止并发时重复创建 + if client, exists := clientPool[serviceName]; exists && isClientHealthy(client) { + return client, nil + } + + // 获取服务实例地址 addr, err := consul.GetInstanceAddr(ctx, serviceName) if err != nil { g.Log().Errorf(ctx, "从consul获取服务[%s]地址失败: %v", serviceName, err) @@ -44,27 +119,32 @@ func newXClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, g.Log().Debugf(ctx, "服务[%s]地址: %s", serviceName, addr) - // 使用Peer2PeerDiscovery直接连接指定服务 + // 创建服务发现 discovery, err := rpcxClient.NewPeer2PeerDiscovery("tcp@"+addr, "") if err != nil { g.Log().Errorf(ctx, "创建服务发现失败: %v", err) return nil, err } - // 使用OneClient,因为是单点连接 - client := rpcxClient.NewOneClient( + // 创建新客户端 + newClient := rpcxClient.NewOneClient( rpcxClient.Failtry, rpcxClient.RandomSelect, discovery, rpcxClient.DefaultOption, ) + newClient.SetPlugins(pluginsContainer) - // 设置插件(使用全局统一的pluginsContainer) - client.SetPlugins(pluginsContainer) + // 更新连接池 + if oldClient, ok := clientPool[serviceName]; ok && oldClient != nil { + oldClient.Close() + } + clientPool[serviceName] = newClient + lastHealthCheckTime[serviceName] = time.Now() - g.Log().Infof(ctx, "rpcx客户端[%s]创建成功", serviceName) + g.Log().Infof(ctx, "rpcx客户端[%s]创建并加入连接池", serviceName) - return client, nil + return newClient, nil } // Call 调用rpcx服务方法 @@ -73,26 +153,62 @@ func newXClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, // args: 请求参数 // reply: 响应结果 func Call(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}) error { - client, err := newXClient(ctx, serviceName) + // 从连接池获取客户端(不再关闭连接) + client, err := getOrCreateClient(ctx, serviceName) if err != nil { g.Log().Errorf(ctx, "获取rpcx客户端失败: %v", err) return err } - defer client.Close() // 设置超时 callCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() + // 调用服务方法 err = client.Call(callCtx, serviceName, serviceMethod, args, reply) if err != nil { g.Log().Errorf(ctx, "调用服务[%s]方法[%s]失败: %v", serviceName, serviceMethod, err) + + // 如果调用失败,检查连接是否需要重新创建 + poolMutex.Lock() + if pooledClient, ok := clientPool[serviceName]; ok && pooledClient == client { + // 标记为不健康,下次请求时会重新创建 + delete(lastHealthCheckTime, serviceName) + } + poolMutex.Unlock() + return err } return nil } +// Close 关闭指定服务的连接(用于清理连接池) +func Close(serviceName string) { + poolMutex.Lock() + defer poolMutex.Unlock() + + if client, ok := clientPool[serviceName]; ok { + client.Close() + delete(clientPool, serviceName) + delete(lastHealthCheckTime, serviceName) + g.Log().Infof(context.Background(), "rpcx客户端[%s]已从连接池移除", serviceName) + } +} + +// CloseAll 关闭所有连接(用于优雅停机) +func CloseAll() { + poolMutex.Lock() + defer poolMutex.Unlock() + + for serviceName, client := range clientPool { + client.Close() + g.Log().Infof(context.Background(), "rpcx客户端[%s]已关闭", serviceName) + } + clientPool = make(map[string]*rpcxClient.OneClient) + lastHealthCheckTime = make(map[string]time.Time) +} + // TracingPlugin rpcx链路追踪插件 // 实现 rpcx 的 PreCallPlugin 和 PostCallPlugin 接口 type TracingPlugin struct{}