diff --git a/internal/xds/resolver/cluster_specifier_plugin_test.go b/internal/xds/resolver/cluster_specifier_plugin_test.go index bd878b3b1773..7df61cc765dd 100644 --- a/internal/xds/resolver/cluster_specifier_plugin_test.go +++ b/internal/xds/resolver/cluster_specifier_plugin_test.go @@ -31,14 +31,17 @@ import ( "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/internal/xds/balancer/clustermanager" "google.golang.org/grpc/internal/xds/clusterspecifier" + "google.golang.org/grpc/internal/xds/httpfilter" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/wrapperspb" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3" ) func init() { @@ -337,3 +340,136 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) { }` verifyUpdateFromResolver(ctx, t, stateCh, wantSC) } + +// TestResolverClusterSpecifierPlugin_WithFilters tests the case where a route +// configuration containing cluster specifier plugins is sent by the management +// server, and HTTP filters are configured. The test verifies that the +// interceptor chain is built for routes matching cluster specifier plugins. +func (s) TestResolverClusterSpecifierPlugin_WithFilters(t *testing.T) { + // Register custom httpFilter builders for the test. + testFilterTypeURL1 := "test-filter-type-url-1" + uuid.New().String() + testFilterTypeURL2 := "test-filter-type-url-2" + uuid.New().String() + newStreamChan := testutils.NewChannelWithSize(2) + fb1 := &testHTTPFilterWithRPCMetadata{ + logger: t, + typeURL: testFilterTypeURL1, + newStreamChan: newStreamChan, + } + fb2 := &testHTTPFilterWithRPCMetadata{ + logger: t, + typeURL: testFilterTypeURL2, + newStreamChan: newStreamChan, + } + httpfilter.Register(fb1) + httpfilter.Register(fb2) + defer httpfilter.UnregisterForTesting(fb1.typeURL) + defer httpfilter.UnregisterForTesting(fb2.typeURL) + + // Spin up an xDS management server. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + nodeID := uuid.New().String() + mgmtServer, _, _, bc := setupManagementServerForTest(t, nodeID) + + // Configure resources on the management server. + // We need a listener with the filter, and a route with ClusterSpecifierPlugin. + listeners := []*v3listenerpb.Listener{{ + Name: defaultTestServiceName, + ApiListener: &v3listenerpb.ApiListener{ + ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{ + ConfigSource: &v3corepb.ConfigSource{ + ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}}, + }, + RouteConfigName: defaultTestRouteConfigName, + }}, + HttpFilters: []*v3httppb.HttpFilter{ + newHTTPFilter(t, "test-filter-1", testFilterTypeURL1, "filter-path-1", ""), + newHTTPFilter(t, "test-filter-2", testFilterTypeURL2, "filter-path-2", ""), + e2e.RouterHTTPFilter, + }, + }), + }, + }} + + routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{ + RouteConfigName: defaultTestRouteConfigName, + ListenerName: defaultTestServiceName, + ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin, + ClusterSpecifierPluginName: "cspA", + ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}), + })} + // Override the configuration for "test-filter-1" in the route. + routes[0].VirtualHosts[0].Routes[0].TypedPerFilterConfig = map[string]*anypb.Any{ + "test-filter-1": newHTTPFilter(t, "test-filter-1", testFilterTypeURL1, "override-path-1", "").GetTypedConfig(), + } + configureResources(ctx, t, mgmtServer, nodeID, listeners, routes, nil, nil) + + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc) + + // Wait for an update from the resolver, and verify the service config. + wantSC := ` + { + "loadBalancingConfig": [ + { + "xds_cluster_manager_experimental": { + "children": { + "cluster_specifier_plugin:cspA": { + "childPolicy": [ + { + "csp_experimental": { + "arbitrary_field": "anything" + } + } + ] + } + } + } + } + ] + }` + cs := verifyUpdateFromResolver(ctx, t, stateCh, wantSC) + res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) + if err != nil { + t.Fatalf("cs.SelectConfig(): %v", err) + } + + // Verify that the interceptor is not nil. + if res.Interceptor == nil { + t.Fatal("RPCInfo does not contain interceptors list") + } + + newStream := func(context.Context, func()) (iresolver.ClientStream, error) { + return nil, nil + } + + if _, err = res.Interceptor.NewStream(ctx, iresolver.RPCInfo{Method: "/service/method", Context: ctx}, func() {}, newStream); err != nil { + t.Fatalf("NewStream() failed with error: %v", err) + } + + // Verify that first filter receives the config. + cfg, err := newStreamChan.Receive(ctx) + if err != nil { + t.Fatalf("Timeout waiting for first filter to receive config: %v", err) + } + ofc := cfg.(overallFilterConfig) + if ofc.BasePath != "filter-path-1" { + t.Fatalf("Unexpected base path for first filter, got: %q, want: %q", ofc.BasePath, "filter-path-1") + } + if ofc.OverridePath != "override-path-1" { + t.Fatalf("Unexpected override path for first filter, got: %q, want: %q", ofc.OverridePath, "override-path-1") + } + + // Verify that second filter receives the base path. + cfg, err = newStreamChan.Receive(ctx) + if err != nil { + t.Fatalf("Timeout waiting for second filter to receive config: %v", err) + } + ofc = cfg.(overallFilterConfig) + if ofc.BasePath != "filter-path-2" { + t.Fatalf("Unexpected base path for second filter, got: %q, want: %q", ofc.BasePath, "filter-path-2") + } + if ofc.OverridePath != "" { + t.Fatalf("Unexpected override path for second filter, got: %q, want: %q", ofc.OverridePath, "") + } +} diff --git a/internal/xds/resolver/xds_resolver.go b/internal/xds/resolver/xds_resolver.go index 70ba351ca9e5..7b181feaa277 100644 --- a/internal/xds/resolver/xds_resolver.go +++ b/internal/xds/resolver/xds_resolver.go @@ -379,6 +379,30 @@ func (r *xdsResolver) sendNewServiceConfig(cs stoppableConfigSelector) bool { return true } +// addClusterToRoute creates a client interceptor for the given cluster +// configuration, adds the cluster to the provided WRR picker, and appends the +// interceptor to the list of interceptors for the current route. +func (r *xdsResolver) addClusterToRoute(clusters wrr.WRR, clusterName string, weight int64, interceptors *[]iresolver.ClientInterceptor, + clusterOverride map[string]httpfilter.FilterConfig, routeOverride map[string]httpfilter.FilterConfig) error { + interceptor, err := r.newInterceptor(r.xdsConfig.Listener.APIListener.HTTPFilters, clusterOverride, routeOverride, r.xdsConfig.VirtualHost.HTTPFilterConfigOverride) + if err != nil { + // Clean up any interceptors that were successfully built + // for the current route before this error occurred. Note + // that this is not handled by the call to cs.stop() in the + // deferred function. + for _, i := range *interceptors { + i.Close() + } + return err + } + clusters.Add(&routeCluster{ + name: clusterName, + interceptor: interceptor, + }, weight) + *interceptors = append(*interceptors, interceptor) + return nil +} + // newConfigSelector creates a new config selector using the most recently // received listener and route config updates. May add entries to // r.activeClusters for previously-unseen clusters. @@ -417,29 +441,18 @@ func (r *xdsResolver) newConfigSelector() (_ *configSelector, err error) { interceptors := []iresolver.ClientInterceptor{} if rt.ClusterSpecifierPlugin != "" { clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin - clusters.Add(&routeCluster{name: clusterName}, 1) + if err := r.addClusterToRoute(clusters, clusterName, 1, &interceptors, nil, rt.HTTPFilterConfigOverride); err != nil { + return nil, err + } ci := r.addOrGetActiveClusterInfo(clusterName, "") ci.cfg = xdsChildConfig{ChildPolicy: balancerConfig(r.xdsConfig.RouteConfig.ClusterSpecifierPlugins[rt.ClusterSpecifierPlugin])} cs.plugins[clusterName] = ci } else { for _, wc := range rt.WeightedClusters { clusterName := clusterPrefix + wc.Name - interceptor, err := r.newInterceptor(r.xdsConfig.Listener.APIListener.HTTPFilters, wc.HTTPFilterConfigOverride, rt.HTTPFilterConfigOverride, r.xdsConfig.VirtualHost.HTTPFilterConfigOverride) - if err != nil { - // Clean up any interceptors that were successfully built - // for the current route before this error occurred. Note - // that this is not handled by the call to cs.stop() in the - // deferred function. - for _, i := range interceptors { - i.Close() - } + if err := r.addClusterToRoute(clusters, clusterName, int64(wc.Weight), &interceptors, wc.HTTPFilterConfigOverride, rt.HTTPFilterConfigOverride); err != nil { return nil, err } - clusters.Add(&routeCluster{ - name: clusterName, - interceptor: interceptor, - }, int64(wc.Weight)) - interceptors = append(interceptors, interceptor) ci := r.addOrGetActiveClusterInfo(clusterName, wc.Name) ci.cfg = xdsChildConfig{ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: wc.Name})} cs.clusters[clusterName] = ci