Files
common/rpc/rpcx.go
2026-03-12 08:51:59 +08:00

365 lines
11 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package rpc
import (
"context"
"encoding/json"
"errors"
"strings"
"sync"
"time"
"gitea.com/red-future/common/consul"
"gitea.com/red-future/common/jaeger"
"github.com/gogf/gf/v2/frame/g"
rpcxClient "github.com/smallnest/rpcx/client"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
var (
// pluginsContainer rpcx插件容器全局统一设置
// init()中添加链路追踪插件所有client共用此容器
pluginsContainer = rpcxClient.NewPluginContainer()
// clientPool 连接池缓存key为服务名value为客户端实例
clientPool = make(map[string]*rpcxClient.OneClient)
// poolMutex 连接池锁
poolMutex sync.RWMutex
// healthCheckInterval 健康检查间隔(秒)
healthCheckInterval = 30
// lastHealthCheckTime 上次健康检查时间key为服务名
lastHealthCheckTime = make(map[string]time.Time)
// serviceAddrCache 服务地址缓存key为服务名value为地址
serviceAddrCache = make(map[string]string)
)
func init() {
// 全局设置链路追踪插件所有client共用
pluginsContainer.Add(&TracingPlugin{})
// 启动后台健康检查协程
go healthCheckLoop()
}
// healthCheckLoop 后台健康检查循环
func healthCheckLoop() {
ticker := time.NewTicker(time.Duration(healthCheckInterval) * time.Second)
defer ticker.Stop()
for range ticker.C {
checkAllConnections()
}
}
// checkAllConnections 检查所有缓存连接的健康状态
func checkAllConnections() {
poolMutex.Lock()
defer poolMutex.Unlock()
now := time.Now()
for serviceName, client := range clientPool {
// 检查连接是否需要健康检查
if lastCheck, ok := lastHealthCheckTime[serviceName]; ok {
if now.Sub(lastCheck) < time.Duration(healthCheckInterval)*time.Second {
continue
}
}
ctx := context.Background()
// 检查连接健康状态(心跳检测)
if !isClientHealthy(ctx, client, serviceName) {
g.Log().Warningf(ctx, "检测到服务[%s]连接不健康,将从连接池移除", serviceName)
client.Close()
delete(clientPool, serviceName)
delete(lastHealthCheckTime, serviceName)
delete(serviceAddrCache, serviceName)
continue
}
// 连接健康,检查服务地址是否发生变化
currentAddr, err := consul.GetInstanceAddr(ctx, serviceName)
if err != nil {
g.Log().Warningf(ctx, "健康检查时从consul获取服务[%s]地址失败: %v保持现有连接", serviceName, err)
lastHealthCheckTime[serviceName] = now
continue
}
// 检查地址是否发生变化
if oldAddr, ok := serviceAddrCache[serviceName]; ok && oldAddr != currentAddr {
g.Log().Infof(ctx, "检测到服务[%s]地址变化: %s -> %s重建连接", serviceName, oldAddr, currentAddr)
// 关闭旧连接并从连接池移除,下次请求时会创建新连接
client.Close()
delete(clientPool, serviceName)
delete(lastHealthCheckTime, serviceName)
// 更新缓存的新地址
serviceAddrCache[serviceName] = currentAddr
} else {
// 地址未变化,更新检查时间
if !ok {
serviceAddrCache[serviceName] = currentAddr
}
g.Log().Debugf(ctx, "服务[%s]地址未变化,连接健康", serviceName)
}
lastHealthCheckTime[serviceName] = now
}
}
// isClientHealthy 检查client是否健康
// 使用心跳检测方式:尝试调用服务的心跳方法
func isClientHealthy(ctx context.Context, client *rpcxClient.OneClient, serviceName string) bool {
if client == nil {
return false
}
// 设置较短的超时时间,避免阻塞
pingCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
// 尝试调用健康检查方法
// 大多数服务都会提供 Ping 或 Health 方法
// 如果服务没有提供这些方法,会返回错误,我们认为是健康的
// 因为连接本身是正常的,只是方法不存在
var reply interface{}
err := client.Call(pingCtx, serviceName, "Ping", nil, &reply)
// 如果调用成功,连接肯定健康
if err == nil {
return true
}
// 如果是方法不存在的错误说明连接是健康的只是服务没有Ping方法
// 这种情况下我们认为是健康的
if isMethodNotFoundError(err) || isServiceNotFoundError(err) {
return true
}
// 其他错误(网络错误、超时等)说明连接不健康
g.Log().Warningf(ctx, "健康检查失败,服务[%s]连接可能不健康: %v", serviceName, err)
return false
}
// isMethodNotFoundError 判断是否是方法未找到错误
func isMethodNotFoundError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
// rpcx 方法不存在的常见错误信息
return strings.Contains(errStr, "not found") ||
strings.Contains(errStr, "no such") ||
strings.Contains(errStr, "service not found") ||
strings.Contains(errStr, "method not found")
}
// isServiceNotFoundError 判断是否是服务未找到错误
func isServiceNotFoundError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
return strings.Contains(errStr, "no service") ||
strings.Contains(errStr, "service not registered")
}
// getOrCreateClient 从连接池获取或创建客户端(带连接池)
func getOrCreateClient(ctx context.Context, serviceName string) (*rpcxClient.OneClient, error) {
if g.IsEmpty(serviceName) {
return nil, errors.New("服务名称不能为空")
}
// 先尝试从连接池获取
poolMutex.RLock()
client, exists := clientPool[serviceName]
poolMutex.RUnlock()
// 如果存在且健康,直接返回
if exists && isClientHealthy(ctx, client, serviceName) {
g.Log().Debugf(ctx, "从连接池获取rpcx客户端[%s]", serviceName)
return client, nil
}
// 不存在或不健康,重新创建
poolMutex.Lock()
defer poolMutex.Unlock()
// 双重检查,防止并发时重复创建
if client, exists := clientPool[serviceName]; exists && isClientHealthy(ctx, client, serviceName) {
return client, nil
}
// 获取服务实例地址
addr, err := consul.GetInstanceAddr(ctx, serviceName)
if err != nil {
g.Log().Errorf(ctx, "从consul获取服务[%s]地址失败: %v", serviceName, err)
return nil, err
}
g.Log().Debugf(ctx, "服务[%s]地址: %s", serviceName, addr)
// 缓存服务地址,用于健康检查时对比
serviceAddrCache[serviceName] = addr
// 创建服务发现
discovery, err := rpcxClient.NewPeer2PeerDiscovery("tcp@"+addr, "")
if err != nil {
g.Log().Errorf(ctx, "创建服务发现失败: %v", err)
return nil, err
}
// 创建新客户端
newClient := rpcxClient.NewOneClient(
rpcxClient.Failtry,
rpcxClient.RandomSelect,
discovery,
rpcxClient.DefaultOption,
)
newClient.SetPlugins(pluginsContainer)
// 更新连接池
if oldClient, ok := clientPool[serviceName]; ok && oldClient != nil {
oldClient.Close()
}
clientPool[serviceName] = newClient
lastHealthCheckTime[serviceName] = time.Now()
g.Log().Infof(ctx, "rpcx客户端[%s]创建并加入连接池", serviceName)
return newClient, nil
}
// Call 调用rpcx服务方法
// serviceName: 服务名称
// serviceMethod: 服务方法
// args: 请求参数
// reply: 响应结果
func Call(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}) error {
// 从连接池获取客户端(不再关闭连接)
client, err := getOrCreateClient(ctx, serviceName)
if err != nil {
g.Log().Errorf(ctx, "获取rpcx客户端失败: %v", err)
return err
}
// 设置超时
callCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
// 调用服务方法
err = client.Call(callCtx, serviceName, serviceMethod, args, reply)
if err != nil {
g.Log().Errorf(ctx, "调用服务[%s]方法[%s]失败: %v", serviceName, serviceMethod, err)
// 如果调用失败,检查连接是否需要重新创建
poolMutex.Lock()
if pooledClient, ok := clientPool[serviceName]; ok && pooledClient == client {
// 标记为不健康,下次请求时会重新创建
delete(lastHealthCheckTime, serviceName)
}
poolMutex.Unlock()
return err
}
return nil
}
// Close 关闭指定服务的连接(用于清理连接池)
func Close(serviceName string) {
poolMutex.Lock()
defer poolMutex.Unlock()
if client, ok := clientPool[serviceName]; ok {
client.Close()
delete(clientPool, serviceName)
delete(lastHealthCheckTime, serviceName)
delete(serviceAddrCache, serviceName)
g.Log().Infof(context.Background(), "rpcx客户端[%s]已从连接池移除", serviceName)
}
}
// CloseAll 关闭所有连接(用于优雅停机)
func CloseAll() {
poolMutex.Lock()
defer poolMutex.Unlock()
for serviceName, client := range clientPool {
client.Close()
g.Log().Infof(context.Background(), "rpcx客户端[%s]已关闭", serviceName)
}
clientPool = make(map[string]*rpcxClient.OneClient)
lastHealthCheckTime = make(map[string]time.Time)
serviceAddrCache = make(map[string]string)
}
// TracingPlugin rpcx链路追踪插件
// 实现 rpcx 的 PreCallPlugin 和 PostCallPlugin 接口
type TracingPlugin struct{}
// PreCall 调用前拦截 - 创建jaeger span
func (p *TracingPlugin) PreCall(ctx context.Context, serviceName, serviceMethod string, args interface{}) (err error) {
// 创建span名称格式: ServiceName.Method
spanName := serviceName + "." + serviceMethod
ctx, span := jaeger.NewSpan(ctx, spanName)
// 记录服务和方法信息
span.SetAttributes(
attribute.String("rpc.service", serviceName),
attribute.String("rpc.method", serviceMethod),
attribute.String("rpc.system", "rpcx"),
)
var data []byte
// 记录请求参数序列化为JSON
if args != nil {
if data, err = json.Marshal(args); err == nil {
argsStr := string(data)
// 限制长度,避免过大
if len(argsStr) > 2000 {
argsStr = argsStr[:2000] + "... (truncated)"
}
span.SetAttributes(attribute.String("rpc.request", argsStr))
}
}
g.Log().Debugf(ctx, "[rpcx] 调用开始: %s.%s", serviceName, serviceMethod)
return
}
// PostCall 调用后拦截 - 记录结果和错误
func (p *TracingPlugin) PostCall(ctx context.Context, serviceName, serviceMethod string, args interface{}, reply interface{}, err error) error {
span := trace.SpanFromContext(ctx)
if span != nil && span.IsRecording() {
defer span.End()
// 记录响应结果
if reply != nil {
if data, err := json.Marshal(reply); err == nil {
replyStr := string(data)
// 限制长度,避免过大
if len(replyStr) > 2000 {
replyStr = replyStr[:2000] + "... (truncated)"
}
span.SetAttributes(attribute.String("rpc.response", replyStr))
}
}
// 处理错误
if err != nil {
jaeger.RecordError(ctx, err, "rpcx调用失败")
span.SetStatus(codes.Error, err.Error())
g.Log().Errorf(ctx, "[rpcx] 调用失败: %s.%s, 错误: %v", serviceName, serviceMethod, err)
} else {
g.Log().Debugf(ctx, "[rpcx] 调用成功: %s.%s", serviceName, serviceMethod)
}
}
return nil
}