Files
common/k3sconfig/k3sconfig.go

223 lines
6.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package k3sconfig
import (
"context"
"fmt"
"os"
"time"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gcfg"
"github.com/r3labs/diff/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
type K8sConfig struct {
ApiServer string `json:"apiServer"`
Token string `json:"token"`
Kubeconfig string `json:"kubeconfig"`
Namespace string `json:"namespace"`
}
func init() {
ctx := context.Background()
consulAddr := g.Cfg().MustGet(ctx, "consul.address").String()
if consulAddr != "" {
g.Log().Debug(ctx, "📄 [K3sConfig] 检测到 consul.address 配置,使用 Consul跳过 K3s 加载")
return
}
loadFromK3sCluster(ctx)
}
func loadFromK3sCluster(ctx context.Context) {
serviceName := g.Cfg().MustGet(ctx, "server.name", "").String()
if serviceName == "" {
panic("❌ [K3sConfig] 配置文件中未设置 server.name")
}
k8sConfig := getK8sConfig(ctx)
namespace := k8sConfig.Namespace
if namespace == "" {
namespace = "default"
}
configMapName := fmt.Sprintf("%s-config", serviceName)
g.Log().Info(ctx, "🔗 [K3sConfig] 从 K3s 集群加载配置:", configMapName)
var config *rest.Config
var err error
if k8sConfig.ApiServer != "" && k8sConfig.Token != "" {
g.Log().Infof(ctx, "📍 [K3sConfig] 使用远程 K8s API Server: %s", k8sConfig.ApiServer)
config = &rest.Config{
Host: k8sConfig.ApiServer,
BearerToken: k8sConfig.Token,
TLSClientConfig: rest.TLSClientConfig{
Insecure: true,
},
}
} else if k8sConfig.Kubeconfig != "" {
g.Log().Infof(ctx, "📍 [K3sConfig] 使用 Kubeconfig 文件: %s", k8sConfig.Kubeconfig)
config, err = clientcmd.BuildConfigFromFlags("", k8sConfig.Kubeconfig)
if err != nil {
panic(fmt.Sprintf("❌ [K3sConfig] 构建 K8s 配置失败: %v", err))
}
} else {
kubeconfigEnv := os.Getenv("KUBECONFIG")
if kubeconfigEnv != "" {
g.Log().Infof(ctx, "📍 [K3sConfig] 使用环境变量 KUBECONFIG: %s", kubeconfigEnv)
config, err = clientcmd.BuildConfigFromFlags("", kubeconfigEnv)
if err != nil {
panic(fmt.Sprintf("❌ [K3sConfig] 构建 K8s 配置失败: %v", err))
}
} else {
home, _ := os.UserHomeDir()
defaultKubeconfig := fmt.Sprintf("%s/.kube/config", home)
g.Log().Infof(ctx, "📍 [K3sConfig] 使用默认 Kubeconfig: %s", defaultKubeconfig)
config, err = clientcmd.BuildConfigFromFlags("", defaultKubeconfig)
if err != nil {
panic(fmt.Sprintf("❌ [K3sConfig] 构建 K8s 配置失败: %v", err))
}
}
}
g.Log().Infof(ctx, "🔌 [K3sConfig] K8s API Server: %s", config.Host)
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("❌ [K3sConfig] 创建 K8s 客户端失败: %v", err))
}
cm, err := clientset.CoreV1().ConfigMaps(namespace).Get(ctx, configMapName, metav1.GetOptions{})
if err != nil {
panic(fmt.Sprintf("❌ [K3sConfig] 获取 ConfigMap 失败: %v", err))
}
g.Log().Infof(ctx, "📦 [K3sConfig] ConfigMap 信息 - Name: %s, Namespace: %s, ResourceVersion: %s",
cm.Name, cm.Namespace, cm.ResourceVersion)
configData, ok := cm.Data["config.yml"]
if !ok {
g.Log().Debugf(ctx, "📋 [K3sConfig] ConfigMap 可用键: %v", getMapKeys(cm.Data))
panic("❌ [K3sConfig] ConfigMap 中未找到 config.yml 键")
}
g.Log().Infof(ctx, "📄 [K3sConfig] 获取到的配置内容长度: %d 字节", len(configData))
adapter, err := gcfg.NewAdapterContent()
if err != nil {
panic(fmt.Sprintf("❌ [K3sConfig] 创建配置适配器失败: %v", err))
}
adapter.SetContent(configData)
g.Cfg().SetAdapter(adapter)
g.Log().Infof(ctx, "✅ [K3sConfig] 成功从 K3s 加载配置: %s/%s", namespace, configMapName)
go watchK3sConfig(ctx, clientset, namespace, configMapName, cm.ResourceVersion)
}
func getK8sConfig(ctx context.Context) K8sConfig {
return K8sConfig{
ApiServer: g.Cfg().MustGet(ctx, "k8s.apiServer", "").String(),
Token: g.Cfg().MustGet(ctx, "k8s.token", "").String(),
Kubeconfig: g.Cfg().MustGet(ctx, "k8s.kubeconfig", "").String(),
Namespace: g.Cfg().MustGet(ctx, "k8s.namespace", "default").String(),
}
}
func watchK3sConfig(ctx context.Context, clientset *kubernetes.Clientset, namespace, configMapName string, lastResourceVersion string) {
g.Log().Info(ctx, "👀 [K3sConfig] 开始监听 ConfigMap 变更...")
for {
time.Sleep(10 * time.Second)
cm, err := clientset.CoreV1().ConfigMaps(namespace).Get(ctx, configMapName, metav1.GetOptions{
ResourceVersion: "0",
})
if err != nil {
g.Log().Errorf(ctx, "❌ [K3sConfig] 监听 ConfigMap 失败: %v", err)
continue
}
if cm.ResourceVersion == lastResourceVersion {
continue
}
g.Log().Infof(ctx, "🔔 [K3sConfig] 检测到 ConfigMap 变更, Old Version: %s, New Version: %s",
lastResourceVersion, cm.ResourceVersion)
configData, ok := cm.Data["config.yml"]
if !ok {
g.Log().Error(ctx, "❌ [K3sConfig] ConfigMap 中未找到 config.yml 键")
continue
}
updateK3sConfig(configData)
lastResourceVersion = cm.ResourceVersion
}
}
func updateK3sConfig(content string) {
ctx := context.Background()
oldConfig, _ := g.Cfg().Data(ctx)
adapter, err := gcfg.NewAdapterContent()
if err != nil {
g.Log().Errorf(ctx, "❌ [K3sConfig] 创建配置适配器失败: %v", err)
return
}
adapter.SetContent(content)
g.Cfg().SetAdapter(adapter)
newConfig, _ := g.Cfg().Data(ctx)
changelog, err := diff.Diff(oldConfig, newConfig)
if err != nil {
g.Log().Errorf(ctx, "❌ [K3sConfig] 配置对比失败: %v", err)
return
}
if len(changelog) == 0 {
g.Log().Info(ctx, "✅ [K3sConfig] 配置已热更新成功(内容无实质变化)")
return
}
g.Log().Infof(ctx, "=== [K3sConfig] 检测到配置变更 (%d 项) ===", len(changelog))
for _, change := range changelog {
switch change.Type {
case "create":
g.Log().Infof(ctx, "[+] 新增: %s = %v", change.Path, change.To)
case "update":
g.Log().Infof(ctx, "[~] 修改: %s: %v -> %v", change.Path, change.From, change.To)
case "delete":
g.Log().Infof(ctx, "[-] 删除: %s (原值: %v)", change.Path, change.From)
}
}
g.Log().Info(ctx, "=================================")
}
func getMapKeys(m map[string]string) []string {
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
return keys
}
func loadConfigFromFile(ctx context.Context, configPath string) {
adapter, err := gcfg.NewAdapterFile(configPath)
if err != nil {
g.Log().Errorf(ctx, "❌ [K3sConfig] 创建文件适配器失败: %v", err)
return
}
g.Cfg().SetAdapter(adapter)
g.Log().Info(ctx, "✅ [K3sConfig] 已成功加载配置:", configPath)
}