Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions internal/xds/resolver/cluster_specifier_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ import (
"google.golang.org/grpc/internal/testutils/xds/e2e"
"google.golang.org/grpc/internal/xds/balancer/clustermanager"
"google.golang.org/grpc/internal/xds/clusterspecifier"
"google.golang.org/grpc/internal/xds/httpfilter"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/wrapperspb"

v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
)

func init() {
Expand Down Expand Up @@ -337,3 +340,108 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
}`
verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
}

// TestResolverClusterSpecifierPlugin_WithFilters tests the case where a route
// configuration containing cluster specifier plugins is sent by the management
// server, and HTTP filters are configured. The test verifies that the
// interceptor chain is built for routes matching cluster specifier plugins.
func (s) TestResolverClusterSpecifierPlugin_WithFilters(t *testing.T) {
// Register a custom httpFilter builder for the test.
Comment thread
Pranjali-2501 marked this conversation as resolved.
Outdated
testFilterTypeURL := "test-filter-type-url-" + uuid.New().String()
Comment thread
Pranjali-2501 marked this conversation as resolved.
Outdated
newStreamChan := testutils.NewChannel()
fb := &testHTTPFilterWithRPCMetadata{
logger: t,
typeURL: testFilterTypeURL,
newStreamChan: newStreamChan,
}
httpfilter.Register(fb)
defer httpfilter.UnregisterForTesting(fb.typeURL)

// Spin up an xDS management server.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
nodeID := uuid.New().String()
mgmtServer, _, _, bc := setupManagementServerForTest(t, nodeID)

// Configure resources on the management server.
// We need a listener with the filter, and a route with ClusterSpecifierPlugin.
listeners := []*v3listenerpb.Listener{{
Name: defaultTestServiceName,
ApiListener: &v3listenerpb.ApiListener{
ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: defaultTestRouteConfigName,
}},
HttpFilters: []*v3httppb.HttpFilter{
newHTTPFilter(t, "test-filter", testFilterTypeURL, "filter-path", ""),
e2e.RouterHTTPFilter,
},
}),
},
}}

routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
RouteConfigName: defaultTestRouteConfigName,
ListenerName: defaultTestServiceName,
ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
ClusterSpecifierPluginName: "cspA",
ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}),
})}
configureResources(ctx, t, mgmtServer, nodeID, listeners, routes, nil, nil)

stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc)

// Wait for an update from the resolver, and verify the service config.
wantSC := `
{
"loadBalancingConfig": [
{
"xds_cluster_manager_experimental": {
"children": {
"cluster_specifier_plugin:cspA": {
"childPolicy": [
{
"csp_experimental": {
"arbitrary_field": "anything"
}
}
]
}
}
}
}
]
}`
Comment on lines +412 to +430
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we have this whole block indented one tabspace to the left. That way, it will align with the code instead of sticking out.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

cs := verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
if err != nil {
t.Fatalf("cs.SelectConfig(): %v", err)
}

// Verify that the interceptor is not nil.
if res.Interceptor == nil {
t.Fatal("res.Interceptor is nil, want non-nil")
Comment thread
Pranjali-2501 marked this conversation as resolved.
Outdated
}

newStream := func(context.Context, func()) (iresolver.ClientStream, error) {
return nil, nil
}
Comment on lines +442 to +444
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why significance does this empty function have? Why does it return nil, nil? Can a nil function be passed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we cannot pass nil here. The documentation for the NewStream method in the ClientInterceptor interface explicitly states: The caller must ensure done is non-nil. Passing nil would cause a panic.


_, err = res.Interceptor.NewStream(ctx, iresolver.RPCInfo{Method: "/service/method", Context: ctx}, func() {}, newStream)
if err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Single line

t.Fatalf("res.Interceptor.NewStream() failed: %v", err)
}

// Verify that the filter received the config.
cfg, err := newStreamChan.Receive(ctx)
if err != nil {
t.Fatalf("Timeout waiting for filter to receive config: %v", err)
}
ofc := cfg.(overallFilterConfig)
if ofc.BasePath != "filter-path" {
t.Fatalf("Unexpected filter config path, got: %q, want: %q", ofc.BasePath, "filter-path")
}
Comment on lines +463 to +474
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the point of having two filters? What extra test coverage does it provide over having a single filter?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It verifies that the system can correctly build and execute a chain of multiple interceptors. A single filter would only test that an interceptor works in isolation, not that the chaining mechanism itself is functional. It also ensures that configuration overrides are applied correctly on a per-filter basis.
In this test, test-filter-1 is configured with an override, while test-filter-2 uses its base configuration. This allows us to verify that the override for one filter does not affect the configuration of other filters in the chain.

}
17 changes: 16 additions & 1 deletion internal/xds/resolver/xds_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,22 @@ func (r *xdsResolver) newConfigSelector() (_ *configSelector, err error) {
interceptors := []iresolver.ClientInterceptor{}
if rt.ClusterSpecifierPlugin != "" {
clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin
clusters.Add(&routeCluster{name: clusterName}, 1)
interceptor, err := r.newInterceptor(r.xdsConfig.Listener.APIListener.HTTPFilters, nil, rt.HTTPFilterConfigOverride, r.xdsConfig.VirtualHost.HTTPFilterConfigOverride)
if err != nil {
// Clean up any interceptors that were successfully built
// for the current route before this error occurred. Note
// that this is not handled by the call to cs.stop() in the
// deferred function.
for _, i := range interceptors {
i.Close()
}
return nil, err
}
clusters.Add(&routeCluster{
name: clusterName,
interceptor: interceptor,
}, 1)
interceptors = append(interceptors, interceptor)
ci := r.addOrGetActiveClusterInfo(clusterName, "")
ci.cfg = xdsChildConfig{ChildPolicy: balancerConfig(r.xdsConfig.RouteConfig.ClusterSpecifierPlugins[rt.ClusterSpecifierPlugin])}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lines 420-437 can be shared with the weighted cluster case if we pull it out into a method/helper function. So, let's do that please.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have refactored it and move common logic in a seperate method.

cs.plugins[clusterName] = ci
Expand Down
Loading