-
Notifications
You must be signed in to change notification settings - Fork 158
feat(scaler): add observability (metrics + tracing) to the external scaler #1634
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,20 @@ type config struct { | |
| ProfilingAddr string `env:"PROFILING_BIND_ADDRESS" envDefault:""` | ||
| // StreamIntervalMS is the interval in milliseconds between stream ticks | ||
| StreamIntervalMS int `env:"KEDA_HTTP_SCALER_STREAM_INTERVAL_MS" envDefault:"200"` | ||
|
|
||
| Metrics metricsConfig `envPrefix:""` | ||
| Tracing tracingConfig `envPrefix:""` | ||
| } | ||
|
|
||
| type metricsConfig struct { | ||
| OtelPrometheusExporterEnabled bool `env:"OTEL_PROM_EXPORTER_ENABLED" envDefault:"true"` | ||
| OtelPrometheusExporterPort int `env:"OTEL_PROM_EXPORTER_PORT" envDefault:"2224"` | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason to not use the same port we use for the interceptor metrics? |
||
| OtelHTTPExporterEnabled bool `env:"OTEL_EXPORTER_OTLP_METRICS_ENABLED" envDefault:"false"` | ||
| } | ||
|
|
||
| type tracingConfig struct { | ||
| Enabled bool `env:"OTEL_EXPORTER_OTLP_TRACES_ENABLED" envDefault:"false"` | ||
| Exporter string `env:"OTEL_EXPORTER_OTLP_TRACES_PROTOCOL" envDefault:"console"` | ||
| } | ||
|
|
||
| func mustParseConfig() config { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,10 +13,13 @@ import ( | |
| "net/http" | ||
| _ "net/http/pprof" //nolint:gosec // G108: pprof intentionally exposed, gated by --profiling-addr | ||
| "os" | ||
| "runtime" | ||
| "time" | ||
|
|
||
| "github.com/go-logr/logr" | ||
| "github.com/kedacore/keda/v2/pkg/scalers/externalscaler" | ||
| "github.com/prometheus/client_golang/prometheus/promhttp" | ||
| "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" | ||
| "golang.org/x/sync/errgroup" | ||
| "google.golang.org/grpc" | ||
| "google.golang.org/grpc/health" | ||
|
|
@@ -29,8 +32,11 @@ import ( | |
|
|
||
| "github.com/kedacore/http-add-on/pkg/build" | ||
| kedacache "github.com/kedacore/http-add-on/pkg/cache" | ||
| kedahttp "github.com/kedacore/http-add-on/pkg/http" | ||
| "github.com/kedacore/http-add-on/pkg/k8s" | ||
| "github.com/kedacore/http-add-on/pkg/util" | ||
| "github.com/kedacore/http-add-on/scaler/metrics" | ||
| "github.com/kedacore/http-add-on/scaler/tracing" | ||
| ) | ||
|
|
||
| var setupLog = ctrl.Log.WithName("setup") | ||
|
|
@@ -53,10 +59,36 @@ func main() { | |
|
|
||
| ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) | ||
|
|
||
| setupLog.Info( | ||
| "starting scaler", | ||
| "metricsConfig", cfg.Metrics, | ||
| "tracingConfig", cfg.Tracing, | ||
| ) | ||
|
|
||
| provider, err := metrics.NewMeterProvider( | ||
| cfg.Metrics.OtelPrometheusExporterEnabled, | ||
| cfg.Metrics.OtelHTTPExporterEnabled, | ||
| ) | ||
| if err != nil { | ||
| setupLog.Error(err, "failed to create meter provider") | ||
| os.Exit(1) | ||
| } | ||
| defer func() { | ||
| if err := provider.Shutdown(context.Background()); err != nil { | ||
| setupLog.Error(err, "error shutting down meter provider") | ||
| } | ||
| }() | ||
|
|
||
| instruments, err := metrics.NewInstruments(provider) | ||
| if err != nil { | ||
| setupLog.Error(err, "failed to create metric instruments") | ||
| runtime.Goexit() | ||
| } | ||
|
|
||
| k8sCfg, err := ctrl.GetConfig() | ||
| if err != nil { | ||
| setupLog.Error(err, "Kubernetes client config not found") | ||
| os.Exit(1) | ||
| runtime.Goexit() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason for using runtime.Goexit now? If this is intended we should probably also add the |
||
| } | ||
|
|
||
| ctrlCache, err := cache.New(k8sCfg, cache.Options{ | ||
|
|
@@ -65,16 +97,29 @@ func main() { | |
| }) | ||
| if err != nil { | ||
| setupLog.Error(err, "creating cache") | ||
| os.Exit(1) | ||
| runtime.Goexit() | ||
| } | ||
|
|
||
| pinger := newQueuePinger(ctrl.Log, k8s.EndpointsFuncForControllerClient(ctrlCache), namespace, svcName, deplName, targetPortStr) | ||
| pinger := newQueuePinger(ctrl.Log, k8s.EndpointsFuncForControllerClient(ctrlCache), namespace, svcName, deplName, targetPortStr, instruments) | ||
|
|
||
| ctx := ctrl.SetupSignalHandler() | ||
| ctx = util.ContextWithLogger(ctx, setupLog) | ||
|
|
||
| eg, ctx := errgroup.WithContext(ctx) | ||
|
|
||
| if cfg.Tracing.Enabled { | ||
| shutdown, err := tracing.SetupOTelSDK(ctx, cfg.Tracing.Exporter) | ||
| if err != nil { | ||
| setupLog.Error(err, "error setting up tracer") | ||
| runtime.Goexit() | ||
| } | ||
| defer func() { | ||
| if shutdownErr := shutdown(context.Background()); shutdownErr != nil { | ||
| setupLog.Error(shutdownErr, "error during tracer shutdown") | ||
| } | ||
| }() | ||
| } | ||
|
|
||
| // start the controller-runtime cache | ||
| eg.Go(func() error { | ||
| setupLog.Info("starting the controller-runtime cache") | ||
|
|
@@ -95,7 +140,7 @@ func main() { | |
| // Wait for cache to sync before starting components that depend on it | ||
| if !ctrlCache.WaitForCacheSync(ctx) { | ||
| setupLog.Error(nil, "cache failed to sync") | ||
| os.Exit(1) | ||
| runtime.Goexit() | ||
| } | ||
|
|
||
| eg.Go(func() error { | ||
|
|
@@ -120,11 +165,21 @@ func main() { | |
| return nil | ||
| }) | ||
|
|
||
| if cfg.Metrics.OtelPrometheusExporterEnabled { | ||
| eg.Go(func() error { | ||
| if err := runMetricsServer(ctx, ctrl.Log, cfg.Metrics); !util.IsIgnoredErr(err) { | ||
| setupLog.Error(err, "could not start the Prometheus metrics server") | ||
| return err | ||
| } | ||
| return nil | ||
| }) | ||
| } | ||
|
|
||
| build.PrintComponentInfo(ctrl.Log, "Scaler") | ||
|
|
||
| if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) { | ||
| setupLog.Error(err, "fatal error") | ||
| os.Exit(1) | ||
| runtime.Goexit() | ||
| } | ||
|
|
||
| setupLog.Info("Bye!") | ||
|
|
@@ -139,7 +194,12 @@ func startGrpcServer(ctx context.Context, cfg config, lggr logr.Logger, pinger * | |
| return err | ||
| } | ||
|
|
||
| grpcServer := grpc.NewServer() | ||
| var grpcOpts []grpc.ServerOption | ||
| if cfg.Tracing.Enabled { | ||
| grpcOpts = append(grpcOpts, grpc.StatsHandler(otelgrpc.NewServerHandler())) | ||
| } | ||
|
|
||
| grpcServer := grpc.NewServer(grpcOpts...) | ||
| reflection.Register(grpcServer) | ||
|
|
||
| hs := health.NewServer() | ||
|
|
@@ -180,3 +240,11 @@ func startGrpcServer(ctx context.Context, cfg config, lggr logr.Logger, pinger * | |
|
|
||
| return grpcServer.Serve(lis) | ||
| } | ||
|
|
||
| func runMetricsServer(ctx context.Context, lggr logr.Logger, metricsCfg metricsConfig) error { | ||
| lggr.Info("starting the prometheus metrics server", "port", metricsCfg.OtelPrometheusExporterPort, "path", "/metrics") | ||
| addr := fmt.Sprintf("0.0.0.0:%d", metricsCfg.OtelPrometheusExporterPort) | ||
| mux := http.NewServeMux() | ||
| mux.Handle("/metrics", promhttp.Handler()) | ||
| return kedahttp.ServeContext(ctx, addr, mux, nil) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,81 @@ | ||
| package metrics | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably also add a test like the prometheus_test.go the interceptor has right now? |
||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "time" | ||
|
|
||
| "go.opentelemetry.io/otel/attribute" | ||
| api "go.opentelemetry.io/otel/metric" | ||
| sdkmetric "go.opentelemetry.io/otel/sdk/metric" | ||
| ) | ||
|
|
||
| const ( | ||
| meterName = "keda-external-scaler" | ||
|
|
||
| // ServiceName is the OTEL service.name used for both metrics and tracing. | ||
| ServiceName = "keda-http-external-scaler" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably also align the service name of the interceptor to use this naming scheme |
||
|
|
||
| MetricPingerFetchDuration = "scaler.pinger.fetch.duration" | ||
| MetricPingerFetchErrors = "scaler.pinger.fetch.errors" | ||
| MetricPingerEndpoints = "scaler.pinger.endpoints" | ||
|
|
||
| AttrNamespace = "namespace" | ||
| AttrService = "service" | ||
|
Comment on lines
+23
to
+24
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unused vars |
||
| ) | ||
|
|
||
| // Instruments holds all metric instruments for the external scaler. | ||
| type Instruments struct { | ||
| pingerFetchDuration api.Float64Histogram | ||
| pingerFetchErrors api.Int64Counter | ||
| pingerEndpoints api.Int64Gauge | ||
| } | ||
|
|
||
| // NewInstruments creates metric instruments from a MeterProvider. | ||
| func NewInstruments(provider *sdkmetric.MeterProvider) (*Instruments, error) { | ||
| meter := provider.Meter(meterName) | ||
|
|
||
| pingerFetchDuration, err := meter.Float64Histogram( | ||
| MetricPingerFetchDuration, | ||
| api.WithDescription("Duration of a queue pinger fetch cycle across all interceptor pods"), | ||
| api.WithUnit("s"), | ||
| api.WithExplicitBucketBoundaries( | ||
| 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, | ||
| ), | ||
| ) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("creating pinger fetch duration histogram: %w", err) | ||
| } | ||
|
|
||
| pingerFetchErrors, err := meter.Int64Counter( | ||
| MetricPingerFetchErrors, | ||
| api.WithDescription("Total failed queue pinger fetch cycles"), | ||
| ) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("creating pinger fetch errors counter: %w", err) | ||
| } | ||
|
|
||
| pingerEndpoints, err := meter.Int64Gauge( | ||
| MetricPingerEndpoints, | ||
| api.WithDescription("Number of interceptor endpoints the scaler is polling"), | ||
| ) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("creating pinger endpoints gauge: %w", err) | ||
| } | ||
|
|
||
| return &Instruments{ | ||
| pingerFetchDuration: pingerFetchDuration, | ||
| pingerFetchErrors: pingerFetchErrors, | ||
| pingerEndpoints: pingerEndpoints, | ||
| }, nil | ||
| } | ||
|
|
||
| // RecordFetch records a completed pinger fetch cycle. | ||
| func (i *Instruments) RecordFetch(duration time.Duration, endpointCount int, fetchErr error) { | ||
| attrs := api.WithAttributeSet(attribute.NewSet()) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can be removed as there are no attributes |
||
| i.pingerFetchDuration.Record(context.Background(), duration.Seconds(), attrs) | ||
| i.pingerEndpoints.Record(context.Background(), int64(endpointCount), attrs) | ||
| if fetchErr != nil { | ||
| i.pingerFetchErrors.Add(context.Background(), 1, attrs) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| package metrics | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks like a copy of interceptor/provider/metrics.go, we should deduplicate this code. |
||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| "go.opentelemetry.io/otel/attribute" | ||
| "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" | ||
| "go.opentelemetry.io/otel/exporters/prometheus" | ||
| sdkmetric "go.opentelemetry.io/otel/sdk/metric" | ||
| "go.opentelemetry.io/otel/sdk/resource" | ||
|
|
||
| "github.com/kedacore/http-add-on/pkg/build" | ||
| ) | ||
|
|
||
| // NewMeterProvider creates a MeterProvider with Prometheus and/or OTLP readers. | ||
| // Without readers, all instrument operations become no-ops. | ||
| func NewMeterProvider(promEnabled, otlpEnabled bool, opts ...sdkmetric.Option) (*sdkmetric.MeterProvider, error) { | ||
| var options []sdkmetric.Option | ||
|
|
||
| if promEnabled { | ||
| promExporter, err := prometheus.New( | ||
| prometheus.WithoutScopeInfo(), | ||
| ) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("creating prometheus exporter: %w", err) | ||
| } | ||
| options = append(options, sdkmetric.WithReader(promExporter)) | ||
| } | ||
|
|
||
| if otlpEnabled { | ||
| otlpExporter, err := otlpmetrichttp.New(context.Background()) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("creating OTLP exporter: %w", err) | ||
| } | ||
| options = append(options, sdkmetric.WithReader( | ||
| sdkmetric.NewPeriodicReader(otlpExporter), | ||
| )) | ||
| } | ||
|
|
||
| options = append(options, sdkmetric.WithResource( | ||
| resource.NewSchemaless( | ||
| attribute.String("service.name", ServiceName), | ||
| attribute.String("service.version", build.Version()), | ||
| ), | ||
| )) | ||
|
Fedosin marked this conversation as resolved.
|
||
|
|
||
| options = append(options, opts...) | ||
|
|
||
| return sdkmetric.NewMeterProvider(options...), nil | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we could also deduplicate the config to ensure it is consistent across components?