2 Commits

Author SHA1 Message Date
41b2a37fc0 Dockerfile 2026-04-01 14:19:50 +08:00
0ad6bc9438 Dockerfile 2026-04-01 14:11:07 +08:00

View File

@@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"net/http"
"sync" "sync"
"time" "time"
@@ -20,6 +21,8 @@ var (
consulAddr string consulAddr string
reconnectMutex sync.RWMutex reconnectMutex sync.RWMutex
reconnectDone chan struct{} reconnectDone chan struct{}
connected bool
httpClient *http.Client
) )
// connectConsul 连接 Consul // connectConsul 连接 Consul
@@ -27,6 +30,11 @@ func connectConsul(ctx context.Context) error {
reconnectMutex.Lock() reconnectMutex.Lock()
defer reconnectMutex.Unlock() defer reconnectMutex.Unlock()
// 如果已经连接,不再重复连接
if connected && registry != nil {
return nil
}
var err error var err error
registry, err = consul.New(consul.WithAddress(consulAddr)) registry, err = consul.New(consul.WithAddress(consulAddr))
if err != nil { if err != nil {
@@ -36,11 +44,9 @@ func connectConsul(ctx context.Context) error {
gsvc.SetRegistry(registry) gsvc.SetRegistry(registry)
gsel.SetBuilder(gsel.NewBuilderRoundRobin()) gsel.SetBuilder(gsel.NewBuilderRoundRobin())
connected = true
g.Log().Infof(ctx, "✅ Consul 初始化成功: %s", consulAddr) g.Log().Infof(ctx, "✅ Consul 初始化成功: %s", consulAddr)
// 启动健康检查和自动重连
go startHealthCheckAndReconnect()
return nil return nil
} }
@@ -51,11 +57,16 @@ func startHealthCheckAndReconnect() {
} }
reconnectDone = make(chan struct{}) reconnectDone = make(chan struct{})
ticker := time.NewTicker(5 * time.Second) ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop() defer ticker.Stop()
ctx := context.Background() ctx := context.Background()
// 初始化HTTP客户端用于健康检查
httpClient = &http.Client{
Timeout: 5 * time.Second,
}
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
@@ -65,8 +76,15 @@ func startHealthCheckAndReconnect() {
} }
g.Log().Warning(ctx, "⚠️ Consul 连接异常,尝试重新连接...") g.Log().Warning(ctx, "⚠️ Consul 连接异常,尝试重新连接...")
// 重置连接状态并重连
reconnectMutex.Lock()
connected = false
registry = nil
reconnectMutex.Unlock()
if err := connectConsul(ctx); err != nil { if err := connectConsul(ctx); err != nil {
g.Log().Errorf(ctx, "❌ Consul 重连失败: %v,5秒后重试...", err) g.Log().Errorf(ctx, "❌ Consul 重连失败: %v,30秒后重试...", err)
} }
case <-reconnectDone: case <-reconnectDone:
@@ -81,18 +99,26 @@ func checkConsulHealth(ctx context.Context) bool {
reconnectMutex.RLock() reconnectMutex.RLock()
defer reconnectMutex.RUnlock() defer reconnectMutex.RUnlock()
if registry == nil { if registry == nil || !connected {
return false return false
} }
// 尝试获取服务列表来检测连接是否正常 // 使用consul原生API进行健康检查
services, err := registry.Search(ctx, gsvc.SearchInput{}) // 调用 /v1/agent/self 接口检测连接状态
url := fmt.Sprintf("http://%s/v1/agent/self", consulAddr)
resp, err := httpClient.Get(url)
if err != nil { if err != nil {
g.Log().Debugf(ctx, "Consul 健康检查失败: %v", err) g.Log().Debugf(ctx, "Consul 健康检查失败: %v", err)
return false return false
} }
defer resp.Body.Close()
g.Log().Debugf(ctx, "✅ Consul 健康检查通过,发现 %d 个服务", len(services)) if resp.StatusCode != http.StatusOK {
g.Log().Debugf(ctx, "Consul 健康检查失败,状态码: %d", resp.StatusCode)
return false
}
g.Log().Debugf(ctx, "✅ Consul 健康检查通过")
return true return true
} }
@@ -102,8 +128,12 @@ func init() {
g.Log().Warning(context.Background(), "⚠️ Consul 配置未找到,跳过初始化") g.Log().Warning(context.Background(), "⚠️ Consul 配置未找到,跳过初始化")
return return
} }
if err := connectConsul(context.Background()); err != nil { if err := connectConsul(context.Background()); err != nil {
g.Log().Errorf(context.Background(), "❌ Consul 初始化失败: %v", err) g.Log().Errorf(context.Background(), "❌ Consul 初始化失败: %v", err)
} else {
// 连接成功后启动健康检查和自动重连
go startHealthCheckAndReconnect()
} }
} }
func getLocalIP() (string, error) { func getLocalIP() (string, error) {