diff --git a/internal/envconfig/xds.go b/internal/envconfig/xds.go index c20842078722..a2312f8eacf1 100644 --- a/internal/envconfig/xds.go +++ b/internal/envconfig/xds.go @@ -94,6 +94,7 @@ var ( // the client side. For more details, see: // https://github.com/grpc/proposal/blob/master/A93-xds-ext-proc.md XDSClientExtProcEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_EXT_PROC_ON_CLIENT", false) + // GCPAuthenticationFilterEnabled enables the xDS GCP Authentication // filter. For more details, see: // https://github.com/grpc/proposal/blob/master/A83-xds-gcp-authn-filter.md diff --git a/internal/optional/optional.go b/internal/optional/optional.go new file mode 100644 index 000000000000..218794af6ace --- /dev/null +++ b/internal/optional/optional.go @@ -0,0 +1,47 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package optional implements a generic optional type. +package optional + +// Optional represents an optional value of type T. The zero value is usable and +// indicates that no value is set. This type is not safe for concurrent access. +type Optional[T any] struct { + val T + set bool +} + +// New creates a new Optional type with the provided value. +func New[T any](value T) Optional[T] { + return Optional[T]{ + val: value, + set: true, + } +} + +// Value returns the underlying value and a boolean indicating if the value is +// set. If the value is not set, it returns the zero value of T and false. +func (o *Optional[T]) Value() (T, bool) { + return o.val, o.set +} + +// SetValue updates or adds the value to Optional. +func (o *Optional[T]) SetValue(v T) { + o.val = v + o.set = true +} diff --git a/internal/optional/optional_test.go b/internal/optional/optional_test.go new file mode 100644 index 000000000000..8df4190a9a20 --- /dev/null +++ b/internal/optional/optional_test.go @@ -0,0 +1,130 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package optional_test + +import ( + "slices" + "testing" + + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/optional" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +// TestOptionalInt tests the scenario of using integer optional values and +// verifies that zero value, constructors, and mutation methods return correct +// outputs. +func (s) TestOptionalInt(t *testing.T) { + var opt optional.Optional[int] + + // Test unset value. + if v, set := opt.Value(); set || v != 0 { + t.Fatalf("Zero-value Option[int] = (%v, %v); want (0, false)", v, set) + } + + opt = optional.New(42) + if v, set := opt.Value(); !set || v != 42 { + t.Fatalf("NewValue(42) = (%v, %v); want (42, true)", v, set) + } + + opt.SetValue(100) + if v, set := opt.Value(); !set || v != 100 { + t.Fatalf("Set(100) = (%v, %v); want (100, true)", v, set) + } +} + +// TestOptionalString tests the scenario of using string optional values and +// verifies that zero value, constructors, and mutation methods return correct +// outputs. +func (s) TestOptionalString(t *testing.T) { + var opt optional.Optional[string] + + // Test unset value. + if v, set := opt.Value(); set || v != "" { + t.Fatalf("Zero-value Option[string] = (%q, %v); want (%q, false)", v, set, "") + } + + wantString := "test-string" + opt = optional.New(wantString) + if v, set := opt.Value(); !set || v != wantString { + t.Fatalf("NewValue(%q) = (%q, %v); want (%q, true)", wantString, v, set, wantString) + } + + wantStringNew := "world" + opt.SetValue(wantStringNew) + if v, set := opt.Value(); !set || v != wantStringNew { + t.Fatalf("Set(%q) = (%q, %v); want (%q, true)", wantStringNew, v, set, wantStringNew) + } +} + +// TestOptionalStruct tests the scenario of using a custom struct optional value +// and verifies that custom struct field values are preserved, modified, and +// cleared correctly. +func (s) TestOptionalStruct(t *testing.T) { + type testStruct struct { + name string + age int + } + + var opt optional.Optional[testStruct] + if v, set := opt.Value(); set || v != (testStruct{}) { + t.Fatalf("Zero-value Option[struct] = (%v, %v); want (empty, false)", v, set) + } + + want := testStruct{name: "Alice", age: 30} + opt = optional.New(want) + if v, set := opt.Value(); !set || v != want { + t.Fatalf("NewValue(%v) = (%v, %v); want (%v, true)", want, v, set, want) + } + + want2 := testStruct{name: "Bob", age: 40} + opt.SetValue(want2) + if v, set := opt.Value(); !set || v != want2 { + t.Fatalf("Set(%v) = (%v, %v); want (%v, true)", want2, v, set, want2) + } +} + +// TestOptionalSlice tests the scenario of using a slice optional value and +// verifies that zero value, constructors, and mutation methods return correct +// outputs. +func (s) TestOptionalSlice(t *testing.T) { + var opt optional.Optional[[]int] + if v, set := opt.Value(); set || v != nil { + t.Fatalf("Zero-value Option[[]int] = (%v, %v); want (nil, false)", v, set) + } + + want := []int{1, 2, 3} + opt = optional.New(want) + if v, set := opt.Value(); !set || !slices.Equal(v, want) { + t.Fatalf("NewValue(%v) = (%v, %v); want (%v, true)", want, v, set, want) + } + + want2 := []int{4, 5, 6} + opt.SetValue(want2) + if v, set := opt.Value(); !set || !slices.Equal(v, want2) { + t.Fatalf("Set(%v) = (%v, %v); want (%v, true)", &want2, v, set, &want2) + } +} diff --git a/internal/xds/httpfilter/extconfig.go b/internal/xds/httpfilter/extconfig.go new file mode 100644 index 000000000000..83bcea49a2d3 --- /dev/null +++ b/internal/xds/httpfilter/extconfig.go @@ -0,0 +1,88 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package httpfilter + +import ( + "fmt" + "regexp" + + "google.golang.org/grpc/internal/xds/matcher" + + v3mutationpb "github.com/envoyproxy/go-control-plane/envoy/config/common/mutation_rules/v3" + v3matcherpb "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" +) + +// HeaderMutationRules specifies the rules for what modifications an external +// processing server may make to headers sent on the data plane RPC. +type HeaderMutationRules struct { + // AllowExpr specifies a regular expression that matches the headers that can + // be mutated. + AllowExpr *regexp.Regexp + // DisallowExpr specifies a regular expression that matches the headers that + // cannot be mutated. This overrides the above allowExpr if a header matches + // both. + DisallowExpr *regexp.Regexp + // DisallowAll specifies that no header mutations are allowed. This overrides + // all other settings. + DisallowAll bool + // DisallowIsError specifies whether to return an error if a header mutation + // is disallowed. If true, the data plane RPC will be failed with a grpc + // status code of Unknown. + DisallowIsError bool +} + +// ConvertStringMatchers converts a slice of protobuf StringMatcher messages to +// a slice of matcher.StringMatcher. +func ConvertStringMatchers(patterns []*v3matcherpb.StringMatcher) ([]matcher.StringMatcher, error) { + matchers := make([]matcher.StringMatcher, 0, len(patterns)) + for _, p := range patterns { + sm, err := matcher.StringMatcherFromProto(p) + if err != nil { + return nil, err + } + matchers = append(matchers, sm) + } + return matchers, nil +} + +// HeaderMutationRulesFromProto converts a protobuf HeaderMutationRules proto +// message to a HeaderMutationRules struct. +func HeaderMutationRulesFromProto(mr *v3mutationpb.HeaderMutationRules) (HeaderMutationRules, error) { + var rules HeaderMutationRules + if mr == nil { + return rules, nil + } + if allowExpr := mr.GetAllowExpression(); allowExpr != nil { + re, err := regexp.Compile(allowExpr.GetRegex()) + if err != nil { + return rules, fmt.Errorf("httpfilter: %v", err) + } + rules.AllowExpr = re + } + if disallowExpr := mr.GetDisallowExpression(); disallowExpr != nil { + re, err := regexp.Compile(disallowExpr.GetRegex()) + if err != nil { + return rules, fmt.Errorf("httpfilter: %v", err) + } + rules.DisallowExpr = re + } + rules.DisallowAll = mr.GetDisallowAll().GetValue() + rules.DisallowIsError = mr.GetDisallowIsError().GetValue() + return rules, nil +} diff --git a/internal/xds/httpfilter/extconfig_test.go b/internal/xds/httpfilter/extconfig_test.go new file mode 100644 index 000000000000..077a7ceacd00 --- /dev/null +++ b/internal/xds/httpfilter/extconfig_test.go @@ -0,0 +1,102 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package httpfilter + +import ( + "regexp" + "strings" + "testing" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/protobuf/types/known/wrapperspb" + + v3mutationpb "github.com/envoyproxy/go-control-plane/envoy/config/common/mutation_rules/v3" + v3matcherpb "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +func (s) TestHeaderMutationRulesFromProto_HappyPath(t *testing.T) { + mr := &v3mutationpb.HeaderMutationRules{ + AllowExpression: &v3matcherpb.RegexMatcher{Regex: "^allow$"}, + DisallowExpression: &v3matcherpb.RegexMatcher{Regex: "^disallow$"}, + DisallowAll: wrapperspb.Bool(true), + DisallowIsError: wrapperspb.Bool(false), + } + want := HeaderMutationRules{ + AllowExpr: regexp.MustCompile("^allow$"), + DisallowExpr: regexp.MustCompile("^disallow$"), + DisallowAll: true, + DisallowIsError: false, + } + + got, err := HeaderMutationRulesFromProto(mr) + if err != nil { + t.Fatalf("HeaderMutationRulesFromProto() unexpected error: %v", err) + } + if diff := cmp.Diff(got, want, + cmp.Transformer("RegexpToString", func(r *regexp.Regexp) string { + if r == nil { + return "" + } + return r.String() + }), + ); diff != "" { + t.Fatalf("HeaderMutationRulesFromProto() mismatch (-got +want):\n%s", diff) + } +} + +func (s) TestHeaderMutationRulesFromProto_Errors(t *testing.T) { + tests := []struct { + name string + mutationRules *v3mutationpb.HeaderMutationRules + wantErr string + }{ + { + name: "invalid allow expression", + mutationRules: &v3mutationpb.HeaderMutationRules{ + AllowExpression: &v3matcherpb.RegexMatcher{Regex: "["}, + }, + wantErr: "httpfilter: error parsing regexp", + }, + { + name: "invalid disallow expression", + mutationRules: &v3mutationpb.HeaderMutationRules{ + DisallowExpression: &v3matcherpb.RegexMatcher{Regex: "["}, + }, + wantErr: "httpfilter: error parsing regexp", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := HeaderMutationRulesFromProto(tt.mutationRules) + if err == nil || !strings.Contains(err.Error(), tt.wantErr) { + t.Fatalf("HeaderMutationRulesFromProto() error = %v, wantErr %q", err, tt.wantErr) + } + }) + } +} diff --git a/internal/xds/httpfilter/extproc/config.go b/internal/xds/httpfilter/extproc/config.go new file mode 100644 index 000000000000..4a64ada313ac --- /dev/null +++ b/internal/xds/httpfilter/extproc/config.go @@ -0,0 +1,158 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package extproc + +import ( + "time" + + "google.golang.org/grpc/internal/optional" + "google.golang.org/grpc/internal/xds/httpfilter" + "google.golang.org/grpc/internal/xds/matcher" + "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" + + v3procfilterpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" +) + +// baseConfig contains the configuration for the external processing +// interceptor. +type baseConfig struct { + httpfilter.FilterConfig + + // The following fields can be set either in the filter config or the override + // config. If both are set, the override config will be used. + + // server is the configuration for the external processing server. + server xdsresource.GRPCServiceConfig + // processingModes specifies the processing mode for each dataplane event. + processingModes processingModes + // failureModeAllow specifies the behavior when the RPC to the external + // processing server fails. If true, the dataplane RPC will be allowed to + // continue. If false, the data plane RPC will be failed with a grpc status + // code of UNAVAILABLE. + failureModeAllow bool + // Attributes to be sent to the external processing server along with the + // request and response dataplane events. + requestAttributes []string + responseAttributes []string + + // The following fields can only be set in the base config. + + // mutationRules specifies the rules for what modifications an external + // processing server may make to headers/trailers sent to it. + mutationRules httpfilter.HeaderMutationRules + // allowedHeaders specifies the headers that are allowed to be sent to the + // external processing server. If unset, all headers are allowed. + allowedHeaders []matcher.StringMatcher + // disallowedHeaders specifies the headers that will not be sent to the + // external processing server. This overrides the above allowedHeaders if a + // header matches both. + disallowedHeaders []matcher.StringMatcher + // disableImmediateResponse specifies whether to disable immediate response + // from the external processing server. When true, if the response from + // external processing server has the `immediate_response` field set, the + // dataplane RPC will be failed with `UNAVAILABLE` status code. When false, + // the `immediate_response` field in the response from external processing + // server will be ignored. + disableImmediateResponse bool + // observabilityMode determines if the filter waits for the external + // processing server. If true, events are sent to the server in + // "observation-only" mode; the filter does not wait for a response. If false, + // the filter waits for a response, allowing the server to modify events + // before they reach the dataplane. + observabilityMode bool + // deferredCloseTimeout is the duration the filter waits before closing the + // external processing stream after the dataplane RPC completes. This is only + // applicable when observabilityMode is true; otherwise, it is ignored. The + // default value is 5 seconds. + deferredCloseTimeout time.Duration +} + +// overrideConfig contains the configuration for the external processing +// interceptor used for overriding the base config. If a particular field is +// set, that will be used instead of the base config. The fields are similar to +// base config. +type overrideConfig struct { + httpfilter.FilterConfig + server optional.Optional[xdsresource.GRPCServiceConfig] + processingModes optional.Optional[processingModes] + failureModeAllow optional.Optional[bool] + requestAttributes []string + responseAttributes []string +} + +// processingMode defines how headers, trailers, and bodies are handled in +// relation to the external processing server. +type processingMode int + +const ( + // modeSkip indicates that the header/trailer/body should not be sent. + modeSkip processingMode = iota + // modeSend indicates that the header/trailer/body should be sent. + modeSend +) + +type processingModes struct { + requestHeaderMode processingMode + requestBodyMode processingMode + responseHeaderMode processingMode + responseTrailerMode processingMode + responseBodyMode processingMode +} + +// resolveHeaderMode resolves the processing mode for headers based on the +// protobuf enum value. If the mode is not set or set to Default processing +// mode, it returns the provided defaultMode. +func resolveHeaderMode(mode v3procfilterpb.ProcessingMode_HeaderSendMode, defaultMode processingMode) processingMode { + switch mode { + case v3procfilterpb.ProcessingMode_SEND: + return modeSend + case v3procfilterpb.ProcessingMode_SKIP: + return modeSkip + case v3procfilterpb.ProcessingMode_DEFAULT: + return defaultMode + default: + return defaultMode + } +} + +// resolveBodyMode resolves the processing mode for body based on the protobuf +// enum value. If the mode is not set (i.e., default), it returns modeSkip, as +// the default for body is to skip. +func resolveBodyMode(mode v3procfilterpb.ProcessingMode_BodySendMode) processingMode { + switch mode { + case v3procfilterpb.ProcessingMode_GRPC: + return modeSend + case v3procfilterpb.ProcessingMode_NONE: + return modeSkip + default: + return modeSkip + } +} + +// processingModesFromProto converts a protobuf ProcessingMode message +// to a processingModes struct. +func processingModesFromProto(pm *v3procfilterpb.ProcessingMode) processingModes { + return processingModes{ + requestHeaderMode: resolveHeaderMode(pm.GetRequestHeaderMode(), modeSend), + requestBodyMode: resolveBodyMode(pm.GetRequestBodyMode()), + responseHeaderMode: resolveHeaderMode(pm.GetResponseHeaderMode(), modeSend), + responseBodyMode: resolveBodyMode(pm.GetResponseBodyMode()), + responseTrailerMode: resolveHeaderMode(pm.GetResponseTrailerMode(), modeSkip), + } +} diff --git a/internal/xds/httpfilter/extproc/config_test.go b/internal/xds/httpfilter/extproc/config_test.go new file mode 100644 index 000000000000..3cd60dda0b84 --- /dev/null +++ b/internal/xds/httpfilter/extproc/config_test.go @@ -0,0 +1,517 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package extproc + +import ( + "fmt" + "regexp" + "strings" + "testing" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/optional" + "google.golang.org/grpc/internal/xds/httpfilter" + "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/wrapperspb" + + mutationpb "github.com/envoyproxy/go-control-plane/envoy/config/common/mutation_rules/v3" + corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + fpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" + matcherpb "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +// testParseGRPCServiceConfig is a helper function that parses a GrpcService +// proto message into a GRPCServiceConfig. This is a temporary test +// implementation that will be removed once gRFC A102 is implemented. +func testParseGRPCServiceConfig(grpcService *corepb.GrpcService) (xdsresource.GRPCServiceConfig, error) { + if grpcService == nil { + return xdsresource.GRPCServiceConfig{}, nil + } + if grpcService.GetGoogleGrpc() == nil { + return xdsresource.GRPCServiceConfig{}, fmt.Errorf("only google_grpc grpc_service is supported") + } + if grpcService.GetGoogleGrpc().GetTargetUri() == "" { + return xdsresource.GRPCServiceConfig{}, fmt.Errorf("targetURI must be a non-empty string") + } + + sc := xdsresource.GRPCServiceConfig{ + TargetURI: grpcService.GetGoogleGrpc().GetTargetUri(), + } + return sc, nil +} + +var cmpOpts = []cmp.Option{ + cmp.AllowUnexported( + baseConfig{}, + overrideConfig{}, + xdsresource.GRPCServiceConfig{}, + processingModes{}, + httpfilter.HeaderMutationRules{}, + optional.Optional[xdsresource.GRPCServiceConfig]{}, + optional.Optional[processingModes]{}, + optional.Optional[bool]{}, + ), + protocmp.Transform(), + cmp.Transformer("RegexpToString", func(r *regexp.Regexp) string { + if r == nil { + return "" + } + return r.String() + }), +} + +func (s) TestParseFilterConfig_Success(t *testing.T) { + origParseGRPCServiceConfig := parseGRPCServiceConfig + defer func() { parseGRPCServiceConfig = origParseGRPCServiceConfig }() + parseGRPCServiceConfig = testParseGRPCServiceConfig + + b := builder{} + + tests := []struct { + name string + cfg proto.Message + wantCfg httpfilter.FilterConfig + }{ + { + name: "DefaultConfig", + cfg: func() proto.Message { + m, _ := anypb.New(&fpb.ExternalProcessor{ + GrpcService: &corepb.GrpcService{ + TargetSpecifier: &corepb.GrpcService_GoogleGrpc_{ + GoogleGrpc: &corepb.GrpcService_GoogleGrpc{ + TargetUri: "localhost:1234", + }, + }, + }, + ProcessingMode: &fpb.ProcessingMode{}, + }) + return m + }(), + wantCfg: baseConfig{ + server: xdsresource.GRPCServiceConfig{ + TargetURI: "localhost:1234", + ChannelCredentials: "", + }, + processingModes: processingModes{ + requestHeaderMode: modeSend, + responseHeaderMode: modeSend, + responseTrailerMode: modeSkip, + requestBodyMode: modeSkip, + responseBodyMode: modeSkip, + }, + failureModeAllow: false, + deferredCloseTimeout: defaultDeferredCloseTimeout, + }, + }, + { + name: "ConfigWithGrpcMode", + cfg: func() proto.Message { + m, _ := anypb.New(&fpb.ExternalProcessor{ + GrpcService: &corepb.GrpcService{ + TargetSpecifier: &corepb.GrpcService_GoogleGrpc_{ + GoogleGrpc: &corepb.GrpcService_GoogleGrpc{ + TargetUri: "localhost:1234", + }, + }, + }, + ProcessingMode: &fpb.ProcessingMode{ + RequestBodyMode: fpb.ProcessingMode_GRPC, + ResponseBodyMode: fpb.ProcessingMode_GRPC, + }, + }) + return m + }(), + wantCfg: baseConfig{ + server: xdsresource.GRPCServiceConfig{ + TargetURI: "localhost:1234", + ChannelCredentials: "", + }, + processingModes: processingModes{ + requestHeaderMode: modeSend, + responseHeaderMode: modeSend, + responseTrailerMode: modeSkip, + requestBodyMode: modeSend, + responseBodyMode: modeSend, + }, + failureModeAllow: false, + deferredCloseTimeout: defaultDeferredCloseTimeout, + }, + }, + { + name: "ConfigWithMutationRules", + cfg: func() proto.Message { + m, _ := anypb.New(&fpb.ExternalProcessor{ + GrpcService: &corepb.GrpcService{ + TargetSpecifier: &corepb.GrpcService_GoogleGrpc_{ + GoogleGrpc: &corepb.GrpcService_GoogleGrpc{ + TargetUri: "localhost:1234", + }, + }, + }, + ProcessingMode: &fpb.ProcessingMode{}, + MutationRules: &mutationpb.HeaderMutationRules{ + AllowExpression: &matcherpb.RegexMatcher{Regex: ".*"}, + DisallowExpression: &matcherpb.RegexMatcher{Regex: "a"}, + }, + }) + return m + }(), + wantCfg: baseConfig{ + server: xdsresource.GRPCServiceConfig{ + TargetURI: "localhost:1234", + ChannelCredentials: "", + }, + processingModes: processingModes{ + requestHeaderMode: modeSend, + responseHeaderMode: modeSend, + responseTrailerMode: modeSkip, + requestBodyMode: modeSkip, + responseBodyMode: modeSkip, + }, + failureModeAllow: false, + mutationRules: httpfilter.HeaderMutationRules{ + AllowExpr: regexp.MustCompile(".*"), + DisallowExpr: regexp.MustCompile("a"), + }, + deferredCloseTimeout: defaultDeferredCloseTimeout, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := b.ParseFilterConfig(tt.cfg) + if err != nil { + t.Fatalf("ParseFilterConfig() returned unexpected error: %v", err) + } + if diff := cmp.Diff(got, tt.wantCfg, cmpOpts...); diff != "" { + t.Fatalf("ParseFilterConfig() returned unexpected config (-got +want):\n%s", diff) + } + }) + } +} + +func (s) TestParseFilterConfig_Errors(t *testing.T) { + origParseGRPCServiceConfig := parseGRPCServiceConfig + defer func() { parseGRPCServiceConfig = origParseGRPCServiceConfig }() + parseGRPCServiceConfig = testParseGRPCServiceConfig + + b := builder{} + + tests := []struct { + name string + cfg proto.Message + wantErr string + }{ + { + name: "MissingGrpcService", + cfg: func() proto.Message { + m, _ := anypb.New(&fpb.ExternalProcessor{ProcessingMode: &fpb.ProcessingMode{}}) + return m + }(), + wantErr: "extproc: empty grpc_service provided", + }, + { + name: "UnsupportedGrpcService_EnvoyGrpc", + cfg: func() proto.Message { + m, _ := anypb.New(&fpb.ExternalProcessor{ + GrpcService: &corepb.GrpcService{ + TargetSpecifier: &corepb.GrpcService_EnvoyGrpc_{ + EnvoyGrpc: &corepb.GrpcService_EnvoyGrpc{ + ClusterName: "cluster", + }, + }, + }, + ProcessingMode: &fpb.ProcessingMode{}, + }) + return m + }(), + wantErr: "extproc: failed to parse grpc_service only google_grpc grpc_service is supported", + }, + { + name: "MissingProcessingMode", + cfg: func() proto.Message { + m, _ := anypb.New(&fpb.ExternalProcessor{ + GrpcService: &corepb.GrpcService{ + TargetSpecifier: &corepb.GrpcService_GoogleGrpc_{ + GoogleGrpc: &corepb.GrpcService_GoogleGrpc{ + TargetUri: "localhost:1234", + }, + }, + }, + }) + return m + }(), + wantErr: "extproc: missing processing_mode", + }, + { + name: "InvalidProcessingMode_RequestBodyStreamed", + cfg: func() proto.Message { + m, _ := anypb.New(&fpb.ExternalProcessor{ + GrpcService: &corepb.GrpcService{ + TargetSpecifier: &corepb.GrpcService_GoogleGrpc_{ + GoogleGrpc: &corepb.GrpcService_GoogleGrpc{ + TargetUri: "localhost:1234", + }, + }, + }, + ProcessingMode: &fpb.ProcessingMode{RequestBodyMode: fpb.ProcessingMode_STREAMED}, + }) + return m + }(), + wantErr: "extproc: invalid request body mode STREAMED", + }, + { + name: "InvalidProcessingMode_ResponseBodyStreamed", + cfg: func() proto.Message { + m, _ := anypb.New(&fpb.ExternalProcessor{ + GrpcService: &corepb.GrpcService{ + TargetSpecifier: &corepb.GrpcService_GoogleGrpc_{ + GoogleGrpc: &corepb.GrpcService_GoogleGrpc{ + TargetUri: "localhost:1234", + }, + }, + }, + ProcessingMode: &fpb.ProcessingMode{ResponseBodyMode: fpb.ProcessingMode_STREAMED}, + }) + return m + }(), + wantErr: "extproc: invalid response body mode STREAMED", + }, + { + name: "InvalidMutationRules", + cfg: func() proto.Message { + m, _ := anypb.New(&fpb.ExternalProcessor{ + GrpcService: &corepb.GrpcService{ + TargetSpecifier: &corepb.GrpcService_GoogleGrpc_{ + GoogleGrpc: &corepb.GrpcService_GoogleGrpc{ + TargetUri: "localhost:1234", + }, + }, + }, + ProcessingMode: &fpb.ProcessingMode{}, + MutationRules: &mutationpb.HeaderMutationRules{ + AllowExpression: &matcherpb.RegexMatcher{Regex: "["}, + }, + }) + return m + }(), + wantErr: "httpfilter: error parsing regexp", + }, + { + name: "InvalidAllowedHeaders_EmptyPrefix", + cfg: func() proto.Message { + m, _ := anypb.New(&fpb.ExternalProcessor{ + GrpcService: &corepb.GrpcService{ + TargetSpecifier: &corepb.GrpcService_GoogleGrpc_{ + GoogleGrpc: &corepb.GrpcService_GoogleGrpc{ + TargetUri: "localhost:1234", + }, + }, + }, + ProcessingMode: &fpb.ProcessingMode{}, + ForwardRules: &fpb.HeaderForwardingRules{ + AllowedHeaders: &matcherpb.ListStringMatcher{ + Patterns: []*matcherpb.StringMatcher{ + { + MatchPattern: &matcherpb.StringMatcher_Prefix{Prefix: ""}, + }, + }, + }, + }, + }) + return m + }(), + wantErr: "empty prefix is not allowed", + }, + { + name: "InvalidServerConfig_EmptyTargetURI", + cfg: func() proto.Message { + m, _ := anypb.New(&fpb.ExternalProcessor{ + GrpcService: &corepb.GrpcService{ + TargetSpecifier: &corepb.GrpcService_GoogleGrpc_{ + GoogleGrpc: &corepb.GrpcService_GoogleGrpc{ + TargetUri: "", + }, + }, + }, + ProcessingMode: &fpb.ProcessingMode{}, + }) + return m + }(), + wantErr: "extproc: failed to parse grpc_service targetURI must be a non-empty string", + }, + { + name: "InvalidConfigType", + cfg: &fpb.ExternalProcessor{}, // Not Any + wantErr: "extproc: error parsing config", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := b.ParseFilterConfig(tt.cfg) + if err == nil || !strings.Contains(err.Error(), tt.wantErr) { + t.Fatalf("ParseFilterConfig() returned error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func (s) TestParseFilterConfigOverride_Success(t *testing.T) { + b := builder{} + + tests := []struct { + name string + override proto.Message + wantOverrideCfg httpfilter.FilterConfig + }{ + { + name: "EmptyOverride", + override: func() proto.Message { + m, _ := anypb.New(&fpb.ExtProcPerRoute{}) + return m + }(), + wantOverrideCfg: overrideConfig{}, + }, + { + name: "GrpcProcessingMode", + override: func() proto.Message { + m, _ := anypb.New( + &fpb.ExtProcPerRoute{ + Override: &fpb.ExtProcPerRoute_Overrides{ + Overrides: &fpb.ExtProcOverrides{ + ProcessingMode: &fpb.ProcessingMode{ + RequestBodyMode: fpb.ProcessingMode_GRPC, + ResponseBodyMode: fpb.ProcessingMode_GRPC, + }, + }, + }, + }) + return m + }(), + wantOverrideCfg: overrideConfig{ + processingModes: optional.New(processingModes{ + requestHeaderMode: modeSend, + responseHeaderMode: modeSend, + responseTrailerMode: modeSkip, + requestBodyMode: modeSend, + responseBodyMode: modeSend, + }), + }, + }, + { + name: "FailureModeAllow", + override: func() proto.Message { + m, _ := anypb.New( + &fpb.ExtProcPerRoute{ + Override: &fpb.ExtProcPerRoute_Overrides{ + Overrides: &fpb.ExtProcOverrides{ + FailureModeAllow: wrapperspb.Bool(true), + }, + }, + }) + return m + }(), + wantOverrideCfg: overrideConfig{ + failureModeAllow: optional.New(true), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := b.ParseFilterConfigOverride(tt.override) + if err != nil { + t.Fatalf("ParseFilterConfigOverride() returned unexpected error: %v", err) + } + if diff := cmp.Diff(got, tt.wantOverrideCfg, cmpOpts...); diff != "" { + t.Fatalf("ParseFilterConfigOverride() returned unexpected config (-got +want):\n%s", diff) + } + }) + } +} + +func (s) TestParseFilterConfigOverride_Errors(t *testing.T) { + b := builder{} + + tests := []struct { + name string + override proto.Message + wantErr string + }{ + { + name: "ProcessingMode_RequestBodyStreamed", + override: func() proto.Message { + m, _ := anypb.New(&fpb.ExtProcPerRoute{ + Override: &fpb.ExtProcPerRoute_Overrides{ + Overrides: &fpb.ExtProcOverrides{ + ProcessingMode: &fpb.ProcessingMode{ + RequestBodyMode: fpb.ProcessingMode_STREAMED, + }, + }, + }, + }) + return m + }(), + wantErr: "extproc: invalid request body mode STREAMED", + }, + { + name: "ProcessingMode_ResponseBodyStreamed", + override: func() proto.Message { + m, _ := anypb.New(&fpb.ExtProcPerRoute{ + Override: &fpb.ExtProcPerRoute_Overrides{ + Overrides: &fpb.ExtProcOverrides{ + ProcessingMode: &fpb.ProcessingMode{ + ResponseBodyMode: fpb.ProcessingMode_STREAMED, + }, + }, + }, + }) + return m + }(), + wantErr: "extproc: invalid response body mode STREAMED", + }, + { + name: "InvalidOverrideType", + override: &fpb.ExtProcOverrides{}, // Not Any + wantErr: "extproc: error parsing override", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := b.ParseFilterConfigOverride(tt.override) + if err == nil || !strings.Contains(err.Error(), tt.wantErr) { + t.Fatalf("ParseFilterConfigOverride() returned error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/internal/xds/httpfilter/extproc/ext_proc.go b/internal/xds/httpfilter/extproc/ext_proc.go new file mode 100644 index 000000000000..45b5a51fb63c --- /dev/null +++ b/internal/xds/httpfilter/extproc/ext_proc.go @@ -0,0 +1,179 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package extproc implements the Envoy external processing HTTP filter. +package extproc + +import ( + "fmt" + "time" + + "google.golang.org/grpc/internal/envconfig" + "google.golang.org/grpc/internal/optional" + "google.golang.org/grpc/internal/xds/httpfilter" + "google.golang.org/grpc/internal/xds/matcher" + "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/anypb" + + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3procfilterpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" +) + +func init() { + if envconfig.XDSClientExtProcEnabled { + httpfilter.Register(builder{}) + } +} + +var parseGRPCServiceConfig = func(*v3corepb.GrpcService) (xdsresource.GRPCServiceConfig, error) { + return xdsresource.GRPCServiceConfig{}, fmt.Errorf("parseGRPCServiceConfig not implemented") +} + +const defaultDeferredCloseTimeout = 5 * time.Second + +type builder struct{} + +func (builder) TypeURLs() []string { + return []string{ + "type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor", + "type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExtProcPerRoute", + } +} + +// validateBodyProcessingMode ensures that the body processing mode is either +// NONE or GRPC. +func validateBodyProcessingMode(mode *v3procfilterpb.ProcessingMode) error { + if m := mode.GetRequestBodyMode(); m != v3procfilterpb.ProcessingMode_NONE && m != v3procfilterpb.ProcessingMode_GRPC { + return fmt.Errorf("extproc: invalid request body mode %v: want %q or %q", m, "NONE", "GRPC") + } + if m := mode.GetResponseBodyMode(); m != v3procfilterpb.ProcessingMode_NONE && m != v3procfilterpb.ProcessingMode_GRPC { + return fmt.Errorf("extproc: invalid response body mode %v: want %q or %q", m, "NONE", "GRPC") + } + return nil +} + +func (builder) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) { + m, ok := cfg.(*anypb.Any) + if !ok { + return nil, fmt.Errorf("extproc: error parsing config %v: unknown type %T, want *anypb.Any", cfg, cfg) + } + msg := new(v3procfilterpb.ExternalProcessor) + if err := m.UnmarshalTo(msg); err != nil { + return nil, fmt.Errorf("extproc: failed to unmarshal config %v: %v", cfg, err) + } + if msg.GetProcessingMode() == nil { + return nil, fmt.Errorf("extproc: missing processing_mode in config %v", cfg) + } + if err := validateBodyProcessingMode(msg.GetProcessingMode()); err != nil { + return nil, err + } + + if msg.GetGrpcService() == nil { + return nil, fmt.Errorf("extproc: empty grpc_service provided in config %v", cfg) + } + server, err := parseGRPCServiceConfig(msg.GetGrpcService()) + if err != nil { + return nil, fmt.Errorf("extproc: failed to parse grpc_service %v", err) + } + + mutationRules, err := httpfilter.HeaderMutationRulesFromProto(msg.GetMutationRules()) + if err != nil { + return nil, err + } + + var allowedHeaders, disallowedHeaders []matcher.StringMatcher + if allowed := msg.GetForwardRules().GetAllowedHeaders(); allowed != nil { + allowedHeaders, err = httpfilter.ConvertStringMatchers(allowed.GetPatterns()) + if err != nil { + return nil, err + } + } + + if disallowed := msg.GetForwardRules().GetDisallowedHeaders(); disallowed != nil { + disallowedHeaders, err = httpfilter.ConvertStringMatchers(disallowed.GetPatterns()) + if err != nil { + return nil, err + } + } + + deferredCloseTimeout := defaultDeferredCloseTimeout + if msg.GetDeferredCloseTimeout() != nil { + deferredCloseTimeout = msg.GetDeferredCloseTimeout().AsDuration() + } + + return baseConfig{ + processingModes: processingModesFromProto(msg.GetProcessingMode()), + requestAttributes: msg.GetRequestAttributes(), + responseAttributes: msg.GetResponseAttributes(), + disableImmediateResponse: msg.GetDisableImmediateResponse(), + observabilityMode: msg.GetObservabilityMode(), + failureModeAllow: msg.GetFailureModeAllow(), + server: server, + mutationRules: mutationRules, + allowedHeaders: allowedHeaders, + disallowedHeaders: disallowedHeaders, + deferredCloseTimeout: deferredCloseTimeout, + }, nil +} + +func (builder) ParseFilterConfigOverride(ov proto.Message) (httpfilter.FilterConfig, error) { + m, ok := ov.(*anypb.Any) + if !ok { + return nil, fmt.Errorf("extproc: error parsing override %v: unknown type %T, want *anypb.Any", ov, ov) + } + msg := new(v3procfilterpb.ExtProcPerRoute) + if err := m.UnmarshalTo(msg); err != nil { + return nil, fmt.Errorf("extproc: failed to unmarshal override %v: %v", ov, err) + } + override := msg.GetOverrides() + + var processingModesOpt optional.Optional[processingModes] + if pm := override.GetProcessingMode(); pm != nil { + if err := validateBodyProcessingMode(pm); err != nil { + return nil, err + } + processingModesOpt = optional.New(processingModesFromProto(pm)) + } + + var serverOpt optional.Optional[xdsresource.GRPCServiceConfig] + if override.GetGrpcService() != nil { + server, err := parseGRPCServiceConfig(override.GetGrpcService()) + if err != nil { + return nil, fmt.Errorf("extproc: failed to parse grpc_service: %v", err) + } + serverOpt = optional.New(server) + } + + var failureModeAllowOpt optional.Optional[bool] + if override.GetFailureModeAllow() != nil { + failureModeAllowOpt = optional.New(override.GetFailureModeAllow().GetValue()) + } + + return overrideConfig{ + server: serverOpt, + processingModes: processingModesOpt, + failureModeAllow: failureModeAllowOpt, + requestAttributes: override.GetRequestAttributes(), + responseAttributes: override.GetResponseAttributes(), + }, nil +} + +func (builder) IsTerminal() bool { + return false +} diff --git a/internal/xds/xdsclient/xdsresource/grpc_service.go b/internal/xds/xdsclient/xdsresource/grpc_service.go new file mode 100644 index 000000000000..c4571d0900e0 --- /dev/null +++ b/internal/xds/xdsclient/xdsresource/grpc_service.go @@ -0,0 +1,49 @@ +/* + * + * Copyright 2026 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xdsresource + +import ( + "time" + + "google.golang.org/grpc/metadata" +) + +// GRPCServiceConfig contains the configuration for an external server. It is +// the parsed configuration for the GrpcService proto message. +// See: https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/core/v3/grpc_service.proto +type GRPCServiceConfig struct { + // TargetURI is the name of the external server. + TargetURI string + // ChannelCredentials specifies the configuration for the transport + // credentials to use to connect to the external server, as a JSON string. + ChannelCredentials string + // CallCredentials specifies the configuration for the per-RPC credentials to + // use when making calls to the external server, as a JSON string. + CallCredentials string + // Timeout is the RPC timeout for the call to the external server. If unset, + // the timeout depends on the usage of this external server. For example, + // cases like ext_authz and ext_proc, where there is a 1:1 mapping between the + // data plane RPC and the external server call, the timeout will be capped by + // the timeout on the data plane RPC. For cases like RLQS where there is a + // side channel to the external server, an unset timeout will result in no + // timeout being applied to the external server call. + Timeout time.Duration + // InitialMetadata is the additional metadata to include in all RPCs sent to + // the external server. + InitialMetadata metadata.MD +}