Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 41b2a37fc0 | |||
| 0ad6bc9438 |
@@ -5,6 +5,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"net/http"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -20,6 +21,8 @@ var (
|
|||||||
consulAddr string
|
consulAddr string
|
||||||
reconnectMutex sync.RWMutex
|
reconnectMutex sync.RWMutex
|
||||||
reconnectDone chan struct{}
|
reconnectDone chan struct{}
|
||||||
|
connected bool
|
||||||
|
httpClient *http.Client
|
||||||
)
|
)
|
||||||
|
|
||||||
// connectConsul 连接 Consul
|
// connectConsul 连接 Consul
|
||||||
@@ -27,6 +30,11 @@ func connectConsul(ctx context.Context) error {
|
|||||||
reconnectMutex.Lock()
|
reconnectMutex.Lock()
|
||||||
defer reconnectMutex.Unlock()
|
defer reconnectMutex.Unlock()
|
||||||
|
|
||||||
|
// 如果已经连接,不再重复连接
|
||||||
|
if connected && registry != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
registry, err = consul.New(consul.WithAddress(consulAddr))
|
registry, err = consul.New(consul.WithAddress(consulAddr))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -36,11 +44,9 @@ func connectConsul(ctx context.Context) error {
|
|||||||
|
|
||||||
gsvc.SetRegistry(registry)
|
gsvc.SetRegistry(registry)
|
||||||
gsel.SetBuilder(gsel.NewBuilderRoundRobin())
|
gsel.SetBuilder(gsel.NewBuilderRoundRobin())
|
||||||
|
connected = true
|
||||||
g.Log().Infof(ctx, "✅ Consul 初始化成功: %s", consulAddr)
|
g.Log().Infof(ctx, "✅ Consul 初始化成功: %s", consulAddr)
|
||||||
|
|
||||||
// 启动健康检查和自动重连
|
|
||||||
go startHealthCheckAndReconnect()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -51,11 +57,16 @@ func startHealthCheckAndReconnect() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
reconnectDone = make(chan struct{})
|
reconnectDone = make(chan struct{})
|
||||||
ticker := time.NewTicker(5 * time.Second)
|
ticker := time.NewTicker(30 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// 初始化HTTP客户端用于健康检查
|
||||||
|
httpClient = &http.Client{
|
||||||
|
Timeout: 5 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
@@ -65,8 +76,15 @@ func startHealthCheckAndReconnect() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
g.Log().Warning(ctx, "⚠️ Consul 连接异常,尝试重新连接...")
|
g.Log().Warning(ctx, "⚠️ Consul 连接异常,尝试重新连接...")
|
||||||
|
|
||||||
|
// 重置连接状态并重连
|
||||||
|
reconnectMutex.Lock()
|
||||||
|
connected = false
|
||||||
|
registry = nil
|
||||||
|
reconnectMutex.Unlock()
|
||||||
|
|
||||||
if err := connectConsul(ctx); err != nil {
|
if err := connectConsul(ctx); err != nil {
|
||||||
g.Log().Errorf(ctx, "❌ Consul 重连失败: %v,5秒后重试...", err)
|
g.Log().Errorf(ctx, "❌ Consul 重连失败: %v,30秒后重试...", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-reconnectDone:
|
case <-reconnectDone:
|
||||||
@@ -81,18 +99,26 @@ func checkConsulHealth(ctx context.Context) bool {
|
|||||||
reconnectMutex.RLock()
|
reconnectMutex.RLock()
|
||||||
defer reconnectMutex.RUnlock()
|
defer reconnectMutex.RUnlock()
|
||||||
|
|
||||||
if registry == nil {
|
if registry == nil || !connected {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// 尝试获取服务列表来检测连接是否正常
|
// 使用consul原生API进行健康检查
|
||||||
services, err := registry.Search(ctx, gsvc.SearchInput{})
|
// 调用 /v1/agent/self 接口检测连接状态
|
||||||
|
url := fmt.Sprintf("http://%s/v1/agent/self", consulAddr)
|
||||||
|
resp, err := httpClient.Get(url)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
g.Log().Debugf(ctx, "Consul 健康检查失败: %v", err)
|
g.Log().Debugf(ctx, "Consul 健康检查失败: %v", err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
g.Log().Debugf(ctx, "✅ Consul 健康检查通过,发现 %d 个服务", len(services))
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
g.Log().Debugf(ctx, "Consul 健康检查失败,状态码: %d", resp.StatusCode)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
g.Log().Debugf(ctx, "✅ Consul 健康检查通过")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -102,8 +128,12 @@ func init() {
|
|||||||
g.Log().Warning(context.Background(), "⚠️ Consul 配置未找到,跳过初始化")
|
g.Log().Warning(context.Background(), "⚠️ Consul 配置未找到,跳过初始化")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := connectConsul(context.Background()); err != nil {
|
if err := connectConsul(context.Background()); err != nil {
|
||||||
g.Log().Errorf(context.Background(), "❌ Consul 初始化失败: %v", err)
|
g.Log().Errorf(context.Background(), "❌ Consul 初始化失败: %v", err)
|
||||||
|
} else {
|
||||||
|
// 连接成功后启动健康检查和自动重连
|
||||||
|
go startHealthCheckAndReconnect()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func getLocalIP() (string, error) {
|
func getLocalIP() (string, error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user