package rpc import ( "context" "encoding/json" "errors" "sync" "time" "gitea.com/red-future/common/consul" "gitea.com/red-future/common/jaeger" "github.com/gogf/gf/v2/frame/g" rpcxClient "github.com/smallnest/rpcx/client" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" ) var ( // pluginsContainer rpcx插件容器(全局统一设置) // init()中添加链路追踪插件,所有client共用此容器 pluginsContainer = rpcxClient.NewPluginContainer() // clientPool 连接池缓存,key为服务名,value为客户端实例 clientPool = make(map[string]*rpcxClient.OneClient) // poolMutex 连接池锁 poolMutex sync.RWMutex // healthCheckInterval 健康检查间隔(秒) healthCheckInterval = 30 // lastHealthCheckTime 上次健康检查时间,key为服务名 lastHealthCheckTime = make(map[string]time.Time) // serviceAddrCache 服务地址缓存,key为服务名,value为地址 serviceAddrCache = make(map[string]string) ) func init() { // 全局设置链路追踪插件,所有client共用 pluginsContainer.Add(&TracingPlugin{}) // 启动后台健康检查协程 go healthCheckLoop() } // healthCheckLoop 后台健康检查循环 func healthCheckLoop() { ticker := time.NewTicker(time.Duration(healthCheckInterval) * time.Second) defer ticker.Stop() for range ticker.C { checkAllConnections() } } // checkAllConnections 检查所有缓存连接的健康状态 func checkAllConnections() { poolMutex.Lock() defer poolMutex.Unlock() now := time.Now() for serviceName := range clientPool { // 检查连接是否需要健康检查 if lastCheck, ok := lastHealthCheckTime[serviceName]; ok { if now.Sub(lastCheck) < time.Duration(healthCheckInterval)*time.Second { continue } } // 从consul重新获取服务地址,检查是否发生变化 ctx := context.Background() currentAddr, err := consul.GetInstanceAddr(ctx, serviceName) if err != nil { g.Log().Warningf(ctx, "健康检查时从consul获取服务[%s]地址失败: %v,保持现有连接", serviceName, err) lastHealthCheckTime[serviceName] = now continue } // 检查地址是否发生变化 if oldAddr, ok := serviceAddrCache[serviceName]; ok && oldAddr != currentAddr { g.Log().Infof(ctx, "检测到服务[%s]地址变化: %s -> %s,重建连接", serviceName, oldAddr, currentAddr) // 关闭旧连接并从连接池移除,下次请求时会创建新连接 if client, exists := clientPool[serviceName]; exists { client.Close() delete(clientPool, serviceName) } // 更新缓存的新地址 serviceAddrCache[serviceName] = currentAddr } else { // 地址未变化,更新检查时间 if !ok { serviceAddrCache[serviceName] = currentAddr } g.Log().Debugf(ctx, "服务[%s]地址未变化,保持现有连接", serviceName) } lastHealthCheckTime[serviceName] = now } } // isClientHealthy 检查client是否健康(简化版) // 实际健康检查依赖调用失败时触发重建 func isClientHealthy(client *rpcxClient.OneClient) bool { // rpcx有内置的重连机制,我们信任client对象的有效性 // 只要client不为nil就认为是健康的 // 实际的错误会在调用时暴露,触发重新创建 return client != nil } // getOrCreateClient 从连接池获取或创建客户端(带连接池) func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, error) { if g.IsEmpty(serviceName) { return nil, errors.New("服务名称不能为空") } // 先尝试从连接池获取 poolMutex.RLock() client, exists := clientPool[serviceName] poolMutex.RUnlock() // 如果存在且健康,直接返回 if exists && isClientHealthy(client) { g.Log().Debugf(ctx, "从连接池获取rpcx客户端[%s]", serviceName) return client, nil } // 不存在或不健康,重新创建 poolMutex.Lock() defer poolMutex.Unlock() // 双重检查,防止并发时重复创建 if client, exists := clientPool[serviceName]; exists && isClientHealthy(client) { return client, nil } // 获取服务实例地址 addr, err := consul.GetInstanceAddr(ctx, serviceName) if err != nil { g.Log().Errorf(ctx, "从consul获取服务[%s]地址失败: %v", serviceName, err) return nil, err } g.Log().Debugf(ctx, "服务[%s]地址: %s", serviceName, addr) // 缓存服务地址,用于健康检查时对比 serviceAddrCache[serviceName] = addr // 创建服务发现 discovery, err := rpcxClient.NewPeer2PeerDiscovery("tcp@"+addr, "") if err != nil { g.Log().Errorf(ctx, "创建服务发现失败: %v", err) return nil, err } // 创建新客户端 newClient := rpcxClient.NewOneClient( rpcxClient.Failtry, rpcxClient.RandomSelect, discovery, rpcxClient.DefaultOption, ) newClient.SetPlugins(pluginsContainer) // 更新连接池 if oldClient, ok := clientPool[serviceName]; ok && oldClient != nil { oldClient.Close() } clientPool[serviceName] = newClient lastHealthCheckTime[serviceName] = time.Now() g.Log().Infof(ctx, "rpcx客户端[%s]创建并加入连接池", serviceName) return newClient, nil } // Call 调用rpcx服务方法 // serviceName: 服务名称 // serviceMethod: 服务方法 // args: 请求参数 // reply: 响应结果 func Call(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}) error { // 从连接池获取客户端(不再关闭连接) client, err := getOrCreateClient(ctx, serviceName) if err != nil { g.Log().Errorf(ctx, "获取rpcx客户端失败: %v", err) return err } // 设置超时 callCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() // 调用服务方法 err = client.Call(callCtx, serviceName, serviceMethod, args, reply) if err != nil { g.Log().Errorf(ctx, "调用服务[%s]方法[%s]失败: %v", serviceName, serviceMethod, err) // 如果调用失败,检查连接是否需要重新创建 poolMutex.Lock() if pooledClient, ok := clientPool[serviceName]; ok && pooledClient == client { // 标记为不健康,下次请求时会重新创建 delete(lastHealthCheckTime, serviceName) } poolMutex.Unlock() return err } return nil } // Close 关闭指定服务的连接(用于清理连接池) func Close(serviceName string) { poolMutex.Lock() defer poolMutex.Unlock() if client, ok := clientPool[serviceName]; ok { client.Close() delete(clientPool, serviceName) delete(lastHealthCheckTime, serviceName) delete(serviceAddrCache, serviceName) g.Log().Infof(context.Background(), "rpcx客户端[%s]已从连接池移除", serviceName) } } // CloseAll 关闭所有连接(用于优雅停机) func CloseAll() { poolMutex.Lock() defer poolMutex.Unlock() for serviceName, client := range clientPool { client.Close() g.Log().Infof(context.Background(), "rpcx客户端[%s]已关闭", serviceName) } clientPool = make(map[string]*rpcxClient.OneClient) lastHealthCheckTime = make(map[string]time.Time) serviceAddrCache = make(map[string]string) } // TracingPlugin rpcx链路追踪插件 // 实现 rpcx 的 PreCallPlugin 和 PostCallPlugin 接口 type TracingPlugin struct{} // PreCall 调用前拦截 - 创建jaeger span func (p *TracingPlugin) PreCall(ctx context.Context, serviceName, serviceMethod string, args interface{}) (err error) { // 创建span,名称格式: ServiceName.Method spanName := serviceName + "." + serviceMethod ctx, span := jaeger.NewSpan(ctx, spanName) // 记录服务和方法信息 span.SetAttributes( attribute.String("rpc.service", serviceName), attribute.String("rpc.method", serviceMethod), attribute.String("rpc.system", "rpcx"), ) var data []byte // 记录请求参数(序列化为JSON) if args != nil { if data, err = json.Marshal(args); err == nil { argsStr := string(data) // 限制长度,避免过大 if len(argsStr) > 2000 { argsStr = argsStr[:2000] + "... (truncated)" } span.SetAttributes(attribute.String("rpc.request", argsStr)) } } g.Log().Debugf(ctx, "[rpcx] 调用开始: %s.%s", serviceName, serviceMethod) return } // PostCall 调用后拦截 - 记录结果和错误 func (p *TracingPlugin) PostCall(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}, err error) error { span := trace.SpanFromContext(ctx) if span != nil && span.IsRecording() { defer span.End() // 记录响应结果 if reply != nil { if data, err := json.Marshal(reply); err == nil { replyStr := string(data) // 限制长度,避免过大 if len(replyStr) > 2000 { replyStr = replyStr[:2000] + "... (truncated)" } span.SetAttributes(attribute.String("rpc.response", replyStr)) } } // 处理错误 if err != nil { jaeger.RecordError(ctx, err, "rpcx调用失败") span.SetStatus(codes.Error, err.Error()) g.Log().Errorf(ctx, "[rpcx] 调用失败: %s.%s, 错误: %v", serviceName, serviceMethod, err) } else { g.Log().Debugf(ctx, "[rpcx] 调用成功: %s.%s", serviceName, serviceMethod) } } return nil }