// Package config 提供全局配置管理和Consul监听 // // 本包实现了基于Consul的配置热更新机制,所有服务导入common包即可自动获得配置监听能力 package config import ( "context" "encoding/json" "sync" "time" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" "github.com/hashicorp/consul/api" ) // Direction 咨询方向配置 type Direction struct { Name string `json:"name"` // 方向名称(如:气血、减肥) ChatId string `json:"chat_id"` // RAGFlow对话ID } var ( directionsCache []Direction // 本地缓存(内存读取,超快) cacheMu sync.RWMutex // 读写锁(支持多goroutine并发读) startOnce sync.Once // 确保只启动一次监听 consulClient *api.Client // Consul客户端(复用连接) ) // init 包初始化函数(所有服务导入common包时自动执行) // // Fallback顺序:Consul → config.yml func init() { ctx := context.Background() // 检查Consul是否配置 consulAddr := g.Cfg().MustGet(ctx, "consul.address").String() if consulAddr == "" { glog.Warning(ctx, "Consul未配置,使用本地配置") loadDirectionsFromLocal(ctx) return } // 初始化Consul客户端 config := api.DefaultConfig() config.Address = consulAddr client, err := api.NewClient(config) if err != nil { glog.Errorf(ctx, "Consul客户端初始化失败: %v,fallback到本地配置", err) loadDirectionsFromLocal(ctx) return } consulClient = client // 启动后台监听(单例,确保只启动一次) startOnce.Do(func() { go startConsulWatcher(ctx) glog.Info(ctx, "Consul配置监听已启动") }) } // GetDirections 获取咨询方向配置(从内存缓存读取) // // 返回: // // []Direction: 方向列表 // // 特点: // - 高性能:读内存缓存,无网络IO // - 线程安全:使用读锁,支持并发读取 // - 自动更新:后台监听Consul,配置变化时自动更新缓存 // // 使用示例: // // dirs := config.GetDirections() // for _, dir := range dirs { // fmt.Printf("%s -> %s\n", dir.Name, dir.ChatId) // } func GetDirections() []Direction { cacheMu.RLock() defer cacheMu.RUnlock() // 返回副本,避免外部修改缓存 result := make([]Direction, len(directionsCache)) copy(result, directionsCache) return result } // GetDirectionChatId 根据方向名称获取对应的ChatId // // 参数: // // name: 方向名称(如:"气血"、"减肥") // // 返回: // // chatId: 对应的RAGFlow对话ID,未找到返回空字符串 // // 使用示例: // // chatId := config.GetDirectionChatId("气血") func GetDirectionChatId(name string) string { cacheMu.RLock() defer cacheMu.RUnlock() for _, dir := range directionsCache { if dir.Name == name { return dir.ChatId } } return "" } // startConsulWatcher 后台监听Consul配置变化(Blocking Query长连接) // // 工作原理: // 1. 使用Consul Blocking Query API(长连接,只在变化时返回) // 2. 收到变化通知后更新本地缓存 // 3. 自动重连(网络异常时自动恢复) // // 资源消耗: // - 一个长连接(保持5分钟) // - 配置未变化时几乎不占用CPU和网络 // - 对比轮询:节省99%资源 // // 注意: // - 此函数在后台goroutine中运行 // - 使用Blocking Query避免轮询 func startConsulWatcher(ctx context.Context) { const consulKey = "ragflow/directions" kv := consulClient.KV() var lastIndex uint64 // 初始化时先读取一次配置 if err := loadDirectionsFromConsul(ctx, kv); err != nil { glog.Warningf(ctx, "初始化加载Consul配置失败: %v", err) } // 持续监听配置变化 for { // Consul Blocking Query(长连接模式) // WaitIndex: 指定版本号,只在配置变化时返回 // WaitTime: 最长等待时间(超时后返回,客户端重新请求) pair, meta, err := kv.Get(consulKey, &api.QueryOptions{ WaitIndex: lastIndex, WaitTime: 5 * time.Minute, }) if err != nil { glog.Errorf(ctx, "Consul查询失败: %v", err) time.Sleep(5 * time.Second) // 错误时等待5秒重试 continue } // 配置版本号变化,说明有更新 if meta.LastIndex != lastIndex { lastIndex = meta.LastIndex // 配置被删除 if pair == nil { glog.Warning(ctx, "Consul配置已删除: "+consulKey) cacheMu.Lock() directionsCache = []Direction{} cacheMu.Unlock() continue } // 解析并更新缓存 var dirs []Direction if err := json.Unmarshal(pair.Value, &dirs); err != nil { glog.Errorf(ctx, "解析Consul配置失败: %v", err) continue } cacheMu.Lock() directionsCache = dirs cacheMu.Unlock() glog.Infof(ctx, "Consul配置已更新: %d个方向", len(dirs)) } } } // loadDirectionsFromConsul 从Consul加载配置(初始化时调用) func loadDirectionsFromConsul(ctx context.Context, kv *api.KV) error { const consulKey = "ragflow/directions" pair, _, err := kv.Get(consulKey, nil) if err != nil { // Consul查询失败,fallback到本地配置 glog.Warningf(ctx, "Consul查询失败: %v,fallback到本地配置", err) loadDirectionsFromLocal(ctx) return err } if pair == nil { glog.Warning(ctx, "Consul中未找到配置: "+consulKey+",fallback到本地配置") loadDirectionsFromLocal(ctx) return nil } var dirs []Direction if err := json.Unmarshal(pair.Value, &dirs); err != nil { glog.Errorf(ctx, "解析Consul配置失败: %v,fallback到本地配置", err) loadDirectionsFromLocal(ctx) return err } cacheMu.Lock() directionsCache = dirs cacheMu.Unlock() glog.Infof(ctx, "已加载Consul配置: %d个方向", len(dirs)) return nil } // loadDirectionsFromLocal 从本地config.yml加载配置(fallback机制) func loadDirectionsFromLocal(ctx context.Context) { directionsConfig := g.Cfg().MustGet(ctx, "ragflow.directions") if directionsConfig.IsEmpty() { glog.Warning(ctx, "本地配置中也未找到 ragflow.directions") return } var dirs []Direction if err := directionsConfig.Scan(&dirs); err != nil { glog.Errorf(ctx, "解析本地配置失败: %v", err) return } cacheMu.Lock() directionsCache = dirs cacheMu.Unlock() glog.Infof(ctx, "已加载config.yml配置: %d个方向", len(dirs)) }