From 60902a9839cf65702fca062cce0f75da8168f5c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=96=8C?= <259278618@qq.com> Date: Thu, 1 Jan 2026 10:37:01 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=86=94=E6=96=AD=E7=AD=96?= =?UTF-8?q?=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- middleware/circuit_breaker.go | 327 +++++++++++++++++++++++----------- 1 file changed, 225 insertions(+), 102 deletions(-) diff --git a/middleware/circuit_breaker.go b/middleware/circuit_breaker.go index 688560f..e025b56 100644 --- a/middleware/circuit_breaker.go +++ b/middleware/circuit_breaker.go @@ -25,18 +25,17 @@ const ( // CircuitBreakerConfig 熔断器配置 type CircuitBreakerConfig struct { - MaxFailures int // 连续失败次数 - Timeout string // 熔断超时时间 - HalfOpenSuccess int // 半开状态连续成功次数 - SuccessStatusCodes []int // 视为成功的HTTP状态码 - SlowRequestThreshold string // 慢请求阈值 - HalfOpenRequestSampleRate float64 // 半开状态请求采样率 - Dimension string // 熔断器维度: service/ip/user - EnableSlidingWindow bool // 是否启用滑动窗口 - SlidingWindowSize string // 滑动窗口大小 - FailureRateThreshold float64 // 失败率阈值 - EnableFallback bool // 是否启用降级 - FallbackMessage string // 降级提示消息 + Enabled bool // 是否启用熔断器 + MaxFailures int // 连续失败次数 + Timeout string // 熔断超时时间 + SuccessStatusCodes []int // 视为成功的HTTP状态码 + SlowRequestThreshold string // 慢请求阈值 + EnableSlidingWindow bool // 是否启用滑动窗口 + FailureRateThreshold float64 // 失败率阈值 + EnableFallback bool // 是否启用降级 + FallbackMessage string // 降级提示消息 + RequestTimeout int // 请求超时时间(毫秒) + DistributedTTL int // 分布式熔断状态TTL(秒) } // CircuitBreakerMetrics 熔断器指标 @@ -64,19 +63,17 @@ type CircuitBreakerInfo struct { var ( // circuitBreakers 存储所有熔断器状态(用于健康检查) circuitBreakers sync.Map - // enableDistributed 是否启用分布式熔断 - enableDistributed = false // circuitBreakerConfigs 熔断器配置缓存 circuitBreakerConfigs sync.Map // distributedSyncLock 分布式同步锁 distributedSyncLock sync.Mutex + // stateChangeListeners 状态变化监听器 + stateChangeListeners sync.Map ) // InitCircuitBreaker 初始化Sentinel熔断器 func InitCircuitBreaker() error { ctx := context.Background() - // 从配置文件读取是否启用分布式熔断 - enableDistributed = g.Cfg().MustGet(ctx, "circuitBreaker.enableDistributed", false).Bool() // 初始化Sentinel err := api.InitDefault() @@ -84,33 +81,46 @@ func InitCircuitBreaker() error { return fmt.Errorf("Sentinel初始化失败: %v", err) } - g.Log().Infof(ctx, "Sentinel熔断器初始化成功,分布式熔断: %v", enableDistributed) + // 注册熔断器状态变化监听器 + registerStateChangeListeners() - // 动态从配置文件读取服务列表 - services := g.Cfg().MustGet(ctx, "circuitBreaker.services", []string{ - "customerService", "order", "assets", "cid", "oss", - "wallet", "market", "knapsack", - }).Strings() + g.Log().Infof(ctx, "Sentinel熔断器初始化成功") - if len(services) == 0 { - return fmt.Errorf("未配置熔断器服务列表") - } + // 扫描配置文件中所有配置了熔断器的服务 + services := g.Cfg().MustGet(ctx, "circuitBreaker").Map() - // 为每个服务创建熔断器 - for _, service := range services { - serviceConfig := loadServiceCircuitBreakerConfig(service) - if serviceConfig != nil { - circuitBreakerConfigs.Store(service, serviceConfig) - initErr := initServiceCircuitBreaker(service, serviceConfig) - if initErr != nil { - g.Log().Errorf(ctx, "服务 %s 熔断器初始化失败: %v", service, initErr) - } else { - g.Log().Infof(ctx, "服务 %s 熔断器初始化成功", service) - } + // 过滤掉非服务配置的key + serviceNames := make([]string, 0) + for key := range services { + if key != "services" && key != "enableDistributed" && key != "requestTimeout" && key != "distributedTTL" { + serviceNames = append(serviceNames, key) } } - g.Log().Infof(ctx, "共初始化 %d 个服务熔断器", len(services)) + if len(serviceNames) == 0 { + g.Log().Infof(ctx, "未配置任何服务熔断器") + return nil + } + + // 为每个服务创建熔断器 + enabledCount := 0 + for _, serviceName := range serviceNames { + serviceConfig := loadServiceCircuitBreakerConfig(serviceName) + if serviceConfig != nil && serviceConfig.Enabled { + circuitBreakerConfigs.Store(serviceName, serviceConfig) + initErr := initServiceCircuitBreaker(serviceName, serviceConfig) + if initErr != nil { + g.Log().Errorf(ctx, "服务 %s 熔断器初始化失败: %v", serviceName, initErr) + } else { + g.Log().Infof(ctx, "服务 %s 熔断器初始化成功", serviceName) + enabledCount++ + } + } else { + g.Log().Infof(ctx, "服务 %s 熔断器未启用", serviceName) + } + } + + g.Log().Infof(ctx, "共初始化 %d 个服务熔断器,其中 %d 个已启用", len(serviceNames), enabledCount) return nil } @@ -142,35 +152,33 @@ func loadServiceCircuitBreakerConfig(serviceName string) *CircuitBreakerConfig { ctx := context.Background() key := fmt.Sprintf("circuitBreaker.%s", serviceName) + enabled := g.Cfg().MustGet(ctx, key+".enabled", true).Bool() maxFailures := g.Cfg().MustGet(ctx, key+".maxFailures", 5).Int() timeout := g.Cfg().MustGet(ctx, key+".timeout", "60s").String() - halfOpenSuccess := g.Cfg().MustGet(ctx, key+".halfOpenSuccess", 2).Int() slowRequestThreshold := g.Cfg().MustGet(ctx, key+".slowRequestThreshold", "3s").String() - dimension := g.Cfg().MustGet(ctx, key+".dimension", "service").String() enableSlidingWindow := g.Cfg().MustGet(ctx, key+".enableSlidingWindow", false).Bool() - slidingWindowSize := g.Cfg().MustGet(ctx, key+".slidingWindowSize", "60s").String() failureRateThreshold := g.Cfg().MustGet(ctx, key+".failureRateThreshold", 0.5).Float64() - halfOpenRequestSampleRate := g.Cfg().MustGet(ctx, key+".halfOpenRequestSampleRate", 1.0).Float64() enableFallback := g.Cfg().MustGet(ctx, key+".enableFallback", false).Bool() fallbackMessage := g.Cfg().MustGet(ctx, key+".fallbackMessage", "").String() + requestTimeout := g.Cfg().MustGet(ctx, key+".requestTimeout", 30000).Int() + distributedTTL := g.Cfg().MustGet(ctx, key+".distributedTTL", 300).Int() // 解析成功状态码 successCodes := g.Cfg().MustGet(ctx, key+".successStatusCodes", "200,201,204").String() statusCodes := parseStatusCodes(successCodes) return &CircuitBreakerConfig{ - MaxFailures: maxFailures, - Timeout: timeout, - HalfOpenSuccess: halfOpenSuccess, - SuccessStatusCodes: statusCodes, - SlowRequestThreshold: slowRequestThreshold, - HalfOpenRequestSampleRate: halfOpenRequestSampleRate, - Dimension: dimension, - EnableSlidingWindow: enableSlidingWindow, - SlidingWindowSize: slidingWindowSize, - FailureRateThreshold: failureRateThreshold, - EnableFallback: enableFallback, - FallbackMessage: fallbackMessage, + Enabled: enabled, + MaxFailures: maxFailures, + Timeout: timeout, + SuccessStatusCodes: statusCodes, + SlowRequestThreshold: slowRequestThreshold, + EnableSlidingWindow: enableSlidingWindow, + FailureRateThreshold: failureRateThreshold, + EnableFallback: enableFallback, + FallbackMessage: fallbackMessage, + RequestTimeout: requestTimeout, + DistributedTTL: distributedTTL, } } @@ -189,9 +197,20 @@ func parseStatusCodes(str string) []int { // initServiceCircuitBreaker 初始化服务熔断器 func initServiceCircuitBreaker(serviceName string, config *CircuitBreakerConfig) error { - timeout, _ := time.ParseDuration(config.Timeout) - slowRequestThreshold, _ := time.ParseDuration(config.SlowRequestThreshold) - _, _ = time.ParseDuration(config.SlidingWindowSize) + // 验证配置参数 + if err := validateCircuitBreakerConfig(config); err != nil { + return fmt.Errorf("配置验证失败: %v", err) + } + + timeout, err := time.ParseDuration(config.Timeout) + if err != nil { + return fmt.Errorf("解析超时时间失败: %v", err) + } + + slowRequestThreshold, err := time.ParseDuration(config.SlowRequestThreshold) + if err != nil { + return fmt.Errorf("解析慢请求阈值失败: %v", err) + } resourceName := fmt.Sprintf("service:%s", serviceName) @@ -225,7 +244,7 @@ func initServiceCircuitBreaker(serviceName string, config *CircuitBreakerConfig) } // 加载规则到Sentinel - _, err := circuitbreaker.LoadRules(rule) + _, err = circuitbreaker.LoadRules(rule) if err != nil { return fmt.Errorf("加载熔断规则失败: %v", err) } @@ -244,8 +263,8 @@ func initServiceCircuitBreaker(serviceName string, config *CircuitBreakerConfig) strategy = "slow_ratio" } - g.Log().Infof(context.Background(), "服务 %s 熔断器初始化成功: resource=%s, strategy=%s, timeout=%v", - serviceName, resourceName, strategy, timeout) + g.Log().Infof(context.Background(), "服务 %s 熔断器初始化成功: resource=%s, strategy=%s, timeout=%v, threshold=%.2f", + serviceName, resourceName, strategy, timeout, rule[0].Threshold) return nil } @@ -253,6 +272,7 @@ func initServiceCircuitBreaker(serviceName string, config *CircuitBreakerConfig) // CircuitBreakerMiddleware 熔断降级中间件(使用阿里Sentinel) func CircuitBreakerMiddleware(r *ghttp.Request) { startTime := time.Now() + ctx := r.GetCtx() // 从URL路径提取服务名 pathParts := strings.Split(strings.Trim(r.URL.Path, "/"), "/") @@ -262,25 +282,47 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { } serviceName := pathParts[0] - resourceName := fmt.Sprintf("service:%s", serviceName) - // 获取熔断器信息 - val, ok := circuitBreakers.Load(serviceName) + // 获取熔断器配置 + val, ok := circuitBreakerConfigs.Load(serviceName) if !ok { // 未配置熔断器,直接放行 r.Middleware.Next() return } - cbInfo := val.(*CircuitBreakerInfo) + config := val.(*CircuitBreakerConfig) + if !config.Enabled { + // 熔断器未启用,直接放行 + r.Middleware.Next() + return + } + + // 获取熔断器信息 + cbInfoVal, ok := circuitBreakers.Load(serviceName) + if !ok { + r.Middleware.Next() + return + } + cbInfo := cbInfoVal.(*CircuitBreakerInfo) cbInfo.Metrics.TotalRequests.Add(1) + // 设置请求超时(使用服务独立配置) + if config.RequestTimeout > 0 { + ctx, cancel := context.WithTimeout(ctx, time.Duration(config.RequestTimeout)*time.Millisecond) + r.SetCtx(ctx) + defer cancel() + } + + resourceName := fmt.Sprintf("service:%s", serviceName) + // 检查是否启用分布式熔断 - if enableDistributed { - if isCircuitBreakerOpenInDistributed(r.GetCtx(), resourceName) { + if config.DistributedTTL > 0 { + if isCircuitBreakerOpenInDistributed(ctx, resourceName) { cbInfo.Metrics.BlockRequests.Add(1) - g.Log().Warningf(r.GetCtx(), "分布式熔断触发: %s", resourceName) - sendFallbackResponse(r, serviceName, cbInfo.Config) + g.Log().Warningf(ctx, "分布式熔断触发: %s", resourceName) + notifyStateChange(serviceName, StateOpen, StateOpen) + sendFallbackResponse(r, serviceName, config, "distributed") return } } @@ -291,23 +333,29 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { // 被熔断拦截 cbInfo.Metrics.BlockRequests.Add(1) cbInfo.Metrics.OpenCount.Add(1) - g.Log().Warningf(r.GetCtx(), "熔断触发: %s, reason: %v", resourceName, blockError) + g.Log().Warningf(ctx, "熔断触发: %s, reason: %v", resourceName, blockError) // 更新熔断器状态 cbInfo.mu.Lock() + oldState := cbInfo.State cbInfo.State = StateOpen cbInfo.LastOpenTime = time.Now() - if timeout, err := time.ParseDuration(cbInfo.Config.Timeout); err == nil { + if timeout, err := time.ParseDuration(config.Timeout); err == nil { cbInfo.NextRetryTime = time.Now().Add(timeout) } cbInfo.mu.Unlock() - // 同步到分布式存储 - if enableDistributed { - syncCircuitBreakerStateToDistributed(r.GetCtx(), resourceName, "open") + // 通知状态变化(如果状态改变) + if oldState != StateOpen { + notifyStateChange(serviceName, oldState, StateOpen) } - sendFallbackResponse(r, serviceName, cbInfo.Config) + // 同步到分布式存储 + if config.DistributedTTL > 0 { + syncCircuitBreakerStateToDistributed(ctx, resourceName, "open", config.DistributedTTL) + } + + sendFallbackResponse(r, serviceName, config, "blocked") return } @@ -322,9 +370,17 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { // 记录异常 cbInfo.Metrics.FailureRequests.Add(1) api.TraceError(entry, fmt.Errorf("request failed with status: %d", statusCode)) - g.Log().Debugf(r.GetCtx(), "服务 %s 请求失败: status=%d, duration=%v", serviceName, statusCode, duration) + g.Log().Debugf(ctx, "服务 %s 请求失败: status=%d, duration=%v", serviceName, statusCode, duration) } else { cbInfo.Metrics.PassRequests.Add(1) + // 更新状态为关闭(如果之前是开启状态) + cbInfo.mu.Lock() + oldState := cbInfo.State + if cbInfo.State != StateClosed { + cbInfo.State = StateClosed + notifyStateChange(serviceName, oldState, StateClosed) + } + cbInfo.mu.Unlock() } // 退出Sentinel资源 @@ -332,13 +388,22 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { } // sendFallbackResponse 发送降级响应 -func sendFallbackResponse(r *ghttp.Request, serviceName string, config *CircuitBreakerConfig) { +func sendFallbackResponse(r *ghttp.Request, serviceName string, config *CircuitBreakerConfig, reason string) { if config.EnableFallback && config.FallbackMessage != "" { // 自定义降级消息 r.Response.WriteStatusExit(503, config.FallbackMessage) } else { - // 默认消息 - r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 暂时不可用,请稍后再试", serviceName)) + // 根据原因返回不同的状态码和消息 + switch reason { + case "timeout": + r.Response.WriteStatusExit(504, fmt.Sprintf("服务 '%s' 响应超时", serviceName)) + case "blocked": + r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 熔断保护中,请稍后再试", serviceName)) + case "distributed": + r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 分布式熔断中", serviceName)) + default: + r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 暂时不可用,请稍后再试", serviceName)) + } } } @@ -372,7 +437,13 @@ func isSuccessStatusCode(resourceName string, statusCode int) bool { // isCircuitBreakerOpenInDistributed 检查分布式熔断状态 func isCircuitBreakerOpenInDistributed(ctx context.Context, resourceName string) bool { key := fmt.Sprintf("circuit_breaker:%s:state", resourceName) - value, err := g.Redis().Get(ctx, key) + + redis := g.Redis() + if redis == nil { + return false + } + + value, err := redis.Get(ctx, key) if err != nil || value.IsNil() { return false } @@ -381,18 +452,70 @@ func isCircuitBreakerOpenInDistributed(ctx context.Context, resourceName string) } // syncCircuitBreakerStateToDistributed 同步熔断器状态到分布式存储 -func syncCircuitBreakerStateToDistributed(ctx context.Context, resourceName, state string) { +func syncCircuitBreakerStateToDistributed(ctx context.Context, resourceName, state string, ttl int) { distributedSyncLock.Lock() defer distributedSyncLock.Unlock() key := fmt.Sprintf("circuit_breaker:%s:state", resourceName) - // 设置过期时间为5分钟,使用SetEX - _, err := g.Redis().Do(ctx, "SETEX", key, 300, state) + + redis := g.Redis() + if redis == nil { + g.Log().Errorf(ctx, "Redis客户端未初始化,无法同步熔断状态") + return + } + + _, err := redis.Do(ctx, "SETEX", key, ttl, state) if err != nil { g.Log().Errorf(ctx, "同步熔断状态到Redis失败: %v", err) } } +// validateCircuitBreakerConfig 验证熔断器配置 +func validateCircuitBreakerConfig(config *CircuitBreakerConfig) error { + if config.MaxFailures <= 0 { + return fmt.Errorf("maxFailures必须大于0") + } + if config.FailureRateThreshold < 0 || config.FailureRateThreshold > 1 { + return fmt.Errorf("failureRateThreshold必须在0.0-1.0之间") + } + if len(config.SuccessStatusCodes) == 0 { + return fmt.Errorf("successStatusCodes不能为空") + } + return nil +} + +// registerStateChangeListeners 注册状态变化监听器 +func registerStateChangeListeners() { + // 示例:注册默认监听器 + RegisterStateChangeListener("default", func(serviceName string, fromState, toState CircuitBreakerState) { + g.Log().Infof(context.Background(), "熔断器状态变化: service=%s, %s -> %s", + serviceName, fromState, toState) + }) +} + +// StateChangeListener 状态变化监听器类型 +type StateChangeListener func(serviceName string, fromState, toState CircuitBreakerState) + +// RegisterStateChangeListener 注册状态变化监听器 +func RegisterStateChangeListener(name string, listener StateChangeListener) { + stateChangeListeners.Store(name, listener) +} + +// UnregisterStateChangeListener 取消注册状态变化监听器 +func UnregisterStateChangeListener(name string) { + stateChangeListeners.Delete(name) +} + +// notifyStateChange 通知所有监听器状态变化 +func notifyStateChange(serviceName string, fromState, toState CircuitBreakerState) { + stateChangeListeners.Range(func(key, value interface{}) bool { + if listener, ok := value.(StateChangeListener); ok { + listener(serviceName, fromState, toState) + } + return true + }) +} + // CircuitBreakerHealthCheckHandler 熔断器健康检查接口 func CircuitBreakerHealthCheckHandler(r *ghttp.Request) { status := make(map[string]interface{}) @@ -431,7 +554,6 @@ func CircuitBreakerHealthCheckHandler(r *ghttp.Request) { "totalServices": totalServices, "openServices": openServices, "closedServices": totalServices - openServices, - "distributed": enableDistributed, } r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ @@ -444,20 +566,6 @@ func CircuitBreakerHealthCheckHandler(r *ghttp.Request) { }) } -// getSentinelStateString 转换Sentinel状态为字符串 -func getSentinelStateString(state circuitbreaker.State) string { - switch state { - case circuitbreaker.Closed: - return string(StateClosed) - case circuitbreaker.Open: - return string(StateOpen) - case circuitbreaker.HalfOpen: - return string(StateHalfOpen) - default: - return "unknown" - } -} - // CircuitBreakerResetHandler 熔断器手动重置接口(仅限管理后台调用) func CircuitBreakerResetHandler(r *ghttp.Request) { serviceName := r.Get("service").String() @@ -510,9 +618,15 @@ func CircuitBreakerResetHandler(r *ghttp.Request) { } // 重置分布式状态(如果启用) - if enableDistributed { - key := fmt.Sprintf("circuit_breaker:%s:state", resourceName) - _, _ = g.Redis().Del(r.GetCtx(), key) + if val, ok := circuitBreakerConfigs.Load(serviceName); ok { + config := val.(*CircuitBreakerConfig) + if config.DistributedTTL > 0 { + key := fmt.Sprintf("circuit_breaker:%s:state", resourceName) + redis := g.Redis() + if redis != nil { + _, _ = redis.Del(r.GetCtx(), key) + } + } } g.Log().Infof(r.GetCtx(), "熔断器已手动重置: %s", resourceName) @@ -528,12 +642,21 @@ func CircuitBreakerReloadHandler(r *ghttp.Request) { serviceName := r.Get("service").String() if serviceName == "" { - // 重载所有服务 - services := g.Cfg().MustGet(r.GetCtx(), "circuitBreaker.services", []string{}).Strings() + // 重载所有服务 - 扫描配置文件中所有服务 + services := g.Cfg().MustGet(r.GetCtx(), "circuitBreaker").Map() + + // 过滤出服务名 + serviceNames := make([]string, 0) + for key := range services { + if key != "services" && key != "enableDistributed" && key != "requestTimeout" && key != "distributedTTL" { + serviceNames = append(serviceNames, key) + } + } + successCount := 0 failCount := 0 - for _, service := range services { + for _, service := range serviceNames { err := ReloadCircuitBreakerConfig(service) if err != nil { g.Log().Errorf(r.GetCtx(), "服务 %s 配置重载失败: %v", service, err)