From bf235c709d2642afe62a687cdb851d035daa04dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=96=8C?= <259278618@qq.com> Date: Wed, 31 Dec 2025 23:38:33 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=88=86=E5=B8=83?= =?UTF-8?q?=E5=BC=8F=E7=86=94=E6=96=AD=E5=99=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- middleware/circuit_breaker.go | 508 ++++++++++++++++++++++++++++++++++ 1 file changed, 508 insertions(+) create mode 100644 middleware/circuit_breaker.go diff --git a/middleware/circuit_breaker.go b/middleware/circuit_breaker.go new file mode 100644 index 0000000..7678220 --- /dev/null +++ b/middleware/circuit_breaker.go @@ -0,0 +1,508 @@ +package middleware + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "gitee.com/red-future---jilin-g/common/jaeger" + "gitee.com/red-future---jilin-g/common/redis" + "github.com/gogf/gf/v2/database/gredis" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/ghttp" + "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/util/gconv" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// 熔断器状态 +type CircuitState int + +const ( + StateClosed CircuitState = iota // 关闭:正常请求 + StateOpen // 开启:熔断,拒绝请求 + StateHalfOpen // 半开:尝试恢复 +) + +func (s CircuitState) String() string { + switch s { + case StateClosed: + return "closed" + case StateOpen: + return "open" + case StateHalfOpen: + return "half_open" + default: + return "unknown" + } +} + +// 熔断器配置 +type CircuitBreakerConfig struct { + MaxFailures int // 最大失败次数 + Timeout time.Duration // 熔断超时时间(多久后尝试恢复) + HalfOpenSuccess int // 半开状态连续成功次数 + EnableDistributed bool // 是否启用分布式熔断(Redis) +} + +// 熔断器 +type CircuitBreaker struct { + mu sync.RWMutex + state CircuitState + failures int + halfOpenSuccess int + lastFailureTime time.Time + config CircuitBreakerConfig + serviceName string +} + +// Redis Key 前缀 +const ( + CircuitBreakerStateKeyPrefix = "circuit:breaker:%s:state" // 熔断状态 + CircuitBreakerFailuresKeyPrefix = "circuit:breaker:%s:failures" // 失败计数 + CircuitBreakerLastFailKeyPrefix = "circuit:breaker:%s:last_fail" // 最后失败时间 + CircuitBreakerHalfOpenKeyPrefix = "circuit:breaker:%s:half_open_success" // 半开成功计数 +) + +var ( + circuitBreakers = make(map[string]*CircuitBreaker) + circuitMu sync.RWMutex +) + +// GetOrCreateCircuitBreaker 获取或创建熔断器 +func GetOrCreateCircuitBreaker(serviceName string, config CircuitBreakerConfig) *CircuitBreaker { + circuitMu.RLock() + cb, exists := circuitBreakers[serviceName] + circuitMu.RUnlock() + + if exists { + return cb + } + + circuitMu.Lock() + defer circuitMu.Unlock() + + // 双重检查 + if cb, exists := circuitBreakers[serviceName]; exists { + return cb + } + + cb = &CircuitBreaker{ + state: StateClosed, + config: config, + serviceName: serviceName, + } + circuitBreakers[serviceName] = cb + + glog.Infof(context.Background(), "✅ 熔断器已初始化 - 服务: %s, 配置: MaxFailures=%d, Timeout=%v", + serviceName, config.MaxFailures, config.Timeout) + + return cb +} + +// AllowRequest 判断是否允许请求通过 +func (cb *CircuitBreaker) AllowRequest(ctx context.Context) bool { + // 分布式模式:从 Redis 获取全局熔断状态 + if cb.config.EnableDistributed { + redisKey := fmt.Sprintf(CircuitBreakerStateKeyPrefix, cb.serviceName) + state, err := redis.RedisClient.Get(ctx, redisKey) + + if err == nil && !state.IsEmpty() { + stateStr := state.String() + if stateStr == "open" { + // 检查是否超时(进入半开状态) + lastFailKey := fmt.Sprintf(CircuitBreakerLastFailKeyPrefix, cb.serviceName) + lastFail, _ := redis.RedisClient.Get(ctx, lastFailKey) + + if !lastFail.IsEmpty() { + lastFailTime := gconv.Int64(lastFail.Val()) + now := time.Now().Unix() + if (now - lastFailTime) >= int64(cb.config.Timeout.Seconds()) { + glog.Debugf(ctx, "🔓 熔断器进入半开状态 - 服务: %s", cb.serviceName) + return true + } + } + glog.Debugf(ctx, "🔒 熔断器已开启 - 服务: %s, 拒绝请求", cb.serviceName) + return false + } + } + } + + cb.mu.RLock() + localState := cb.state + cb.mu.RUnlock() + + switch localState { + case StateClosed: + return true + case StateOpen: + // 检查是否进入半开状态 + if time.Since(cb.lastFailureTime) > cb.config.Timeout { + glog.Debugf(ctx, "🔓 熔断器进入半开状态 - 服务: %s", cb.serviceName) + return true + } + glog.Debugf(ctx, "🔒 熔断器已开启 - 服务: %s, 拒绝请求", cb.serviceName) + return false + case StateHalfOpen: + return true + } + + return true +} + +// RecordSuccess 记录成功 +func (cb *CircuitBreaker) RecordSuccess(ctx context.Context) { + cb.mu.Lock() + defer cb.mu.Unlock() + + cb.failures = 0 + + if cb.state == StateHalfOpen { + cb.halfOpenSuccess++ + + if cb.halfOpenSuccess >= cb.config.HalfOpenSuccess { + oldState := cb.state + cb.state = StateClosed + cb.halfOpenSuccess = 0 + + glog.Infof(ctx, "✅ 熔断器已恢复 - 服务: %s, 状态: %s -> %s", + cb.serviceName, oldState, cb.state) + + // 记录恢复事件到 Jaeger + _, span := jaeger.NewSpan(ctx, "circuit_breaker_recovered") + span.SetAttributes( + attribute.String("service", cb.serviceName), + attribute.String("old_state", oldState.String()), + attribute.String("new_state", cb.state.String()), + ) + span.End() + + // 分布式模式:清除 Redis 熔断状态 + if cb.config.EnableDistributed { + go cb.clearRedisState(ctx) + } + } else { + glog.Debugf(ctx, "🔼 半开状态成功计数: %d/%d - 服务: %s", + cb.halfOpenSuccess, cb.config.HalfOpenSuccess, cb.serviceName) + } + } +} + +// RecordFailure 记录失败 +func (cb *CircuitBreaker) RecordFailure(ctx context.Context) { + cb.mu.Lock() + defer cb.mu.Unlock() + + cb.failures++ + cb.lastFailureTime = time.Now() + + oldState := cb.state + if cb.failures >= cb.config.MaxFailures { + cb.state = StateOpen + + glog.Warningf(ctx, "⚠️ 熔断器已触发 - 服务: %s, 状态: %s -> %s, 失败次数: %d/%d", + cb.serviceName, oldState, cb.state, cb.failures, cb.config.MaxFailures) + + // 记录熔断事件到 Jaeger + ctx2, span := jaeger.NewSpan(ctx, "circuit_breaker_triggered") + span.SetAttributes( + attribute.String("service", cb.serviceName), + attribute.Int("failures", cb.failures), + attribute.String("old_state", oldState.String()), + attribute.String("new_state", cb.state.String()), + ) + span.End() + trace.SpanFromContext(ctx2).End() + + // 分布式模式:将熔断状态写入 Redis + if cb.config.EnableDistributed { + go cb.setRedisState(ctx) + } + } else { + glog.Debugf(ctx, "📊 熔断器失败计数: %d/%d - 服务: %s", + cb.failures, cb.config.MaxFailures, cb.serviceName) + } +} + +// setRedisState 将熔断状态写入 Redis(分布式同步) +func (cb *CircuitBreaker) setRedisState(ctx context.Context) { + stateKey := fmt.Sprintf(CircuitBreakerStateKeyPrefix, cb.serviceName) + failuresKey := fmt.Sprintf(CircuitBreakerFailuresKeyPrefix, cb.serviceName) + lastFailKey := fmt.Sprintf(CircuitBreakerLastFailKeyPrefix, cb.serviceName) + + // 设置熔断状态 + ttl := int64(cb.config.Timeout.Seconds()) + _, err := redis.RedisClient.Set(ctx, stateKey, "open", gredis.SetOption{ + TTLOption: gredis.TTLOption{EX: &ttl}, + }) + if err != nil { + glog.Errorf(ctx, "设置熔断状态到 Redis 失败: %v", err) + } + + // 设置失败计数 + _, err = redis.RedisClient.Set(ctx, failuresKey, cb.failures, gredis.SetOption{ + TTLOption: gredis.TTLOption{EX: &ttl}, + }) + if err != nil { + glog.Errorf(ctx, "设置失败计数到 Redis 失败: %v", err) + } + + // 设置最后失败时间 + _, err = redis.RedisClient.Set(ctx, lastFailKey, time.Now().Unix(), gredis.SetOption{ + TTLOption: gredis.TTLOption{EX: &ttl}, + }) + if err != nil { + glog.Errorf(ctx, "设置最后失败时间到 Redis 失败: %v", err) + } + + glog.Infof(ctx, "📡 熔断状态已同步到 Redis - 服务: %s", cb.serviceName) +} + +// clearRedisState 清除 Redis 中的熔断状态(分布式恢复) +func (cb *CircuitBreaker) clearRedisState(ctx context.Context) { + stateKey := fmt.Sprintf(CircuitBreakerStateKeyPrefix, cb.serviceName) + failuresKey := fmt.Sprintf(CircuitBreakerFailuresKeyPrefix, cb.serviceName) + lastFailKey := fmt.Sprintf(CircuitBreakerLastFailKeyPrefix, cb.serviceName) + + // 删除熔断状态 + _, err := redis.RedisClient.Del(ctx, stateKey) + if err != nil { + glog.Errorf(ctx, "删除熔断状态从 Redis 失败: %v", err) + } + + // 删除失败计数 + _, err = redis.RedisClient.Del(ctx, failuresKey) + if err != nil { + glog.Errorf(ctx, "删除失败计数从 Redis 失败: %v", err) + } + + // 删除最后失败时间 + _, err = redis.RedisClient.Del(ctx, lastFailKey) + if err != nil { + glog.Errorf(ctx, "删除最后失败时间从 Redis 失败: %v", err) + } + + glog.Infof(ctx, "📡 熔断状态已从 Redis 清除 - 服务: %s", cb.serviceName) +} + +// GetState 获取熔断器当前状态 +func (cb *CircuitBreaker) GetState() CircuitState { + cb.mu.RLock() + defer cb.mu.RUnlock() + return cb.state +} + +// GetFailures 获取当前失败次数 +func (cb *CircuitBreaker) GetFailures() int { + cb.mu.RLock() + defer cb.mu.RUnlock() + return cb.failures +} + +// Reset 重置熔断器(手动恢复) +func (cb *CircuitBreaker) Reset(ctx context.Context) { + cb.mu.Lock() + defer cb.mu.Unlock() + + oldState := cb.state + cb.state = StateClosed + cb.failures = 0 + cb.halfOpenSuccess = 0 + + glog.Infof(ctx, "🔄 熔断器已手动重置 - 服务: %s, 状态: %s -> %s", + cb.serviceName, oldState, cb.state) + + // 清除 Redis 状态 + if cb.config.EnableDistributed { + go cb.clearRedisState(ctx) + } +} + +// GetAllBreakers 获取所有熔断器状态 +func GetAllBreakers() map[string]map[string]interface{} { + circuitMu.RLock() + defer circuitMu.RUnlock() + + result := make(map[string]map[string]interface{}) + for name, cb := range circuitBreakers { + cb.mu.RLock() + result[name] = map[string]interface{}{ + "state": cb.state.String(), + "failures": cb.failures, + "halfOpenSuccess": cb.halfOpenSuccess, + "lastFailureTime": cb.lastFailureTime.Format("2006-01-02 15:04:05"), + } + cb.mu.RUnlock() + } + return result +} + +// getCircuitBreakerConfig 从配置文件读取熔断器配置 +func getCircuitBreakerConfig(ctx context.Context, serviceName string) CircuitBreakerConfig { + return CircuitBreakerConfig{ + MaxFailures: g.Cfg().MustGet(ctx, fmt.Sprintf("circuitBreaker.%s.maxFailures", serviceName), 5).Int(), + Timeout: g.Cfg().MustGet(ctx, fmt.Sprintf("circuitBreaker.%s.timeout", serviceName), "30s").Duration(), + HalfOpenSuccess: g.Cfg().MustGet(ctx, fmt.Sprintf("circuitBreaker.%s.halfOpenSuccess", serviceName), 2).Int(), + EnableDistributed: g.Cfg().MustGet(ctx, "circuitBreaker.enableDistributed", true).Bool(), + } +} + +// CircuitBreakerMiddleware Gateway 熔断中间件 +func CircuitBreakerMiddleware(r *ghttp.Request) { + // 从 URL 提取服务名 + pathParts := strings.Split(strings.Trim(r.URL.Path, "/"), "/") + if len(pathParts) == 0 { + r.Middleware.Next() + return + } + + serviceName := pathParts[0] + + // 跳过非微服务路径 + if serviceName == "health" || serviceName == "metrics" || serviceName == "swagger" { + r.Middleware.Next() + return + } + + // 获取熔断配置 + config := getCircuitBreakerConfig(r.GetCtx(), serviceName) + + // 获取或创建熔断器 + cb := GetOrCreateCircuitBreaker(serviceName, config) + + // 判断是否允许请求通过 + if !cb.AllowRequest(r.GetCtx()) { + // 熔断开启,返回降级响应 + glog.Warningf(r.GetCtx(), "⛔ 服务 %s 熔断中,触发降级", serviceName) + + // 获取降级响应 + fallbackResponse := getFallbackResponse(serviceName) + + // 记录降级事件到 Jaeger + ctx2, span := jaeger.NewSpan(r.GetCtx(), "circuit_breaker_fallback") + span.SetAttributes( + attribute.String("service", serviceName), + attribute.String("fallback_type", "circuit_breaker_open"), + ) + span.End() + trace.SpanFromContext(ctx2).End() + + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ + Code: 503, + Message: "服务暂时不可用,已自动降级", + Data: fallbackResponse, + }) + return + } + + // 记录请求开始时间 + startTime := time.Now() + + // 执行后续处理(包括代理转发) + r.Middleware.Next() + + // 根据响应状态记录成功/失败 + elapsed := time.Since(startTime) + if r.Response.Status >= 500 { + cb.RecordFailure(r.GetCtx()) + glog.Warningf(r.GetCtx(), "❌ 服务 %s 请求失败 - 状态码: %d, 耗时: %v", + serviceName, r.Response.Status, elapsed) + } else { + cb.RecordSuccess(r.GetCtx()) + glog.Debugf(r.GetCtx(), "✅ 服务 %s 请求成功 - 状态码: %d, 耗时: %v", + serviceName, r.Response.Status, elapsed) + } +} + +// getFallbackResponse 获取降级响应 +func getFallbackResponse(serviceName string) interface{} { + switch serviceName { + case "customerService": + // 客服服务降级:返回固定话术 + return map[string]interface{}{ + "message": "当前客服系统繁忙,智能助手暂时离线。如需帮助,可拨打:400-xxx-xxxx", + "fallback": true, + "type": "fixed_response", + } + case "order": + // 订单服务降级:返回排队提示 + return map[string]interface{}{ + "message": "订单系统繁忙,您的请求已排队,请稍后刷新查看", + "fallback": true, + "type": "queue_hint", + } + case "assets": + // 资产服务降级:返回缓存数据 + return map[string]interface{}{ + "message": "资产数据正在更新中,显示的是5分钟前的缓存数据", + "fallback": true, + "type": "cached_data", + } + case "wallet": + // 钱包服务降级:返回只读数据 + return map[string]interface{}{ + "message": "钱包服务暂时不可用,无法进行转账操作,查询功能正常", + "fallback": true, + "type": "read_only", + } + case "market": + // 市场服务降级:返回推荐数据 + return map[string]interface{}{ + "message": "市场数据加载中,为您推荐以下热门商品", + "fallback": true, + "type": "recommended", + } + default: + // 默认降级响应 + return map[string]interface{}{ + "message": "服务暂时不可用,请稍后再试", + "fallback": true, + "type": "default", + } + } +} + +// CircuitBreakerHealthCheckHandler 熔断器健康检查接口 +func CircuitBreakerHealthCheckHandler(r *ghttp.Request) { + breakers := GetAllBreakers() + + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ + Code: 200, + Message: "熔断器状态查询成功", + Data: breakers, + }) +} + +// CircuitBreakerResetHandler 熔断器手动重置接口 +func CircuitBreakerResetHandler(r *ghttp.Request) { + serviceName := r.Get("serviceName").String() + if serviceName == "" { + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ + Code: 400, + Message: "服务名不能为空", + }) + return + } + + circuitMu.RLock() + cb, exists := circuitBreakers[serviceName] + circuitMu.RUnlock() + + if !exists { + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ + Code: 404, + Message: fmt.Sprintf("服务 %s 的熔断器不存在", serviceName), + }) + return + } + + cb.Reset(r.GetCtx()) + + r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{ + Code: 200, + Message: fmt.Sprintf("服务 %s 的熔断器已重置", serviceName), + }) +}