-
Notifications
You must be signed in to change notification settings - Fork 4.7k
extproc: implement ClientFilter and ClientFilterBuilder interface #9086
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
eshitachandwani
wants to merge
14
commits into
grpc:master
Choose a base branch
from
eshitachandwani:build_client_interceptor
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+363
−3
Open
Changes from 6 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
0542270
BuildClientInterceptor
eshitachandwani d31b4c9
minor fixes
eshitachandwani d0d83d3
minor
eshitachandwani 02e1edb
remove mode override and fix test
eshitachandwani f36a295
vet
eshitachandwani 328deca
test fix
eshitachandwani 3319dae
Update internal/xds/httpfilter/extproc/ext_proc.go
eshitachandwani 7ada0ba
gemini review
eshitachandwani f3e7083
minor fixes
eshitachandwani 87fb4c7
change the parameters to be complete interceptorCfg not proto
eshitachandwani 76f6f0a
change to use override
eshitachandwani a1ebc59
Merge remote-tracking branch 'upstream/master' into build_client_inte…
eshitachandwani 1dbeeed
Merge branch 'master' into build_client_interceptor
eshitachandwani 5b914d3
merge upstream and resolve conflict
eshitachandwani File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,287 @@ | ||
| /* | ||
| * | ||
| * Copyright 2026 gRPC authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| * | ||
| */ | ||
|
|
||
| package extproc | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "regexp" | ||
| "time" | ||
|
|
||
| "google.golang.org/grpc/credentials" | ||
| "google.golang.org/grpc/internal/xds/httpfilter" | ||
| "google.golang.org/grpc/internal/xds/matcher" | ||
| "google.golang.org/grpc/metadata" | ||
|
|
||
| v3procfilterpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/ext_proc/v3" | ||
| v3matcherpb "github.com/envoyproxy/go-control-plane/envoy/type/matcher/v3" | ||
| ) | ||
|
|
||
| const defaultDeferredCloseTimeout = 5 * time.Second | ||
|
|
||
| type baseConfig struct { | ||
| httpfilter.FilterConfig | ||
| config *v3procfilterpb.ExternalProcessor | ||
| } | ||
|
|
||
| type overrideConfig struct { | ||
| httpfilter.FilterConfig | ||
| config *v3procfilterpb.ExtProcOverrides | ||
| } | ||
|
|
||
| // interceptorConfig contains the configuration for the external processing | ||
| // client interceptor. | ||
| type interceptorConfig struct { | ||
| // The following fields can be set either in the filter config or the override | ||
| // config. If both are set, the override config will be used. | ||
| // | ||
| // server is the configuration for the external processing server. | ||
| server serverConfig | ||
| // failureModeAllow specifies the behavior when the RPC to the external | ||
| // processing server fails. If true, the dataplane PRC will be allowed to | ||
| // continue. If false, the data plane RPC will be failed with a grpc status | ||
| // code of UNAVAILABLE. | ||
| failureModeAllow bool | ||
| // processingMode specifies the processing mode for each dataplane event. | ||
| processingMode processingModes | ||
| // Attributes to be sent to the external processing server along with the | ||
| // request and response dataplane events. | ||
| requestAttributes []string | ||
| responseAttributes []string | ||
|
|
||
| // The following fields can only be set in the base config. | ||
| // | ||
| // mutationRules specifies the rules for what modifications an external | ||
| // processing server may make to headers/trailers sent to it. | ||
| mutationRules headerMutationRules | ||
| // allowedHeaders specifies the headers that are allowed to be sent to the | ||
| // external processing server. If unset, all headers are allowed. | ||
| allowedHeaders []matcher.StringMatcher | ||
| // disallowedHeaders specifies the headers that will not be sent to the | ||
| // external processing server. This overrides the above AllowedHeaders if a | ||
| // header matches both. | ||
| disallowedHeaders []matcher.StringMatcher | ||
| // disableImmediateResponse specifies whether to disable immediate response | ||
| // from the external processing server. When true, if the response from | ||
| // external processing server has the `immediate_response` field set, the | ||
| // dataplane RPC will be failed with `UNAVAILABLE` status code. When false, | ||
| // the `immediate_response` field in the response from external processing | ||
| // server will be ignored. | ||
| disableImmediateResponse bool | ||
| // observabilityMode determines if the filter waits for the external | ||
| // processing server. If true, events are sent to the server in | ||
| // "observation-only" mode; the filter does not wait for a response. If false, | ||
| // the filter waits for a response, allowing the server to modify events | ||
| // before they reach the dataplane. | ||
| observabilityMode bool | ||
| // deferredCloseTimeout is the duration the filter waits before closing the | ||
| // external processing stream after the dataplane RPC completes. This is only | ||
| // applicable when observabilityMode is true; otherwise, it is ignored. The | ||
| // default value is 5 seconds. | ||
| deferredCloseTimeout time.Duration | ||
| } | ||
|
|
||
| // processingMode defines how headers, trailers, and bodies are handled in | ||
| // relation to the external processing server. | ||
| type processingMode int | ||
|
|
||
| const ( | ||
| // modeSkip indicates that the header/trailer/body should not be sent. | ||
| modeSkip processingMode = iota | ||
| // modeSend indicates that the header/trailer/body should be sent. | ||
| modeSend | ||
| ) | ||
|
|
||
| type processingModes struct { | ||
| requestHeaderMode processingMode | ||
| responseHeaderMode processingMode | ||
| responseTrailerMode processingMode | ||
| requestBodyMode processingMode | ||
| responseBodyMode processingMode | ||
| } | ||
|
|
||
| // headerMutationRules specifies the rules for what modifications an external | ||
| // processing server may make to headers sent on the data plane RPC. | ||
| type headerMutationRules struct { | ||
| // allowExpr specifies a regular expression that matches the headers that can | ||
| // be mutated. | ||
| allowExpr *regexp.Regexp | ||
| // disallowExpr specifies a regular expression that matches the headers that | ||
| // cannot be mutated. This overrides the above allowExpr if a header matches | ||
| // both. | ||
| disallowExpr *regexp.Regexp | ||
| // disallowAll specifies that no header mutations are allowed. This overrides | ||
| // all other settings. | ||
| disallowAll bool | ||
| // disallowIsError specifies whether to return an error if a header mutation | ||
| // is disallowed. If true, the data plane RPC will be failed with a grpc | ||
| // status code of Unknown. | ||
| disallowIsError bool | ||
| } | ||
|
|
||
| // serverConfig contains the configuration for an external server. | ||
| type serverConfig struct { | ||
| // targetURI is the name of the external server. | ||
| targetURI string | ||
| // channelCredentials specifies the transport credentials to use to connect to | ||
| // the external server. Must not be nil. | ||
| channelCredentials credentials.TransportCredentials | ||
| // callCredentials specifies the per-RPC credentials to use when making calls | ||
| // to the external server. | ||
| callCredentials []credentials.PerRPCCredentials | ||
| // timeout is the RPC timeout for the call to the external server. If unset, | ||
| // the timeout depends on the usage of this external server. For example, | ||
| // cases like ext_authz and ext_proc, where there is a 1:1 mapping between the | ||
| // data plane RPC and the external server call, the timeout will be capped by | ||
| // the timeout on the data plane RPC. For cases like RLQS where there is a | ||
| // side channel to the external server, an unset timeout will result in no | ||
| // timeout being applied to the external server call. | ||
| timeout time.Duration | ||
| // initialMetadata is the additional metadata to include in all RPCs sent to | ||
| // the external server. | ||
| initialMetadata metadata.MD | ||
| } | ||
|
|
||
| // newInterceptorConfig creates the interceptor config from the base and | ||
| // override filter configs. The base config is required and the override config | ||
| // is optional. If a field is set in both the base and override configs, the | ||
| // value from the override config will be used. | ||
| func newInterceptorConfig(base *v3procfilterpb.ExternalProcessor, override *v3procfilterpb.ExtProcOverrides) (*interceptorConfig, error) { | ||
| ic := &interceptorConfig{ | ||
| failureModeAllow: base.GetFailureModeAllow(), | ||
| requestAttributes: base.GetRequestAttributes(), | ||
| responseAttributes: base.GetResponseAttributes(), | ||
| observabilityMode: base.GetObservabilityMode(), | ||
| disableImmediateResponse: base.GetDisableImmediateResponse(), | ||
| } | ||
| if base.GetDeferredCloseTimeout() != nil { | ||
| ic.deferredCloseTimeout = base.GetDeferredCloseTimeout().AsDuration() | ||
| } else { | ||
| ic.deferredCloseTimeout = defaultDeferredCloseTimeout | ||
| } | ||
|
|
||
| var err error | ||
| if allowed := base.GetForwardRules().GetAllowedHeaders(); allowed != nil { | ||
| if ic.allowedHeaders, err = convertStringMatchers(allowed.GetPatterns()); err != nil { | ||
| return nil, fmt.Errorf("invalid allowed header matcher: %v", err) | ||
| } | ||
| } | ||
| if disallowed := base.GetForwardRules().GetDisallowedHeaders(); disallowed != nil { | ||
| if ic.disallowedHeaders, err = convertStringMatchers(disallowed.GetPatterns()); err != nil { | ||
| return nil, fmt.Errorf("invalid disallowed header matcher: %v", err) | ||
| } | ||
| } | ||
|
|
||
| if mr := base.GetMutationRules(); mr != nil { | ||
| if allowexp := mr.GetAllowExpression(); allowexp != nil { | ||
| if ic.mutationRules.allowExpr, err = regexp.Compile(allowexp.GetRegex()); err != nil { | ||
| return nil, fmt.Errorf("invalid allow expression: %v", err) | ||
| } | ||
| } | ||
| if disallowexp := mr.GetDisallowExpression(); disallowexp != nil { | ||
| if ic.mutationRules.disallowExpr, err = regexp.Compile(disallowexp.GetRegex()); err != nil { | ||
| return nil, fmt.Errorf("invalid disallow expression: %v", err) | ||
| } | ||
| } | ||
| ic.mutationRules.disallowAll = mr.GetDisallowAll().GetValue() | ||
| ic.mutationRules.disallowIsError = mr.GetDisallowIsError().GetValue() | ||
| } | ||
| if ic.server, err = serverConfigFromGrpcService(base.GetGrpcService()); err != nil { | ||
| return nil, fmt.Errorf("failed to parse gRPC service config: %v", err) | ||
| } | ||
|
|
||
| pm := base.GetProcessingMode() | ||
| // The default processing mode is to send headers and skip body and | ||
| // trailers. | ||
| ic.processingMode.requestHeaderMode = resolveHeaderMode(pm.GetRequestHeaderMode(), modeSend) | ||
| ic.processingMode.responseHeaderMode = resolveHeaderMode(pm.GetResponseHeaderMode(), modeSend) | ||
| ic.processingMode.responseTrailerMode = resolveHeaderMode(pm.GetResponseTrailerMode(), modeSkip) | ||
| ic.processingMode.requestBodyMode = resolveBodyMode(pm.GetRequestBodyMode()) | ||
| ic.processingMode.responseBodyMode = resolveBodyMode(pm.GetResponseBodyMode()) | ||
|
|
||
| if override == nil { | ||
| return ic, nil | ||
| } | ||
| // Apply overrides if present. | ||
| if gs := override.GetGrpcService(); gs != nil { | ||
| sc, err := serverConfigFromGrpcService(gs) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| ic.server = sc | ||
| } | ||
| if override.GetFailureModeAllow() != nil { | ||
| ic.failureModeAllow = override.GetFailureModeAllow().GetValue() | ||
| } | ||
| if override.GetRequestAttributes() != nil { | ||
| ic.requestAttributes = override.GetRequestAttributes() | ||
| } | ||
| if override.GetResponseAttributes() != nil { | ||
| ic.responseAttributes = override.GetResponseAttributes() | ||
| } | ||
| if pm := override.GetProcessingMode(); pm != nil { | ||
| ic.processingMode.requestHeaderMode = resolveHeaderMode(pm.GetRequestHeaderMode(), modeSend) | ||
| ic.processingMode.responseHeaderMode = resolveHeaderMode(pm.GetResponseHeaderMode(), modeSend) | ||
| ic.processingMode.responseTrailerMode = resolveHeaderMode(pm.GetResponseTrailerMode(), modeSkip) | ||
| ic.processingMode.requestBodyMode = resolveBodyMode(pm.GetRequestBodyMode()) | ||
| ic.processingMode.responseBodyMode = resolveBodyMode(pm.GetResponseBodyMode()) | ||
| } | ||
| return ic, nil | ||
| } | ||
|
|
||
| // convertStringMatchers converts a slice of protobuf StringMatcher messages to | ||
| // a slice of matcher.StringMatcher. | ||
| func convertStringMatchers(patterns []*v3matcherpb.StringMatcher) ([]matcher.StringMatcher, error) { | ||
| var matchers []matcher.StringMatcher | ||
| for _, m := range patterns { | ||
| sm, err := matcher.StringMatcherFromProto(m) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| matchers = append(matchers, sm) | ||
| } | ||
| return matchers, nil | ||
| } | ||
|
|
||
| // resolveHeaderMode resolves the processing mode for headers based on the | ||
| // protobuf enum value. If the mode is not set or set to Default, it returns the | ||
| // provided defaultMode. | ||
| func resolveHeaderMode(mode v3procfilterpb.ProcessingMode_HeaderSendMode, defaultMode processingMode) processingMode { | ||
| switch mode { | ||
| case v3procfilterpb.ProcessingMode_SEND: | ||
| return modeSend | ||
| case v3procfilterpb.ProcessingMode_SKIP: | ||
| return modeSkip | ||
| default: | ||
| return defaultMode | ||
| } | ||
| } | ||
|
|
||
| // resolveBodyMode resolves the processing mode for body based on the protobuf | ||
| // enum value. If the mode is not set (i.e., default), it returns modeSkip, as | ||
| // the default for body is to skip. | ||
| func resolveBodyMode(mode v3procfilterpb.ProcessingMode_BodySendMode) processingMode { | ||
| switch mode { | ||
| case v3procfilterpb.ProcessingMode_GRPC: | ||
| return modeSend | ||
| case v3procfilterpb.ProcessingMode_NONE: | ||
| return modeSkip | ||
| default: | ||
| return modeSkip | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,97 @@ | ||
| /* | ||
| * | ||
| * Copyright 2026 gRPC authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| * | ||
| */ | ||
|
|
||
| // Package extproc implements the Envoy external processing filter. | ||
| package extproc | ||
|
|
||
| import ( | ||
| "fmt" | ||
|
|
||
| "google.golang.org/grpc" | ||
| "google.golang.org/grpc/internal/resolver" | ||
| "google.golang.org/grpc/internal/xds/httpfilter" | ||
|
|
||
| v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" | ||
| v3procservicepb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" | ||
| ) | ||
|
|
||
| var serverConfigFromGrpcService func(grpcService *v3corepb.GrpcService) (serverConfig, error) | ||
|
eshitachandwani marked this conversation as resolved.
Outdated
|
||
|
|
||
| type builder struct{} | ||
|
|
||
| func (builder) BuildClientFilter() httpfilter.ClientFilter { | ||
| return clientFilter{} | ||
| } | ||
|
|
||
| var _ httpfilter.ClientFilterBuilder = builder{} | ||
|
|
||
| type clientFilter struct{} | ||
|
|
||
| func (clientFilter) Close() {} | ||
|
|
||
| func (clientFilter) BuildClientInterceptor(cfg, override httpfilter.FilterConfig) (resolver.ClientInterceptor, error) { | ||
| if cfg == nil { | ||
| return nil, fmt.Errorf("extproc: nil config provided") | ||
| } | ||
|
|
||
| c, ok := cfg.(baseConfig) | ||
| if !ok { | ||
| return nil, fmt.Errorf("extproc: incorrect config type provided (%T): %v", cfg, cfg) | ||
| } | ||
| var ov overrideConfig | ||
| if override != nil { | ||
| ov, ok = override.(overrideConfig) | ||
| if !ok { | ||
| return nil, fmt.Errorf("extproc: incorrect override config type provided (%T): %v", override, override) | ||
| } | ||
| } | ||
|
|
||
| config, err := newInterceptorConfig(c.config, ov.config) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("extproc: %v", err) | ||
| } | ||
|
|
||
| dOpts := []grpc.DialOption{grpc.WithTransportCredentials(config.server.channelCredentials)} | ||
| for _, creds := range config.server.callCredentials { | ||
| dOpts = append(dOpts, grpc.WithPerRPCCredentials(creds)) | ||
| } | ||
| cc, err := grpc.NewClient(config.server.targetURI, dOpts...) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("extproc: failed to create client: %v", err) | ||
| } | ||
| extClient := v3procservicepb.NewExternalProcessorClient(cc) | ||
|
eshitachandwani marked this conversation as resolved.
|
||
|
|
||
| return &interceptor{ | ||
| config: config, | ||
| extClient: extClient, | ||
| cc: cc, | ||
| }, nil | ||
| } | ||
|
|
||
| type interceptor struct { | ||
| resolver.ClientInterceptor | ||
|
eshitachandwani marked this conversation as resolved.
eshitachandwani marked this conversation as resolved.
|
||
| config *interceptorConfig | ||
| extClient v3procservicepb.ExternalProcessorClient | ||
| cc *grpc.ClientConn | ||
| } | ||
|
|
||
| func (i *interceptor) Close() { | ||
| if i.cc != nil { | ||
| i.cc.Close() | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.