diff --git a/rpc/client.go b/rpc/client.go new file mode 100644 index 0000000..9f1c244 --- /dev/null +++ b/rpc/client.go @@ -0,0 +1,69 @@ +package rpc + +import ( + "context" + "sync" + + "github.com/gogf/gf/v2/frame/g" + consulClient "github.com/rpcxio/rpcx-consul/client" + "github.com/smallnest/rpcx/client" +) + +var ( + xclientMap = make(map[string]client.XClient) + xclientMu sync.RWMutex +) + +// CallWithConsul 基于Consul服务发现调用RPC +func CallWithConsul(ctx context.Context, serviceName string, args, reply interface{}) error { + xclient, err := getOrCreatXClient(serviceName) + if err != nil { + return err + } + + err = xclient.Call(ctx, "Mul", args, reply) + if err != nil { + // 调用失败,清理失效客户端 + removeXClient(serviceName) + } + return err +} + +func removeXClient(serviceName string) { + xclientMu.Lock() + defer xclientMu.Unlock() + if c, ok := xclientMap[serviceName]; ok { + c.Close() + delete(xclientMap, serviceName) + } +} + +func getOrCreatXClient(serviceName string) (client.XClient, error) { + // 第一次:读锁,快速判断是否存在 + xclientMu.RLock() + if c, ok := xclientMap[serviceName]; ok { + xclientMu.RUnlock() + return c, nil + } + xclientMu.RUnlock() + + // 没找到,加写锁准备创建 + xclientMu.Lock() + defer xclientMu.Unlock() + + // 第二次:双重检查,防止刚被别人创建完 + if c, ok := xclientMap[serviceName]; ok { + return c, nil + } + + consulAddr := g.Cfg().MustGet(nil, "consul.address").String() + + d, err := consulClient.NewConsulDiscovery("rpcx", serviceName, []string{consulAddr}, nil) + if err != nil { + return nil, err + } + + c := client.NewXClient(serviceName, client.Failtry, client.RandomSelect, d, client.DefaultOption) + xclientMap[serviceName] = c + return c, nil +} diff --git a/rpc/server.go b/rpc/server.go new file mode 100644 index 0000000..f8a2440 --- /dev/null +++ b/rpc/server.go @@ -0,0 +1,93 @@ +package rpc + +import ( + "context" + "errors" + "fmt" + "net" + "strconv" + "strings" + "time" + + "github.com/gogf/gf/v2/frame/g" + "github.com/rpcxio/rpcx-consul/serverplugin" + "github.com/smallnest/rpcx/server" +) + +var rpcServer *server.Server + +// Serve 启动RPC服务 +func Serve(ctx context.Context, service interface{}) error { + // 获取本机IP + ip, err := getLocalIP() + if err != nil { + return fmt.Errorf("获取IP失败: %w", err) + } + + // 解析端口 + addrConfig := g.Cfg().MustGet(ctx, "server.address").String() + portStr := strings.TrimPrefix(addrConfig, ":") + port, err := strconv.Atoi(portStr) + if err != nil { + return errors.New("端口解析失败") + } + serviceAddr := fmt.Sprintf("%s:%d", ip, port) + + // 创建服务端 + rpcServer = server.NewServer() + + // 添加 Consul 注册插件 + consulAddr := g.Cfg().MustGet(ctx, "consul.address").String() + if consulAddr != "" { + plugin := &serverplugin.ConsulRegisterPlugin{ + ServiceAddress: "tcp@" + serviceAddr, + ConsulServers: []string{consulAddr}, + BasePath: "rpcx", + UpdateInterval: time.Minute, + } + if err := plugin.Start(); err != nil { + return err + } + rpcServer.Plugins.Add(plugin) + g.Log().Infof(ctx, "Consul注册成功: %s", serviceAddr) + } + + // 注册服务 + err = rpcServer.Register(service, "") + if err != nil { + g.Log().Errorf(ctx, "注册服务失败: %v", err) + return nil + } + // 优雅关闭 + //gproc.AddShutdownFunc(func(ctx context.Context) { + // if rpcServer != nil { + // _ = rpcServer.Shutdown(ctx) + // } + //}) + + // 异步启动 + go func() { + g.Log().Infof(ctx, "RPC服务启动: %s", serviceAddr) + if err := rpcServer.Serve("tcp", serviceAddr); err != nil { + g.Log().Fatalf(ctx, "RPC服务启动失败: %v", err) + } + }() + + return nil +} + +func getLocalIP() (string, error) { + addrs, err := net.InterfaceAddrs() + if err != nil { + return "", err + } + + for _, addr := range addrs { + if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() { + if ipNet.IP.To4() != nil { + return ipNet.IP.String(), nil + } + } + } + return "", fmt.Errorf("未找到本机IP") +}