279 lines
6.6 KiB
Go
279 lines
6.6 KiB
Go
package consul
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"net"
|
||
"net/http"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/gogf/gf/contrib/registry/consul/v2"
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/gogf/gf/v2/net/gsel"
|
||
"github.com/gogf/gf/v2/net/gsvc"
|
||
"github.com/gogf/gf/v2/os/gcfg"
|
||
"github.com/gogf/gf/v2/util/grand"
|
||
"github.com/hashicorp/consul/api"
|
||
"github.com/r3labs/diff/v2"
|
||
)
|
||
|
||
var (
|
||
registry gsvc.Registry
|
||
consulAddr string
|
||
reconnectMutex sync.RWMutex
|
||
reconnectDone chan struct{}
|
||
connected bool
|
||
httpClient *http.Client
|
||
)
|
||
|
||
// connectConsul 连接 Consul
|
||
func connectConsul(ctx context.Context) error {
|
||
reconnectMutex.Lock()
|
||
defer reconnectMutex.Unlock()
|
||
|
||
// 如果已经连接,不再重复连接
|
||
if connected && registry != nil {
|
||
return nil
|
||
}
|
||
|
||
var err error
|
||
registry, err = consul.New(consul.WithAddress(consulAddr))
|
||
if err != nil {
|
||
g.Log().Errorf(ctx, "❌ Consul 连接失败: %v", err)
|
||
return err
|
||
}
|
||
|
||
gsvc.SetRegistry(registry)
|
||
gsel.SetBuilder(gsel.NewBuilderRoundRobin())
|
||
connected = true
|
||
g.Log().Infof(ctx, "✅ Consul 初始化成功: %s", consulAddr)
|
||
|
||
return nil
|
||
}
|
||
|
||
func init() {
|
||
ctx := context.Background()
|
||
|
||
consulAddr = g.Cfg().MustGet(ctx, "consul.address").String()
|
||
if consulAddr == "" {
|
||
g.Log().Debug(ctx, "📄 [Consul] 配置文件中未设置 consul.address,跳过 Consul 初始化")
|
||
return
|
||
}
|
||
|
||
if err := connectConsul(ctx); err != nil {
|
||
g.Log().Errorf(ctx, "❌ Consul 初始化失败: %v", err)
|
||
}
|
||
|
||
loadConfigFromConsul()
|
||
}
|
||
|
||
func loadConfigFromConsul() {
|
||
ctx := context.Background()
|
||
|
||
serviceName := g.Cfg().MustGet(ctx, "server.name", "admin-go").String()
|
||
fmt.Printf("服务名称: %s\n", serviceName)
|
||
|
||
consulKey := fmt.Sprintf("config/%s/%s", serviceName, serviceName)
|
||
fmt.Printf("从 Consul 读取配置键: %s\n", consulKey)
|
||
|
||
consulData, lastIndex, err := loadFromConsul(consulKey)
|
||
if err == nil && len(consulData) > 0 {
|
||
adapter, err := gcfg.NewAdapterContent()
|
||
if err != nil {
|
||
fmt.Printf("创建配置适配器失败: %v\n", err)
|
||
return
|
||
}
|
||
adapter.SetContent(string(consulData))
|
||
g.Cfg().SetAdapter(adapter)
|
||
|
||
fmt.Printf("已从 Consul 成功加载初始配置\n")
|
||
|
||
go watchConsulConfig(consulKey, lastIndex)
|
||
} else {
|
||
fmt.Printf("从 Consul 获取配置失败,使用本地配置文件\n")
|
||
}
|
||
}
|
||
|
||
func loadFromConsul(consulKey string) ([]byte, uint64, error) {
|
||
ctx := context.Background()
|
||
|
||
consulAddress := g.Cfg().MustGet(ctx, "consul.address", "127.0.0.1:8500").String()
|
||
fmt.Printf("Consul 地址: %s\n", consulAddress)
|
||
|
||
config := api.DefaultConfig()
|
||
config.Address = consulAddress
|
||
|
||
client, err := api.NewClient(config)
|
||
if err != nil {
|
||
fmt.Printf("创建 Consul 客户端失败: %v\n", err)
|
||
return nil, 0, err
|
||
}
|
||
|
||
kv := client.KV()
|
||
|
||
opts := &api.QueryOptions{
|
||
WaitIndex: 0,
|
||
WaitTime: 60 * time.Second,
|
||
}
|
||
|
||
pair, meta, err := kv.Get(consulKey, opts)
|
||
if err != nil {
|
||
fmt.Printf("从 Consul 读取配置失败: %v\n", err)
|
||
return nil, 0, err
|
||
}
|
||
|
||
if pair == nil {
|
||
fmt.Printf("Consul 中未找到配置键: %s\n", consulKey)
|
||
return nil, 0, nil
|
||
}
|
||
|
||
fmt.Printf("已从 Consul 加载配置, Index: %d\n", meta.LastIndex)
|
||
return pair.Value, meta.LastIndex, nil
|
||
}
|
||
|
||
func watchConsulConfig(consulKey string, lastIndex uint64) {
|
||
ctx := context.Background()
|
||
|
||
consulAddress := g.Cfg().MustGet(ctx, "consul.address", "127.0.0.1:8500").String()
|
||
config := api.DefaultConfig()
|
||
config.Address = consulAddress
|
||
|
||
client, err := api.NewClient(config)
|
||
if err != nil {
|
||
fmt.Printf("创建 Consul 监听客户端失败: %v\n", err)
|
||
return
|
||
}
|
||
|
||
for {
|
||
opts := &api.QueryOptions{
|
||
WaitIndex: lastIndex,
|
||
WaitTime: 60 * time.Second,
|
||
}
|
||
|
||
pair, meta, err := client.KV().Get(consulKey, opts)
|
||
if err != nil {
|
||
fmt.Printf("Consul 监听出错: %v, 5秒后重试...\n", err)
|
||
time.Sleep(5 * time.Second)
|
||
continue
|
||
}
|
||
|
||
if meta.LastIndex == lastIndex {
|
||
continue
|
||
}
|
||
|
||
if pair == nil {
|
||
fmt.Printf("Consul 配置被删除: %s\n", consulKey)
|
||
lastIndex = meta.LastIndex
|
||
continue
|
||
}
|
||
|
||
fmt.Printf("检测到 Consul 配置变更: %s, New Index: %d\n", consulKey, meta.LastIndex)
|
||
|
||
updateLocalConfig(pair.Value)
|
||
|
||
lastIndex = meta.LastIndex
|
||
}
|
||
}
|
||
|
||
func updateLocalConfig(content []byte) {
|
||
ctx := context.Background()
|
||
|
||
oldConfig, _ := g.Cfg().Data(ctx)
|
||
|
||
adapter, err := gcfg.NewAdapterContent()
|
||
if err != nil {
|
||
fmt.Printf("创建新配置适配器失败: %v\n", err)
|
||
return
|
||
}
|
||
adapter.SetContent(string(content))
|
||
g.Cfg().SetAdapter(adapter)
|
||
|
||
newConfig, _ := g.Cfg().Data(ctx)
|
||
|
||
changelog, err := diff.Diff(oldConfig, newConfig)
|
||
if err != nil {
|
||
fmt.Printf("配置对比失败: %v\n", err)
|
||
return
|
||
}
|
||
|
||
if len(changelog) == 0 {
|
||
fmt.Printf("配置已热更新成功(内容无实质变化)\n")
|
||
return
|
||
}
|
||
|
||
fmt.Printf("=== 检测到配置变更 (%d 项) ===\n", len(changelog))
|
||
for _, change := range changelog {
|
||
switch change.Type {
|
||
case "create":
|
||
fmt.Printf("[+] 新增: %s = %v\n", change.Path, change.To)
|
||
case "update":
|
||
fmt.Printf("[~] 修改: %s: %v -> %v\n", change.Path, change.From, change.To)
|
||
case "delete":
|
||
fmt.Printf("[-] 删除: %s (原值: %v)\n", change.Path, change.From)
|
||
}
|
||
}
|
||
fmt.Printf("=================================\n")
|
||
}
|
||
|
||
func getLocalIP() (string, error) {
|
||
// 获取本机所有网络接口
|
||
addrs, err := net.InterfaceAddrs()
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
for _, addr := range addrs {
|
||
// 检查是否是IP地址
|
||
if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
|
||
if ipNet.IP.To4() != nil {
|
||
// 返回第一个非回环的IPv4地址
|
||
return ipNet.IP.String(), nil
|
||
}
|
||
}
|
||
}
|
||
|
||
return "", fmt.Errorf("无法找到本地IP地址")
|
||
}
|
||
func getInstanceAddrByIp(ctx context.Context, ip string, services []gsvc.Service) (addr string) {
|
||
for _, s := range services {
|
||
if s.GetEndpoints()[0].Host() == ip {
|
||
addr = s.GetEndpoints()[0].String()
|
||
return
|
||
}
|
||
}
|
||
return
|
||
}
|
||
func GetInstanceAddr(ctx context.Context, name string) (addr string, err error) {
|
||
watch, err := gsvc.GetRegistry().Watch(ctx, name)
|
||
if err != nil {
|
||
err = errors.New("获取服务监听器失败")
|
||
return
|
||
}
|
||
|
||
service, err := watch.Proceed()
|
||
if err != nil || service == nil {
|
||
err = errors.New("获取服务实例失败")
|
||
return
|
||
}
|
||
//优先使用客户端IP获取实例(前后端在同一台机器调试)
|
||
addr = getInstanceAddrByIp(ctx, g.RequestFromCtx(ctx).GetClientIp(), service)
|
||
if !g.IsEmpty(addr) {
|
||
return
|
||
}
|
||
//优先使用gateway同IP的服务实例(前后端不同机器调试)
|
||
addr, err = getLocalIP()
|
||
if err != nil {
|
||
return
|
||
}
|
||
addr = getInstanceAddrByIp(ctx, addr, service)
|
||
if !g.IsEmpty(addr) {
|
||
return
|
||
}
|
||
//随机获取一个服务实例
|
||
maxService := grand.N(0, len(service)-1)
|
||
addr = service[maxService].GetEndpoints()[0].String()
|
||
return
|
||
}
|