Files
common/middleware/circuit_breaker.go
2026-03-12 08:51:25 +08:00

1131 lines
35 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package middleware
import (
"context"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
)
// CircuitBreakerState 熔断器状态
type CircuitBreakerState string
const (
StateClosed CircuitBreakerState = "closed" // 关闭:正常状态
StateOpen CircuitBreakerState = "open" // 开启:熔断状态
)
// 熔断器状态常量用于atomic.Int64
const (
stateClosed int64 = 0
stateOpen int64 = 1
)
// getState 获取熔断器状态字符串
func (cb *CircuitBreakerInfo) getState() CircuitBreakerState {
if cb.State.Load() == stateOpen {
return StateOpen
}
return StateClosed
}
// setState 设置熔断器状态atomic操作无锁
func (cb *CircuitBreakerInfo) setState(state CircuitBreakerState) CircuitBreakerState {
var newState int64
if state == StateOpen {
newState = stateOpen
} else {
newState = stateClosed
}
oldState := cb.State.Swap(newState)
if oldState == stateOpen {
return StateOpen
}
return StateClosed
}
// CircuitBreakerConfig 熔断器配置
type CircuitBreakerConfig struct {
Enabled bool // 是否启用熔断器
MaxFailures int // 连续失败次数
Timeout string // 熔断超时时间
TimeoutParsed time.Duration // 缓存的超时时间(性能优化)
SuccessStatusCodes []int // 视为成功的HTTP状态码
SlowRequestThreshold string // 慢请求阈值
SlowRequestThresholdParsed time.Duration // 缓存的慢请求阈值(性能优化)
EnableSlidingWindow bool // 是否启用滑动窗口
FailureRateThreshold float64 // 失败率阈值
EnableFallback bool // 是否启用降级
FallbackMessage string // 降级提示消息
RequestTimeout int // 请求超时时间(毫秒)
DistributedTTL int // 分布式熔断状态TTL
AdminIPs []string // 允许重置熔断器的管理员IP列表
StatIntervalMs int // 统计窗口时长毫秒默认1000ms
MinRequestAmount int // 最小请求数量默认与MaxFailures相同
AdminCIDRs []string // 允许重置熔断器的管理员CIDR列表P0支持IP段
// P1预编译的CIDR网络掩码性能优化
CIDRNetMasks []*net.IPNet
}
// CircuitBreakerMetrics 熔断器指标
type CircuitBreakerMetrics struct {
TotalRequests atomic.Int64 // 总请求数
PassRequests atomic.Int64 // 通过请求数
BlockRequests atomic.Int64 // 阻塞请求数
FailureRequests atomic.Int64 // 失败请求数
SlowRequests atomic.Int64 // 慢请求数P2可观测性
OpenCount atomic.Int64 // 熔断开启次数
LastResetTime atomic.Int64 // 上次重置时间Unix时间戳
// 使用atomic.Int64实现简单的时间戳存储避免使用mutex
LastOpenTime atomic.Int64 // 上次熔断时间Unix时间戳
NextRetryTime atomic.Int64 // 下次重试时间Unix时间戳
}
// CircuitBreakerInfo 熔断器信息
type CircuitBreakerInfo struct {
ResourceName string `json:"resourceName"` // 资源名称
State atomic.Int64 `json:"state"` // 当前状态0:closed, 1:open使用atomic避免mutex
Config *CircuitBreakerConfig `json:"config"` // 配置信息
Metrics *CircuitBreakerMetrics `json:"metrics"` // 指标统计
// 预编译的成功状态码集合P1性能优化
SuccessCodeMap map[int]bool
// P1预编译的CIDR网络掩码避免重复解析
CIDRNetMasks []*net.IPNet
}
var (
// circuitBreakers 存储所有熔断器状态(用于健康检查)
circuitBreakers sync.Map
// circuitBreakerConfigs 熔断器配置缓存
circuitBreakerConfigs sync.Map
// distributedSyncLocks 分布式同步锁(按服务名分片)
distributedSyncLocks sync.Map
// stateChangeListeners 状态变化监听器
stateChangeListeners sync.Map
// stateChangeListenersRegistered 默认监听器是否已注册
stateChangeListenersRegistered sync.Map
// P1使用map代替slice优化IP查找性能
allowedAdminIPsMap map[string]bool
// allowedAdminIPsMutex 保护白名单缓存的并发访问
allowedAdminIPsMutex sync.RWMutex
// P1预编译的CIDR网络掩码列表
allowedAdminCIDRs []*net.IPNet
// allowedAdminCIDRsMutex 保护CIDR列表的并发访问
allowedAdminCIDRsMutex sync.RWMutex
// totalServicesCount 缓存总服务数P1性能优化
totalServicesCount atomic.Int64
)
// InitCircuitBreaker 初始化Sentinel熔断器
func InitCircuitBreaker() error {
ctx := context.Background()
// 初始化Sentinel
err := api.InitDefault()
if err != nil {
return fmt.Errorf("sentinel初始化失败: %v", err)
}
// 注册熔断器状态变化监听器
registerStateChangeListeners()
g.Log().Infof(ctx, "Sentinel熔断器初始化成功")
// 扫描配置文件中所有配置了熔断器的服务
services := g.Cfg().MustGet(ctx, "circuitBreaker").Map()
// 过滤掉非服务配置的key
serviceNames := filterServiceNames(services)
if len(serviceNames) == 0 {
g.Log().Infof(ctx, "未配置任何服务熔断器")
return nil
}
// P1缓存总服务数
totalServicesCount.Store(int64(len(serviceNames)))
// 为每个服务创建熔断器
enabledCount := 0
for _, serviceName := range serviceNames {
serviceConfig := loadServiceCircuitBreakerConfig(serviceName)
if serviceConfig != nil && serviceConfig.Enabled {
circuitBreakerConfigs.Store(serviceName, serviceConfig)
initErr := initServiceCircuitBreaker(serviceName, serviceConfig)
if initErr != nil {
g.Log().Errorf(ctx, "服务 %s 熔断器初始化失败: %v", serviceName, initErr)
} else {
g.Log().Infof(ctx, "服务 %s 熔断器初始化成功", serviceName)
enabledCount++
}
} else {
g.Log().Infof(ctx, "服务 %s 熔断器未启用", serviceName)
}
}
// P1更新管理员IP白名单缓存在所有服务配置加载完成后
updateAdminIPsCache()
g.Log().Infof(ctx, "共初始化 %d 个服务熔断器,其中 %d 个已启用", len(serviceNames), enabledCount)
return nil
}
// ReloadCircuitBreakerConfig 动态重新加载熔断器配置
func ReloadCircuitBreakerConfig(serviceName string) error {
ctx := context.Background()
// 重新加载配置
serviceConfig := loadServiceCircuitBreakerConfig(serviceName)
if serviceConfig == nil {
return fmt.Errorf("未找到服务 %s 的配置", serviceName)
}
// 更新配置缓存
circuitBreakerConfigs.Store(serviceName, serviceConfig)
// 重新初始化熔断器
err := initServiceCircuitBreaker(serviceName, serviceConfig)
if err != nil {
return fmt.Errorf("重新初始化熔断器失败: %v", err)
}
g.Log().Infof(ctx, "服务 %s 熔断器配置重新加载成功", serviceName)
return nil
}
// loadServiceCircuitBreakerConfig 加载单个服务的熔断器配置
func loadServiceCircuitBreakerConfig(serviceName string) *CircuitBreakerConfig {
ctx := context.Background()
key := fmt.Sprintf("circuitBreaker.%s", serviceName)
enabled := g.Cfg().MustGet(ctx, key+".enabled", true).Bool()
maxFailures := g.Cfg().MustGet(ctx, key+".maxFailures", 5).Int()
timeout := g.Cfg().MustGet(ctx, key+".timeout", "60s").String()
slowRequestThreshold := g.Cfg().MustGet(ctx, key+".slowRequestThreshold", "3s").String()
enableSlidingWindow := g.Cfg().MustGet(ctx, key+".enableSlidingWindow", false).Bool()
failureRateThreshold := g.Cfg().MustGet(ctx, key+".failureRateThreshold", 0.5).Float64()
enableFallback := g.Cfg().MustGet(ctx, key+".enableFallback", false).Bool()
fallbackMessage := g.Cfg().MustGet(ctx, key+".fallbackMessage", "").String()
requestTimeout := g.Cfg().MustGet(ctx, key+".requestTimeout", 30000).Int()
distributedTTL := g.Cfg().MustGet(ctx, key+".distributedTTL", 300).Int()
adminIPs := g.Cfg().MustGet(ctx, key+".adminIPs", "").String()
adminCIDRs := g.Cfg().MustGet(ctx, key+".adminCIDRs", "").String() // P0支持CIDR
statIntervalMs := g.Cfg().MustGet(ctx, key+".statIntervalMs", 1000).Int()
minRequestAmount := g.Cfg().MustGet(ctx, key+".minRequestAmount", 0).Int()
// 解析成功状态码使用map用于快速查找
successCodes := g.Cfg().MustGet(ctx, key+".successStatusCodes", "200,201,204").String()
statusCodes := parseStatusCodesSlice(successCodes)
// 解析时间(缓存结果,性能优化)
timeoutParsed, err := time.ParseDuration(timeout)
if err != nil {
timeoutParsed = 60 * time.Second
g.Log().Warningf(ctx, "服务 %s 的 timeout 解析失败,使用默认值: %v", serviceName, err)
}
slowRequestThresholdParsed, err := time.ParseDuration(slowRequestThreshold)
if err != nil {
slowRequestThresholdParsed = 3 * time.Second
g.Log().Warningf(ctx, "服务 %s 的 slowRequestThreshold 解析失败,使用默认值: %v", serviceName, err)
}
// 解析管理员IP列表
adminIPList := parseAdminIPs(adminIPs)
// P1预编译CIDR为net.IPNet支持IPv4和IPv6
cidrNets, err := parseAdminCIDRs(adminCIDRs)
if err != nil {
g.Log().Warningf(ctx, "服务 %s 的 adminCIDRs 解析失败: %v", serviceName, err)
cidrNets = nil
}
// 如果minRequestAmount未配置则使用maxFailures作为默认值
if minRequestAmount == 0 {
minRequestAmount = maxFailures
}
return &CircuitBreakerConfig{
Enabled: enabled,
MaxFailures: maxFailures,
Timeout: timeout,
TimeoutParsed: timeoutParsed,
SuccessStatusCodes: statusCodes,
SlowRequestThreshold: slowRequestThreshold,
SlowRequestThresholdParsed: slowRequestThresholdParsed,
EnableSlidingWindow: enableSlidingWindow,
FailureRateThreshold: failureRateThreshold,
EnableFallback: enableFallback,
FallbackMessage: fallbackMessage,
RequestTimeout: requestTimeout,
DistributedTTL: distributedTTL,
AdminIPs: adminIPList,
CIDRNetMasks: cidrNets, // P1预编译的CIDR网络掩码
StatIntervalMs: statIntervalMs,
MinRequestAmount: minRequestAmount,
}
}
// parseStatusCodes 解析HTTP状态码返回map用于快速查找
func parseStatusCodes(str string) map[int]bool {
parts := strings.Split(str, ",")
codeMap := make(map[int]bool, len(parts))
for _, part := range parts {
var code int
if _, err := fmt.Sscanf(strings.TrimSpace(part), "%d", &code); err == nil {
codeMap[code] = true
}
}
return codeMap
}
// parseStatusCodesSlice 解析HTTP状态码返回切片用于配置
func parseStatusCodesSlice(str string) []int {
parts := strings.Split(str, ",")
codes := make([]int, 0, len(parts))
for _, part := range parts {
var code int
if _, err := fmt.Sscanf(strings.TrimSpace(part), "%d", &code); err == nil {
codes = append(codes, code)
}
}
return codes
}
// parseAdminCIDRs 解析管理员CIDR列表P1预编译为net.IPNetP0支持IPv6
func parseAdminCIDRs(str string) ([]*net.IPNet, error) {
if str == "" {
return nil, nil
}
parts := strings.Split(str, ",")
nets := make([]*net.IPNet, 0, len(parts))
for _, part := range parts {
cidr := strings.TrimSpace(part)
if cidr != "" {
// 使用net.ParseCIDR解析CIDR支持IPv4和IPv6
_, ipNet, err := net.ParseCIDR(cidr)
if err != nil {
return nil, fmt.Errorf("解析CIDR失败: %s, error: %v", cidr, err)
}
nets = append(nets, ipNet)
}
}
return nets, nil
}
// updateAdminIPsCache 更新管理员IP白名单缓存P1使用map优化性能P1预编译CIDR
func updateAdminIPsCache() {
ipMap := make(map[string]bool)
cidrNets := make([]*net.IPNet, 0)
// 收集所有服务的adminIPs配置
circuitBreakerConfigs.Range(func(key, value interface{}) bool {
config := value.(*CircuitBreakerConfig)
if len(config.AdminIPs) > 0 {
for _, ip := range config.AdminIPs {
if !ipMap[ip] {
ipMap[ip] = true
}
}
}
// P1使用预编译的CIDR网络掩码
if len(config.CIDRNetMasks) > 0 {
for _, cidrNet := range config.CIDRNetMasks {
cidrNets = append(cidrNets, cidrNet)
}
}
return true
})
// 更新缓存
allowedAdminIPsMutex.Lock()
allowedAdminIPsMap = ipMap
allowedAdminIPsMutex.Unlock()
allowedAdminCIDRsMutex.Lock()
allowedAdminCIDRs = cidrNets
allowedAdminCIDRsMutex.Unlock()
}
// isIPInCIDR 检查IP是否在CIDR范围内P1使用预编译的net.IPNetP0支持IPv6
func isIPInCIDR(ipStr string, cidrNet *net.IPNet) bool {
ip := net.ParseIP(ipStr)
if ip == nil {
return false
}
return cidrNet.Contains(ip)
}
// parseAdminIPs 解析管理员IP列表
func parseAdminIPs(str string) []string {
if str == "" {
return nil
}
parts := strings.Split(str, ",")
ips := make([]string, 0, len(parts))
for _, part := range parts {
ip := strings.TrimSpace(part)
if ip != "" {
ips = append(ips, ip)
}
}
return ips
}
// filterServiceNames 过滤服务名排除非服务配置的key
func filterServiceNames(services map[string]interface{}) []string {
excludeKeys := map[string]bool{
"services": true,
"enableDistributed": true,
"requestTimeout": true,
"distributedTTL": true,
}
serviceNames := make([]string, 0, len(services))
for key := range services {
if !excludeKeys[key] {
serviceNames = append(serviceNames, key)
}
}
return serviceNames
}
// initServiceCircuitBreaker 初始化服务熔断器
func initServiceCircuitBreaker(serviceName string, config *CircuitBreakerConfig) error {
// 验证配置参数
if err := validateCircuitBreakerConfig(config); err != nil {
return fmt.Errorf("配置验证失败: %v", err)
}
// 使用缓存的时间值(性能优化)
timeout := config.TimeoutParsed
slowRequestThreshold := config.SlowRequestThresholdParsed
resourceName := fmt.Sprintf("service:%s", serviceName)
var rule []*circuitbreaker.Rule
if config.EnableSlidingWindow {
// 使用滑动窗口统计(更精确)- 慢调用比例策略
rule = []*circuitbreaker.Rule{
{
Resource: resourceName,
Strategy: circuitbreaker.SlowRequestRatio,
RetryTimeoutMs: uint32(timeout.Milliseconds()),
MinRequestAmount: uint64(config.MinRequestAmount),
StatIntervalMs: uint32(config.StatIntervalMs),
StatSlidingWindowBucketCount: 10,
MaxAllowedRtMs: uint64(slowRequestThreshold.Milliseconds()),
Threshold: config.FailureRateThreshold,
},
}
} else {
// 使用连续失败计数(更简单快速)- 异常数策略
rule = []*circuitbreaker.Rule{
{
Resource: resourceName,
Strategy: circuitbreaker.ErrorCount,
RetryTimeoutMs: uint32(timeout.Milliseconds()),
MinRequestAmount: uint64(config.MinRequestAmount),
StatIntervalMs: uint32(config.StatIntervalMs),
Threshold: float64(config.MaxFailures),
},
}
}
// 先清理旧规则(健壮性改进)
_, _ = circuitbreaker.LoadRulesOfResource(resourceName, []*circuitbreaker.Rule{})
// 加载新规则到Sentinel
_, err := circuitbreaker.LoadRules(rule)
if err != nil {
return fmt.Errorf("加载熔断规则失败: %v", err)
}
// 初始化熔断器信息P1直接从slice构建map避免重复解析
successCodeMap := make(map[int]bool, len(config.SuccessStatusCodes))
for _, code := range config.SuccessStatusCodes {
successCodeMap[code] = true
}
cbInfo := &CircuitBreakerInfo{
ResourceName: resourceName,
Config: config,
Metrics: &CircuitBreakerMetrics{},
SuccessCodeMap: successCodeMap,
CIDRNetMasks: config.CIDRNetMasks,
}
cbInfo.State.Store(stateClosed)
cbInfo.Metrics.LastResetTime.Store(time.Now().Unix())
circuitBreakers.Store(serviceName, cbInfo)
strategy := "error_count"
if config.EnableSlidingWindow {
strategy = "slow_ratio"
}
g.Log().Infof(context.Background(), "服务 %s 熔断器初始化成功: resource=%s, strategy=%s, timeout=%v, threshold=%.2f",
serviceName, resourceName, strategy, timeout, rule[0].Threshold)
return nil
}
// CircuitBreakerMiddleware 熔断降级中间件使用阿里Sentinel
func CircuitBreakerMiddleware(r *ghttp.Request) {
startTime := time.Now()
ctx := r.GetCtx()
// 从URL路径提取服务名并获取配置P1合并重复验证
serviceName := extractServiceName(r.URL.Path)
if serviceName == "" {
r.Middleware.Next()
return
}
// 获取熔断器信息(包含配置)
cbInfoVal, ok := circuitBreakers.Load(serviceName)
if !ok {
// 未配置熔断器,直接放行
r.Middleware.Next()
return
}
cbInfo := cbInfoVal.(*CircuitBreakerInfo)
config := cbInfo.Config
if !config.Enabled {
// 熔断器未启用,直接放行
r.Middleware.Next()
return
}
cbInfo.Metrics.TotalRequests.Add(1)
// 提前构造resourceName性能优化
resourceName := cbInfo.ResourceName
// 设置请求超时(使用服务独立配置)
if config.RequestTimeout > 0 {
ctx, cancel := context.WithTimeout(ctx, time.Duration(config.RequestTimeout)*time.Millisecond)
r.SetCtx(ctx)
defer cancel()
}
// 检查是否启用分布式熔断
if config.DistributedTTL > 0 {
if isCircuitBreakerOpenInDistributed(ctx, resourceName) {
cbInfo.Metrics.BlockRequests.Add(1)
g.Log().Warningf(ctx, "分布式熔断触发: %s", resourceName)
sendFallbackResponse(r, serviceName, config, "distributed")
return
}
}
// 使用Sentinel进行熔断保护
entry, blockError := api.Entry(resourceName)
if blockError != nil {
// 被熔断拦截
cbInfo.Metrics.BlockRequests.Add(1)
cbInfo.Metrics.OpenCount.Add(1)
g.Log().Warningf(ctx, "熔断触发: %s, reason: %v", resourceName, blockError)
// 使用atomic更新状态无锁
oldStateStr := cbInfo.setState(StateOpen)
now := time.Now()
cbInfo.Metrics.LastOpenTime.Store(now.Unix())
cbInfo.Metrics.NextRetryTime.Store(now.Add(config.TimeoutParsed).Unix())
// 通知状态变化(如果状态改变)
if oldStateStr != StateOpen {
notifyStateChange(serviceName, oldStateStr, StateOpen)
}
// 同步到分布式存储
if config.DistributedTTL > 0 {
syncCircuitBreakerStateToDistributed(ctx, resourceName, "open", config.DistributedTTL)
}
sendFallbackResponse(r, serviceName, config, "blocked")
return
}
// 执行后续中间件和业务逻辑
r.Middleware.Next()
// 记录请求结果基于HTTP状态码
statusCode := r.Response.Status
duration := time.Since(startTime)
// 判断是否为慢请求P2可观测性
if duration > config.SlowRequestThresholdParsed {
cbInfo.Metrics.SlowRequests.Add(1)
}
// 使用cbInfo.SuccessCodeMap判断状态码性能优化
if !isSuccessStatusCode(cbInfo, statusCode) {
// 记录异常
cbInfo.Metrics.FailureRequests.Add(1)
api.TraceError(entry, fmt.Errorf("request failed with status: %d", statusCode))
g.Log().Debugf(ctx, "服务 %s 请求失败: status=%d, duration=%v", serviceName, statusCode, duration)
} else {
cbInfo.Metrics.PassRequests.Add(1)
// 更新状态为关闭如果之前是开启状态使用atomic操作
if cbInfo.getState() != StateClosed {
oldStateStr := cbInfo.setState(StateClosed)
if oldStateStr != StateClosed {
notifyStateChange(serviceName, oldStateStr, StateClosed)
}
}
}
// 退出Sentinel资源
entry.Exit()
}
// sendFallbackResponse 发送降级响应P0添加日志记录
func sendFallbackResponse(r *ghttp.Request, serviceName string, config *CircuitBreakerConfig, reason string) {
// P0记录降级日志便于问题排查
g.Log().Warningf(r.GetCtx(), "熔断器降级: service=%s, reason=%s, clientIP=%s", serviceName, reason, r.GetClientIp())
if config.EnableFallback && config.FallbackMessage != "" {
// 自定义降级消息
r.Response.WriteStatusExit(503, config.FallbackMessage)
} else {
// 根据原因返回不同的状态码和消息
switch reason {
case "blocked":
r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 熔断保护中,请稍后再试", serviceName))
case "distributed":
r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 分布式熔断中", serviceName))
default:
r.Response.WriteStatusExit(503, fmt.Sprintf("服务 '%s' 暂时不可用,请稍后再试", serviceName))
}
}
}
// isSuccessStatusCode 判断HTTP状态码是否成功使用cbInfo.SuccessCodeMap优化性能
func isSuccessStatusCode(cbInfo *CircuitBreakerInfo, statusCode int) bool {
if cbInfo.SuccessCodeMap != nil && len(cbInfo.SuccessCodeMap) > 0 {
return cbInfo.SuccessCodeMap[statusCode]
}
// 默认2xx状态码为成功
return statusCode >= 200 && statusCode < 300
}
// extractServiceName 从URL路径提取服务名P0添加URL编码处理
func extractServiceName(path string) string {
// 去除首尾斜杠并分割
path = strings.Trim(path, "/")
if path == "" {
return ""
}
parts := strings.Split(path, "/")
if len(parts) == 0 {
return ""
}
serviceName := parts[0]
// P0处理URL编码将 %2F 等转义字符还原
// 注意在goframe的网关中间件中路径通常已经被框架处理过
// 但为了安全性,这里对包含%的情况进行简单处理
if strings.Contains(serviceName, "%") {
// 尝试解码URL编码的字符串
// 使用path.Unescape而不是url.QueryUnescape因为我们处理的是路径片段
decoded, err := pathUnescape(serviceName)
if err == nil {
serviceName = decoded
}
// 如果解码失败继续使用原始serviceName
}
// 验证服务名是否在已配置的熔断器中
if _, ok := circuitBreakerConfigs.Load(serviceName); ok {
return serviceName
}
return ""
}
// pathUnescape 路径片段的URL解码P0安全性改进
// 注意Go 1.8+ 可以使用 path.Unescape这里提供兼容实现
func pathUnescape(s string) (string, error) {
// 使用strings.Builder优化性能
var builder strings.Builder
builder.Grow(len(s))
for i := 0; i < len(s); i++ {
switch s[i] {
case '%':
// 处理百分号编码
if i+2 >= len(s) {
// 不完整的编码,保留原样
builder.WriteByte(s[i])
continue
}
// 解析十六进制数字
high := hexDigit(s[i+1])
low := hexDigit(s[i+2])
if high == 0xFF || low == 0xFF {
// 无效的十六进制,保留原样
builder.WriteByte(s[i])
continue
}
builder.WriteByte((high << 4) | low)
i += 2
case '+':
// 路径片段中的+通常不需要解码为空格
builder.WriteByte('+')
default:
builder.WriteByte(s[i])
}
}
return builder.String(), nil
}
// hexDigit 将十六进制字符转换为对应的数值
func hexDigit(c byte) byte {
switch {
case '0' <= c && c <= '9':
return c - '0'
case 'a' <= c && c <= 'f':
return c - 'a' + 10
case 'A' <= c && c <= 'F':
return c - 'A' + 10
default:
return 0xFF // 无效字符
}
}
// isCircuitBreakerOpenInDistributed 检查分布式熔断状态
func isCircuitBreakerOpenInDistributed(ctx context.Context, resourceName string) bool {
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
redis := g.Redis()
if redis == nil {
return false
}
value, err := redis.Get(ctx, key)
if err != nil || value.IsNil() {
return false
}
state := value.String()
return state == "open"
}
// getDistributedLock 获取分布式锁(按服务名分片)
func getDistributedLock(serviceName string) *sync.Mutex {
lock, _ := distributedSyncLocks.LoadOrStore(serviceName, &sync.Mutex{})
return lock.(*sync.Mutex)
}
// syncCircuitBreakerStateToDistributed 同步熔断器状态到分布式存储
func syncCircuitBreakerStateToDistributed(ctx context.Context, resourceName, state string, ttl int) {
// 提取服务名用于锁分片
serviceName := strings.TrimPrefix(resourceName, "service:")
lock := getDistributedLock(serviceName)
lock.Lock()
defer lock.Unlock()
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
redis := g.Redis()
if redis == nil {
g.Log().Errorf(ctx, "Redis客户端未初始化无法同步熔断状态")
return
}
_, err := redis.Do(ctx, "SETEX", key, ttl, state)
if err != nil {
g.Log().Errorf(ctx, "同步熔断状态到Redis失败: %v", err)
}
}
// validateCircuitBreakerConfig 验证熔断器配置
func validateCircuitBreakerConfig(config *CircuitBreakerConfig) error {
if config.MaxFailures <= 0 {
return fmt.Errorf("maxFailures必须大于0当前值: %d", config.MaxFailures)
}
if config.FailureRateThreshold < 0 || config.FailureRateThreshold > 1 {
return fmt.Errorf("failureRateThreshold必须在0.0-1.0之间,当前值: %.2f", config.FailureRateThreshold)
}
if len(config.SuccessStatusCodes) == 0 {
return fmt.Errorf("successStatusCodes不能为空")
}
if config.RequestTimeout < 0 || config.RequestTimeout > 300000 {
return fmt.Errorf("requestTimeout必须在0-300000毫秒之间当前值: %d", config.RequestTimeout)
}
if config.DistributedTTL < 0 || config.DistributedTTL > 3600 {
return fmt.Errorf("distributedTTL必须在0-3600秒之间当前值: %d", config.DistributedTTL)
}
if config.StatIntervalMs < 100 || config.StatIntervalMs > 60000 {
return fmt.Errorf("statIntervalMs必须在100-60000毫秒之间当前值: %d", config.StatIntervalMs)
}
if config.MinRequestAmount < 1 || config.MinRequestAmount > 10000 {
return fmt.Errorf("minRequestAmount必须在1-10000之间当前值: %d", config.MinRequestAmount)
}
// 验证时间字符串格式(如果缓存为空,说明解析失败)
if config.TimeoutParsed == 0 {
return fmt.Errorf("timeout格式错误应为有效的时间字符串如30s, 1m当前值: %s", config.Timeout)
}
if config.SlowRequestThresholdParsed == 0 {
return fmt.Errorf("slowRequestThreshold格式错误应为有效的时间字符串如3s, 1m当前值: %s", config.SlowRequestThreshold)
}
return nil
}
// registerStateChangeListeners 注册状态变化监听器
func registerStateChangeListeners() {
// 检查是否已注册,防止重复注册(健壮性改进)
if _, exists := stateChangeListenersRegistered.LoadOrStore("default", true); exists {
return
}
// 注册默认监听器(区分日志级别)
RegisterStateChangeListener("default", func(serviceName string, fromState, toState CircuitBreakerState) {
// Open状态使用Warning级别Closed状态使用Info级别
if toState == StateOpen {
g.Log().Warningf(context.Background(), "熔断器状态变化: service=%s, %s -> %s",
serviceName, fromState, toState)
} else {
g.Log().Infof(context.Background(), "熔断器状态变化: service=%s, %s -> %s",
serviceName, fromState, toState)
}
})
}
// StateChangeListener 状态变化监听器类型
type StateChangeListener func(serviceName string, fromState, toState CircuitBreakerState)
// RegisterStateChangeListener 注册状态变化监听器
func RegisterStateChangeListener(name string, listener StateChangeListener) {
stateChangeListeners.Store(name, listener)
}
// UnregisterStateChangeListener 取消注册状态变化监听器
func UnregisterStateChangeListener(name string) {
stateChangeListeners.Delete(name)
}
// notifyStateChange 通知所有监听器状态变化
func notifyStateChange(serviceName string, fromState, toState CircuitBreakerState) {
stateChangeListeners.Range(func(key, value interface{}) bool {
if listener, ok := value.(StateChangeListener); ok {
listener(serviceName, fromState, toState)
}
return true
})
}
// CircuitBreakerHealthCheckHandler 熔断器健康检查接口P0添加IP白名单验证P1添加分页支持P1优化性能
func CircuitBreakerHealthCheckHandler(r *ghttp.Request) {
// P0权限验证
if !isAdminIP(r) {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 403,
Message: "权限不足,禁止访问",
})
return
}
status := make(map[string]interface{})
totalServices := 0
openServices := 0
// P1分页参数
page := r.Get("page").Int()
size := r.Get("size").Int()
if page < 0 {
page = 0
}
if size <= 0 || size > 100 {
size = 20 // 默认20条最多100条
}
// P1使用缓存的totalServicesCount避免每次遍历
total := int(totalServicesCount.Load())
start := page * size
// P1只遍历分页范围内的服务通过计数跳过
end := start + size
if end > total {
end = total
}
current := 0
circuitBreakers.Range(func(key, value interface{}) bool {
// 跳过前面的页
if current < start {
current++
return true
}
// 只处理当前页
if current >= end {
return false
}
serviceName := key.(string)
cbInfo := value.(*CircuitBreakerInfo)
totalServices++
isOpen := cbInfo.getState() == StateOpen
if isOpen {
openServices++
}
// 从Metrics中读取数据使用atomic
lastResetTime := cbInfo.Metrics.LastResetTime.Load()
var lastResetTimeStr string
if lastResetTime > 0 {
lastResetTimeStr = time.Unix(lastResetTime, 0).Format("2006-01-02 15:04:05")
}
lastOpenTime := cbInfo.Metrics.LastOpenTime.Load()
var lastOpenTimeStr string
if lastOpenTime > 0 {
lastOpenTimeStr = time.Unix(lastOpenTime, 0).Format("2006-01-02 15:04:05")
}
nextRetryTime := cbInfo.Metrics.NextRetryTime.Load()
var nextRetryTimeStr string
if nextRetryTime > 0 {
nextRetryTimeStr = time.Unix(nextRetryTime, 0).Format("2006-01-02 15:04:05")
}
status[serviceName] = map[string]interface{}{
"resource": cbInfo.ResourceName,
"state": string(cbInfo.getState()),
"lastOpenTime": lastOpenTimeStr,
"nextRetryTime": nextRetryTimeStr,
"totalRequests": cbInfo.Metrics.TotalRequests.Load(),
"passRequests": cbInfo.Metrics.PassRequests.Load(),
"blockRequests": cbInfo.Metrics.BlockRequests.Load(),
"failureRequests": cbInfo.Metrics.FailureRequests.Load(),
"slowRequests": cbInfo.Metrics.SlowRequests.Load(),
"openCount": cbInfo.Metrics.OpenCount.Load(),
"lastResetTime": lastResetTimeStr,
}
current++
return true
})
summary := map[string]interface{}{
"totalServices": totalServices,
"openServices": openServices,
"closedServices": totalServices - openServices,
}
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: "熔断器状态",
Data: map[string]interface{}{
"summary": summary,
"services": status,
"page": page,
"size": size,
"total": total,
},
})
}
// isAdminIP 检查请求IP是否在管理员白名单中P1使用map优化性能P0支持IPv6 CIDR
func isAdminIP(r *ghttp.Request) bool {
clientIP := r.GetClientIp()
if clientIP == "" {
return false
}
// 读取缓存的IP白名单P1使用map实现O(1)查找)
allowedAdminIPsMutex.RLock()
allowedIPs := allowedAdminIPsMap
allowedAdminIPsMutex.RUnlock()
// 如果没有配置白名单允许所有IP向后兼容
if len(allowedIPs) == 0 {
allowedAdminCIDRsMutex.RLock()
hasCIDRs := len(allowedAdminCIDRs) > 0
allowedAdminCIDRsMutex.RUnlock()
// 如果也没有CIDR则允许所有IP
if !hasCIDRs {
return true
}
}
// 精确IP匹配P1map查找O(1)
if allowedIPs[clientIP] {
return true
}
// P1使用预编译的CIDR网络掩码匹配支持IPv4和IPv6
allowedAdminCIDRsMutex.RLock()
cidrNets := allowedAdminCIDRs
allowedAdminCIDRsMutex.RUnlock()
if len(cidrNets) > 0 {
clientNetIP := net.ParseIP(clientIP)
if clientNetIP != nil {
for _, cidrNet := range cidrNets {
if cidrNet.Contains(clientNetIP) {
return true
}
}
}
}
g.Log().Warningf(r.GetCtx(), "熔断器操作请求被拒绝IP不在白名单中: %s", clientIP)
return false
}
// CircuitBreakerResetHandler 熔断器手动重置接口(仅限管理后台调用)
func CircuitBreakerResetHandler(r *ghttp.Request) {
serviceName := r.Get("service").String()
if serviceName == "" {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 400,
Message: "缺少service参数",
})
return
}
// 权限验证检查IP是否在白名单中
if !isAdminIP(r) {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 403,
Message: "权限不足,禁止访问",
})
return
}
resourceName := fmt.Sprintf("service:%s", serviceName)
// 获取当前服务的所有规则
currentRules := circuitbreaker.GetRulesOfResource(resourceName)
// 只删除当前服务的规则
if len(currentRules) > 0 {
_, err := circuitbreaker.LoadRulesOfResource(resourceName, []*circuitbreaker.Rule{})
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重置熔断器失败: %v", err),
})
return
}
}
// 重新加载该服务的规则
if val, ok := circuitBreakerConfigs.Load(serviceName); ok {
config := val.(*CircuitBreakerConfig)
err := initServiceCircuitBreaker(serviceName, config)
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重置熔断器失败: %v", err),
})
return
}
}
// 更新内存状态并重置指标使用atomic操作
if val, ok := circuitBreakers.Load(serviceName); ok {
cbInfo := val.(*CircuitBreakerInfo)
cbInfo.State.Store(stateClosed)
cbInfo.Metrics.LastOpenTime.Store(0)
cbInfo.Metrics.NextRetryTime.Store(0)
// 重置指标
cbInfo.Metrics.TotalRequests.Store(0)
cbInfo.Metrics.PassRequests.Store(0)
cbInfo.Metrics.BlockRequests.Store(0)
cbInfo.Metrics.FailureRequests.Store(0)
cbInfo.Metrics.SlowRequests.Store(0)
cbInfo.Metrics.OpenCount.Store(0)
cbInfo.Metrics.LastResetTime.Store(time.Now().Unix())
}
// 重置分布式状态(如果启用)
if val, ok := circuitBreakerConfigs.Load(serviceName); ok {
config := val.(*CircuitBreakerConfig)
if config.DistributedTTL > 0 {
key := fmt.Sprintf("circuit_breaker:%s:state", resourceName)
redis := g.Redis()
if redis != nil {
_, _ = redis.Del(r.GetCtx(), key)
}
}
}
g.Log().Infof(r.GetCtx(), "熔断器已手动重置: %s", resourceName)
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: fmt.Sprintf("服务 '%s' 的熔断器已重置", serviceName),
})
}
// CircuitBreakerReloadHandler 熔断器配置重载接口
func CircuitBreakerReloadHandler(r *ghttp.Request) {
serviceName := r.Get("service").String()
// 权限验证检查IP是否在白名单中P0级别安全问题
if !isAdminIP(r) {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 403,
Message: "权限不足,禁止访问",
})
return
}
if serviceName == "" {
// 重载所有服务 - 扫描配置文件中所有服务
services := g.Cfg().MustGet(r.GetCtx(), "circuitBreaker").Map()
// 过滤出服务名
serviceNames := filterServiceNames(services)
successCount := 0
failCount := 0
for _, service := range serviceNames {
err := ReloadCircuitBreakerConfig(service)
if err != nil {
g.Log().Errorf(r.GetCtx(), "服务 %s 配置重载失败: %v", service, err)
failCount++
} else {
successCount++
}
}
// 更新管理员IP白名单缓存
updateAdminIPsCache()
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: fmt.Sprintf("配置重载完成: 成功 %d, 失败 %d", successCount, failCount),
Data: map[string]interface{}{
"success": successCount,
"failed": failCount,
},
})
return
}
// 重载单个服务
err := ReloadCircuitBreakerConfig(serviceName)
if err != nil {
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 500,
Message: fmt.Sprintf("重载失败: %v", err),
})
return
}
r.Response.WriteJsonExit(ghttp.DefaultHandlerResponse{
Code: 200,
Message: fmt.Sprintf("服务 '%s' 的熔断器配置已重载", serviceName),
})
}