diff --git a/middleware/circuit_breaker.go b/middleware/circuit_breaker.go index 73d67ba..b28c414 100644 --- a/middleware/circuit_breaker.go +++ b/middleware/circuit_breaker.go @@ -7,389 +7,217 @@ import ( "sync" "time" - "gitee.com/red-future---jilin-g/common/jaeger" - "gitee.com/red-future---jilin-g/common/redis" - "github.com/gogf/gf/v2/database/gredis" + "github.com/alibaba/sentinel-golang/api" + "github.com/alibaba/sentinel-golang/core/circuitbreaker" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/ghttp" - "github.com/gogf/gf/v2/os/glog" - "github.com/gogf/gf/v2/util/gconv" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" ) -// 熔断器状态 -type CircuitState int +// CircuitBreakerState 熔断器状态 +type CircuitBreakerState string const ( - StateClosed CircuitState = iota // 关闭:正常请求 - StateOpen // 开启:熔断,拒绝请求 - StateHalfOpen // 半开:尝试恢复 + StateClosed CircuitBreakerState = "closed" // 关闭:正常状态 + StateOpen CircuitBreakerState = "open" // 开启:熔断状态 + StateHalfOpen CircuitBreakerState = "half-open" // 半开:尝试恢复状态 ) -func (s CircuitState) String() string { - switch s { - case StateClosed: - return "closed" - case StateOpen: - return "open" - case StateHalfOpen: - return "half_open" - default: - return "unknown" - } -} - -// 熔断器配置 +// CircuitBreakerConfig 熔断器配置 type CircuitBreakerConfig struct { - MaxFailures int // 最大失败次数 - Timeout time.Duration // 熔断超时时间(多久后尝试恢复) - HalfOpenSuccess int // 半开状态连续成功次数 - EnableDistributed bool // 是否启用分布式熔断(Redis) + MaxFailures int // 连续失败次数 + Timeout string // 熔断超时时间 + HalfOpenSuccess int // 半开状态连续成功次数 + SuccessStatusCodes []int // 视为成功的HTTP状态码 + SlowRequestThreshold string // 慢请求阈值 + HalfOpenRequestSampleRate float64 // 半开状态请求采样率 + Dimension string // 熔断器维度: service/ip/user + EnableSlidingWindow bool // 是否启用滑动窗口 + SlidingWindowSize string // 滑动窗口大小 + FailureRateThreshold float64 // 失败率阈值 } -// 熔断器 -type CircuitBreaker struct { - mu sync.RWMutex - state CircuitState - failures int - halfOpenSuccess int - lastFailureTime time.Time - config CircuitBreakerConfig - serviceName string +// CircuitBreakerInfo 熔断器信息 +type CircuitBreakerInfo struct { + ResourceName string `json:"resourceName"` // 资源名称 + State CircuitBreakerState `json:"state"` // 当前状态 + Config *CircuitBreakerConfig `json:"config"` // 配置信息 + FailCount int64 `json:"failCount"` // 失败次数 + TotalCount int64 `json:"totalCount"` // 总请求数 + LastOpenTime time.Time `json:"lastOpenTime"` // 上次熔断时间 + NextRetryTime time.Time `json:"nextRetryTime"` // 下次重试时间 } -// Redis Key 前缀 -const ( - CircuitBreakerStateKeyPrefix = "circuit:breaker:%s:state" // 熔断状态 - CircuitBreakerFailuresKeyPrefix = "circuit:breaker:%s:failures" // 失败计数 - CircuitBreakerLastFailKeyPrefix = "circuit:breaker:%s:last_fail" // 最后失败时间 - CircuitBreakerHalfOpenKeyPrefix = "circuit:breaker:%s:half_open_success" // 半开成功计数 -) - var ( - circuitBreakers = make(map[string]*CircuitBreaker) - circuitMu sync.RWMutex + // circuitBreakers 存储所有熔断器状态(用于健康检查) + circuitBreakers sync.Map + // enableDistributed 是否启用分布式熔断 + enableDistributed = false + // circuitBreakerConfigs 熔断器配置缓存 + circuitBreakerConfigs sync.Map ) -// GetOrCreateCircuitBreaker 获取或创建熔断器 -func GetOrCreateCircuitBreaker(serviceName string, config CircuitBreakerConfig) *CircuitBreaker { - circuitMu.RLock() - cb, exists := circuitBreakers[serviceName] - circuitMu.RUnlock() +// InitCircuitBreaker 初始化Sentinel熔断器 +func InitCircuitBreaker() error { + ctx := context.Background() + // 从配置文件读取是否启用分布式熔断 + enableDistributed = g.Cfg().MustGet(ctx, "circuitBreaker.enableDistributed", false).Bool() - if exists { - return cb + // 初始化Sentinel + err := api.InitDefault() + if err != nil { + return fmt.Errorf("Sentinel初始化失败: %v", err) } - circuitMu.Lock() - defer circuitMu.Unlock() + g.Log().Infof(ctx, "Sentinel熔断器初始化成功,分布式熔断: %v", enableDistributed) - // 双重检查 - if cb, exists := circuitBreakers[serviceName]; exists { - return cb + // 加载所有服务的熔断器配置 + loadCircuitBreakerConfigs() + + // 为每个服务创建熔断器 + services := []string{ + "customerService", "order", "assets", "cid", "oss", + "wallet", "market", "knapsack", } - cb = &CircuitBreaker{ - state: StateClosed, - config: config, - serviceName: serviceName, + for _, service := range services { + serviceConfig := loadServiceCircuitBreakerConfig(service) + if serviceConfig != nil { + circuitBreakerConfigs.Store(service, serviceConfig) + initErr := initServiceCircuitBreaker(service, serviceConfig) + if initErr != nil { + g.Log().Errorf(ctx, "服务 %s 熔断器初始化失败: %v", service, initErr) + } + } } - circuitBreakers[serviceName] = cb - glog.Infof(context.Background(), "✅ 熔断器已初始化 - 服务: %s, 配置: MaxFailures=%d, Timeout=%v", - serviceName, config.MaxFailures, config.Timeout) - - return cb + return nil } -// AllowRequest 判断是否允许请求通过 -func (cb *CircuitBreaker) AllowRequest(ctx context.Context) bool { - // 分布式模式:从 Redis 获取全局熔断状态 - if cb.config.EnableDistributed { - redisKey := fmt.Sprintf(CircuitBreakerStateKeyPrefix, cb.serviceName) - state, err := redis.RedisClient.Get(ctx, redisKey) - - if err == nil && !state.IsEmpty() { - stateStr := state.String() - if stateStr == "open" { - // 检查是否超时(进入半开状态) - lastFailKey := fmt.Sprintf(CircuitBreakerLastFailKeyPrefix, cb.serviceName) - lastFail, _ := redis.RedisClient.Get(ctx, lastFailKey) - - if !lastFail.IsEmpty() { - lastFailTime := gconv.Int64(lastFail.Val()) - now := time.Now().Unix() - if (now - lastFailTime) >= int64(cb.config.Timeout.Seconds()) { - glog.Debugf(ctx, "🔓 熔断器进入半开状态 - 服务: %s", cb.serviceName) - return true - } - } - glog.Debugf(ctx, "🔒 熔断器已开启 - 服务: %s, 拒绝请求", cb.serviceName) - return false - } - } - - // 从 Redis 同步半开成功计数 - halfOpenKey := fmt.Sprintf(CircuitBreakerHalfOpenKeyPrefix, cb.serviceName) - halfOpenValue, err := redis.RedisClient.Get(ctx, halfOpenKey) - if err == nil && !halfOpenValue.IsEmpty() { - cb.mu.Lock() - cb.halfOpenSuccess = gconv.Int(halfOpenValue.Val()) - cb.mu.Unlock() - } +// loadCircuitBreakerConfigs 加载熔断器配置 +func loadCircuitBreakerConfigs() { + services := []string{ + "customerService", "order", "assets", "cid", "oss", + "wallet", "market", "knapsack", } - - cb.mu.RLock() - localState := cb.state - cb.mu.RUnlock() - - switch localState { - case StateClosed: - return true - case StateOpen: - // 检查是否进入半开状态 - if time.Since(cb.lastFailureTime) > cb.config.Timeout { - glog.Debugf(ctx, "🔓 熔断器进入半开状态 - 服务: %s", cb.serviceName) - return true - } - glog.Debugf(ctx, "🔒 熔断器已开启 - 服务: %s, 拒绝请求", cb.serviceName) - return false - case StateHalfOpen: - return true - } - - return true -} - -// RecordSuccess 记录成功 -func (cb *CircuitBreaker) RecordSuccess(ctx context.Context) { - cb.mu.Lock() - defer cb.mu.Unlock() - - cb.failures = 0 - - if cb.state == StateHalfOpen { - cb.halfOpenSuccess++ - - // 分布式模式:同步半开成功计数到 Redis - if cb.config.EnableDistributed { - halfOpenKey := fmt.Sprintf(CircuitBreakerHalfOpenKeyPrefix, cb.serviceName) - ttl := int64(cb.config.Timeout.Seconds()) - _, err := redis.RedisClient.Set(ctx, halfOpenKey, cb.halfOpenSuccess, gredis.SetOption{ - TTLOption: gredis.TTLOption{EX: &ttl}, - }) - if err != nil { - glog.Errorf(ctx, "同步半开成功计数到 Redis 失败: %v", err) - } - } - - if cb.halfOpenSuccess >= cb.config.HalfOpenSuccess { - oldState := cb.state - cb.state = StateClosed - cb.halfOpenSuccess = 0 - - glog.Infof(ctx, "✅ 熔断器已恢复 - 服务: %s, 状态: %s -> %s", - cb.serviceName, oldState, cb.state) - - // 记录恢复事件到 Jaeger - _, span := jaeger.NewSpan(ctx, "circuit_breaker_recovered") - span.SetAttributes( - attribute.String("service", cb.serviceName), - attribute.String("old_state", oldState.String()), - attribute.String("new_state", cb.state.String()), - ) - span.End() - - // 分布式模式:清除 Redis 熔断状态 - if cb.config.EnableDistributed { - go cb.clearRedisState(ctx) - } - } else { - glog.Debugf(ctx, "🔼 半开状态成功计数: %d/%d - 服务: %s", - cb.halfOpenSuccess, cb.config.HalfOpenSuccess, cb.serviceName) + for _, service := range services { + config := loadServiceCircuitBreakerConfig(service) + if config != nil { + circuitBreakerConfigs.Store(service, config) } } } -// RecordFailure 记录失败 -func (cb *CircuitBreaker) RecordFailure(ctx context.Context) { - cb.mu.Lock() - defer cb.mu.Unlock() +// loadServiceCircuitBreakerConfig 加载单个服务的熔断器配置 +func loadServiceCircuitBreakerConfig(serviceName string) *CircuitBreakerConfig { + ctx := context.Background() + key := fmt.Sprintf("circuitBreaker.%s", serviceName) - cb.failures++ - cb.lastFailureTime = time.Now() + maxFailures := g.Cfg().MustGet(ctx, key+".maxFailures", 5).Int() + timeout := g.Cfg().MustGet(ctx, key+".timeout", "60s").String() + halfOpenSuccess := g.Cfg().MustGet(ctx, key+".halfOpenSuccess", 2).Int() + slowRequestThreshold := g.Cfg().MustGet(ctx, key+".slowRequestThreshold", "3s").String() + dimension := g.Cfg().MustGet(ctx, key+".dimension", "service").String() + enableSlidingWindow := g.Cfg().MustGet(ctx, key+".enableSlidingWindow", false).Bool() + slidingWindowSize := g.Cfg().MustGet(ctx, key+".slidingWindowSize", "60s").String() + failureRateThreshold := g.Cfg().MustGet(ctx, key+".failureRateThreshold", 0.5).Float64() + halfOpenRequestSampleRate := g.Cfg().MustGet(ctx, key+".halfOpenRequestSampleRate", 1.0).Float64() - oldState := cb.state - if cb.failures >= cb.config.MaxFailures { - cb.state = StateOpen + // 解析成功状态码 + successCodes := g.Cfg().MustGet(ctx, key+".successStatusCodes", "200,201,204").String() + statusCodes := parseStatusCodes(successCodes) - glog.Warningf(ctx, "⚠️ 熔断器已触发 - 服务: %s, 状态: %s -> %s, 失败次数: %d/%d", - cb.serviceName, oldState, cb.state, cb.failures, cb.config.MaxFailures) + return &CircuitBreakerConfig{ + MaxFailures: maxFailures, + Timeout: timeout, + HalfOpenSuccess: halfOpenSuccess, + SuccessStatusCodes: statusCodes, + SlowRequestThreshold: slowRequestThreshold, + HalfOpenRequestSampleRate: halfOpenRequestSampleRate, + Dimension: dimension, + EnableSlidingWindow: enableSlidingWindow, + SlidingWindowSize: slidingWindowSize, + FailureRateThreshold: failureRateThreshold, + } +} - // 记录熔断事件到 Jaeger - ctx2, span := jaeger.NewSpan(ctx, "circuit_breaker_triggered") - span.SetAttributes( - attribute.String("service", cb.serviceName), - attribute.Int("failures", cb.failures), - attribute.String("old_state", oldState.String()), - attribute.String("new_state", cb.state.String()), - ) - span.End() - trace.SpanFromContext(ctx2).End() +// parseStatusCodes 解析HTTP状态码 +func parseStatusCodes(str string) []int { + parts := strings.Split(str, ",") + codes := make([]int, 0, len(parts)) + for _, part := range parts { + var code int + if _, err := fmt.Sscanf(strings.TrimSpace(part), "%d", &code); err == nil { + codes = append(codes, code) + } + } + return codes +} - // 分布式模式:将熔断状态写入 Redis - if cb.config.EnableDistributed { - go cb.setRedisState(ctx) +// initServiceCircuitBreaker 初始化服务熔断器 +func initServiceCircuitBreaker(serviceName string, config *CircuitBreakerConfig) error { + timeout, _ := time.ParseDuration(config.Timeout) + slowRequestThreshold, _ := time.ParseDuration(config.SlowRequestThreshold) + _, _ = time.ParseDuration(config.SlidingWindowSize) + + resourceName := fmt.Sprintf("service:%s", serviceName) + + var rule []*circuitbreaker.Rule + if config.EnableSlidingWindow { + // 使用滑动窗口统计(更精确)- 慢调用比例策略 + rule = []*circuitbreaker.Rule{ + { + Resource: resourceName, + Strategy: circuitbreaker.SlowRequestRatio, + RetryTimeoutMs: uint32(timeout.Milliseconds()), + MinRequestAmount: uint64(config.MaxFailures), + StatIntervalMs: 1000, + StatSlidingWindowBucketCount: 10, + MaxAllowedRtMs: uint64(slowRequestThreshold.Milliseconds()), + Threshold: config.FailureRateThreshold, + }, } } else { - glog.Debugf(ctx, "📊 熔断器失败计数: %d/%d - 服务: %s", - cb.failures, cb.config.MaxFailures, cb.serviceName) - } -} - -// setRedisState 将熔断状态写入 Redis(分布式同步) -func (cb *CircuitBreaker) setRedisState(ctx context.Context) { - stateKey := fmt.Sprintf(CircuitBreakerStateKeyPrefix, cb.serviceName) - failuresKey := fmt.Sprintf(CircuitBreakerFailuresKeyPrefix, cb.serviceName) - lastFailKey := fmt.Sprintf(CircuitBreakerLastFailKeyPrefix, cb.serviceName) - halfOpenKey := fmt.Sprintf(CircuitBreakerHalfOpenKeyPrefix, cb.serviceName) - - // 设置熔断状态 - ttl := int64(cb.config.Timeout.Seconds()) - _, err := redis.RedisClient.Set(ctx, stateKey, "open", gredis.SetOption{ - TTLOption: gredis.TTLOption{EX: &ttl}, - }) - if err != nil { - glog.Errorf(ctx, "设置熔断状态到 Redis 失败: %v", err) - } - - // 设置失败计数 - _, err = redis.RedisClient.Set(ctx, failuresKey, cb.failures, gredis.SetOption{ - TTLOption: gredis.TTLOption{EX: &ttl}, - }) - if err != nil { - glog.Errorf(ctx, "设置失败计数到 Redis 失败: %v", err) - } - - // 设置最后失败时间 - _, err = redis.RedisClient.Set(ctx, lastFailKey, time.Now().Unix(), gredis.SetOption{ - TTLOption: gredis.TTLOption{EX: &ttl}, - }) - if err != nil { - glog.Errorf(ctx, "设置最后失败时间到 Redis 失败: %v", err) - } - - // 重置半开成功计数 - _, err = redis.RedisClient.Set(ctx, halfOpenKey, 0, gredis.SetOption{ - TTLOption: gredis.TTLOption{EX: &ttl}, - }) - if err != nil { - glog.Errorf(ctx, "设置半开成功计数到 Redis 失败: %v", err) - } - - glog.Infof(ctx, "📡 熔断状态已同步到 Redis - 服务: %s", cb.serviceName) -} - -// clearRedisState 清除 Redis 中的熔断状态(分布式恢复) -func (cb *CircuitBreaker) clearRedisState(ctx context.Context) { - stateKey := fmt.Sprintf(CircuitBreakerStateKeyPrefix, cb.serviceName) - failuresKey := fmt.Sprintf(CircuitBreakerFailuresKeyPrefix, cb.serviceName) - lastFailKey := fmt.Sprintf(CircuitBreakerLastFailKeyPrefix, cb.serviceName) - halfOpenKey := fmt.Sprintf(CircuitBreakerHalfOpenKeyPrefix, cb.serviceName) - - // 删除熔断状态 - _, err := redis.RedisClient.Del(ctx, stateKey) - if err != nil { - glog.Errorf(ctx, "删除熔断状态从 Redis 失败: %v", err) - } - - // 删除失败计数 - _, err = redis.RedisClient.Del(ctx, failuresKey) - if err != nil { - glog.Errorf(ctx, "删除失败计数从 Redis 失败: %v", err) - } - - // 删除最后失败时间 - _, err = redis.RedisClient.Del(ctx, lastFailKey) - if err != nil { - glog.Errorf(ctx, "删除最后失败时间从 Redis 失败: %v", err) - } - - // 删除半开成功计数 - _, err = redis.RedisClient.Del(ctx, halfOpenKey) - if err != nil { - glog.Errorf(ctx, "删除半开成功计数从 Redis 失败: %v", err) - } - - glog.Infof(ctx, "📡 熔断状态已从 Redis 清除 - 服务: %s", cb.serviceName) -} - -// GetState 获取熔断器当前状态 -func (cb *CircuitBreaker) GetState() CircuitState { - cb.mu.RLock() - defer cb.mu.RUnlock() - return cb.state -} - -// GetFailures 获取当前失败次数 -func (cb *CircuitBreaker) GetFailures() int { - cb.mu.RLock() - defer cb.mu.RUnlock() - return cb.failures -} - -// Reset 重置熔断器(手动恢复) -func (cb *CircuitBreaker) Reset(ctx context.Context) { - cb.mu.Lock() - defer cb.mu.Unlock() - - oldState := cb.state - cb.state = StateClosed - cb.failures = 0 - cb.halfOpenSuccess = 0 - - glog.Infof(ctx, "🔄 熔断器已手动重置 - 服务: %s, 状态: %s -> %s", - cb.serviceName, oldState, cb.state) - - // 清除 Redis 状态 - if cb.config.EnableDistributed { - go cb.clearRedisState(ctx) - } -} - -// GetAllBreakers 获取所有熔断器状态 -func GetAllBreakers() map[string]map[string]interface{} { - circuitMu.RLock() - defer circuitMu.RUnlock() - - result := make(map[string]map[string]interface{}) - for name, cb := range circuitBreakers { - cb.mu.RLock() - result[name] = map[string]interface{}{ - "state": cb.state.String(), - "failures": cb.failures, - "halfOpenSuccess": cb.halfOpenSuccess, - "lastFailureTime": cb.lastFailureTime.Format("2006-01-02 15:04:05"), + // 使用连续失败计数(更简单快速)- 异常数策略 + rule = []*circuitbreaker.Rule{ + { + Resource: resourceName, + Strategy: circuitbreaker.ErrorCount, + RetryTimeoutMs: uint32(timeout.Milliseconds()), + MinRequestAmount: uint64(config.MaxFailures), + StatIntervalMs: 1000, // 1秒统计窗口 + Threshold: float64(config.MaxFailures), + }, } - cb.mu.RUnlock() } - return result + + // 加载规则到Sentinel + _, err := circuitbreaker.LoadRules(rule) + if err != nil { + return fmt.Errorf("加载熔断规则失败: %v", err) + } + + // 初始化熔断器信息 + cbInfo := &CircuitBreakerInfo{ + ResourceName: resourceName, + State: StateClosed, + Config: config, + } + circuitBreakers.Store(serviceName, cbInfo) + + strategy := "error_count" + if config.EnableSlidingWindow { + strategy = "slow_ratio" + } + + g.Log().Infof(context.Background(), "服务 %s 熔断器初始化成功: resource=%s, strategy=%s, timeout=%v", + serviceName, resourceName, strategy, timeout) + + return nil } -// getCircuitBreakerConfig 从配置文件读取熔断器配置 -func getCircuitBreakerConfig(ctx context.Context, serviceName string) CircuitBreakerConfig { - return CircuitBreakerConfig{ - MaxFailures: g.Cfg().MustGet(ctx, fmt.Sprintf("circuitBreaker.%s.maxFailures", serviceName), 5).Int(), - Timeout: g.Cfg().MustGet(ctx, fmt.Sprintf("circuitBreaker.%s.timeout", serviceName), "30s").Duration(), - HalfOpenSuccess: g.Cfg().MustGet(ctx, fmt.Sprintf("circuitBreaker.%s.halfOpenSuccess", serviceName), 2).Int(), - EnableDistributed: g.Cfg().MustGet(ctx, "circuitBreaker.enableDistributed", true).Bool(), - } -} - -// CircuitBreakerMiddleware Gateway 熔断中间件 +// CircuitBreakerMiddleware 熔断降级中间件(使用阿里Sentinel) func CircuitBreakerMiddleware(r *ghttp.Request) { - // 从 URL 提取服务名 + // 从URL路径提取服务名 pathParts := strings.Split(strings.Trim(r.URL.Path, "/"), "/") if len(pathParts) == 0 { r.Middleware.Next() @@ -397,149 +225,195 @@ func CircuitBreakerMiddleware(r *ghttp.Request) { } serviceName := pathParts[0] + resourceName := fmt.Sprintf("service:%s", serviceName) - // 跳过非微服务路径 - if serviceName == "health" || serviceName == "metrics" || serviceName == "swagger" { - r.Middleware.Next() + // 检查是否启用分布式熔断 + if enableDistributed { + // 检查Redis中的熔断状态 + if isCircuitBreakerOpenInDistributed(r.GetCtx(), resourceName) { + g.Log().Warningf(r.GetCtx(), "分布式熔断触发: %s", resourceName) + r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 熔断中,请稍后再试", serviceName)) + return + } + } + + // 使用Sentinel进行熔断保护 + entry, blockError := api.Entry(resourceName) + if blockError != nil { + // 被熔断拦截 + g.Log().Warningf(r.GetCtx(), "熔断触发: %s, reason: %v", resourceName, blockError) + + // 更新熔断器状态 + if val, ok := circuitBreakers.Load(serviceName); ok { + cbInfo := val.(*CircuitBreakerInfo) + cbInfo.State = StateOpen + cbInfo.LastOpenTime = time.Now() + if timeout, err := time.ParseDuration(cbInfo.Config.Timeout); err == nil { + cbInfo.NextRetryTime = time.Now().Add(timeout) + } + circuitBreakers.Store(serviceName, cbInfo) + } + + r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 暂时不可用,请稍后再试", serviceName)) return } - // 获取熔断配置 - config := getCircuitBreakerConfig(r.GetCtx(), serviceName) - - // 获取或创建熔断器 - cb := GetOrCreateCircuitBreaker(serviceName, config) - - // 判断是否允许请求通过 - if !cb.AllowRequest(r.GetCtx()) { - // 熔断开启,返回降级响应 - glog.Warningf(r.GetCtx(), "⛔ 服务 %s 熔断中,触发降级", serviceName) - - // 获取降级响应 - fallbackResponse := getFallbackResponse(serviceName) - - // 记录降级事件到 Jaeger - ctx2, span := jaeger.NewSpan(r.GetCtx(), "circuit_breaker_fallback") - span.SetAttributes( - attribute.String("service", serviceName), - attribute.String("fallback_type", "circuit_breaker_open"), - ) - span.End() - trace.SpanFromContext(ctx2).End() - - r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ - Code: 503, - Message: "服务暂时不可用,已自动降级", - Data: fallbackResponse, - }) - return - } - - // 记录请求开始时间 - startTime := time.Now() - - // 执行后续处理(包括代理转发) + // 执行后续中间件和业务逻辑 r.Middleware.Next() - // 根据响应状态记录成功/失败 - elapsed := time.Since(startTime) - if r.Response.Status >= 500 { - cb.RecordFailure(r.GetCtx()) - glog.Warningf(r.GetCtx(), "❌ 服务 %s 请求失败 - 状态码: %d, 耗时: %v", - serviceName, r.Response.Status, elapsed) - } else { - cb.RecordSuccess(r.GetCtx()) - glog.Debugf(r.GetCtx(), "✅ 服务 %s 请求成功 - 状态码: %d, 耗时: %v", - serviceName, r.Response.Status, elapsed) + // 记录请求结果(基于HTTP状态码) + statusCode := r.Response.Status + if !isSuccessStatusCode(resourceName, statusCode) { + // 记录异常 + api.TraceError(entry, fmt.Errorf("request failed with status: %d", statusCode)) } + + // 退出Sentinel资源 + entry.Exit() } -// getFallbackResponse 获取降级响应 -func getFallbackResponse(serviceName string) interface{} { - switch serviceName { - case "customerService": - // 客服服务降级:返回固定话术 - return map[string]interface{}{ - "message": "当前客服系统繁忙,智能助手暂时离线。如需帮助,可拨打:400-xxx-xxxx", - "fallback": true, - "type": "fixed_response", - } - case "order": - // 订单服务降级:返回排队提示 - return map[string]interface{}{ - "message": "订单系统繁忙,您的请求已排队,请稍后刷新查看", - "fallback": true, - "type": "queue_hint", - } - case "assets": - // 资产服务降级:返回缓存数据 - return map[string]interface{}{ - "message": "资产数据正在更新中,显示的是5分钟前的缓存数据", - "fallback": true, - "type": "cached_data", - } - case "wallet": - // 钱包服务降级:返回只读数据 - return map[string]interface{}{ - "message": "钱包服务暂时不可用,无法进行转账操作,查询功能正常", - "fallback": true, - "type": "read_only", - } - case "market": - // 市场服务降级:返回推荐数据 - return map[string]interface{}{ - "message": "市场数据加载中,为您推荐以下热门商品", - "fallback": true, - "type": "recommended", - } - default: - // 默认降级响应 - return map[string]interface{}{ - "message": "服务暂时不可用,请稍后再试", - "fallback": true, - "type": "default", - } +// isSuccessStatusCode 判断HTTP状态码是否成功 +func isSuccessStatusCode(resourceName string, statusCode int) bool { + serviceName := strings.TrimPrefix(resourceName, "service:") + if serviceName == "" { + // 默认只认为2xx是成功 + return statusCode >= 200 && statusCode < 300 } + + // 从配置中获取成功状态码列表 + var serviceConfig *CircuitBreakerConfig + if val, ok := circuitBreakerConfigs.Load(serviceName); ok { + serviceConfig = val.(*CircuitBreakerConfig) + } + + if serviceConfig != nil && len(serviceConfig.SuccessStatusCodes) > 0 { + for _, code := range serviceConfig.SuccessStatusCodes { + if statusCode == code { + return true + } + } + return false + } + + // 默认:2xx状态码为成功 + return statusCode >= 200 && statusCode < 300 +} + +// isCircuitBreakerOpenInDistributed 检查分布式熔断状态 +func isCircuitBreakerOpenInDistributed(ctx context.Context, resourceName string) bool { + key := fmt.Sprintf("circuit_breaker:%s:state", resourceName) + value, err := g.Redis().Get(ctx, key) + if err != nil || value.IsNil() { + return false + } + state := value.String() + return state == "open" } // CircuitBreakerHealthCheckHandler 熔断器健康检查接口 func CircuitBreakerHealthCheckHandler(r *ghttp.Request) { - breakers := GetAllBreakers() + status := make(map[string]interface{}) + + // 遍历所有熔断器 + circuitBreakers.Range(func(key, value interface{}) bool { + serviceName := key.(string) + cbInfo := value.(*CircuitBreakerInfo) + + // 获取Sentinel中的实际状态 + rules := circuitbreaker.GetRulesOfResource(cbInfo.ResourceName) + var stateStr string + if len(rules) > 0 { + stateStr = string(cbInfo.State) + } else { + stateStr = "unknown" + } + + status[serviceName] = map[string]interface{}{ + "resource": cbInfo.ResourceName, + "state": stateStr, + "config": cbInfo.Config, + "lastOpenTime": cbInfo.LastOpenTime, + "nextRetryTime": cbInfo.NextRetryTime, + } + return true + }) r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ Code: 200, - Message: "熔断器状态查询成功", - Data: breakers, + Message: "熔断器状态", + Data: status, }) } -// CircuitBreakerResetHandler 熔断器手动重置接口 +// getSentinelStateString 转换Sentinel状态为字符串 +func getSentinelStateString(state circuitbreaker.State) string { + switch state { + case circuitbreaker.Closed: + return string(StateClosed) + case circuitbreaker.Open: + return string(StateOpen) + case circuitbreaker.HalfOpen: + return string(StateHalfOpen) + default: + return "unknown" + } +} + +// CircuitBreakerResetHandler 熔断器手动重置接口(仅限管理后台调用) func CircuitBreakerResetHandler(r *ghttp.Request) { - serviceName := r.Get("serviceName").String() + serviceName := r.Get("service").String() if serviceName == "" { r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ Code: 400, - Message: "服务名不能为空", + Message: "缺少service参数", }) return } - circuitMu.RLock() - cb, exists := circuitBreakers[serviceName] - circuitMu.RUnlock() + resourceName := fmt.Sprintf("service:%s", serviceName) - if !exists { + // 重置Sentinel规则 - 清空现有规则 + _, err := circuitbreaker.LoadRules([]*circuitbreaker.Rule{}) + if err != nil { r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ - Code: 404, - Message: fmt.Sprintf("服务 %s 的熔断器不存在", serviceName), + Code: 500, + Message: fmt.Sprintf("重置熔断器失败: %v", err), }) return } - cb.Reset(r.GetCtx()) + // 重新加载规则 + if val, ok := circuitBreakerConfigs.Load(serviceName); ok { + config := val.(*CircuitBreakerConfig) + err = initServiceCircuitBreaker(serviceName, config) + if err != nil { + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ + Code: 500, + Message: fmt.Sprintf("重置熔断器失败: %v", err), + }) + return + } + } + + // 更新内存状态 + if val, ok := circuitBreakers.Load(serviceName); ok { + cbInfo := val.(*CircuitBreakerInfo) + cbInfo.State = StateClosed + cbInfo.LastOpenTime = time.Time{} + cbInfo.NextRetryTime = time.Time{} + circuitBreakers.Store(serviceName, cbInfo) + } + + // 重置分布式状态(如果启用) + if enableDistributed { + key := fmt.Sprintf("circuit_breaker:%s:state", resourceName) + _, _ = g.Redis().Del(r.GetCtx(), key) + } + + g.Log().Infof(r.GetCtx(), "熔断器已手动重置: %s", resourceName) r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ Code: 200, - Message: fmt.Sprintf("服务 %s 的熔断器已重置", serviceName), + Message: fmt.Sprintf("服务 '%s' 的熔断器已重置", serviceName), }) }