From 274bab258bffe89f242288c281882adc4fef8054 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=96=8C?= <259278618@qq.com> Date: Thu, 1 Jan 2026 07:38:00 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=86=94=E6=96=AD=E7=AD=96?= =?UTF-8?q?=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- middleware/circuit_breaker.go | 266 ++++++++++++++++++++++++++-------- 1 file changed, 209 insertions(+), 57 deletions(-) diff --git a/middleware/circuit_breaker.go b/middleware/circuit_breaker.go index b28c414..688560f 100644 --- a/middleware/circuit_breaker.go +++ b/middleware/circuit_breaker.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "time" "github.com/alibaba/sentinel-golang/api" @@ -34,17 +35,30 @@ type CircuitBreakerConfig struct { EnableSlidingWindow bool // 是否启用滑动窗口 SlidingWindowSize string // 滑动窗口大小 FailureRateThreshold float64 // 失败率阈值 + EnableFallback bool // 是否启用降级 + FallbackMessage string // 降级提示消息 +} + +// CircuitBreakerMetrics 熔断器指标 +type CircuitBreakerMetrics struct { + TotalRequests atomic.Int64 // 总请求数 + PassRequests atomic.Int64 // 通过请求数 + BlockRequests atomic.Int64 // 阻塞请求数 + FailureRequests atomic.Int64 // 失败请求数 + OpenCount atomic.Int64 // 熔断开启次数 } // CircuitBreakerInfo 熔断器信息 type CircuitBreakerInfo struct { - ResourceName string `json:"resourceName"` // 资源名称 - State CircuitBreakerState `json:"state"` // 当前状态 - Config *CircuitBreakerConfig `json:"config"` // 配置信息 - FailCount int64 `json:"failCount"` // 失败次数 - TotalCount int64 `json:"totalCount"` // 总请求数 - LastOpenTime time.Time `json:"lastOpenTime"` // 上次熔断时间 - NextRetryTime time.Time `json:"nextRetryTime"` // 下次重试时间 + ResourceName string `json:"resourceName"` // 资源名称 + State CircuitBreakerState `json:"state"` // 当前状态 + Config *CircuitBreakerConfig `json:"config"` // 配置信息 + FailCount int64 `json:"failCount"` // 失败次数 + TotalCount int64 `json:"totalCount"` // 总请求数 + LastOpenTime time.Time `json:"lastOpenTime"` // 上次熔断时间 + NextRetryTime time.Time `json:"nextRetryTime"` // 下次重试时间 + Metrics *CircuitBreakerMetrics `json:"metrics"` // 指标统计 + mu sync.RWMutex // 保护状态更新 } var ( @@ -54,6 +68,8 @@ var ( enableDistributed = false // circuitBreakerConfigs 熔断器配置缓存 circuitBreakerConfigs sync.Map + // distributedSyncLock 分布式同步锁 + distributedSyncLock sync.Mutex ) // InitCircuitBreaker 初始化Sentinel熔断器 @@ -70,15 +86,17 @@ func InitCircuitBreaker() error { g.Log().Infof(ctx, "Sentinel熔断器初始化成功,分布式熔断: %v", enableDistributed) - // 加载所有服务的熔断器配置 - loadCircuitBreakerConfigs() - - // 为每个服务创建熔断器 - services := []string{ + // 动态从配置文件读取服务列表 + services := g.Cfg().MustGet(ctx, "circuitBreaker.services", []string{ "customerService", "order", "assets", "cid", "oss", "wallet", "market", "knapsack", + }).Strings() + + if len(services) == 0 { + return fmt.Errorf("未配置熔断器服务列表") } + // 为每个服务创建熔断器 for _, service := range services { serviceConfig := loadServiceCircuitBreakerConfig(service) if serviceConfig != nil { @@ -86,25 +104,37 @@ func InitCircuitBreaker() error { initErr := initServiceCircuitBreaker(service, serviceConfig) if initErr != nil { g.Log().Errorf(ctx, "服务 %s 熔断器初始化失败: %v", service, initErr) + } else { + g.Log().Infof(ctx, "服务 %s 熔断器初始化成功", service) } } } + g.Log().Infof(ctx, "共初始化 %d 个服务熔断器", len(services)) return nil } -// loadCircuitBreakerConfigs 加载熔断器配置 -func loadCircuitBreakerConfigs() { - services := []string{ - "customerService", "order", "assets", "cid", "oss", - "wallet", "market", "knapsack", +// ReloadCircuitBreakerConfig 动态重新加载熔断器配置 +func ReloadCircuitBreakerConfig(serviceName string) error { + ctx := context.Background() + + // 重新加载配置 + serviceConfig := loadServiceCircuitBreakerConfig(serviceName) + if serviceConfig == nil { + return fmt.Errorf("未找到服务 %s 的配置", serviceName) } - for _, service := range services { - config := loadServiceCircuitBreakerConfig(service) - if config != nil { - circuitBreakerConfigs.Store(service, config) - } + + // 更新配置缓存 + circuitBreakerConfigs.Store(serviceName, serviceConfig) + + // 重新初始化熔断器 + err := initServiceCircuitBreaker(serviceName, serviceConfig) + if err != nil { + return fmt.Errorf("重新初始化熔断器失败: %v", err) } + + g.Log().Infof(ctx, "服务 %s 熔断器配置重新加载成功", serviceName) + return nil } // loadServiceCircuitBreakerConfig 加载单个服务的熔断器配置 @@ -121,6 +151,8 @@ func loadServiceCircuitBreakerConfig(serviceName string) *CircuitBreakerConfig { slidingWindowSize := g.Cfg().MustGet(ctx, key+".slidingWindowSize", "60s").String() failureRateThreshold := g.Cfg().MustGet(ctx, key+".failureRateThreshold", 0.5).Float64() halfOpenRequestSampleRate := g.Cfg().MustGet(ctx, key+".halfOpenRequestSampleRate", 1.0).Float64() + enableFallback := g.Cfg().MustGet(ctx, key+".enableFallback", false).Bool() + fallbackMessage := g.Cfg().MustGet(ctx, key+".fallbackMessage", "").String() // 解析成功状态码 successCodes := g.Cfg().MustGet(ctx, key+".successStatusCodes", "200,201,204").String() @@ -137,6 +169,8 @@ func loadServiceCircuitBreakerConfig(serviceName string) *CircuitBreakerConfig { EnableSlidingWindow: enableSlidingWindow, SlidingWindowSize: slidingWindowSize, FailureRateThreshold: failureRateThreshold, + EnableFallback: enableFallback, + FallbackMessage: fallbackMessage, } } @@ -201,6 +235,7 @@ func initServiceCircuitBreaker(serviceName string, config *CircuitBreakerConfig) ResourceName: resourceName, State: StateClosed, Config: config, + Metrics: &CircuitBreakerMetrics{}, } circuitBreakers.Store(serviceName, cbInfo) @@ -217,6 +252,8 @@ func initServiceCircuitBreaker(serviceName string, config *CircuitBreakerConfig) // CircuitBreakerMiddleware 熔断降级中间件(使用阿里Sentinel) func CircuitBreakerMiddleware(r *ghttp.Request) { + startTime := time.Now() + // 从URL路径提取服务名 pathParts := strings.Split(strings.Trim(r.URL.Path, "/"), "/") if len(pathParts) == 0 { @@ -227,12 +264,23 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { serviceName := pathParts[0] resourceName := fmt.Sprintf("service:%s", serviceName) + // 获取熔断器信息 + val, ok := circuitBreakers.Load(serviceName) + if !ok { + // 未配置熔断器,直接放行 + r.Middleware.Next() + return + } + + cbInfo := val.(*CircuitBreakerInfo) + cbInfo.Metrics.TotalRequests.Add(1) + // 检查是否启用分布式熔断 if enableDistributed { - // 检查Redis中的熔断状态 if isCircuitBreakerOpenInDistributed(r.GetCtx(), resourceName) { + cbInfo.Metrics.BlockRequests.Add(1) g.Log().Warningf(r.GetCtx(), "分布式熔断触发: %s", resourceName) - r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 熔断中,请稍后再试", serviceName)) + sendFallbackResponse(r, serviceName, cbInfo.Config) return } } @@ -241,20 +289,25 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { entry, blockError := api.Entry(resourceName) if blockError != nil { // 被熔断拦截 + cbInfo.Metrics.BlockRequests.Add(1) + cbInfo.Metrics.OpenCount.Add(1) g.Log().Warningf(r.GetCtx(), "熔断触发: %s, reason: %v", resourceName, blockError) // 更新熔断器状态 - if val, ok := circuitBreakers.Load(serviceName); ok { - cbInfo := val.(*CircuitBreakerInfo) - cbInfo.State = StateOpen - cbInfo.LastOpenTime = time.Now() - if timeout, err := time.ParseDuration(cbInfo.Config.Timeout); err == nil { - cbInfo.NextRetryTime = time.Now().Add(timeout) - } - circuitBreakers.Store(serviceName, cbInfo) + cbInfo.mu.Lock() + cbInfo.State = StateOpen + cbInfo.LastOpenTime = time.Now() + if timeout, err := time.ParseDuration(cbInfo.Config.Timeout); err == nil { + cbInfo.NextRetryTime = time.Now().Add(timeout) + } + cbInfo.mu.Unlock() + + // 同步到分布式存储 + if enableDistributed { + syncCircuitBreakerStateToDistributed(r.GetCtx(), resourceName, "open") } - r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 暂时不可用,请稍后再试", serviceName)) + sendFallbackResponse(r, serviceName, cbInfo.Config) return } @@ -263,15 +316,32 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { // 记录请求结果(基于HTTP状态码) statusCode := r.Response.Status + duration := time.Since(startTime) + if !isSuccessStatusCode(resourceName, statusCode) { // 记录异常 + cbInfo.Metrics.FailureRequests.Add(1) api.TraceError(entry, fmt.Errorf("request failed with status: %d", statusCode)) + g.Log().Debugf(r.GetCtx(), "服务 %s 请求失败: status=%d, duration=%v", serviceName, statusCode, duration) + } else { + cbInfo.Metrics.PassRequests.Add(1) } // 退出Sentinel资源 entry.Exit() } +// sendFallbackResponse 发送降级响应 +func sendFallbackResponse(r *ghttp.Request, serviceName string, config *CircuitBreakerConfig) { + if config.EnableFallback && config.FallbackMessage != "" { + // 自定义降级消息 + r.Response.WriteStatusExit(503, config.FallbackMessage) + } else { + // 默认消息 + r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 暂时不可用,请稍后再试", serviceName)) + } +} + // isSuccessStatusCode 判断HTTP状态码是否成功 func isSuccessStatusCode(resourceName string, statusCode int) bool { serviceName := strings.TrimPrefix(resourceName, "service:") @@ -310,38 +380,67 @@ func isCircuitBreakerOpenInDistributed(ctx context.Context, resourceName string) return state == "open" } +// syncCircuitBreakerStateToDistributed 同步熔断器状态到分布式存储 +func syncCircuitBreakerStateToDistributed(ctx context.Context, resourceName, state string) { + distributedSyncLock.Lock() + defer distributedSyncLock.Unlock() + + key := fmt.Sprintf("circuit_breaker:%s:state", resourceName) + // 设置过期时间为5分钟,使用SetEX + _, err := g.Redis().Do(ctx, "SETEX", key, 300, state) + if err != nil { + g.Log().Errorf(ctx, "同步熔断状态到Redis失败: %v", err) + } +} + // CircuitBreakerHealthCheckHandler 熔断器健康检查接口 func CircuitBreakerHealthCheckHandler(r *ghttp.Request) { status := make(map[string]interface{}) + totalServices := 0 + openServices := 0 // 遍历所有熔断器 circuitBreakers.Range(func(key, value interface{}) bool { serviceName := key.(string) cbInfo := value.(*CircuitBreakerInfo) - // 获取Sentinel中的实际状态 - rules := circuitbreaker.GetRulesOfResource(cbInfo.ResourceName) - var stateStr string - if len(rules) > 0 { - stateStr = string(cbInfo.State) - } else { - stateStr = "unknown" + totalServices++ + cbInfo.mu.RLock() + isOpen := cbInfo.State == StateOpen + if isOpen { + openServices++ } status[serviceName] = map[string]interface{}{ - "resource": cbInfo.ResourceName, - "state": stateStr, - "config": cbInfo.Config, - "lastOpenTime": cbInfo.LastOpenTime, - "nextRetryTime": cbInfo.NextRetryTime, + "resource": cbInfo.ResourceName, + "state": string(cbInfo.State), + "lastOpenTime": cbInfo.LastOpenTime, + "nextRetryTime": cbInfo.NextRetryTime, + "totalRequests": cbInfo.Metrics.TotalRequests.Load(), + "passRequests": cbInfo.Metrics.PassRequests.Load(), + "blockRequests": cbInfo.Metrics.BlockRequests.Load(), + "failureRequests": cbInfo.Metrics.FailureRequests.Load(), + "openCount": cbInfo.Metrics.OpenCount.Load(), } + cbInfo.mu.RUnlock() + return true }) + summary := map[string]interface{}{ + "totalServices": totalServices, + "openServices": openServices, + "closedServices": totalServices - openServices, + "distributed": enableDistributed, + } + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ Code: 200, Message: "熔断器状态", - Data: status, + Data: map[string]interface{}{ + "summary": summary, + "services": status, + }, }) } @@ -372,20 +471,25 @@ func CircuitBreakerResetHandler(r *ghttp.Request) { resourceName := fmt.Sprintf("service:%s", serviceName) - // 重置Sentinel规则 - 清空现有规则 - _, err := circuitbreaker.LoadRules([]*circuitbreaker.Rule{}) - if err != nil { - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ - Code: 500, - Message: fmt.Sprintf("重置熔断器失败: %v", err), - }) - return + // 获取当前服务的所有规则 + currentRules := circuitbreaker.GetRulesOfResource(resourceName) + + // 只删除当前服务的规则 + if len(currentRules) > 0 { + _, err := circuitbreaker.LoadRulesOfResource(resourceName, []*circuitbreaker.Rule{}) + if err != nil { + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ + Code: 500, + Message: fmt.Sprintf("重置熔断器失败: %v", err), + }) + return + } } - // 重新加载规则 + // 重新加载该服务的规则 if val, ok := circuitBreakerConfigs.Load(serviceName); ok { config := val.(*CircuitBreakerConfig) - err = initServiceCircuitBreaker(serviceName, config) + err := initServiceCircuitBreaker(serviceName, config) if err != nil { r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ Code: 500, @@ -398,10 +502,11 @@ func CircuitBreakerResetHandler(r *ghttp.Request) { // 更新内存状态 if val, ok := circuitBreakers.Load(serviceName); ok { cbInfo := val.(*CircuitBreakerInfo) + cbInfo.mu.Lock() cbInfo.State = StateClosed cbInfo.LastOpenTime = time.Time{} cbInfo.NextRetryTime = time.Time{} - circuitBreakers.Store(serviceName, cbInfo) + cbInfo.mu.Unlock() } // 重置分布式状态(如果启用) @@ -417,3 +522,50 @@ func CircuitBreakerResetHandler(r *ghttp.Request) { Message: fmt.Sprintf("服务 '%s' 的熔断器已重置", serviceName), }) } + +// CircuitBreakerReloadHandler 熔断器配置重载接口 +func CircuitBreakerReloadHandler(r *ghttp.Request) { + serviceName := r.Get("service").String() + + if serviceName == "" { + // 重载所有服务 + services := g.Cfg().MustGet(r.GetCtx(), "circuitBreaker.services", []string{}).Strings() + successCount := 0 + failCount := 0 + + for _, service := range services { + err := ReloadCircuitBreakerConfig(service) + if err != nil { + g.Log().Errorf(r.GetCtx(), "服务 %s 配置重载失败: %v", service, err) + failCount++ + } else { + successCount++ + } + } + + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ + Code: 200, + Message: fmt.Sprintf("配置重载完成: 成功 %d, 失败 %d", successCount, failCount), + Data: map[string]interface{}{ + "success": successCount, + "failed": failCount, + }, + }) + return + } + + // 重载单个服务 + err := ReloadCircuitBreakerConfig(serviceName) + if err != nil { + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ + Code: 500, + Message: fmt.Sprintf("重载失败: %v", err), + }) + return + } + + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ + Code: 200, + Message: fmt.Sprintf("服务 '%s' 的熔断器配置已重载", serviceName), + }) +}