diff --git a/consul/consul.go b/consul/consul.go index b5e79cf..91f0900 100644 --- a/consul/consul.go +++ b/consul/consul.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "sync" + "time" "github.com/gogf/gf/contrib/registry/consul/v2" "github.com/gogf/gf/v2/frame/g" @@ -14,30 +15,96 @@ import ( "github.com/gogf/gf/v2/util/grand" ) -var initOnce sync.Once +var ( + registry gsvc.Registry + consulAddr string + reconnectMutex sync.RWMutex + reconnectDone chan struct{} +) -// Init 初始化 Consul 注册中心(延迟初始化,首次调用时执行) -func Init() { - initOnce.Do(func() { - consulAddr := g.Cfg().MustGet(context.Background(), "consul.address").String() - if consulAddr == "" { - g.Log().Warning(context.Background(), "⚠️ Consul 配置未找到,跳过初始化") +// connectConsul 连接 Consul +func connectConsul(ctx context.Context) error { + reconnectMutex.Lock() + defer reconnectMutex.Unlock() + + var err error + registry, err = consul.New(consul.WithAddress(consulAddr)) + if err != nil { + g.Log().Errorf(ctx, "❌ Consul 连接失败: %v", err) + return err + } + + gsvc.SetRegistry(registry) + gsel.SetBuilder(gsel.NewBuilderRoundRobin()) + g.Log().Infof(ctx, "✅ Consul 初始化成功: %s", consulAddr) + + // 启动健康检查和自动重连 + go startHealthCheckAndReconnect() + + return nil +} + +// startHealthCheckAndReconnect 启动健康检查和自动重连 +func startHealthCheckAndReconnect() { + if reconnectDone != nil { + close(reconnectDone) + } + + reconnectDone = make(chan struct{}) + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + ctx := context.Background() + + for { + select { + case <-ticker.C: + // 检查服务发现是否正常工作 + if checkConsulHealth(ctx) { + continue + } + + g.Log().Warning(ctx, "⚠️ Consul 连接异常,尝试重新连接...") + if err := connectConsul(ctx); err != nil { + g.Log().Errorf(ctx, "❌ Consul 重连失败: %v,5秒后重试...", err) + } + + case <-reconnectDone: + g.Log().Info(ctx, "🛑 Consul 健康检查已停止") return } - registry, err := consul.New(consul.WithAddress(consulAddr)) - if err != nil { - g.Log().Errorf(context.Background(), "Consul 初始化失败: %v", err) - return - } - gsvc.SetRegistry(registry) - gsel.SetBuilder(gsel.NewBuilderRoundRobin()) - g.Log().Infof(context.Background(), "✅ Consul 初始化成功: %s", consulAddr) - }) + } +} + +// checkConsulHealth 检查 Consul 健康状态 +func checkConsulHealth(ctx context.Context) bool { + reconnectMutex.RLock() + defer reconnectMutex.RUnlock() + + if registry == nil { + return false + } + + // 尝试获取服务列表来检测连接是否正常 + services, err := registry.Search(ctx, gsvc.SearchInput{}) + if err != nil { + g.Log().Debugf(ctx, "Consul 健康检查失败: %v", err) + return false + } + + g.Log().Debugf(ctx, "✅ Consul 健康检查通过,发现 %d 个服务", len(services)) + return true } func init() { - // 默认自动初始化(保持向后兼容) - Init() + consulAddr = g.Cfg().MustGet(context.Background(), "consul.address").String() + if consulAddr == "" { + g.Log().Warning(context.Background(), "⚠️ Consul 配置未找到,跳过初始化") + return + } + if err := connectConsul(context.Background()); err != nil { + g.Log().Errorf(context.Background(), "❌ Consul 初始化失败: %v", err) + } } func getLocalIP() (string, error) { // 获取本机所有网络接口 @@ -69,9 +136,14 @@ func getInstanceAddrByIp(ctx context.Context, ip string, services []gsvc.Service } func GetInstanceAddr(ctx context.Context, name string) (addr string, err error) { watch, err := gsvc.GetRegistry().Watch(ctx, name) + if err != nil { + err = errors.New("获取服务监听器失败") + return + } + service, err := watch.Proceed() if err != nil || service == nil { - err = errors.New("获取customerService服务实例失败!") + err = errors.New("获取服务实例失败") return } //优先使用客户端IP获取实例(前后端在同一台机器调试)