package middleware import ( "context" "fmt" "strconv" "strings" "sync" "sync/atomic" "time" "gitee.com/red-future---jilin-g/common/redis" "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/circuitbreaker" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/ghttp" ) // CircuitBreakerState 熔断器状态 type CircuitBreakerState string const ( StateClosed CircuitBreakerState = "closed" StateOpen CircuitBreakerState = "open" StateHalfOpen CircuitBreakerState = "halfopen" ) // 熔断器状态常量(用于atomic.Int64) const ( stateClosed int64 = 0 stateOpen int64 = 1 stateHalfOpen int64 = 2 ) // CircuitBreakerConfig 熔断器配置 type CircuitBreakerConfig struct { Enabled bool MaxFailures int Timeout string TimeoutParsed time.Duration SuccessStatusCodes []int SlowRequestThreshold string SlowRequestThresholdParsed time.Duration EnableSlidingWindow bool FailureRateThreshold float64 EnableFallback bool FallbackMessage string RequestTimeout int DistributedTTL int StatIntervalMs int MinRequestAmount int HalfOpenMaxRequests int HalfOpenSuccessThreshold float64 WarmupDuration string WarmupDurationParsed time.Duration EnableAdaptiveThreshold bool AdaptiveMinThreshold float64 AdaptiveMaxThreshold float64 } // CircuitBreakerMetrics 熔断器指标 type CircuitBreakerMetrics struct { // 请求统计 TotalRequests atomic.Int64 PassRequests atomic.Int64 BlockRequests atomic.Int64 FailureRequests atomic.Int64 SlowRequests atomic.Int64 // 状态统计 OpenCount atomic.Int64 ClosedCount atomic.Int64 HalfOpenCount atomic.Int64 // 时间戳 LastResetTime atomic.Int64 LastOpenTime atomic.Int64 NextRetryTime atomic.Int64 LastCloseTime atomic.Int64 LastHalfOpenTime atomic.Int64 // 半开状态统计 HalfOpenRequests atomic.Int64 HalfOpenPassed atomic.Int64 HalfOpenFailed atomic.Int64 // 性能指标 TotalResponseTime atomic.Int64 // 总响应时间(纳秒) MinResponseTime atomic.Int64 // 最小响应时间(纳秒) MaxResponseTime atomic.Int64 // 最大响应时间(纳秒) // 窗口统计(用于计算成功率等) WindowStartTime atomic.Int64 // 统计窗口开始时间 WindowRequests atomic.Int64 // 窗口内请求总数 WindowFailures atomic.Int64 // 窗口内失败数 } // 实现HalfOpenMetrics接口 func (m *CircuitBreakerMetrics) GetHalfOpenRequests() *atomic.Int64 { return &m.HalfOpenRequests } func (m *CircuitBreakerMetrics) GetHalfOpenPassed() *atomic.Int64 { return &m.HalfOpenPassed } func (m *CircuitBreakerMetrics) GetHalfOpenFailed() *atomic.Int64 { return &m.HalfOpenFailed } func (m *CircuitBreakerMetrics) AddHalfOpenRequests(delta int64) { m.HalfOpenRequests.Add(delta) } func (m *CircuitBreakerMetrics) AddHalfOpenPassed(delta int64) { m.HalfOpenPassed.Add(delta) } func (m *CircuitBreakerMetrics) AddHalfOpenFailed(delta int64) { m.HalfOpenFailed.Add(delta) } // 半开状态指标接口,定义半开状态管理需要的原子操作 type HalfOpenMetrics interface { GetHalfOpenRequests() *atomic.Int64 GetHalfOpenPassed() *atomic.Int64 GetHalfOpenFailed() *atomic.Int64 AddHalfOpenRequests(delta int64) AddHalfOpenPassed(delta int64) AddHalfOpenFailed(delta int64) } // HalfOpenManager 半开状态管理器 // 解决多个goroutine同时操作半开状态时可能出现的竞态条件和数据不一致问题 type HalfOpenManager struct { mu sync.RWMutex } // NewHalfOpenManager 创建半开状态管理器实例 func NewHalfOpenManager() *HalfOpenManager { return &HalfOpenManager{} } var ( halfOpenManagerInstance *HalfOpenManager halfOpenManagerOnce sync.Once ) // GetHalfOpenManager 获取半开状态管理器单例 func GetHalfOpenManager() *HalfOpenManager { halfOpenManagerOnce.Do(func() { halfOpenManagerInstance = NewHalfOpenManager() }) return halfOpenManagerInstance } // TryAcquireHalfOpenSlot 尝试获取半开状态的请求槽位 func (m *HalfOpenManager) TryAcquireHalfOpenSlot(metrics HalfOpenMetrics, maxRequests int) (bool, int) { if maxRequests <= 0 { return false, 0 } m.mu.Lock() defer m.mu.Unlock() currentRequests := int(metrics.GetHalfOpenRequests().Load()) if currentRequests >= maxRequests { return false, currentRequests } metrics.AddHalfOpenRequests(1) metrics.AddHalfOpenPassed(1) return true, currentRequests + 1 } // RecordHalfOpenResult 记录半开状态请求结果,并检查是否达到成功阈值 func (m *HalfOpenManager) RecordHalfOpenResult(metrics HalfOpenMetrics, isSuccess bool, successThreshold float64) bool { if successThreshold < 0 || successThreshold > 1 { successThreshold = 0.5 } m.mu.Lock() defer m.mu.Unlock() metrics.AddHalfOpenRequests(-1) if isSuccess { metrics.AddHalfOpenPassed(1) } else { metrics.AddHalfOpenFailed(1) } return m.checkHalfOpenSuccessThreshold(metrics, successThreshold) } // checkHalfOpenSuccessThreshold 检查半开状态的成功率是否达到阈值 func (m *HalfOpenManager) checkHalfOpenSuccessThreshold(metrics HalfOpenMetrics, successThreshold float64) bool { totalRequests := metrics.GetHalfOpenPassed().Load() + metrics.GetHalfOpenFailed().Load() passedRequests := metrics.GetHalfOpenPassed().Load() if totalRequests == 0 { return false } successRate := float64(passedRequests) / float64(totalRequests) return successRate >= successThreshold } // ResetHalfOpenStats 重置半开状态统计 func (m *HalfOpenManager) ResetHalfOpenStats(metrics HalfOpenMetrics) { m.mu.Lock() defer m.mu.Unlock() metrics.GetHalfOpenRequests().Store(0) metrics.GetHalfOpenPassed().Store(0) metrics.GetHalfOpenFailed().Store(0) } // CircuitBreakerInfo 熔断器信息 type CircuitBreakerInfo struct { ResourceName string State atomic.Int64 Config *CircuitBreakerConfig Metrics *CircuitBreakerMetrics SuccessCodeMap map[int]bool AdaptiveThreshold float64 WarmupEndTime int64 } var ( circuitBreakers sync.Map stateChangeListeners sync.Map stateChangeListenersRegistered sync.Map ) // 默认值常量 const ( defaultMaxFailures = 5 defaultTimeout = "60s" defaultSlowRequestThreshold = "3s" defaultStatIntervalMs = 1000 defaultRequestTimeout = 30000 defaultDistributedTTL = 300 defaultHalfOpenMaxRequests = 5 defaultWarmupDuration = "10s" defaultHalfOpenSuccessThreshold = 0.5 ) // getState 获取熔断器状态 func (cb *CircuitBreakerInfo) getState() CircuitBreakerState { switch cb.State.Load() { case stateOpen: return StateOpen case stateHalfOpen: return StateHalfOpen default: return StateClosed } } // setState 设置熔断器状态 func (cb *CircuitBreakerInfo) setState(state CircuitBreakerState) CircuitBreakerState { return cb.setStateWithMetrics(state, true) } // setStateWithMetrics 设置熔断器状态并更新指标 func (cb *CircuitBreakerInfo) setStateWithMetrics(state CircuitBreakerState, updateMetrics bool) CircuitBreakerState { newState := cb.stateToInt64(state) oldState := cb.State.Swap(newState) oldStateEnum := cb.int64ToState(oldState) // 如果状态发生了变化且需要更新指标 if oldStateEnum != state && updateMetrics { cb.updateStateMetrics(state) } return oldStateEnum } // init 初始化熔断器信息 func (cb *CircuitBreakerInfo) init() { cb.State.Store(stateClosed) cb.Metrics.LastResetTime.Store(time.Now().Unix()) cb.Metrics.LastCloseTime.Store(time.Now().Unix()) cb.Metrics.WindowStartTime.Store(time.Now().Unix()) } // stateToInt64 将CircuitBreakerState转换为int64状态 func (cb *CircuitBreakerInfo) stateToInt64(state CircuitBreakerState) int64 { switch state { case StateOpen: return stateOpen case StateHalfOpen: return stateHalfOpen default: return stateClosed } } // int64ToState 将int64状态转换为CircuitBreakerState func (cb *CircuitBreakerInfo) int64ToState(state int64) CircuitBreakerState { switch state { case stateOpen: return StateOpen case stateHalfOpen: return StateHalfOpen default: return StateClosed } } // updateStateMetrics 更新状态相关的指标 func (cb *CircuitBreakerInfo) updateStateMetrics(state CircuitBreakerState) { now := time.Now().Unix() // 根据新状态更新计数器 switch state { case StateOpen: cb.Metrics.OpenCount.Add(1) cb.Metrics.LastOpenTime.Store(now) // 设置下一次重试时间 cb.Metrics.NextRetryTime.Store(time.Now().Add(cb.Config.TimeoutParsed).Unix()) case StateClosed: cb.Metrics.ClosedCount.Add(1) cb.Metrics.LastCloseTime.Store(now) case StateHalfOpen: cb.Metrics.HalfOpenCount.Add(1) cb.Metrics.LastHalfOpenTime.Store(now) } } // getCircuitBreakerInfoByResource 根据资源名获取熔断器信息 // 支持精确匹配和前缀匹配 func getCircuitBreakerInfoByResource(resourceName string) (*CircuitBreakerInfo, *CircuitBreakerConfig) { // 先尝试精确匹配 if cbInfoVal, ok := circuitBreakers.Load(resourceName); ok { cbInfo, ok := cbInfoVal.(*CircuitBreakerInfo) if ok { return cbInfo, cbInfo.Config } } // 尝试前缀匹配:去掉查询参数部分 if idx := strings.Index(resourceName, "?"); idx > 0 { prefix := resourceName[:idx] if cbInfoVal, ok := circuitBreakers.Load(prefix); ok { cbInfo, ok := cbInfoVal.(*CircuitBreakerInfo) if ok { return cbInfo, cbInfo.Config } } } return nil, nil } // updateResponseTimeStats 更新响应时间统计 func updateResponseTimeStats(cbInfo *CircuitBreakerInfo, duration time.Duration, config *CircuitBreakerConfig) { durationNs := duration.Nanoseconds() cbInfo.Metrics.TotalResponseTime.Add(durationNs) // 原子更新最小和最大响应时间 atomicUpdateMin(&cbInfo.Metrics.MinResponseTime, durationNs) atomicUpdateMax(&cbInfo.Metrics.MaxResponseTime, durationNs) if duration > config.SlowRequestThresholdParsed { cbInfo.Metrics.SlowRequests.Add(1) } } // formatUnixTime 格式化Unix时间戳 func formatUnixTime(timestamp int64) string { if timestamp <= 0 { return "" } return time.Unix(timestamp, 0).Format("2006-01-02 15:04:05") } // InitCircuitBreaker 初始化Sentinel熔断器 func InitCircuitBreaker() error { ctx := context.Background() if err := api.InitDefault(); err != nil { return fmt.Errorf("sentinel初始化失败: %v", err) } registerStateChangeListeners() g.Log().Infof(ctx, "Sentinel熔断器初始化成功") // 加载接口级别的熔断器配置 configs := g.Cfg().MustGet(ctx, "circuitBreaker.interfaces").Map() if len(configs) == 0 { g.Log().Infof(ctx, "未配置任何接口熔断器") return nil } enabledCount := 0 for resourcePattern, configData := range configs { config, err := loadInterfaceCircuitBreakerConfig(ctx, resourcePattern, configData) if err != nil { g.Log().Errorf(ctx, "加载接口 %s 熔断器配置失败: %v", resourcePattern, err) continue } if config != nil && config.Enabled { if err := initInterfaceCircuitBreaker(resourcePattern, config); err != nil { g.Log().Errorf(ctx, "接口 %s 熔断器初始化失败: %v", resourcePattern, err) } else { g.Log().Infof(ctx, "接口 %s 熔断器初始化成功", resourcePattern) enabledCount++ } } } g.Log().Infof(ctx, "共初始化 %d 个接口熔断器,其中 %d 个已启用", len(configs), enabledCount) return nil } // ReloadCircuitBreakerConfig 动态重新加载熔断器配置 // loadInterfaceCircuitBreakerConfig 加载接口级别的熔断器配置 func loadInterfaceCircuitBreakerConfig(ctx context.Context, resourcePattern string, configData interface{}) (*CircuitBreakerConfig, error) { configMap, ok := configData.(map[string]interface{}) if !ok { return nil, fmt.Errorf("接口 %s 配置格式错误: %v", resourcePattern, configData) } config := &CircuitBreakerConfig{ Enabled: getBoolFromMap(configMap, "enabled", true), MaxFailures: getIntFromMap(configMap, "maxFailures", defaultMaxFailures), Timeout: getStringFromMap(configMap, "timeout", defaultTimeout), SlowRequestThreshold: getStringFromMap(configMap, "slowRequestThreshold", defaultSlowRequestThreshold), EnableSlidingWindow: getBoolFromMap(configMap, "enableSlidingWindow", false), FailureRateThreshold: getFloatFromMap(configMap, "failureRateThreshold", 0.5), EnableFallback: getBoolFromMap(configMap, "enableFallback", false), FallbackMessage: getStringFromMap(configMap, "fallbackMessage", ""), RequestTimeout: getIntFromMap(configMap, "requestTimeout", defaultRequestTimeout), DistributedTTL: getIntFromMap(configMap, "distributedTTL", defaultDistributedTTL), StatIntervalMs: getIntFromMap(configMap, "statIntervalMs", defaultStatIntervalMs), HalfOpenMaxRequests: getIntFromMap(configMap, "halfOpenMaxRequests", defaultHalfOpenMaxRequests), HalfOpenSuccessThreshold: getFloatFromMap(configMap, "halfOpenSuccessThreshold", defaultHalfOpenSuccessThreshold), WarmupDuration: getStringFromMap(configMap, "warmupDuration", defaultWarmupDuration), EnableAdaptiveThreshold: getBoolFromMap(configMap, "enableAdaptiveThreshold", false), AdaptiveMinThreshold: getFloatFromMap(configMap, "adaptiveMinThreshold", 0.3), AdaptiveMaxThreshold: getFloatFromMap(configMap, "adaptiveMaxThreshold", 0.7), } config.MinRequestAmount = getIntFromMap(configMap, "minRequestAmount", 0) if config.MinRequestAmount == 0 { config.MinRequestAmount = config.MaxFailures } // 解析时间 - 使用默认值处理解析错误 config.TimeoutParsed, config.Timeout = parseDurationWithDefault(ctx, config.Timeout, defaultTimeout, "timeout") config.SlowRequestThresholdParsed, config.SlowRequestThreshold = parseDurationWithDefault(ctx, config.SlowRequestThreshold, defaultSlowRequestThreshold, "slowRequestThreshold") config.WarmupDurationParsed, config.WarmupDuration = parseDurationWithDefault(ctx, config.WarmupDuration, defaultWarmupDuration, "warmupDuration") // 解析状态码 successCodes := getStringFromMap(configMap, "successStatusCodes", "200,201,204") config.SuccessStatusCodes = parseIntSlice(successCodes) return config, nil } // 辅助函数:从map中获取值 func getBoolFromMap(m map[string]interface{}, key string, defaultValue bool) bool { if val, ok := m[key]; ok { if b, ok := val.(bool); ok { return b } } return defaultValue } func getIntFromMap(m map[string]interface{}, key string, defaultValue int) int { if val, ok := m[key]; ok { switch v := val.(type) { case int: return v case float64: return int(v) case string: if i, err := strconv.Atoi(v); err == nil { return i } } } return defaultValue } func getFloatFromMap(m map[string]interface{}, key string, defaultValue float64) float64 { if val, ok := m[key]; ok { switch v := val.(type) { case float64: return v case int: return float64(v) case string: if f, err := strconv.ParseFloat(v, 64); err == nil { return f } } } return defaultValue } func getStringFromMap(m map[string]interface{}, key string, defaultValue string) string { if val, ok := m[key]; ok { if s, ok := val.(string); ok { return s } } return defaultValue } // initInterfaceCircuitBreaker 初始化接口级别的熔断器 func initInterfaceCircuitBreaker(resourcePattern string, config *CircuitBreakerConfig) error { if err := validateCircuitBreakerConfig(config); err != nil { return err } threshold := config.FailureRateThreshold if config.EnableAdaptiveThreshold { threshold = (config.AdaptiveMinThreshold + config.AdaptiveMaxThreshold) / 2 } var rule []*circuitbreaker.Rule baseRule := &circuitbreaker.Rule{ Resource: resourcePattern, RetryTimeoutMs: uint32(config.TimeoutParsed.Milliseconds()), MinRequestAmount: uint64(config.MinRequestAmount), StatIntervalMs: uint32(config.StatIntervalMs), } if config.EnableSlidingWindow { baseRule.Strategy = circuitbreaker.SlowRequestRatio baseRule.StatSlidingWindowBucketCount = 10 baseRule.MaxAllowedRtMs = uint64(config.SlowRequestThresholdParsed.Milliseconds()) baseRule.Threshold = threshold } else { baseRule.Strategy = circuitbreaker.ErrorCount baseRule.Threshold = float64(config.MaxFailures) } rule = []*circuitbreaker.Rule{baseRule} if _, err := circuitbreaker.LoadRulesOfResource(resourcePattern, []*circuitbreaker.Rule{}); err != nil { return fmt.Errorf("清空熔断规则失败: %v", err) } if _, err := circuitbreaker.LoadRules(rule); err != nil { return fmt.Errorf("加载熔断规则失败: %v", err) } successCodeMap := make(map[int]bool, len(config.SuccessStatusCodes)) for _, code := range config.SuccessStatusCodes { successCodeMap[code] = true } cbInfo := &CircuitBreakerInfo{ ResourceName: resourcePattern, Config: config, Metrics: newCircuitBreakerMetrics(), SuccessCodeMap: successCodeMap, AdaptiveThreshold: threshold, WarmupEndTime: time.Now().Add(config.WarmupDurationParsed).Unix(), } cbInfo.init() circuitBreakers.Store(resourcePattern, cbInfo) strategy := "error_count" if config.EnableSlidingWindow { strategy = "slow_ratio" } g.Log().Infof(context.Background(), "接口 %s 熔断器初始化成功: resource=%s, strategy=%s, timeout=%v, threshold=%.2f", resourcePattern, resourcePattern, strategy, config.TimeoutParsed, rule[0].Threshold) return nil } // parseIntSlice 解析整数切片 func parseIntSlice(str string) []int { parts := strings.Split(str, ",") result := make([]int, 0, len(parts)) for _, part := range parts { if val, err := strconv.Atoi(strings.TrimSpace(part)); err == nil { result = append(result, val) } } return result } // parseDurationWithDefault 解析持续时间,失败时使用默认值 func parseDurationWithDefault(ctx context.Context, durationStr, defaultStr, fieldName string) (time.Duration, string) { durationParsed, err := time.ParseDuration(durationStr) if err != nil { g.Log().Warningf(ctx, "解析%s失败: %s, 使用默认值 %s, error: %v", fieldName, durationStr, defaultStr, err) durationParsed, _ = time.ParseDuration(defaultStr) return durationParsed, defaultStr } return durationParsed, durationStr } // atomicUpdateMin 原子更新最小值 func atomicUpdateMin(minValue *atomic.Int64, newValue int64) { for { currentMin := minValue.Load() if newValue >= currentMin { break } if minValue.CompareAndSwap(currentMin, newValue) { break } } } // atomicUpdateMax 原子更新最大值 func atomicUpdateMax(maxValue *atomic.Int64, newValue int64) { for { currentMax := maxValue.Load() if newValue <= currentMax { break } if maxValue.CompareAndSwap(currentMax, newValue) { break } } } // reset 重置所有指标到初始状态 func (m *CircuitBreakerMetrics) reset() { m.TotalRequests.Store(0) m.PassRequests.Store(0) m.BlockRequests.Store(0) m.FailureRequests.Store(0) m.SlowRequests.Store(0) m.OpenCount.Store(0) m.HalfOpenRequests.Store(0) m.HalfOpenPassed.Store(0) m.HalfOpenFailed.Store(0) m.TotalResponseTime.Store(0) m.MinResponseTime.Store(1<<63 - 1) // 最大int64值作为初始最小值 m.MaxResponseTime.Store(0) m.WindowRequests.Store(0) m.WindowFailures.Store(0) // 时间戳相关字段不重置,LastResetTime在调用时单独设置 } // newCircuitBreakerMetrics 创建并初始化熔断器指标 func newCircuitBreakerMetrics() *CircuitBreakerMetrics { metrics := &CircuitBreakerMetrics{} metrics.reset() return metrics } // updateWindowStats 更新窗口统计信息 func (cb *CircuitBreakerInfo) updateWindowStats(isSuccess bool, ctx context.Context) { now := time.Now().Unix() windowStart := cb.Metrics.WindowStartTime.Load() // 默认窗口大小为60秒 windowSize := int64(60) // 如果超过窗口大小,重置统计 if now-windowStart >= windowSize { // 使用原子操作重置窗口 if cb.Metrics.WindowStartTime.CompareAndSwap(windowStart, now) { cb.Metrics.WindowRequests.Store(0) cb.Metrics.WindowFailures.Store(0) } // 重新获取最新的windowStart windowStart = cb.Metrics.WindowStartTime.Load() } // 原子更新窗口内请求总数 cb.Metrics.WindowRequests.Add(1) if !isSuccess { cb.Metrics.WindowFailures.Add(1) } // 计算当前窗口内的成功率 total := cb.Metrics.WindowRequests.Load() failures := cb.Metrics.WindowFailures.Load() if total >= 10 { // 有足够样本时才记录 successRate := float64(total-failures) / float64(total) if successRate < 0.5 { // 如果成功率低于50% g.Log().Warningf(ctx, "熔断器 %s 窗口内成功率较低: %.2f%%, total=%d, failures=%d", cb.ResourceName, successRate*100, total, failures) } } } // validateInRange 验证值是否在指定范围内 func validateInRange(name string, value, min, max int) error { if value < min || value > max { return fmt.Errorf("%s必须在%d-%d之间", name, min, max) } return nil } // validateFloatInRange 验证浮点数值是否在指定范围内 func validateFloatInRange(name string, value, min, max float64) error { if value < min || value > max { return fmt.Errorf("%s必须在%.1f-%.1f之间", name, min, max) } return nil } // validateCircuitBreakerConfig 验证配置 func validateCircuitBreakerConfig(config *CircuitBreakerConfig) error { if config.MaxFailures <= 0 { return fmt.Errorf("maxFailures必须大于0") } if err := validateFloatInRange("failureRateThreshold", config.FailureRateThreshold, 0.0, 1.0); err != nil { return err } if len(config.SuccessStatusCodes) == 0 { return fmt.Errorf("successStatusCodes不能为空") } if err := validateInRange("requestTimeout", config.RequestTimeout, 0, 300000); err != nil { return err } if err := validateInRange("distributedTTL", config.DistributedTTL, 0, 3600); err != nil { return err } if err := validateInRange("statIntervalMs", config.StatIntervalMs, 100, 60000); err != nil { return err } if err := validateInRange("minRequestAmount", config.MinRequestAmount, 1, 10000); err != nil { return err } if err := validateInRange("halfOpenMaxRequests", config.HalfOpenMaxRequests, 1, 100); err != nil { return err } if err := validateFloatInRange("halfOpenSuccessThreshold", config.HalfOpenSuccessThreshold, 0.0, 1.0); err != nil { return err } if config.EnableAdaptiveThreshold { if err := validateFloatInRange("adaptiveMinThreshold", config.AdaptiveMinThreshold, 0.0, 1.0); err != nil { return err } if err := validateFloatInRange("adaptiveMaxThreshold", config.AdaptiveMaxThreshold, 0.0, 1.0); err != nil { return err } if config.AdaptiveMinThreshold >= config.AdaptiveMaxThreshold { return fmt.Errorf("adaptiveMinThreshold必须小于adaptiveMaxThreshold") } } return nil } // CircuitBreakerMiddleware 熔断降级中间件 func CircuitBreakerMiddleware(r *ghttp.Request) { startTime := time.Now() ctx := r.GetCtx() // 基于接口地址+请求参数生成熔断资源名 resourceName := generateResourceName(r) if resourceName == "" { r.Middleware.Next() return } // 检查是否有该资源的熔断配置 cbInfo, config := getCircuitBreakerInfoByResource(resourceName) if cbInfo == nil || config == nil || !config.Enabled { r.Middleware.Next() return } cbInfo.Metrics.TotalRequests.Add(1) // 预热期检查 if time.Now().Unix() < cbInfo.WarmupEndTime { r.Middleware.Next() return } if config.RequestTimeout > 0 { var ctxCancel context.CancelFunc ctx, ctxCancel = context.WithTimeout(ctx, time.Duration(config.RequestTimeout)*time.Millisecond) r.SetCtx(ctx) defer ctxCancel() } // 分布式熔断检查 if config.DistributedTTL > 0 && isCircuitBreakerOpenInDistributed(ctx, resourceName) { cbInfo.Metrics.BlockRequests.Add(1) g.Log().Warningf(ctx, "分布式熔断触发: %s", resourceName) sendFallbackResponse(r, resourceName, config, "distributed") return } // 半开状态处理 - 使用HalfOpenManager确保线程安全 currentState := cbInfo.getState() if currentState == StateHalfOpen { manager := GetHalfOpenManager() acquired, _ := manager.TryAcquireHalfOpenSlot(cbInfo.Metrics, config.HalfOpenMaxRequests) if !acquired { cbInfo.Metrics.BlockRequests.Add(1) // 尝试转换为打开状态,如果成功则记录日志 oldState := cbInfo.setState(StateOpen) if oldState != StateOpen { g.Log().Warningf(ctx, "半开状态试探请求超限,恢复熔断: %s", resourceName) if config.DistributedTTL > 0 { syncCircuitBreakerStateToDistributed(ctx, resourceName, "open", config.DistributedTTL) } } sendFallbackResponse(r, resourceName, config, "halfopen_limit") return } } entry, blockError := api.Entry(resourceName) if blockError != nil { if entry != nil { entry.Exit() } cbInfo.Metrics.BlockRequests.Add(1) oldState := cbInfo.setStateWithMetrics(StateOpen, true) if oldState != StateOpen { notifyStateChange(resourceName, oldState, StateOpen) } if config.DistributedTTL > 0 { syncCircuitBreakerStateToDistributed(ctx, resourceName, "open", config.DistributedTTL) } sendFallbackResponse(r, resourceName, config, "blocked") return } if entry != nil { defer entry.Exit() } r.Middleware.Next() statusCode := r.Response.Status if statusCode < 100 || statusCode > 599 { return } duration := time.Since(startTime) // 记录响应时间统计 updateResponseTimeStats(cbInfo, duration, config) isSuccess := isSuccessStatusCode(cbInfo, statusCode) // 更新窗口统计 cbInfo.updateWindowStats(isSuccess, ctx) if !isSuccess { cbInfo.Metrics.FailureRequests.Add(1) if entry != nil { api.TraceError(entry, fmt.Errorf("request failed with status: %d", statusCode)) } g.Log().Debugf(ctx, "接口 %s 请求失败: status=%d, duration=%v", resourceName, statusCode, duration) // 重新获取当前状态,避免使用过期状态 currentState := cbInfo.getState() if currentState == StateHalfOpen { cbInfo.Metrics.HalfOpenFailed.Add(1) oldState := cbInfo.setStateWithMetrics(StateOpen, true) if oldState == StateHalfOpen { g.Log().Warningf(ctx, "半开状态请求失败,恢复熔断: %s", resourceName) if config.DistributedTTL > 0 { syncCircuitBreakerStateToDistributed(ctx, resourceName, "open", config.DistributedTTL) } } } } else { cbInfo.Metrics.PassRequests.Add(1) // 重新获取当前状态 currentState := cbInfo.getState() if currentState == StateHalfOpen { manager := GetHalfOpenManager() // 使用HalfOpenManager记录结果并检查是否达到阈值 if manager.RecordHalfOpenResult(cbInfo.Metrics, true, config.HalfOpenSuccessThreshold) { // 达到成功阈值,关闭熔断器 oldState := cbInfo.setStateWithMetrics(StateClosed, true) if oldState == StateHalfOpen { // 重置半开统计 manager.ResetHalfOpenStats(cbInfo.Metrics) g.Log().Infof(ctx, "半开状态成功,恢复关闭: %s", resourceName) // 同步分布式状态 if config.DistributedTTL > 0 { syncCircuitBreakerStateToDistributed(ctx, resourceName, "closed", config.DistributedTTL) } } } } else if currentState != StateClosed { // 如果状态不是关闭但也不是半开,尝试重置为关闭状态 oldState := cbInfo.setStateWithMetrics(StateClosed, true) if oldState != StateClosed { notifyStateChange(resourceName, oldState, StateClosed) } } } } // sendFallbackResponse 发送降级响应 func sendFallbackResponse(r *ghttp.Request, resourceName string, config *CircuitBreakerConfig, reason string) { g.Log().Warningf(r.GetCtx(), "熔断器降级: resource=%s, reason=%s", resourceName, reason) if config.EnableFallback && config.FallbackMessage != "" { r.Response.WriteStatusExit(503, config.FallbackMessage) return } msg := fmt.Sprintf("接口 '%s' 暂时不可用,请稍后再试", resourceName) switch reason { case "blocked": msg = fmt.Sprintf("接口 '%s' 熔断保护中,请稍后再试", resourceName) case "distributed": msg = fmt.Sprintf("接口 '%s' 分布式熔断中", resourceName) } r.Response.WriteStatusExit(503, msg) } // isSuccessStatusCode 判断HTTP状态码是否成功 func isSuccessStatusCode(cbInfo *CircuitBreakerInfo, statusCode int) bool { // 验证状态码范围 if statusCode < 100 || statusCode > 599 { return false } if len(cbInfo.SuccessCodeMap) > 0 { return cbInfo.SuccessCodeMap[statusCode] } return statusCode >= 200 && statusCode < 300 } // generateResourceName 基于接口地址+请求参数生成熔断资源名 func generateResourceName(r *ghttp.Request) string { method := r.Method path := r.URL.Path query := r.URL.Query().Encode() // 生成资源名:方法:路径?查询参数 // 示例: GET:/api/users?userId=123 resourceName := method + ":" + path if query != "" { // 对查询参数进行排序以确保相同的参数顺序生成相同的资源名 sortedQuery := sortQueryString(query) resourceName += "?" + sortedQuery } return resourceName } // sortQueryString 对查询字符串进行排序 func sortQueryString(query string) string { if query == "" { return "" } params := strings.Split(query, "&") if len(params) == 0 { return query } // 简单的字符串排序 for i := 0; i < len(params)-1; i++ { for j := i + 1; j < len(params); j++ { if params[i] > params[j] { params[i], params[j] = params[j], params[i] } } } return strings.Join(params, "&") } // isCircuitBreakerOpenInDistributed 检查分布式熔断状态 func isCircuitBreakerOpenInDistributed(ctx context.Context, resourceName string) bool { key := "circuit_breaker:" + resourceName + ":state" redisClient := g.Redis() if redisClient == nil { return false } value, err := redisClient.Get(ctx, key) if err != nil || value.IsNil() { return false } return value.String() == "open" } // syncCircuitBreakerStateToDistributed 同步熔断器状态到Redis func syncCircuitBreakerStateToDistributed(ctx context.Context, resourceName, state string, ttl int) { stateKey := "circuit_breaker:" + resourceName + ":state" lockKey := "circuit_breaker:" + resourceName + ":lock" redisClient := g.Redis() if redisClient == nil { g.Log().Warningf(ctx, "Redis未初始化,无法同步分布式熔断状态: %s", resourceName) return } // 使用common/redis中的Lock方法获取分布式锁 success, err := redis.Lock(ctx, lockKey, 10, func(ctx context.Context) error { // 设置熔断器状态 _, err := redisClient.Do(ctx, "SETEX", stateKey, ttl, state) if err != nil { g.Log().Errorf(ctx, "设置分布式熔断状态失败: %s=%s, error: %v", stateKey, state, err) } else { g.Log().Debugf(ctx, "分布式熔断状态已同步: %s=%s (TTL: %d)", stateKey, state, ttl) } return nil }) if err != nil { g.Log().Errorf(ctx, "获取分布式锁失败: %s, error: %v", lockKey, err) return } if !success { g.Log().Debugf(ctx, "未获取到分布式锁,跳过状态同步: %s", lockKey) } } // CircuitBreakerHealthCheckHandler 健康检查接口 func CircuitBreakerHealthCheckHandler(r *ghttp.Request) { page := r.Get("page").Int() size := r.Get("size").Int() if page < 0 { page = 0 } if size <= 0 || size > 100 { size = 20 } // 获取所有熔断器资源 var resources []string circuitBreakers.Range(func(key, value interface{}) bool { resources = append(resources, key.(string)) return true }) total := len(resources) start := page * size if start >= total { r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 200, Message: "熔断器状态", Data: map[string]interface{}{ "summary": map[string]interface{}{"totalServices": 0, "openServices": 0, "closedServices": 0, "halfOpenServices": 0}, "services": map[string]interface{}{}, "page": page, "size": size, "total": total}}) return } end := start + size if end > total { end = total } status := make(map[string]interface{}) totalServices := 0 openServices := 0 halfOpenServices := 0 for i := start; i < end; i++ { resourceName := resources[i] cbInfoVal, ok := circuitBreakers.Load(resourceName) if !ok { continue } cbInfo, ok := cbInfoVal.(*CircuitBreakerInfo) if !ok { continue } totalServices++ state := cbInfo.getState() if state == StateOpen { openServices++ } else if state == StateHalfOpen { halfOpenServices++ } // 格式化时间字符串 lastResetTimeStr := formatUnixTime(cbInfo.Metrics.LastResetTime.Load()) lastOpenTimeStr := formatUnixTime(cbInfo.Metrics.LastOpenTime.Load()) nextRetryTimeStr := formatUnixTime(cbInfo.Metrics.NextRetryTime.Load()) status[resourceName] = map[string]interface{}{ "resource": cbInfo.ResourceName, "state": string(state), "lastOpenTime": lastOpenTimeStr, "nextRetryTime": nextRetryTimeStr, "totalRequests": cbInfo.Metrics.TotalRequests.Load(), "passRequests": cbInfo.Metrics.PassRequests.Load(), "blockRequests": cbInfo.Metrics.BlockRequests.Load(), "failureRequests": cbInfo.Metrics.FailureRequests.Load(), "slowRequests": cbInfo.Metrics.SlowRequests.Load(), "openCount": cbInfo.Metrics.OpenCount.Load(), "lastResetTime": lastResetTimeStr, "halfOpenRequests": cbInfo.Metrics.HalfOpenRequests.Load(), "halfOpenPassed": cbInfo.Metrics.HalfOpenPassed.Load(), } } r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 200, Message: "熔断器状态", Data: map[string]interface{}{ "summary": map[string]interface{}{"totalServices": totalServices, "openServices": openServices, "closedServices": totalServices - openServices - halfOpenServices, "halfOpenServices": halfOpenServices}, "services": status, "page": page, "size": size, "total": total}}) } // batchProcessResources 批量处理资源 func batchProcessResources(r *ghttp.Request, processFunc func(resourceName string) error) (int, int, map[string]string) { successCount := 0 failCount := 0 failures := make(map[string]string) circuitBreakers.Range(func(key, value interface{}) bool { resourceName := key.(string) if err := processFunc(resourceName); err != nil { g.Log().Errorf(r.GetCtx(), "资源 %s 处理失败: %v", resourceName, err) failCount++ failures[resourceName] = err.Error() } else { successCount++ } return true }) return successCount, failCount, failures } // CircuitBreakerResetHandler 重置熔断器 func CircuitBreakerResetHandler(r *ghttp.Request) { resourceName := r.Get("resource").String() if resourceName == "" || resourceName == "*" { successCount, failCount, failures := batchProcessResources(r, func(name string) error { return resetSingleResource(r, name) }) g.Log().Infof(r.GetCtx(), "批量重置熔断器完成: 成功 %d, 失败 %d", successCount, failCount) r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 200, Message: fmt.Sprintf("批量重置完成: 成功 %d, 失败 %d", successCount, failCount), Data: map[string]interface{}{"success": successCount, "failed": failCount, "failures": failures}}) return } if err := resetSingleResource(r, resourceName); err != nil { r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 500, Message: fmt.Sprintf("重置熔断器失败: %v", err)}) return } r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 200, Message: fmt.Sprintf("资源 '%s' 的熔断器已重置", resourceName)}) } // resetSingleResource 重置单个资源 func resetSingleResource(r *ghttp.Request, resourceName string) error { if rules := circuitbreaker.GetRulesOfResource(resourceName); len(rules) > 0 { if _, err := circuitbreaker.LoadRulesOfResource(resourceName, []*circuitbreaker.Rule{}); err != nil { return err } } if cbInfoVal, ok := circuitBreakers.Load(resourceName); ok { cbInfo := cbInfoVal.(*CircuitBreakerInfo) config := cbInfo.Config cbInfo.State.Store(stateClosed) // 重置指标 cbInfo.Metrics.reset() cbInfo.WarmupEndTime = time.Now().Add(config.WarmupDurationParsed).Unix() cbInfo.Metrics.LastResetTime.Store(time.Now().Unix()) // 清除分布式状态 if config.DistributedTTL > 0 { redisClient := g.Redis() if redisClient != nil { lockKey := "circuit_breaker:" + resourceName + ":lock" success, err := redis.Lock(r.GetCtx(), lockKey, 10, func(ctx context.Context) error { _, err := redisClient.Del(ctx, "circuit_breaker:"+resourceName+":state") if err != nil { g.Log().Warningf(ctx, "清除分布式熔断状态失败: %s, error: %v", resourceName, err) } return nil }) if err != nil { g.Log().Errorf(r.GetCtx(), "获取分布式锁失败: %s, error: %v", lockKey, err) } if !success { g.Log().Debugf(r.GetCtx(), "未获取到分布式锁,跳过状态清除: %s", lockKey) } } } } g.Log().Infof(r.GetCtx(), "熔断器已手动重置: %s", resourceName) return nil } // CircuitBreakerReloadHandler 配置重载接口 func CircuitBreakerReloadHandler(r *ghttp.Request) { r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{Code: 501, Message: "基于接口的熔断器暂不支持配置重载"}) } // StateChangeListener 状态变化监听器类型 type StateChangeListener func(serviceName string, fromState, toState CircuitBreakerState) // RegisterStateChangeListener 注册监听器 func RegisterStateChangeListener(name string, listener StateChangeListener) { stateChangeListeners.Store(name, listener) } // notifyStateChange 通知监听器 func notifyStateChange(serviceName string, fromState, toState CircuitBreakerState) { stateChangeListeners.Range(func(_, value interface{}) bool { listener, ok := value.(StateChangeListener) if ok { listener(serviceName, fromState, toState) } return true }) } // registerStateChangeListeners 注册默认监听器 func registerStateChangeListeners() { if _, exists := stateChangeListenersRegistered.LoadOrStore("default", true); exists { return } RegisterStateChangeListener("default", func(serviceName string, fromState, toState CircuitBreakerState) { level := "Info" if toState == StateOpen { level = "Warning" } g.Log().Print(context.Background(), level, fmt.Sprintf("熔断器状态变化: service=%s, %s -> %s", serviceName, fromState, toState)) }) }