Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions internal/xds/resolver/cluster_specifier_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ import (
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/xds/balancer/clustermanager"
"google.golang.org/grpc/internal/xds/clusterspecifier"
"google.golang.org/grpc/internal/xds/httpfilter"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/wrapperspb"

v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
)

func init() {
Expand Down Expand Up @@ -337,3 +340,137 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
}`
verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
}

// TestResolverClusterSpecifierPlugin_WithFilters tests the case where a route
// configuration containing cluster specifier plugins is sent by the management
// server, and HTTP filters are configured. The test verifies that the
// interceptor chain is built for routes matching cluster specifier plugins.
func (s) TestResolverClusterSpecifierPlugin_WithFilters(t *testing.T) {
// Register a custom httpFilter builder for the test.
Comment thread
Pranjali-2501 marked this conversation as resolved.
Outdated
testFilterTypeURL1 := "test-filter-type-url-1" + uuid.New().String()
testFilterTypeURL2 := "test-filter-type-url-2" + uuid.New().String()
newStreamChan := testutils.NewChannelWithSize(2)
fb1 := &testHTTPFilterWithRPCMetadata{
logger: t,
typeURL: testFilterTypeURL1,
newStreamChan: newStreamChan,
}
fb2 := &testHTTPFilterWithRPCMetadata{
logger: t,
typeURL: testFilterTypeURL2,
newStreamChan: newStreamChan,
}
httpfilter.Register(fb1)
httpfilter.Register(fb2)
defer httpfilter.UnregisterForTesting(fb1.typeURL)
defer httpfilter.UnregisterForTesting(fb2.typeURL)

// Spin up an xDS management server.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
nodeID := uuid.New().String()
mgmtServer, _, _, bc := setupManagementServerForTest(t, nodeID)

// Configure resources on the management server.
// We need a listener with the filter, and a route with ClusterSpecifierPlugin.
listeners := []*v3listenerpb.Listener{{
Name: defaultTestServiceName,
ApiListener: &v3listenerpb.ApiListener{
ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: defaultTestRouteConfigName,
}},
HttpFilters: []*v3httppb.HttpFilter{
newHTTPFilter(t, "test-filter-1", testFilterTypeURL1, "filter-path-1", ""),
newHTTPFilter(t, "test-filter-2", testFilterTypeURL2, "filter-path-2", ""),
e2e.RouterHTTPFilter,
},
}),
},
}}

routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: defaultTestRouteConfigName,
ListenerName: defaultTestServiceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}),
})}
// Override the configuration for "test-filter-1" in the route.
routes[0].VirtualHosts[0].Routes[0].TypedPerFilterConfig = map[string]*anypb.Any{
"test-filter-1": newHTTPFilter(t, "test-filter-1", testFilterTypeURL1, "override-path-1", "").GetTypedConfig(),
}
configureResources(ctx, t, mgmtServer, nodeID, listeners, routes, nil, nil)

stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc)

// Wait for an update from the resolver, and verify the service config.
wantSC := `
{
"loadBalancingConfig": [
{
"xds_cluster_manager_experimental": {
"children": {
"cluster_specifier_plugin:cspA": {
"childPolicy": [
{
"csp_experimental": {
"arbitrary_field": "anything"
}
}
]
}
}
}
}
]
}`
Comment thread
Pranjali-2501 marked this conversation as resolved.
cs := verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err != nil {
t.Fatalf("cs.SelectConfig(): %v", err)
}

// Verify that the interceptor is not nil.
if res.Interceptor == nil {
t.Fatal("RPCInfo does not contain interceptors list")
}

newStream := func(context.Context, func()) (iresolver.ClientStream, error) {
return nil, nil
}
Comment thread
Pranjali-2501 marked this conversation as resolved.

_, err = res.Interceptor.NewStream(ctx, iresolver.RPCInfo{Method: "/service/method", Context: ctx}, func() {}, newStream)
if err != nil {
Comment thread
Pranjali-2501 marked this conversation as resolved.
Outdated
t.Fatalf("NewStream() failed with error: %v", err)
}

// Verify that first filter receives the config.
cfg, err := newStreamChan.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for filter 1 to receive config: %v", err)
}
ofc := cfg.(overallFilterConfig)
if ofc.BasePath != "filter-path-1" {
t.Fatalf("Unexpected filter 1 base path, got: %q, want: %q", ofc.BasePath, "filter-path-1")
}
if ofc.OverridePath != "override-path-1" {
t.Fatalf("Unexpected filter 1 override path, got: %q, want: %q", ofc.OverridePath, "override-path-1")
}

// Verify that second filter receives the base path.
cfg, err = newStreamChan.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for filter 2 to receive config: %v", err)
}
ofc = cfg.(overallFilterConfig)
if ofc.BasePath != "filter-path-2" {
t.Fatalf("Unexpected filter 2 base path, got: %q, want: %q", ofc.BasePath, "filter-path-2")
}
if ofc.OverridePath != "" {
t.Fatalf("Unexpected filter 2 override path, got: %q, want: %q", ofc.OverridePath, "")
}
Comment thread
Pranjali-2501 marked this conversation as resolved.
}
17 changes: 16 additions & 1 deletion internal/xds/resolver/xds_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,22 @@ func (r *xdsResolver) newConfigSelector() (_ *configSelector, err error) {
interceptors := []iresolver.ClientInterceptor{}
if rt.ClusterSpecifierPlugin != "" {
clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin
clusters.Add(&routeCluster{name: clusterName}, 1)
interceptor, err := r.newInterceptor(r.xdsConfig.Listener.APIListener.HTTPFilters, nil, rt.HTTPFilterConfigOverride, r.xdsConfig.VirtualHost.HTTPFilterConfigOverride)
if err != nil {
// Clean up any interceptors that were successfully built
// for the current route before this error occurred. Note
// that this is not handled by the call to cs.stop() in the
// deferred function.
for _, i := range interceptors {
i.Close()
}
return nil, err
}
clusters.Add(&routeCluster{
name: clusterName,
interceptor: interceptor,
}, 1)
interceptors = append(interceptors, interceptor)
ci := r.addOrGetActiveClusterInfo(clusterName, "")
ci.cfg = xdsChildConfig{ChildPolicy: balancerConfig(r.xdsConfig.RouteConfig.ClusterSpecifierPlugins[rt.ClusterSpecifierPlugin])}
Comment thread
Pranjali-2501 marked this conversation as resolved.
cs.plugins[clusterName] = ci
Expand Down
Loading