Files
common/consul/consul.go
2026-04-20 19:46:33 +08:00

279 lines
6.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package consul
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"sync"
"time"
"github.com/gogf/gf/contrib/registry/consul/v2"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gsel"
"github.com/gogf/gf/v2/net/gsvc"
"github.com/gogf/gf/v2/os/gcfg"
"github.com/gogf/gf/v2/util/grand"
"github.com/hashicorp/consul/api"
"github.com/r3labs/diff/v2"
)
var (
registry gsvc.Registry
consulAddr string
reconnectMutex sync.RWMutex
reconnectDone chan struct{}
connected bool
httpClient *http.Client
)
// connectConsul 连接 Consul
func connectConsul(ctx context.Context) error {
reconnectMutex.Lock()
defer reconnectMutex.Unlock()
// 如果已经连接,不再重复连接
if connected && registry != nil {
return nil
}
var err error
registry, err = consul.New(consul.WithAddress(consulAddr))
if err != nil {
g.Log().Errorf(ctx, "❌ Consul 连接失败: %v", err)
return err
}
gsvc.SetRegistry(registry)
gsel.SetBuilder(gsel.NewBuilderRoundRobin())
connected = true
g.Log().Infof(ctx, "✅ Consul 初始化成功: %s", consulAddr)
return nil
}
func init() {
ctx := context.Background()
consulAddr = g.Cfg().MustGet(ctx, "consul.address").String()
if consulAddr == "" {
g.Log().Debug(ctx, "📄 [Consul] 配置文件中未设置 consul.address跳过 Consul 初始化")
return
}
if err := connectConsul(ctx); err != nil {
g.Log().Errorf(ctx, "❌ Consul 初始化失败: %v", err)
}
loadConfigFromConsul()
}
func loadConfigFromConsul() {
ctx := context.Background()
serviceName := g.Cfg().MustGet(ctx, "server.name", "admin-go").String()
fmt.Printf("服务名称: %s\n", serviceName)
consulKey := fmt.Sprintf("config/%s/%s", serviceName, serviceName)
fmt.Printf("从 Consul 读取配置键: %s\n", consulKey)
consulData, lastIndex, err := loadFromConsul(consulKey)
if err == nil && len(consulData) > 0 {
adapter, err := gcfg.NewAdapterContent()
if err != nil {
fmt.Printf("创建配置适配器失败: %v\n", err)
return
}
adapter.SetContent(string(consulData))
g.Cfg().SetAdapter(adapter)
fmt.Printf("已从 Consul 成功加载初始配置\n")
go watchConsulConfig(consulKey, lastIndex)
} else {
fmt.Printf("从 Consul 获取配置失败,使用本地配置文件\n")
}
}
func loadFromConsul(consulKey string) ([]byte, uint64, error) {
ctx := context.Background()
consulAddress := g.Cfg().MustGet(ctx, "consul.address", "127.0.0.1:8500").String()
fmt.Printf("Consul 地址: %s\n", consulAddress)
config := api.DefaultConfig()
config.Address = consulAddress
client, err := api.NewClient(config)
if err != nil {
fmt.Printf("创建 Consul 客户端失败: %v\n", err)
return nil, 0, err
}
kv := client.KV()
opts := &api.QueryOptions{
WaitIndex: 0,
WaitTime: 60 * time.Second,
}
pair, meta, err := kv.Get(consulKey, opts)
if err != nil {
fmt.Printf("从 Consul 读取配置失败: %v\n", err)
return nil, 0, err
}
if pair == nil {
fmt.Printf("Consul 中未找到配置键: %s\n", consulKey)
return nil, 0, nil
}
fmt.Printf("已从 Consul 加载配置, Index: %d\n", meta.LastIndex)
return pair.Value, meta.LastIndex, nil
}
func watchConsulConfig(consulKey string, lastIndex uint64) {
ctx := context.Background()
consulAddress := g.Cfg().MustGet(ctx, "consul.address", "127.0.0.1:8500").String()
config := api.DefaultConfig()
config.Address = consulAddress
client, err := api.NewClient(config)
if err != nil {
fmt.Printf("创建 Consul 监听客户端失败: %v\n", err)
return
}
for {
opts := &api.QueryOptions{
WaitIndex: lastIndex,
WaitTime: 60 * time.Second,
}
pair, meta, err := client.KV().Get(consulKey, opts)
if err != nil {
fmt.Printf("Consul 监听出错: %v, 5秒后重试...\n", err)
time.Sleep(5 * time.Second)
continue
}
if meta.LastIndex == lastIndex {
continue
}
if pair == nil {
fmt.Printf("Consul 配置被删除: %s\n", consulKey)
lastIndex = meta.LastIndex
continue
}
fmt.Printf("检测到 Consul 配置变更: %s, New Index: %d\n", consulKey, meta.LastIndex)
updateLocalConfig(pair.Value)
lastIndex = meta.LastIndex
}
}
func updateLocalConfig(content []byte) {
ctx := context.Background()
oldConfig, _ := g.Cfg().Data(ctx)
adapter, err := gcfg.NewAdapterContent()
if err != nil {
fmt.Printf("创建新配置适配器失败: %v\n", err)
return
}
adapter.SetContent(string(content))
g.Cfg().SetAdapter(adapter)
newConfig, _ := g.Cfg().Data(ctx)
changelog, err := diff.Diff(oldConfig, newConfig)
if err != nil {
fmt.Printf("配置对比失败: %v\n", err)
return
}
if len(changelog) == 0 {
fmt.Printf("配置已热更新成功(内容无实质变化)\n")
return
}
fmt.Printf("=== 检测到配置变更 (%d 项) ===\n", len(changelog))
for _, change := range changelog {
switch change.Type {
case "create":
fmt.Printf("[+] 新增: %s = %v\n", change.Path, change.To)
case "update":
fmt.Printf("[~] 修改: %s: %v -> %v\n", change.Path, change.From, change.To)
case "delete":
fmt.Printf("[-] 删除: %s (原值: %v)\n", change.Path, change.From)
}
}
fmt.Printf("=================================\n")
}
func getLocalIP() (string, error) {
// 获取本机所有网络接口
addrs, err := net.InterfaceAddrs()
if err != nil {
return "", err
}
for _, addr := range addrs {
// 检查是否是IP地址
if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() {
if ipNet.IP.To4() != nil {
// 返回第一个非回环的IPv4地址
return ipNet.IP.String(), nil
}
}
}
return "", fmt.Errorf("无法找到本地IP地址")
}
func getInstanceAddrByIp(ctx context.Context, ip string, services []gsvc.Service) (addr string) {
for _, s := range services {
if s.GetEndpoints()[0].Host() == ip {
addr = s.GetEndpoints()[0].String()
return
}
}
return
}
func GetInstanceAddr(ctx context.Context, name string) (addr string, err error) {
watch, err := gsvc.GetRegistry().Watch(ctx, name)
if err != nil {
err = errors.New("获取服务监听器失败")
return
}
service, err := watch.Proceed()
if err != nil || service == nil {
err = errors.New("获取服务实例失败")
return
}
//优先使用客户端IP获取实例(前后端在同一台机器调试)
addr = getInstanceAddrByIp(ctx, g.RequestFromCtx(ctx).GetClientIp(), service)
if !g.IsEmpty(addr) {
return
}
//优先使用gateway同IP的服务实例(前后端不同机器调试)
addr, err = getLocalIP()
if err != nil {
return
}
addr = getInstanceAddrByIp(ctx, addr, service)
if !g.IsEmpty(addr) {
return
}
//随机获取一个服务实例
maxService := grand.N(0, len(service)-1)
addr = service[maxService].GetEndpoints()[0].String()
return
}