package rpc import ( "context" "sync" "github.com/gogf/gf/v2/frame/g" consulClient "github.com/rpcxio/rpcx-consul/client" "github.com/smallnest/rpcx/client" ) var ( xclientMap = make(map[string]client.XClient) xclientMu sync.RWMutex ) // CallWithConsul 基于Consul服务发现调用RPC func CallWithConsul(ctx context.Context, serviceName string, args, reply interface{}) error { xclient, err := getOrCreatXClient(serviceName) if err != nil { return err } err = xclient.Call(ctx, "Mul", args, reply) if err != nil { // 调用失败,清理失效客户端 removeXClient(serviceName) } return err } func removeXClient(serviceName string) { xclientMu.Lock() defer xclientMu.Unlock() if c, ok := xclientMap[serviceName]; ok { c.Close() delete(xclientMap, serviceName) } } func getOrCreatXClient(serviceName string) (client.XClient, error) { // 第一次:读锁,快速判断是否存在 xclientMu.RLock() if c, ok := xclientMap[serviceName]; ok { xclientMu.RUnlock() return c, nil } xclientMu.RUnlock() // 没找到,加写锁准备创建 xclientMu.Lock() defer xclientMu.Unlock() // 第二次:双重检查,防止刚被别人创建完 if c, ok := xclientMap[serviceName]; ok { return c, nil } consulAddr := g.Cfg().MustGet(nil, "consul.address").String() d, err := consulClient.NewConsulDiscovery("rpcx", serviceName, []string{consulAddr}, nil) if err != nil { return nil, err } c := client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption) xclientMap[serviceName] = c return c, nil }