Files
common/middleware/circuit_breaker.go
2026-03-12 08:51:25 +08:00

572 lines
18 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package middleware
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
)
// CircuitBreakerState 熔断器状态
type CircuitBreakerState string
const (
StateClosed CircuitBreakerState = "closed" // 关闭:正常状态
StateOpen CircuitBreakerState = "open" // 开启:熔断状态
StateHalfOpen CircuitBreakerState = "half-open" // 半开:尝试恢复状态
)
// CircuitBreakerConfig 熔断器配置
type CircuitBreakerConfig struct {
MaxFailures int // 连续失败次数
Timeout string // 熔断超时时间
HalfOpenSuccess int // 半开状态连续成功次数
SuccessStatusCodes []int // 视为成功的HTTP状态码
SlowRequestThreshold string // 慢请求阈值
HalfOpenRequestSampleRate float64 // 半开状态请求采样率
Dimension string // 熔断器维度: service/ip/user
EnableSlidingWindow bool // 是否启用滑动窗口
SlidingWindowSize string // 滑动窗口大小
FailureRateThreshold float64 // 失败率阈值
EnableFallback bool // 是否启用降级
FallbackMessage string // 降级提示消息
}
// CircuitBreakerMetrics 熔断器指标
type CircuitBreakerMetrics struct {
TotalRequests atomic.Int64 // 总请求数
PassRequests atomic.Int64 // 通过请求数
BlockRequests atomic.Int64 // 阻塞请求数
FailureRequests atomic.Int64 // 失败请求数
OpenCount atomic.Int64 // 熔断开启次数
}
// CircuitBreakerInfo 熔断器信息
type CircuitBreakerInfo struct {
ResourceName string `json:"resourceName"` // 资源名称
State CircuitBreakerState `json:"state"` // 当前状态
Config *CircuitBreakerConfig `json:"config"` // 配置信息
FailCount int64 `json:"failCount"` // 失败次数
TotalCount int64 `json:"totalCount"` // 总请求数
LastOpenTime time.Time `json:"lastOpenTime"` // 上次熔断时间
NextRetryTime time.Time `json:"nextRetryTime"` // 下次重试时间
Metrics *CircuitBreakerMetrics `json:"metrics"` // 指标统计
mu sync.RWMutex // 保护状态更新
}
var (
// circuitBreakers 存储所有熔断器状态(用于健康检查)
circuitBreakers sync.Map
// enableDistributed 是否启用分布式熔断
enableDistributed = false
// circuitBreakerConfigs 熔断器配置缓存
circuitBreakerConfigs sync.Map
// distributedSyncLock 分布式同步锁
distributedSyncLock sync.Mutex
)
// InitCircuitBreaker 初始化Sentinel熔断器
func InitCircuitBreaker() error {
ctx := context.Background()
// 从配置文件读取是否启用分布式熔断
enableDistributed = g.Cfg().MustGet(ctx, "circuitBreaker.enableDistributed", false).Bool()
// 初始化Sentinel
err := api.InitDefault()
if err != nil {
return fmt.Errorf("Sentinel初始化失败: %v", err)
}
g.Log().Infof(ctx, "Sentinel熔断器初始化成功分布式熔断: %v", enableDistributed)
// 动态从配置文件读取服务列表
services := g.Cfg().MustGet(ctx, "circuitBreaker.services", []string{
"customerService", "order", "assets", "cid", "oss",
"wallet", "market", "knapsack",
}).Strings()
if len(services) == 0 {
return fmt.Errorf("未配置熔断器服务列表")
}
// 为每个服务创建熔断器
for _, service := range services {
serviceConfig := loadServiceCircuitBreakerConfig(service)
if serviceConfig != nil {
circuitBreakerConfigs.Store(service, serviceConfig)
initErr := initServiceCircuitBreaker(service, serviceConfig)
if initErr != nil {
g.Log().Errorf(ctx, "服务 %s 熔断器初始化失败: %v", service, initErr)
} else {
g.Log().Infof(ctx, "服务 %s 熔断器初始化成功", service)
}
}
}
g.Log().Infof(ctx, "共初始化 %d 个服务熔断器", len(services))
return nil
}
// ReloadCircuitBreakerConfig 动态重新加载熔断器配置
func ReloadCircuitBreakerConfig(serviceName string) error {
ctx := context.Background()
// 重新加载配置
serviceConfig := loadServiceCircuitBreakerConfig(serviceName)
if serviceConfig == nil {
return fmt.Errorf("未找到服务 %s 的配置", serviceName)
}
// 更新配置缓存
circuitBreakerConfigs.Store(serviceName, serviceConfig)
// 重新初始化熔断器
err := initServiceCircuitBreaker(serviceName, serviceConfig)
if err != nil {
return fmt.Errorf("重新初始化熔断器失败: %v", err)
}
g.Log().Infof(ctx, "服务 %s 熔断器配置重新加载成功", serviceName)
return nil
}
// loadServiceCircuitBreakerConfig 加载单个服务的熔断器配置
func loadServiceCircuitBreakerConfig(serviceName string) *CircuitBreakerConfig {
ctx := context.Background()
key := fmt.Sprintf("circuitBreaker.%s", serviceName)
maxFailures := g.Cfg().MustGet(ctx, key+".maxFailures", 5).Int()
timeout := g.Cfg().MustGet(ctx, key+".timeout", "60s").String()
halfOpenSuccess := g.Cfg().MustGet(ctx, key+".halfOpenSuccess", 2).Int()
slowRequestThreshold := g.Cfg().MustGet(ctx, key+".slowRequestThreshold", "3s").String()
dimension := g.Cfg().MustGet(ctx, key+".dimension", "service").String()
enableSlidingWindow := g.Cfg().MustGet(ctx, key+".enableSlidingWindow", false).Bool()
slidingWindowSize := g.Cfg().MustGet(ctx, key+".slidingWindowSize", "60s").String()
failureRateThreshold := g.Cfg().MustGet(ctx, key+".failureRateThreshold", 0.5).Float64()
halfOpenRequestSampleRate := g.Cfg().MustGet(ctx, key+".halfOpenRequestSampleRate", 1.0).Float64()
enableFallback := g.Cfg().MustGet(ctx, key+".enableFallback", false).Bool()
fallbackMessage := g.Cfg().MustGet(ctx, key+".fallbackMessage", "").String()
// 解析成功状态码
successCodes := g.Cfg().MustGet(ctx, key+".successStatusCodes", "200,201,204").String()
statusCodes := parseStatusCodes(successCodes)
return &CircuitBreakerConfig{
MaxFailures: maxFailures,
Timeout: timeout,
HalfOpenSuccess: halfOpenSuccess,
SuccessStatusCodes: statusCodes,
SlowRequestThreshold: slowRequestThreshold,
HalfOpenRequestSampleRate: halfOpenRequestSampleRate,
Dimension: dimension,
EnableSlidingWindow: enableSlidingWindow,
SlidingWindowSize: slidingWindowSize,
FailureRateThreshold: failureRateThreshold,
EnableFallback: enableFallback,
FallbackMessage: fallbackMessage,
}
}
// parseStatusCodes 解析HTTP状态码
func parseStatusCodes(str string) []int {
parts := strings.Split(str, ",")
codes := make([]int, 0, len(parts))
for _, part := range parts {
var code int
if _, err := fmt.Sscanf(strings.TrimSpace(part), "%d", &code); err == nil {
codes = append(codes, code)
}
}
return codes
}
// initServiceCircuitBreaker 初始化服务熔断器
func initServiceCircuitBreaker(serviceName string, config *CircuitBreakerConfig) error {
timeout, _ := time.ParseDuration(config.Timeout)
slowRequestThreshold, _ := time.ParseDuration(config.SlowRequestThreshold)
_, _ = time.ParseDuration(config.SlidingWindowSize)
resourceName := fmt.Sprintf("service:%s", serviceName)
var rule []*circuitbreaker.Rule
if config.EnableSlidingWindow {
// 使用滑动窗口统计(更精确)- 慢调用比例策略
rule = []*circuitbreaker.Rule{
{
Resource: resourceName,
Strategy: circuitbreaker.SlowRequestRatio,
RetryTimeoutMs: uint32(timeout.Milliseconds()),
MinRequestAmount: uint64(config.MaxFailures),
StatIntervalMs: 1000,
StatSlidingWindowBucketCount: 10,
MaxAllowedRtMs: uint64(slowRequestThreshold.Milliseconds()),
Threshold: config.FailureRateThreshold,
},
}
} else {
// 使用连续失败计数(更简单快速)- 异常数策略
rule = []*circuitbreaker.Rule{
{
Resource: resourceName,
Strategy: circuitbreaker.ErrorCount,
RetryTimeoutMs: uint32(timeout.Milliseconds()),
MinRequestAmount: uint64(config.MaxFailures),
StatIntervalMs: 1000, // 1秒统计窗口
Threshold: float64(config.MaxFailures),
},
}
}
// 加载规则到Sentinel
_, err := circuitbreaker.LoadRules(rule)
if err != nil {
return fmt.Errorf("加载熔断规则失败: %v", err)
}
// 初始化熔断器信息
cbInfo := &CircuitBreakerInfo{
ResourceName: resourceName,
State: StateClosed,
Config: config,
Metrics: &CircuitBreakerMetrics{},
}
circuitBreakers.Store(serviceName, cbInfo)
strategy := "error_count"
if config.EnableSlidingWindow {
strategy = "slow_ratio"
}
g.Log().Infof(context.Background(), "服务 %s 熔断器初始化成功: resource=%s, strategy=%s, timeout=%v",
serviceName, resourceName, strategy, timeout)
return nil
}
// CircuitBreakerMiddleware 熔断降级中间件使用阿里Sentinel
func CircuitBreakerMiddleware(r *ghttp.Request) {
startTime := time.Now()
// 从URL路径提取服务名
pathParts := strings.Split(strings.Trim(r.URL.Path, "/"), "/")
if len(pathParts) == 0 {
r.Middleware.Next()
return
}
serviceName := pathParts[0]
resourceName := fmt.Sprintf("service:%s", serviceName)
// 获取熔断器信息
val, ok := circuitBreakers.Load(serviceName)
if !ok {
// 未配置熔断器,直接放行
r.Middleware.Next()
return
}
cbInfo := val.(*CircuitBreakerInfo)
cbInfo.Metrics.TotalRequests.Add(1)
// 检查是否启用分布式熔断
if enableDistributed {
if isCircuitBreakerOpenInDistributed(r.GetCtx(), resourceName) {
cbInfo.Metrics.BlockRequests.Add(1)
g.Log().Warningf(r.GetCtx(), "分布式熔断触发: %s", resourceName)
sendFallbackResponse(r, serviceName, cbInfo.Config)
return
}
}
// 使用Sentinel进行熔断保护
entry, blockError := api.Entry(resourceName)
if blockError != nil {
// 被熔断拦截
cbInfo.Metrics.BlockRequests.Add(1)
cbInfo.Metrics.OpenCount.Add(1)
g.Log().Warningf(r.GetCtx(), "熔断触发: %s, reason: %v", resourceName, blockError)
// 更新熔断器状态
cbInfo.mu.Lock()
cbInfo.State = StateOpen
cbInfo.LastOpenTime = time.Now()
if timeout, err := time.ParseDuration(cbInfo.Config.Timeout); err == nil {
cbInfo.NextRetryTime = time.Now().Add(timeout)
}
cbInfo.mu.Unlock()
// 同步到分布式存储
if enableDistributed {
syncCircuitBreakerStateToDistributed(r.GetCtx(), resourceName, "open")
}
sendFallbackResponse(r, serviceName, cbInfo.Config)
return
}
// 执行后续中间件和业务逻辑
r.Middleware.Next()
// 记录请求结果基于HTTP状态码
statusCode := r.Response.Status
duration := time.Since(startTime)
if !isSuccessStatusCode(resourceName, statusCode) {
// 记录异常
cbInfo.Metrics.FailureRequests.Add(1)
api.TraceError(entry, fmt.Errorf("request failed with status: %d", statusCode))
g.Log().Debugf(r.GetCtx(), "服务 %s 请求失败: status=%d, duration=%v", serviceName, statusCode, duration)
} else {
cbInfo.Metrics.PassRequests.Add(1)
}
// 退出Sentinel资源
entry.Exit()
}
// sendFallbackResponse 发送降级响应
func sendFallbackResponse(r *ghttp.Request, serviceName string, config *CircuitBreakerConfig) {
if config.EnableFallback && config.FallbackMessage != "" {
// 自定义降级消息
r.Response.WriteStatusExit(503, config.FallbackMessage)
} else {
// 默认消息
r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 暂时不可用,请稍后再试", serviceName))
}
}
// isSuccessStatusCode 判断HTTP状态码是否成功
func isSuccessStatusCode(resourceName string, statusCode int) bool {
serviceName := strings.TrimPrefix(resourceName, "service:")
if serviceName == "" {
// 默认只认为2xx是成功
return statusCode >= 200 && statusCode < 300
}
// 从配置中获取成功状态码列表
var serviceConfig *CircuitBreakerConfig
if val, ok := circuitBreakerConfigs.Load(serviceName); ok {
serviceConfig = val.(*CircuitBreakerConfig)
}
if serviceConfig != nil && len(serviceConfig.SuccessStatusCodes) > 0 {
for _, code := range serviceConfig.SuccessStatusCodes {
if statusCode == code {
return true
}
}
return false
}
// 默认2xx状态码为成功
return statusCode >= 200 && statusCode < 300
}
// isCircuitBreakerOpenInDistributed 检查分布式熔断状态
func isCircuitBreakerOpenInDistributed(ctx context.Context, resourceName string) bool {
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
value, err := g.Redis().Get(ctx, key)
if err != nil || value.IsNil() {
return false
}
state := value.String()
return state == "open"
}
// syncCircuitBreakerStateToDistributed 同步熔断器状态到分布式存储
func syncCircuitBreakerStateToDistributed(ctx context.Context, resourceName, state string) {
distributedSyncLock.Lock()
defer distributedSyncLock.Unlock()
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
// 设置过期时间为5分钟使用SetEX
_, err := g.Redis().Do(ctx, "SETEX", key, 300, state)
if err != nil {
g.Log().Errorf(ctx, "同步熔断状态到Redis失败: %v", err)
}
}
// CircuitBreakerHealthCheckHandler 熔断器健康检查接口
func CircuitBreakerHealthCheckHandler(r *ghttp.Request) {
status := make(map[string]interface{})
totalServices := 0
openServices := 0
// 遍历所有熔断器
circuitBreakers.Range(func(key, value interface{}) bool {
serviceName := key.(string)
cbInfo := value.(*CircuitBreakerInfo)
totalServices++
cbInfo.mu.RLock()
isOpen := cbInfo.State == StateOpen
if isOpen {
openServices++
}
status[serviceName] = map[string]interface{}{
"resource": cbInfo.ResourceName,
"state": string(cbInfo.State),
"lastOpenTime": cbInfo.LastOpenTime,
"nextRetryTime": cbInfo.NextRetryTime,
"totalRequests": cbInfo.Metrics.TotalRequests.Load(),
"passRequests": cbInfo.Metrics.PassRequests.Load(),
"blockRequests": cbInfo.Metrics.BlockRequests.Load(),
"failureRequests": cbInfo.Metrics.FailureRequests.Load(),
"openCount": cbInfo.Metrics.OpenCount.Load(),
}
cbInfo.mu.RUnlock()
return true
})
summary := map[string]interface{}{
"totalServices": totalServices,
"openServices": openServices,
"closedServices": totalServices - openServices,
"distributed": enableDistributed,
}
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: "熔断器状态",
Data: map[string]interface{}{
"summary": summary,
"services": status,
},
})
}
// getSentinelStateString 转换Sentinel状态为字符串
func getSentinelStateString(state circuitbreaker.State) string {
switch state {
case circuitbreaker.Closed:
return string(StateClosed)
case circuitbreaker.Open:
return string(StateOpen)
case circuitbreaker.HalfOpen:
return string(StateHalfOpen)
default:
return "unknown"
}
}
// CircuitBreakerResetHandler 熔断器手动重置接口(仅限管理后台调用)
func CircuitBreakerResetHandler(r *ghttp.Request) {
serviceName := r.Get("service").String()
if serviceName == "" {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 400,
Message: "缺少service参数",
})
return
}
resourceName := fmt.Sprintf("service:%s", serviceName)
// 获取当前服务的所有规则
currentRules := circuitbreaker.GetRulesOfResource(resourceName)
// 只删除当前服务的规则
if len(currentRules) > 0 {
_, err := circuitbreaker.LoadRulesOfResource(resourceName, []*circuitbreaker.Rule{})
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重置熔断器失败: %v", err),
})
return
}
}
// 重新加载该服务的规则
if val, ok := circuitBreakerConfigs.Load(serviceName); ok {
config := val.(*CircuitBreakerConfig)
err := initServiceCircuitBreaker(serviceName, config)
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重置熔断器失败: %v", err),
})
return
}
}
// 更新内存状态
if val, ok := circuitBreakers.Load(serviceName); ok {
cbInfo := val.(*CircuitBreakerInfo)
cbInfo.mu.Lock()
cbInfo.State = StateClosed
cbInfo.LastOpenTime = time.Time{}
cbInfo.NextRetryTime = time.Time{}
cbInfo.mu.Unlock()
}
// 重置分布式状态(如果启用)
if enableDistributed {
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
_, _ = g.Redis().Del(r.GetCtx(), key)
}
g.Log().Infof(r.GetCtx(), "熔断器已手动重置: %s", resourceName)
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: fmt.Sprintf("服务 '%s' 的熔断器已重置", serviceName),
})
}
// CircuitBreakerReloadHandler 熔断器配置重载接口
func CircuitBreakerReloadHandler(r *ghttp.Request) {
serviceName := r.Get("service").String()
if serviceName == "" {
// 重载所有服务
services := g.Cfg().MustGet(r.GetCtx(), "circuitBreaker.services", []string{}).Strings()
successCount := 0
failCount := 0
for _, service := range services {
err := ReloadCircuitBreakerConfig(service)
if err != nil {
g.Log().Errorf(r.GetCtx(), "服务 %s 配置重载失败: %v", service, err)
failCount++
} else {
successCount++
}
}
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: fmt.Sprintf("配置重载完成: 成功 %d, 失败 %d", successCount, failCount),
Data: map[string]interface{}{
"success": successCount,
"failed": failCount,
},
})
return
}
// 重载单个服务
err := ReloadCircuitBreakerConfig(serviceName)
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重载失败: %v", err),
})
return
}
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: fmt.Sprintf("服务 '%s' 的熔断器配置已重载", serviceName),
})
}