feat: 添加分布式熔断器

This commit is contained in:
2025-12-31 23:38:33 +08:00
parent 4269b4fd79
commit bf235c709d

View File

@@ -0,0 +1,508 @@
package middleware
import (
"context"
"fmt"
"strings"
"sync"
"time"
"gitee.com/red-future---jilin-g/common/jaeger"
"gitee.com/red-future---jilin-g/common/redis"
"github.com/gogf/gf/v2/database/gredis"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/util/gconv"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// 熔断器状态
type CircuitState int
const (
StateClosed CircuitState = iota // 关闭:正常请求
StateOpen // 开启:熔断,拒绝请求
StateHalfOpen // 半开:尝试恢复
)
func (s CircuitState) String() string {
switch s {
case StateClosed:
return "closed"
case StateOpen:
return "open"
case StateHalfOpen:
return "half_open"
default:
return "unknown"
}
}
// 熔断器配置
type CircuitBreakerConfig struct {
MaxFailures int // 最大失败次数
Timeout time.Duration // 熔断超时时间(多久后尝试恢复)
HalfOpenSuccess int // 半开状态连续成功次数
EnableDistributed bool // 是否启用分布式熔断Redis
}
// 熔断器
type CircuitBreaker struct {
mu sync.RWMutex
state CircuitState
failures int
halfOpenSuccess int
lastFailureTime time.Time
config CircuitBreakerConfig
serviceName string
}
// Redis Key 前缀
const (
CircuitBreakerStateKeyPrefix = "circuit:breaker:%s:state" // 熔断状态
CircuitBreakerFailuresKeyPrefix = "circuit:breaker:%s:failures" // 失败计数
CircuitBreakerLastFailKeyPrefix = "circuit:breaker:%s:last_fail" // 最后失败时间
CircuitBreakerHalfOpenKeyPrefix = "circuit:breaker:%s:half_open_success" // 半开成功计数
)
var (
circuitBreakers = make(map[string]*CircuitBreaker)
circuitMu sync.RWMutex
)
// GetOrCreateCircuitBreaker 获取或创建熔断器
func GetOrCreateCircuitBreaker(serviceName string, config CircuitBreakerConfig) *CircuitBreaker {
circuitMu.RLock()
cb, exists := circuitBreakers[serviceName]
circuitMu.RUnlock()
if exists {
return cb
}
circuitMu.Lock()
defer circuitMu.Unlock()
// 双重检查
if cb, exists := circuitBreakers[serviceName]; exists {
return cb
}
cb = &CircuitBreaker{
state: StateClosed,
config: config,
serviceName: serviceName,
}
circuitBreakers[serviceName] = cb
glog.Infof(context.Background(), "✅ 熔断器已初始化 - 服务: %s, 配置: MaxFailures=%d, Timeout=%v",
serviceName, config.MaxFailures, config.Timeout)
return cb
}
// AllowRequest 判断是否允许请求通过
func (cb *CircuitBreaker) AllowRequest(ctx context.Context) bool {
// 分布式模式:从 Redis 获取全局熔断状态
if cb.config.EnableDistributed {
redisKey := fmt.Sprintf(CircuitBreakerStateKeyPrefix, cb.serviceName)
state, err := redis.RedisClient.Get(ctx, redisKey)
if err == nil && !state.IsEmpty() {
stateStr := state.String()
if stateStr == "open" {
// 检查是否超时(进入半开状态)
lastFailKey := fmt.Sprintf(CircuitBreakerLastFailKeyPrefix, cb.serviceName)
lastFail, _ := redis.RedisClient.Get(ctx, lastFailKey)
if !lastFail.IsEmpty() {
lastFailTime := gconv.Int64(lastFail.Val())
now := time.Now().Unix()
if (now - lastFailTime) >= int64(cb.config.Timeout.Seconds()) {
glog.Debugf(ctx, "🔓 熔断器进入半开状态 - 服务: %s", cb.serviceName)
return true
}
}
glog.Debugf(ctx, "🔒 熔断器已开启 - 服务: %s, 拒绝请求", cb.serviceName)
return false
}
}
}
cb.mu.RLock()
localState := cb.state
cb.mu.RUnlock()
switch localState {
case StateClosed:
return true
case StateOpen:
// 检查是否进入半开状态
if time.Since(cb.lastFailureTime) > cb.config.Timeout {
glog.Debugf(ctx, "🔓 熔断器进入半开状态 - 服务: %s", cb.serviceName)
return true
}
glog.Debugf(ctx, "🔒 熔断器已开启 - 服务: %s, 拒绝请求", cb.serviceName)
return false
case StateHalfOpen:
return true
}
return true
}
// RecordSuccess 记录成功
func (cb *CircuitBreaker) RecordSuccess(ctx context.Context) {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.failures = 0
if cb.state == StateHalfOpen {
cb.halfOpenSuccess++
if cb.halfOpenSuccess >= cb.config.HalfOpenSuccess {
oldState := cb.state
cb.state = StateClosed
cb.halfOpenSuccess = 0
glog.Infof(ctx, "✅ 熔断器已恢复 - 服务: %s, 状态: %s -> %s",
cb.serviceName, oldState, cb.state)
// 记录恢复事件到 Jaeger
_, span := jaeger.NewSpan(ctx, "circuit_breaker_recovered")
span.SetAttributes(
attribute.String("service", cb.serviceName),
attribute.String("old_state", oldState.String()),
attribute.String("new_state", cb.state.String()),
)
span.End()
// 分布式模式:清除 Redis 熔断状态
if cb.config.EnableDistributed {
go cb.clearRedisState(ctx)
}
} else {
glog.Debugf(ctx, "🔼 半开状态成功计数: %d/%d - 服务: %s",
cb.halfOpenSuccess, cb.config.HalfOpenSuccess, cb.serviceName)
}
}
}
// RecordFailure 记录失败
func (cb *CircuitBreaker) RecordFailure(ctx context.Context) {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.failures++
cb.lastFailureTime = time.Now()
oldState := cb.state
if cb.failures >= cb.config.MaxFailures {
cb.state = StateOpen
glog.Warningf(ctx, "⚠️ 熔断器已触发 - 服务: %s, 状态: %s -> %s, 失败次数: %d/%d",
cb.serviceName, oldState, cb.state, cb.failures, cb.config.MaxFailures)
// 记录熔断事件到 Jaeger
ctx2, span := jaeger.NewSpan(ctx, "circuit_breaker_triggered")
span.SetAttributes(
attribute.String("service", cb.serviceName),
attribute.Int("failures", cb.failures),
attribute.String("old_state", oldState.String()),
attribute.String("new_state", cb.state.String()),
)
span.End()
trace.SpanFromContext(ctx2).End()
// 分布式模式:将熔断状态写入 Redis
if cb.config.EnableDistributed {
go cb.setRedisState(ctx)
}
} else {
glog.Debugf(ctx, "📊 熔断器失败计数: %d/%d - 服务: %s",
cb.failures, cb.config.MaxFailures, cb.serviceName)
}
}
// setRedisState 将熔断状态写入 Redis分布式同步
func (cb *CircuitBreaker) setRedisState(ctx context.Context) {
stateKey := fmt.Sprintf(CircuitBreakerStateKeyPrefix, cb.serviceName)
failuresKey := fmt.Sprintf(CircuitBreakerFailuresKeyPrefix, cb.serviceName)
lastFailKey := fmt.Sprintf(CircuitBreakerLastFailKeyPrefix, cb.serviceName)
// 设置熔断状态
ttl := int64(cb.config.Timeout.Seconds())
_, err := redis.RedisClient.Set(ctx, stateKey, "open", gredis.SetOption{
TTLOption: gredis.TTLOption{EX: &ttl},
})
if err != nil {
glog.Errorf(ctx, "设置熔断状态到 Redis 失败: %v", err)
}
// 设置失败计数
_, err = redis.RedisClient.Set(ctx, failuresKey, cb.failures, gredis.SetOption{
TTLOption: gredis.TTLOption{EX: &ttl},
})
if err != nil {
glog.Errorf(ctx, "设置失败计数到 Redis 失败: %v", err)
}
// 设置最后失败时间
_, err = redis.RedisClient.Set(ctx, lastFailKey, time.Now().Unix(), gredis.SetOption{
TTLOption: gredis.TTLOption{EX: &ttl},
})
if err != nil {
glog.Errorf(ctx, "设置最后失败时间到 Redis 失败: %v", err)
}
glog.Infof(ctx, "📡 熔断状态已同步到 Redis - 服务: %s", cb.serviceName)
}
// clearRedisState 清除 Redis 中的熔断状态(分布式恢复)
func (cb *CircuitBreaker) clearRedisState(ctx context.Context) {
stateKey := fmt.Sprintf(CircuitBreakerStateKeyPrefix, cb.serviceName)
failuresKey := fmt.Sprintf(CircuitBreakerFailuresKeyPrefix, cb.serviceName)
lastFailKey := fmt.Sprintf(CircuitBreakerLastFailKeyPrefix, cb.serviceName)
// 删除熔断状态
_, err := redis.RedisClient.Del(ctx, stateKey)
if err != nil {
glog.Errorf(ctx, "删除熔断状态从 Redis 失败: %v", err)
}
// 删除失败计数
_, err = redis.RedisClient.Del(ctx, failuresKey)
if err != nil {
glog.Errorf(ctx, "删除失败计数从 Redis 失败: %v", err)
}
// 删除最后失败时间
_, err = redis.RedisClient.Del(ctx, lastFailKey)
if err != nil {
glog.Errorf(ctx, "删除最后失败时间从 Redis 失败: %v", err)
}
glog.Infof(ctx, "📡 熔断状态已从 Redis 清除 - 服务: %s", cb.serviceName)
}
// GetState 获取熔断器当前状态
func (cb *CircuitBreaker) GetState() CircuitState {
cb.mu.RLock()
defer cb.mu.RUnlock()
return cb.state
}
// GetFailures 获取当前失败次数
func (cb *CircuitBreaker) GetFailures() int {
cb.mu.RLock()
defer cb.mu.RUnlock()
return cb.failures
}
// Reset 重置熔断器(手动恢复)
func (cb *CircuitBreaker) Reset(ctx context.Context) {
cb.mu.Lock()
defer cb.mu.Unlock()
oldState := cb.state
cb.state = StateClosed
cb.failures = 0
cb.halfOpenSuccess = 0
glog.Infof(ctx, "🔄 熔断器已手动重置 - 服务: %s, 状态: %s -> %s",
cb.serviceName, oldState, cb.state)
// 清除 Redis 状态
if cb.config.EnableDistributed {
go cb.clearRedisState(ctx)
}
}
// GetAllBreakers 获取所有熔断器状态
func GetAllBreakers() map[string]map[string]interface{} {
circuitMu.RLock()
defer circuitMu.RUnlock()
result := make(map[string]map[string]interface{})
for name, cb := range circuitBreakers {
cb.mu.RLock()
result[name] = map[string]interface{}{
"state": cb.state.String(),
"failures": cb.failures,
"halfOpenSuccess": cb.halfOpenSuccess,
"lastFailureTime": cb.lastFailureTime.Format("2006-01-02 15:04:05"),
}
cb.mu.RUnlock()
}
return result
}
// getCircuitBreakerConfig 从配置文件读取熔断器配置
func getCircuitBreakerConfig(ctx context.Context, serviceName string) CircuitBreakerConfig {
return CircuitBreakerConfig{
MaxFailures: g.Cfg().MustGet(ctx, fmt.Sprintf("circuitBreaker.%s.maxFailures", serviceName), 5).Int(),
Timeout: g.Cfg().MustGet(ctx, fmt.Sprintf("circuitBreaker.%s.timeout", serviceName), "30s").Duration(),
HalfOpenSuccess: g.Cfg().MustGet(ctx, fmt.Sprintf("circuitBreaker.%s.halfOpenSuccess", serviceName), 2).Int(),
EnableDistributed: g.Cfg().MustGet(ctx, "circuitBreaker.enableDistributed", true).Bool(),
}
}
// CircuitBreakerMiddleware Gateway 熔断中间件
func CircuitBreakerMiddleware(r *ghttp.Request) {
// 从 URL 提取服务名
pathParts := strings.Split(strings.Trim(r.URL.Path, "/"), "/")
if len(pathParts) == 0 {
r.Middleware.Next()
return
}
serviceName := pathParts[0]
// 跳过非微服务路径
if serviceName == "health" || serviceName == "metrics" || serviceName == "swagger" {
r.Middleware.Next()
return
}
// 获取熔断配置
config := getCircuitBreakerConfig(r.GetCtx(), serviceName)
// 获取或创建熔断器
cb := GetOrCreateCircuitBreaker(serviceName, config)
// 判断是否允许请求通过
if !cb.AllowRequest(r.GetCtx()) {
// 熔断开启,返回降级响应
glog.Warningf(r.GetCtx(), "⛔ 服务 %s 熔断中,触发降级", serviceName)
// 获取降级响应
fallbackResponse := getFallbackResponse(serviceName)
// 记录降级事件到 Jaeger
ctx2, span := jaeger.NewSpan(r.GetCtx(), "circuit_breaker_fallback")
span.SetAttributes(
attribute.String("service", serviceName),
attribute.String("fallback_type", "circuit_breaker_open"),
)
span.End()
trace.SpanFromContext(ctx2).End()
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 503,
Message: "服务暂时不可用,已自动降级",
Data: fallbackResponse,
})
return
}
// 记录请求开始时间
startTime := time.Now()
// 执行后续处理(包括代理转发)
r.Middleware.Next()
// 根据响应状态记录成功/失败
elapsed := time.Since(startTime)
if r.Response.Status >= 500 {
cb.RecordFailure(r.GetCtx())
glog.Warningf(r.GetCtx(), "❌ 服务 %s 请求失败 - 状态码: %d, 耗时: %v",
serviceName, r.Response.Status, elapsed)
} else {
cb.RecordSuccess(r.GetCtx())
glog.Debugf(r.GetCtx(), "✅ 服务 %s 请求成功 - 状态码: %d, 耗时: %v",
serviceName, r.Response.Status, elapsed)
}
}
// getFallbackResponse 获取降级响应
func getFallbackResponse(serviceName string) interface{} {
switch serviceName {
case "customerService":
// 客服服务降级:返回固定话术
return map[string]interface{}{
"message": "当前客服系统繁忙智能助手暂时离线。如需帮助可拨打400-xxx-xxxx",
"fallback": true,
"type": "fixed_response",
}
case "order":
// 订单服务降级:返回排队提示
return map[string]interface{}{
"message": "订单系统繁忙,您的请求已排队,请稍后刷新查看",
"fallback": true,
"type": "queue_hint",
}
case "assets":
// 资产服务降级:返回缓存数据
return map[string]interface{}{
"message": "资产数据正在更新中显示的是5分钟前的缓存数据",
"fallback": true,
"type": "cached_data",
}
case "wallet":
// 钱包服务降级:返回只读数据
return map[string]interface{}{
"message": "钱包服务暂时不可用,无法进行转账操作,查询功能正常",
"fallback": true,
"type": "read_only",
}
case "market":
// 市场服务降级:返回推荐数据
return map[string]interface{}{
"message": "市场数据加载中,为您推荐以下热门商品",
"fallback": true,
"type": "recommended",
}
default:
// 默认降级响应
return map[string]interface{}{
"message": "服务暂时不可用,请稍后再试",
"fallback": true,
"type": "default",
}
}
}
// CircuitBreakerHealthCheckHandler 熔断器健康检查接口
func CircuitBreakerHealthCheckHandler(r *ghttp.Request) {
breakers := GetAllBreakers()
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: "熔断器状态查询成功",
Data: breakers,
})
}
// CircuitBreakerResetHandler 熔断器手动重置接口
func CircuitBreakerResetHandler(r *ghttp.Request) {
serviceName := r.Get("serviceName").String()
if serviceName == "" {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 400,
Message: "服务名不能为空",
})
return
}
circuitMu.RLock()
cb, exists := circuitBreakers[serviceName]
circuitMu.RUnlock()
if !exists {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 404,
Message: fmt.Sprintf("服务 %s 的熔断器不存在", serviceName),
})
return
}
cb.Reset(r.GetCtx())
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: fmt.Sprintf("服务 %s 的熔断器已重置", serviceName),
})
}