diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a4d3cc11..c577cbb21 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ This changelog keeps track of work items that have been completed and are ready ### New - **General**: TODO ([#TODO](https://github.com/kedacore/http-add-on/issues/TODO)) +- **Scaler**: Add OpenTelemetry metrics and distributed tracing to the external scaler ([#965](https://github.com/kedacore/http-add-on/issues/965)) ### Improvements diff --git a/go.mod b/go.mod index 45a7d0108..b53bfd11e 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/prometheus/client_golang v1.23.2 github.com/stretchr/testify v1.11.1 github.com/tsenart/vegeta/v12 v12.13.0 + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.68.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 go.opentelemetry.io/otel v1.43.0 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.43.0 @@ -121,7 +122,7 @@ require ( golang.org/x/tools v0.44.0 // indirect gomodules.xyz/jsonpatch/v2 v2.5.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d // indirect gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 4599ee80d..2e25387f9 100644 --- a/go.sum +++ b/go.sum @@ -212,6 +212,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.68.0 h1:0Qx7VGBacMm9ZENQ7TnNObTYI4ShC+lHI16seduaxZo= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.68.0/go.mod h1:Sje3i3MjSPKTSPvVWCaL8ugBzJwik3u4smCjUeuupqg= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 h1:CqXxU8VOmDefoh0+ztfGaymYbhdB/tT3zs79QaZTNGY= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0/go.mod h1:BuhAPThV8PBHBvg8ZzZ/Ok3idOdhWIodywz2xEcRbJo= go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I= @@ -308,8 +310,8 @@ gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 h1:VPWxll4HlMw1Vs/qXtN7BvhZqsS9cdAittCNvVENElA= google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:7QBABkRtR8z+TEnmXTqIqwJLlzrZKVfAUm7tY3yGv0M= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 h1:m8qni9SQFH0tJc1X0vmnpw/0t+AImlSvp30sEupozUg= -google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d h1:wT2n40TBqFY6wiwazVK9/iTWbsQrgk5ZfCSVFLO9LQA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20260406210006-6f92a3bedf2d/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= google.golang.org/grpc v1.81.1 h1:VnnIIZ88UzOOKLukQi+ImGz8O1Wdp8nAGGnvOfEIWQQ= google.golang.org/grpc v1.81.1/go.mod h1:xGH9GfzOyMTGIOXBJmXt+BX/V0kcdQbdcuwQ/zNw42I= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= diff --git a/interceptor/config/metrics.go b/interceptor/config/metrics.go index 2ecdac4b4..6ffc2b958 100644 --- a/interceptor/config/metrics.go +++ b/interceptor/config/metrics.go @@ -1,21 +1,14 @@ package config import ( - "github.com/caarlos0/env/v11" + "github.com/kedacore/http-add-on/pkg/observability" ) // Metrics is the configuration for configuring metrics in the interceptor. -type Metrics struct { - // Sets whether or not to enable the Prometheus metrics exporter - OtelPrometheusExporterEnabled bool `env:"OTEL_PROM_EXPORTER_ENABLED" envDefault:"true"` - // Sets the port which the Prometheus compatible metrics endpoint should be served on - OtelPrometheusExporterPort int `env:"OTEL_PROM_EXPORTER_PORT" envDefault:"2223"` - // Sets whether or not to enable the OTEL metrics exporter - OtelHTTPExporterEnabled bool `env:"OTEL_EXPORTER_OTLP_METRICS_ENABLED" envDefault:"false"` -} +type Metrics = observability.MetricsConfig // MustParseMetrics parses standard configs and returns the // newly created config. Panics if parsing fails. func MustParseMetrics() Metrics { - return env.Must(env.ParseAs[Metrics]()) + return observability.MustParseMetricsConfig() } diff --git a/interceptor/config/tracing.go b/interceptor/config/tracing.go index a25c18c72..23378e2c5 100644 --- a/interceptor/config/tracing.go +++ b/interceptor/config/tracing.go @@ -1,19 +1,14 @@ package config import ( - "github.com/caarlos0/env/v11" + "github.com/kedacore/http-add-on/pkg/observability" ) // Tracing is the configuration for configuring tracing through the interceptor. -type Tracing struct { - // States whether tracing should be enabled, False by default - Enabled bool `env:"OTEL_EXPORTER_OTLP_TRACES_ENABLED" envDefault:"false"` - // Sets what tracing export to use, must be one of: console, http/protobuf, grpc - Exporter string `env:"OTEL_EXPORTER_OTLP_TRACES_PROTOCOL" envDefault:"console"` -} +type Tracing = observability.TracingConfig // MustParseTracing parses standard configs and returns the // newly created config. It panics if parsing fails. func MustParseTracing() Tracing { - return env.Must(env.ParseAs[Tracing]()) + return observability.MustParseTracingConfig() } diff --git a/interceptor/metrics/provider.go b/interceptor/metrics/provider.go index 76d9ef8a4..eac566051 100644 --- a/interceptor/metrics/provider.go +++ b/interceptor/metrics/provider.go @@ -1,53 +1,14 @@ package metrics import ( - "context" - "fmt" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" - "go.opentelemetry.io/otel/exporters/prometheus" sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" "github.com/kedacore/http-add-on/interceptor/config" - "github.com/kedacore/http-add-on/pkg/build" + "github.com/kedacore/http-add-on/pkg/observability" ) // NewMeterProvider creates a MeterProvider with Prometheus and/or OTLP readers. // Without readers, all instrument operations become no-ops. func NewMeterProvider(cfg config.Metrics, opts ...sdkmetric.Option) (*sdkmetric.MeterProvider, error) { - var options []sdkmetric.Option - - if cfg.OtelPrometheusExporterEnabled { - promExporter, err := prometheus.New( - // Skip scope labels since all metrics come from one OTel meter - prometheus.WithoutScopeInfo(), - ) - if err != nil { - return nil, fmt.Errorf("creating prometheus exporter: %w", err) - } - options = append(options, sdkmetric.WithReader(promExporter)) - } - - if cfg.OtelHTTPExporterEnabled { - otlpExporter, err := otlpmetrichttp.New(context.Background()) - if err != nil { - return nil, fmt.Errorf("creating OTLP exporter: %w", err) - } - options = append(options, sdkmetric.WithReader( - sdkmetric.NewPeriodicReader(otlpExporter), - )) - } - - options = append(options, sdkmetric.WithResource( - resource.NewSchemaless( - attribute.String("service.name", serviceName), - attribute.String("service.version", build.Version()), - ), - )) - - options = append(options, opts...) - - return sdkmetric.NewMeterProvider(options...), nil + return observability.NewMeterProvider(serviceName, cfg, opts...) } diff --git a/interceptor/tracing/tracing.go b/interceptor/tracing/tracing.go index 74a8eeec4..ddbd7f418 100644 --- a/interceptor/tracing/tracing.go +++ b/interceptor/tracing/tracing.go @@ -2,67 +2,17 @@ package tracing import ( "context" - "errors" - "strings" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" - "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" "go.opentelemetry.io/otel/propagation" - "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/sdk/trace" "github.com/kedacore/http-add-on/interceptor/config" + "github.com/kedacore/http-add-on/pkg/observability" ) const serviceName = "keda-http-interceptor" func SetupOTelSDK(ctx context.Context, tCfg config.Tracing) (shutdown func(context.Context) error, err error) { - var shutdownFuncs []func(context.Context) error - - // shutdown calls cleanup functions registered via shutdownFuncs. - // The errors from the calls are joined. - // Each registered cleanup will be invoked once. - shutdown = func(ctx context.Context) error { - var err error - for _, fn := range shutdownFuncs { - err = errors.Join(err, fn(ctx)) - } - shutdownFuncs = nil - return err - } - - handleErr := func(inErr error) { - err = errors.Join(inErr, shutdown(ctx)) - } - - res, err := newResource(serviceName) - if err != nil { - handleErr(err) - return - } - - prop := NewPropagator() - otel.SetTextMapPropagator(prop) - - tracerProvider, err := newTraceProvider(ctx, res, tCfg) - if err != nil { - handleErr(err) - return - } - shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown) - otel.SetTracerProvider(tracerProvider) - - return -} - -func newResource(serviceName string) (*resource.Resource, error) { - return resource.Merge(resource.Default(), - resource.NewSchemaless( - attribute.String("service.name", serviceName), - )) + return observability.SetupTracing(ctx, serviceName, tCfg) } func NewPropagator() propagation.TextMapPropagator { @@ -71,29 +21,3 @@ func NewPropagator() propagation.TextMapPropagator { propagation.Baggage{}, ) } - -func newTraceProvider(ctx context.Context, res *resource.Resource, tCfg config.Tracing) (*trace.TracerProvider, error) { - traceExporter, err := newExporter(ctx, tCfg) - if err != nil { - return nil, err - } - - traceProvider := trace.NewTracerProvider( - trace.WithBatcher(traceExporter), - trace.WithResource(res), - ) - return traceProvider, nil -} - -func newExporter(ctx context.Context, tCfg config.Tracing) (trace.SpanExporter, error) { - switch strings.ToLower(tCfg.Exporter) { - case "console": - return stdouttrace.New() - case "http/protobuf": - return otlptracehttp.New(ctx) - case "grpc": - return otlptracegrpc.New(ctx) - default: - return nil, errors.New("no valid tracing exporter defined") - } -} diff --git a/pkg/observability/config.go b/pkg/observability/config.go new file mode 100644 index 000000000..11f6693d3 --- /dev/null +++ b/pkg/observability/config.go @@ -0,0 +1,28 @@ +package observability + +import ( + "github.com/caarlos0/env/v11" +) + +// MetricsConfig is the configuration for OTEL metrics export. +type MetricsConfig struct { + OtelPrometheusExporterEnabled bool `env:"OTEL_PROM_EXPORTER_ENABLED" envDefault:"true"` + OtelPrometheusExporterPort int `env:"OTEL_PROM_EXPORTER_PORT" envDefault:"2223"` + OtelHTTPExporterEnabled bool `env:"OTEL_EXPORTER_OTLP_METRICS_ENABLED" envDefault:"false"` +} + +// TracingConfig is the configuration for OTEL tracing. +type TracingConfig struct { + Enabled bool `env:"OTEL_EXPORTER_OTLP_TRACES_ENABLED" envDefault:"false"` + Exporter string `env:"OTEL_EXPORTER_OTLP_TRACES_PROTOCOL" envDefault:"console"` +} + +// MustParseMetricsConfig parses metrics config from environment variables. +func MustParseMetricsConfig() MetricsConfig { + return env.Must(env.ParseAs[MetricsConfig]()) +} + +// MustParseTracingConfig parses tracing config from environment variables. +func MustParseTracingConfig() TracingConfig { + return env.Must(env.ParseAs[TracingConfig]()) +} diff --git a/pkg/observability/metrics.go b/pkg/observability/metrics.go new file mode 100644 index 000000000..e70bc86c6 --- /dev/null +++ b/pkg/observability/metrics.go @@ -0,0 +1,51 @@ +package observability + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" + "go.opentelemetry.io/otel/exporters/prometheus" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + + "github.com/kedacore/http-add-on/pkg/build" +) + +// NewMeterProvider creates a MeterProvider with Prometheus and/or OTLP readers. +// Without readers, all instrument operations become no-ops. +func NewMeterProvider(serviceName string, cfg MetricsConfig, opts ...sdkmetric.Option) (*sdkmetric.MeterProvider, error) { + var options []sdkmetric.Option + + if cfg.OtelPrometheusExporterEnabled { + promExporter, err := prometheus.New( + prometheus.WithoutScopeInfo(), + ) + if err != nil { + return nil, fmt.Errorf("creating prometheus exporter: %w", err) + } + options = append(options, sdkmetric.WithReader(promExporter)) + } + + if cfg.OtelHTTPExporterEnabled { + otlpExporter, err := otlpmetrichttp.New(context.Background()) + if err != nil { + return nil, fmt.Errorf("creating OTLP exporter: %w", err) + } + options = append(options, sdkmetric.WithReader( + sdkmetric.NewPeriodicReader(otlpExporter), + )) + } + + options = append(options, sdkmetric.WithResource( + resource.NewSchemaless( + attribute.String("service.name", serviceName), + attribute.String("service.version", build.Version()), + ), + )) + + options = append(options, opts...) + + return sdkmetric.NewMeterProvider(options...), nil +} diff --git a/pkg/observability/tracing.go b/pkg/observability/tracing.go new file mode 100644 index 000000000..4d5181896 --- /dev/null +++ b/pkg/observability/tracing.go @@ -0,0 +1,89 @@ +package observability + +import ( + "context" + "errors" + "strings" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" +) + +// SetupTracing initialises the global TracerProvider. +func SetupTracing(ctx context.Context, serviceName string, cfg TracingConfig) (shutdown func(context.Context) error, err error) { + var shutdownFuncs []func(context.Context) error + + shutdown = func(ctx context.Context) error { + var err error + for _, fn := range shutdownFuncs { + err = errors.Join(err, fn(ctx)) + } + shutdownFuncs = nil + return err + } + + handleErr := func(inErr error) { + err = errors.Join(inErr, shutdown(ctx)) + } + + res, err := newResource(serviceName) + if err != nil { + handleErr(err) + return + } + + prop := propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ) + otel.SetTextMapPropagator(prop) + + tracerProvider, err := newTraceProvider(ctx, res, cfg.Exporter) + if err != nil { + handleErr(err) + return + } + shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown) + otel.SetTracerProvider(tracerProvider) + + return +} + +func newResource(serviceName string) (*resource.Resource, error) { + return resource.Merge(resource.Default(), + resource.NewSchemaless( + attribute.String("service.name", serviceName), + )) +} + +func newTraceProvider(ctx context.Context, res *resource.Resource, exporter string) (*trace.TracerProvider, error) { + traceExporter, err := newExporter(ctx, exporter) + if err != nil { + return nil, err + } + + traceProvider := trace.NewTracerProvider( + trace.WithBatcher(traceExporter), + trace.WithResource(res), + ) + return traceProvider, nil +} + +func newExporter(ctx context.Context, exporter string) (trace.SpanExporter, error) { + switch strings.ToLower(exporter) { + case "console": + return stdouttrace.New() + case "http/protobuf": + return otlptracehttp.New(ctx) + case "grpc": + return otlptracegrpc.New(ctx) + default: + return nil, errors.New("no valid tracing exporter defined") + } +} diff --git a/scaler/config.go b/scaler/config.go index 002feda51..859ab1494 100644 --- a/scaler/config.go +++ b/scaler/config.go @@ -4,6 +4,8 @@ import ( "time" "github.com/caarlos0/env/v11" + + "github.com/kedacore/http-add-on/pkg/observability" ) type config struct { @@ -30,6 +32,9 @@ type config struct { ProfilingAddr string `env:"PROFILING_BIND_ADDRESS" envDefault:""` // StreamIntervalMS is the interval in milliseconds between stream ticks StreamIntervalMS int `env:"KEDA_HTTP_SCALER_STREAM_INTERVAL_MS" envDefault:"200"` + + Metrics observability.MetricsConfig `envPrefix:""` + Tracing observability.TracingConfig `envPrefix:""` } func mustParseConfig() config { diff --git a/scaler/main.go b/scaler/main.go index 4a2e4818a..fe60529e5 100644 --- a/scaler/main.go +++ b/scaler/main.go @@ -13,10 +13,13 @@ import ( "net/http" _ "net/http/pprof" //nolint:gosec // G108: pprof intentionally exposed, gated by --profiling-addr "os" + "runtime" "time" "github.com/go-logr/logr" "github.com/kedacore/keda/v2/pkg/scalers/externalscaler" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/health" @@ -29,8 +32,11 @@ import ( "github.com/kedacore/http-add-on/pkg/build" kedacache "github.com/kedacore/http-add-on/pkg/cache" + kedahttp "github.com/kedacore/http-add-on/pkg/http" "github.com/kedacore/http-add-on/pkg/k8s" + "github.com/kedacore/http-add-on/pkg/observability" "github.com/kedacore/http-add-on/pkg/util" + "github.com/kedacore/http-add-on/scaler/metrics" ) var setupLog = ctrl.Log.WithName("setup") @@ -40,6 +46,7 @@ var setupLog = ctrl.Log.WithName("setup") // +kubebuilder:rbac:groups=http.keda.sh,resources=interceptorroutes,verbs=get;list;watch func main() { + defer os.Exit(1) cfg := mustParseConfig() namespace := cfg.TargetNamespace svcName := cfg.TargetService @@ -53,10 +60,33 @@ func main() { ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) + setupLog.Info( + "starting scaler", + "metricsConfig", cfg.Metrics, + "tracingConfig", cfg.Tracing, + ) + + provider, err := metrics.NewMeterProvider(cfg.Metrics) + if err != nil { + setupLog.Error(err, "failed to create meter provider") + runtime.Goexit() + } + defer func() { + if err := provider.Shutdown(context.Background()); err != nil { + setupLog.Error(err, "error shutting down meter provider") + } + }() + + instruments, err := metrics.NewInstruments(provider) + if err != nil { + setupLog.Error(err, "failed to create metric instruments") + runtime.Goexit() + } + k8sCfg, err := ctrl.GetConfig() if err != nil { setupLog.Error(err, "Kubernetes client config not found") - os.Exit(1) + runtime.Goexit() } ctrlCache, err := cache.New(k8sCfg, cache.Options{ @@ -65,16 +95,29 @@ func main() { }) if err != nil { setupLog.Error(err, "creating cache") - os.Exit(1) + runtime.Goexit() } - pinger := newQueuePinger(ctrl.Log, k8s.EndpointsFuncForControllerClient(ctrlCache), namespace, svcName, deplName, targetPortStr) + pinger := newQueuePinger(ctrl.Log, k8s.EndpointsFuncForControllerClient(ctrlCache), namespace, svcName, deplName, targetPortStr, instruments) ctx := ctrl.SetupSignalHandler() ctx = util.ContextWithLogger(ctx, setupLog) eg, ctx := errgroup.WithContext(ctx) + if cfg.Tracing.Enabled { + shutdown, err := observability.SetupTracing(ctx, metrics.ServiceName, cfg.Tracing) + if err != nil { + setupLog.Error(err, "error setting up tracer") + runtime.Goexit() + } + defer func() { + if shutdownErr := shutdown(context.Background()); shutdownErr != nil { + setupLog.Error(shutdownErr, "error during tracer shutdown") + } + }() + } + // start the controller-runtime cache eg.Go(func() error { setupLog.Info("starting the controller-runtime cache") @@ -95,7 +138,7 @@ func main() { // Wait for cache to sync before starting components that depend on it if !ctrlCache.WaitForCacheSync(ctx) { setupLog.Error(nil, "cache failed to sync") - os.Exit(1) + runtime.Goexit() } eg.Go(func() error { @@ -120,11 +163,21 @@ func main() { return nil }) + if cfg.Metrics.OtelPrometheusExporterEnabled { + eg.Go(func() error { + if err := runMetricsServer(ctx, ctrl.Log, cfg.Metrics); !util.IsIgnoredErr(err) { + setupLog.Error(err, "could not start the Prometheus metrics server") + return err + } + return nil + }) + } + build.PrintComponentInfo(ctrl.Log, "Scaler") if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) { setupLog.Error(err, "fatal error") - os.Exit(1) + runtime.Goexit() } setupLog.Info("Bye!") @@ -139,7 +192,12 @@ func startGrpcServer(ctx context.Context, cfg config, lggr logr.Logger, pinger * return err } - grpcServer := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if cfg.Tracing.Enabled { + grpcOpts = append(grpcOpts, grpc.StatsHandler(otelgrpc.NewServerHandler())) + } + + grpcServer := grpc.NewServer(grpcOpts...) reflection.Register(grpcServer) hs := health.NewServer() @@ -180,3 +238,11 @@ func startGrpcServer(ctx context.Context, cfg config, lggr logr.Logger, pinger * return grpcServer.Serve(lis) } + +func runMetricsServer(ctx context.Context, lggr logr.Logger, metricsCfg observability.MetricsConfig) error { + lggr.Info("starting the prometheus metrics server", "port", metricsCfg.OtelPrometheusExporterPort, "path", "/metrics") + addr := fmt.Sprintf("0.0.0.0:%d", metricsCfg.OtelPrometheusExporterPort) + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + return kedahttp.ServeContext(ctx, addr, mux, nil) +} diff --git a/scaler/metrics/instruments.go b/scaler/metrics/instruments.go new file mode 100644 index 000000000..b91669978 --- /dev/null +++ b/scaler/metrics/instruments.go @@ -0,0 +1,85 @@ +package metrics + +import ( + "context" + "fmt" + "time" + + api "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" +) + +const ( + meterName = "keda-external-scaler" + + // ServiceName is the OTEL service.name used for both metrics and tracing. + ServiceName = "keda-http-external-scaler" + + MetricPingerFetchDuration = "scaler.pinger.fetch.duration" + MetricPingerFetchErrors = "scaler.pinger.fetch.errors" + MetricPingerEndpoints = "scaler.pinger.endpoints" +) + +// Instruments holds all metric instruments for the external scaler. +type Instruments struct { + pingerFetchDuration api.Float64Histogram + pingerFetchErrors api.Int64Counter + pingerEndpoints api.Int64Gauge +} + +// NewNoopInstruments returns Instruments backed by a no-op provider, for use in tests. +func NewNoopInstruments() *Instruments { + i, err := NewInstruments(sdkmetric.NewMeterProvider()) + if err != nil { + panic("creating noop instruments: " + err.Error()) + } + return i +} + +// NewInstruments creates metric instruments from a MeterProvider. +func NewInstruments(provider *sdkmetric.MeterProvider) (*Instruments, error) { + meter := provider.Meter(meterName) + + pingerFetchDuration, err := meter.Float64Histogram( + MetricPingerFetchDuration, + api.WithDescription("Duration of a queue pinger fetch cycle across all interceptor pods"), + api.WithUnit("s"), + api.WithExplicitBucketBoundaries( + 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, + ), + ) + if err != nil { + return nil, fmt.Errorf("creating pinger fetch duration histogram: %w", err) + } + + pingerFetchErrors, err := meter.Int64Counter( + MetricPingerFetchErrors, + api.WithDescription("Total failed queue pinger fetch cycles"), + ) + if err != nil { + return nil, fmt.Errorf("creating pinger fetch errors counter: %w", err) + } + + pingerEndpoints, err := meter.Int64Gauge( + MetricPingerEndpoints, + api.WithDescription("Number of interceptor endpoints the scaler is polling"), + ) + if err != nil { + return nil, fmt.Errorf("creating pinger endpoints gauge: %w", err) + } + + return &Instruments{ + pingerFetchDuration: pingerFetchDuration, + pingerFetchErrors: pingerFetchErrors, + pingerEndpoints: pingerEndpoints, + }, nil +} + +// RecordFetch records a completed pinger fetch cycle. +func (i *Instruments) RecordFetch(duration time.Duration, endpointCount int, fetchErr error) { + i.pingerFetchDuration.Record(context.Background(), duration.Seconds()) + i.pingerEndpoints.Record(context.Background(), int64(endpointCount)) + if fetchErr != nil { + i.pingerFetchErrors.Add(context.Background(), 1) + } +} diff --git a/scaler/metrics/prometheus_test.go b/scaler/metrics/prometheus_test.go new file mode 100644 index 000000000..010537b7e --- /dev/null +++ b/scaler/metrics/prometheus_test.go @@ -0,0 +1,107 @@ +package metrics + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + promexporter "go.opentelemetry.io/otel/exporters/prometheus" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + + "github.com/kedacore/http-add-on/pkg/observability" +) + +func testRegistry(t *testing.T) (*prometheus.Registry, *Instruments) { + t.Helper() + + registry := prometheus.NewRegistry() + exporter, err := promexporter.New( + promexporter.WithRegisterer(registry), + promexporter.WithoutScopeInfo(), + ) + if err != nil { + t.Fatalf("creating prometheus exporter: %v", err) + } + + provider, err := NewMeterProvider( + observability.MetricsConfig{OtelPrometheusExporterEnabled: false}, + sdkmetric.WithReader(exporter), + ) + if err != nil { + t.Fatalf("NewMeterProvider() error: %v", err) + } + t.Cleanup(func() { _ = provider.Shutdown(context.Background()) }) + + instruments, err := NewInstruments(provider) + if err != nil { + t.Fatalf("NewInstruments() error: %v", err) + } + + return registry, instruments +} + +func TestPrometheus_FetchDuration(t *testing.T) { + registry, instruments := testRegistry(t) + + instruments.RecordFetch(50*time.Millisecond, 3, nil) + instruments.RecordFetch(200*time.Millisecond, 3, nil) + + expected := ` + # HELP scaler_pinger_fetch_duration_seconds Duration of a queue pinger fetch cycle across all interceptor pods + # TYPE scaler_pinger_fetch_duration_seconds histogram + scaler_pinger_fetch_duration_seconds_bucket{le="0.005"} 0 + scaler_pinger_fetch_duration_seconds_bucket{le="0.01"} 0 + scaler_pinger_fetch_duration_seconds_bucket{le="0.025"} 0 + scaler_pinger_fetch_duration_seconds_bucket{le="0.05"} 1 + scaler_pinger_fetch_duration_seconds_bucket{le="0.075"} 1 + scaler_pinger_fetch_duration_seconds_bucket{le="0.1"} 1 + scaler_pinger_fetch_duration_seconds_bucket{le="0.25"} 2 + scaler_pinger_fetch_duration_seconds_bucket{le="0.5"} 2 + scaler_pinger_fetch_duration_seconds_bucket{le="0.75"} 2 + scaler_pinger_fetch_duration_seconds_bucket{le="1"} 2 + scaler_pinger_fetch_duration_seconds_bucket{le="2.5"} 2 + scaler_pinger_fetch_duration_seconds_bucket{le="5"} 2 + scaler_pinger_fetch_duration_seconds_bucket{le="+Inf"} 2 + scaler_pinger_fetch_duration_seconds_sum 0.25 + scaler_pinger_fetch_duration_seconds_count 2 + ` + if err := testutil.CollectAndCompare(registry, strings.NewReader(expected), "scaler_pinger_fetch_duration_seconds"); err != nil { + t.Fatalf("unexpected metrics output:\n%v", err) + } +} + +func TestPrometheus_FetchErrors(t *testing.T) { + registry, instruments := testRegistry(t) + + instruments.RecordFetch(10*time.Millisecond, 2, errors.New("connection refused")) + instruments.RecordFetch(10*time.Millisecond, 2, nil) + instruments.RecordFetch(10*time.Millisecond, 0, errors.New("no endpoints")) + + expected := ` + # HELP scaler_pinger_fetch_errors_total Total failed queue pinger fetch cycles + # TYPE scaler_pinger_fetch_errors_total counter + scaler_pinger_fetch_errors_total 2 + ` + if err := testutil.CollectAndCompare(registry, strings.NewReader(expected), "scaler_pinger_fetch_errors_total"); err != nil { + t.Fatalf("unexpected metrics output:\n%v", err) + } +} + +func TestPrometheus_Endpoints(t *testing.T) { + registry, instruments := testRegistry(t) + + instruments.RecordFetch(10*time.Millisecond, 5, nil) + + expected := ` + # HELP scaler_pinger_endpoints Number of interceptor endpoints the scaler is polling + # TYPE scaler_pinger_endpoints gauge + scaler_pinger_endpoints 5 + ` + if err := testutil.CollectAndCompare(registry, strings.NewReader(expected), "scaler_pinger_endpoints"); err != nil { + t.Fatalf("unexpected metrics output:\n%v", err) + } +} diff --git a/scaler/metrics/provider.go b/scaler/metrics/provider.go new file mode 100644 index 000000000..8d0034e41 --- /dev/null +++ b/scaler/metrics/provider.go @@ -0,0 +1,13 @@ +package metrics + +import ( + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + + "github.com/kedacore/http-add-on/pkg/observability" +) + +// NewMeterProvider creates a MeterProvider with Prometheus and/or OTLP readers. +// Without readers, all instrument operations become no-ops. +func NewMeterProvider(cfg observability.MetricsConfig, opts ...sdkmetric.Option) (*sdkmetric.MeterProvider, error) { + return observability.NewMeterProvider(ServiceName, cfg, opts...) +} diff --git a/scaler/queue_pinger.go b/scaler/queue_pinger.go index 0f9e6e6d3..5f97db300 100644 --- a/scaler/queue_pinger.go +++ b/scaler/queue_pinger.go @@ -16,6 +16,7 @@ import ( "github.com/kedacore/http-add-on/pkg/k8s" "github.com/kedacore/http-add-on/pkg/queue" + "github.com/kedacore/http-add-on/scaler/metrics" ) type PingerStatus int32 @@ -45,7 +46,7 @@ type aggregatedCount struct { // // Sample usage: // -// pinger := newQueuePinger(lggr, getEndpointsFn, ns, svcName, deplName, adminPort) +// pinger := newQueuePinger(lggr, getEndpointsFn, ns, svcName, deplName, adminPort, instruments) // go pinger.start(ctx, ticker) type queuePinger struct { getEndpointsFn k8s.GetEndpointsFunc @@ -58,6 +59,7 @@ type queuePinger struct { allCounts map[string]aggregatedCount lggr logr.Logger status PingerStatus + instruments *metrics.Instruments // prevPodCounts tracks the previous RequestCount per pod per key // so we can compute deltas between consecutive polls. @@ -68,7 +70,7 @@ type queuePinger struct { rateBuckets map[string]*queue.RequestsBuckets } -func newQueuePinger(lggr logr.Logger, getEndpointsFn k8s.GetEndpointsFunc, ns, svcName, deplName, adminPort string) *queuePinger { +func newQueuePinger(lggr logr.Logger, getEndpointsFn k8s.GetEndpointsFunc, ns, svcName, deplName, adminPort string, instruments *metrics.Instruments) *queuePinger { return &queuePinger{ getEndpointsFn: getEndpointsFn, interceptorNS: ns, @@ -76,6 +78,7 @@ func newQueuePinger(lggr logr.Logger, getEndpointsFn k8s.GetEndpointsFunc, ns, s interceptorServiceName: deplName, adminPort: adminPort, lggr: lggr, + instruments: instruments, allCounts: map[string]aggregatedCount{}, prevPodCounts: map[string]map[string]int64{}, rateBuckets: map[string]*queue.RequestsBuckets{}, @@ -152,7 +155,9 @@ func (q *queuePinger) fetchAndSaveCounts(ctx context.Context) error { q.pingMut.Lock() defer q.pingMut.Unlock() - perPod, err := fetchCountsPerPod(ctx, q.lggr, q.getEndpointsFn, q.interceptorNS, q.interceptorSvcName, q.adminPort) + fetchStart := time.Now() + result, err := fetchCountsPerPod(ctx, q.lggr, q.getEndpointsFn, q.interceptorNS, q.interceptorSvcName, q.adminPort) + q.instruments.RecordFetch(time.Since(fetchStart), result.endpointCount, err) if err != nil { q.lggr.Error(err, "getting request counts") q.status = PingerERROR @@ -160,6 +165,8 @@ func (q *queuePinger) fetchAndSaveCounts(ctx context.Context) error { } q.status = PingerACTIVE + perPod := result.perPod + now := time.Now() // Per-key aggregated concurrency and request-count delta. @@ -229,18 +236,25 @@ func (q *queuePinger) fetchAndSaveCounts(ctx context.Context) error { return nil } +// fetchResult holds the outcome of a fetch cycle. +type fetchResult struct { + perPod map[string]queue.Counts + endpointCount int +} + // fetchCountsPerPod fetches counts from every interceptor pod endpoint -// and returns the raw per-pod results keyed by pod URL string. -func fetchCountsPerPod(ctx context.Context, lggr logr.Logger, endpointsFn k8s.GetEndpointsFunc, ns, svcName, adminPort string) (map[string]queue.Counts, error) { +// and returns the raw per-pod results keyed by pod URL string along with +// the total number of endpoints that were polled. +func fetchCountsPerPod(ctx context.Context, lggr logr.Logger, endpointsFn k8s.GetEndpointsFunc, ns, svcName, adminPort string) (fetchResult, error) { lggr = lggr.WithName("queuePinger.requestCounts") endpointURLs, err := k8s.EndpointsForService(ctx, ns, svcName, adminPort, endpointsFn) if err != nil { - return nil, err + return fetchResult{}, err } if len(endpointURLs) == 0 { - return nil, fmt.Errorf("there isn't any valid interceptor endpoint") + return fetchResult{}, fmt.Errorf("there isn't any valid interceptor endpoint") } type podResult struct { @@ -266,7 +280,7 @@ func fetchCountsPerPod(ctx context.Context, lggr logr.Logger, endpointsFn k8s.Ge if err := fetchGrp.Wait(); err != nil { lggr.Error(err, "fetching all counts failed") - return nil, err + return fetchResult{endpointCount: len(endpointURLs)}, err } close(resultCh) @@ -274,7 +288,7 @@ func fetchCountsPerPod(ctx context.Context, lggr logr.Logger, endpointsFn k8s.Ge for r := range resultCh { perPod[r.key] = r.counts } - return perPod, nil + return fetchResult{perPod: perPod, endpointCount: len(endpointURLs)}, nil } func podKey(u url.URL) string { diff --git a/scaler/queue_pinger_test.go b/scaler/queue_pinger_test.go index 8d9e2d6c9..2349a3c4e 100644 --- a/scaler/queue_pinger_test.go +++ b/scaler/queue_pinger_test.go @@ -18,6 +18,7 @@ import ( "github.com/kedacore/http-add-on/pkg/k8s" kedanet "github.com/kedacore/http-add-on/pkg/net" "github.com/kedacore/http-add-on/pkg/queue" + "github.com/kedacore/http-add-on/scaler/metrics" ) func TestCounts(t *testing.T) { @@ -55,6 +56,7 @@ func TestCounts(t *testing.T) { svcName, deplName, srvURL.Port(), + metrics.NewNoopInstruments(), ) ticker := time.NewTicker(tickDur) @@ -106,6 +108,7 @@ func TestFetchAndSaveCounts(t *testing.T) { svcName, deplName, srvURL.Port(), + metrics.NewNoopInstruments(), ) r.NoError(pinger.fetchAndSaveCounts(ctx)) @@ -143,7 +146,7 @@ func TestFetchCountsPerPod(t *testing.T) { return endpoints, nil } - perPod, err := fetchCountsPerPod( + result, err := fetchCountsPerPod( ctx, logr.Discard(), endpointsFn, @@ -152,9 +155,10 @@ func TestFetchCountsPerPod(t *testing.T) { fmt.Sprintf("%v", srvURL.Port()), ) r.NoError(err) - r.Len(perPod, 1) + r.Len(result.perPod, 1) + r.Equal(1, result.endpointCount) - for _, counts := range perPod { + for _, counts := range result.perPod { for host, n := range hosts { c, ok := counts[host] r.True(ok, "host %s missing from pod result", host) @@ -219,6 +223,7 @@ func TestFetchAndSaveCounts_MultiPodLifecycle(t *testing.T) { svcName, deplName, adminPort, + metrics.NewNoopInstruments(), ) // Baseline with one pod. @@ -286,6 +291,7 @@ func TestRateComputation(t *testing.T) { svcName, deplName, srvURL.Port(), + metrics.NewNoopInstruments(), ) // First poll: establishes baseline (no delta, RequestRate=0) @@ -335,6 +341,7 @@ func TestRateComputationCounterReset(t *testing.T) { svcName, deplName, srvURL.Port(), + metrics.NewNoopInstruments(), ) // First poll: baseline @@ -407,6 +414,7 @@ func newFakeQueuePinger(lggr logr.Logger, optsFuncs ...optsFunc) (*time.Ticker, "testsvc", "testdepl", opts.port, + metrics.NewNoopInstruments(), ) return ticker, pinger, nil }