package jaeger import ( "context" "encoding/json" "strconv" "strings" "sync" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/ghttp" "github.com/gogf/gf/v2/net/gtrace" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/exporters/otlp/otlptrace" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.24.0" "go.opentelemetry.io/otel/trace" ) var ( ShutDown func(ctx context.Context) initOnce sync.Once ) // Init 初始化 Jaeger 链路追踪(延迟初始化,首次调用时执行) func Init() { initOnce.Do(func() { ctx := context.Background() jaegerAgent := g.Cfg().MustGet(ctx, "jaeger.addr").String() serverName := g.Cfg().MustGet(ctx, "server.name").String() if jaegerAgent == "" { g.Log().Warning(ctx, "⚠️ Jaeger 配置未找到,跳过初始化") ShutDown = func(ctx context.Context) {} return } traceExp, err := otlptrace.New(ctx, otlptracehttp.NewClient( otlptracehttp.WithEndpoint(jaegerAgent), otlptracehttp.WithURLPath("/v1/traces"), otlptracehttp.WithInsecure(), otlptracehttp.WithCompression(1), )) if err != nil { g.Log().Errorf(ctx, "OTLP exporter 创建失败: %v", err) ShutDown = func(ctx context.Context) {} return } res, err := resource.New(ctx, resource.WithFromEnv(), resource.WithProcess(), resource.WithTelemetrySDK(), resource.WithHost(), resource.WithAttributes( semconv.ServiceNameKey.String(serverName), ), ) if err != nil { g.Log().Errorf(ctx, "Resource 创建失败: %v", err) ShutDown = func(ctx context.Context) {} return } // 创建 TracerProvider,使用白名单 SpanProcessor // 只放行 ghttp.Server 和 gtrace 的 span 到 OTLP 导出器 // gdb / gredis / ghttp.Client 等内部 span 在导出管道中被丢弃 tp := sdktrace.NewTracerProvider( sdktrace.WithSampler(sdktrace.AlwaysSample()), sdktrace.WithResource(res), sdktrace.WithSpanProcessor(&allowlistProcessor{ next: sdktrace.NewBatchSpanProcessor(traceExp), allowed: map[string]bool{ "github.com/gogf/gf/v2/net/ghttp.Server": true, "github.com/gogf/gf/v2/net/gtrace": true, }, }), ) otel.SetTracerProvider(tp) otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( propagation.TraceContext{}, propagation.Baggage{}, )) // Using a package-level var for ShutDown makes the linter unhappy with err, so // shadow it in the closure. const shutdownTimeoutSec = 1 ShutDown = func(ctx context.Context) { ctx, cancel := context.WithTimeout(ctx, shutdownTimeoutSec) defer cancel() if err := tp.Shutdown(ctx); err != nil { g.Log().Errorf(ctx, "Shutdown tracerProvider failed err:%+v", err) } else { g.Log().Debug(ctx, "Shutdown tracerProvider success") } } g.Log().Infof(ctx, "✅ Jaeger 初始化成功: %s(仅 HTTP Server 链路)", jaegerAgent) }) } func init() { Init() } // allowlistProcessor 只放行指定 instrumentation scope 的 span 到底层导出器 type allowlistProcessor struct { next sdktrace.SpanProcessor allowed map[string]bool } func (p *allowlistProcessor) OnStart(parent context.Context, s sdktrace.ReadWriteSpan) { p.next.OnStart(parent, s) } func (p *allowlistProcessor) OnEnd(s sdktrace.ReadOnlySpan) { if p.allowed[s.InstrumentationScope().Name] { p.next.OnEnd(s) } // 不在白名单中的 span(gdb, gredis, ghttp.Client 等)直接丢弃 } func (p *allowlistProcessor) Shutdown(ctx context.Context) error { return p.next.Shutdown(ctx) } func (p *allowlistProcessor) ForceFlush(ctx context.Context) error { return p.next.ForceFlush(ctx) } // NewSpan 创建新的链路追踪 Span func NewSpan(ctx context.Context, spanName string, opts ...trace.SpanStartOption) (context.Context, *gtrace.Span) { ctx, span := otel.Tracer("github.com/gogf/gf/v2/net/gtrace").Start(ctx, spanName, opts...) return ctx, >race.Span{Span: span} } // RecordError 统一错误记录方法 func RecordError(ctx context.Context, err error, msg ...string) { if err == nil { return } if len(msg) > 0 && msg[0] != "" { g.Log().Errorf(ctx, "%s: %+v", msg[0], err) } else { g.Log().Errorf(ctx, "%+v", err) } span := trace.SpanFromContext(ctx) if span == nil || !span.IsRecording() { return } span.RecordError(err) span.SetAttributes( attribute.Bool("error", true), attribute.String("error.message", err.Error()), ) if len(msg) > 0 && msg[0] != "" { span.SetAttributes(attribute.String("error.msg", msg[0])) span.SetStatus(codes.Error, msg[0]+": "+err.Error()) return } span.SetStatus(codes.Error, err.Error()) } // NewTracer HTTP 请求链路追踪中间件 func NewTracer(r *ghttp.Request) { ctx, span := NewSpan(r.Context(), r.GetServeHandler().GetMetaTag("summary")) r.SetCtx(ctx) defer span.End() span.SetAttributes(attribute.String("request", getParams(r))) r.Middleware.Next() response := r.Response.BufferString() cleanResponse := strings.ToValidUTF8(response, "") if len(cleanResponse) > 1000 { cleanResponse = cleanResponse[:1000] + "... (truncated)" } span.SetAttributes(attribute.String("response", cleanResponse)) span.SetAttributes(attribute.Int("http.status_code", r.Response.Status)) if err := r.GetError(); err != nil { RecordError(ctx, err) return } if r.Response.Status >= 500 { span.SetAttributes(attribute.Bool("error", true)) span.SetStatus(codes.Error, "http status "+strconv.Itoa(r.Response.Status)) } } // getParams 提取请求参数(用于 Jaeger 记录) func getParams(r *ghttp.Request) string { params := map[string]interface{}{} if r.Method == "POST" { json.Unmarshal(r.GetBody(), ¶ms) } if r.Method == "GET" { r.Request.ParseForm() form := r.Form for k, v := range form { if vl, e := strconv.Atoi(v[0]); e == nil { params[k] = vl } else { params[k] = v[0] } } } rp, _ := json.Marshal(¶ms) return string(rp) }