package main import ( "context" "fmt" netHttp "net/http" "net/http/httputil" "net/url" "strings" "gitea.com/red-future/common/consul" "gitea.com/red-future/common/http" "gitea.com/red-future/common/middleware" _ "gitea.com/red-future/common/swagger" "gitea.com/red-future/common/utils" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/ghttp" "github.com/gogf/gf/v2/util/gconv" ) func StartServerProxy() { // 初始化Sentinel熔断器 err := middleware.InitCircuitBreaker() if err != nil { g.Log().Errorf(context.Background(), "熔断器初始化失败: %v", err) panic("熔断器初始化失败") } // 绑定中间件:CORS跨域 -> IP限流 -> 用户限流 -> 熔断降级 -> 服务限流 -> 全局限流 http.Httpserver.BindMiddlewareDefault( ghttp.MiddlewareCORS, //middleware.IPLimiter, // IP限流(防DDoS) //middleware.UserLimiter, // 用户限流(防止单用户滥用) //middleware.CircuitBreakerMiddleware, // ⭐ 熔断降级(保护后端服务) //middleware.ServiceLimiter, // 服务限流(保护微服务) //middleware.GlobalLimiter, // Redis全局限流(分布式支持) ) //使用默认http返回结构 // 熔断器健康检查接口 http.Httpserver.BindHandler("/circuit-breaker/health", middleware.CircuitBreakerHealthCheckHandler) // 熔断器手动重置接口(仅限管理后台调用) http.Httpserver.BindHandler("/circuit-breaker/reset", middleware.CircuitBreakerResetHandler) // 熔断器配置重载接口 http.Httpserver.BindHandler("/circuit-breaker/reload", middleware.CircuitBreakerReloadHandler) http.Httpserver.BindHandler("/*", func(r *ghttp.Request) { g.Log().Debugf(r.GetCtx(), "url:%s", r.RequestURI) serverName := strings.Split(r.RequestURI, "/")[1] instanceAddr, err := consul.GetInstanceAddr(r.GetCtx(), serverName) if err != nil { g.Log().Errorf(r.GetCtx(), "serverName:%s不可用", serverName) r.Response.Status = 503 r.Response.WriteJsonExit(map[string]interface{}{ "success": false, "code": 503, "message": fmt.Sprintf("服务 '%s' 暂时不可用", serverName), }) return } // 1. 解析 consul 配置地址 consulAddr := g.Cfg().MustGet(r.GetCtx(), "consul.address").String() consulAddrList := strings.Split(consulAddr, ":") if len(consulAddrList) < 1 { g.Log().Error(r.GetCtx(), "consul.address 配置格式错误") r.Response.WriteJsonExit(map[string]interface{}{ "success": false, "code": 500, "message": fmt.Sprintf("consul.address 配置格式错误:%s", consulAddr), }) return } ipStr := instanceAddr if strings.Contains(instanceAddr, ":") { ipStr = strings.Split(instanceAddr, ":")[0] } // 2. 如果不是本地IP,则替换为consul配置的IP if !utils.IsLocalIP(ipStr) { instanceAddr = strings.Replace(instanceAddr, ipStr, consulAddrList[0], 1) } r.Request.URL.Path = strings.Replace(r.Request.URL.Path, fmt.Sprintf("%s/", serverName), "", 1) r.MakeBodyRepeatableRead(false) u, _ := url.Parse(fmt.Sprintf("%s://%s", "http", instanceAddr)) proxy := httputil.NewSingleHostReverseProxy(u) proxy.ErrorHandler = func(writer netHttp.ResponseWriter, request *netHttp.Request, e error) { writer.WriteHeader(netHttp.StatusBadGateway) } if !strings.Contains(r.RequestURI, "/swagger") && !strings.Contains(r.RequestURI, "/pub/captcha/get") && !strings.Contains(r.RequestURI, "/login") && !strings.Contains(r.RequestURI, "/web/socket/") { user, err := utils.GetUserInfo(r.GetCtx()) if err != nil { g.Log().Errorf(r.GetCtx(), "获取用户信息失败: %v", err) r.Response.Status = 500 r.Response.WriteJsonExit(map[string]interface{}{ "success": false, "code": 500, "message": "获取用户信息失败", }) return } // 将用户信息通过 Header 传递给下游服务 r.Request.Header.Set("X-User-Info", gconv.String(&user)) } proxy.ServeHTTP(r.Response.Writer, r.Request) }) } func main() { StartServerProxy() http.Httpserver.Run() // select {} }