-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Updated All Externally Visible Logs/Docs from Runner V2 to Portable Runner #38532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
c9445ef
dbb9826
ac3716d
1069559
50b4120
0af5af7
b846cec
4013e3d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -334,7 +334,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions | |||||
|
|
||||||
| experiments := jobopts.GetExperiments() | ||||||
| // Ensure that we enable the same set of experiments across all SDKs | ||||||
| // for runner v2. | ||||||
| // for Portable Runner. | ||||||
| var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool | ||||||
| for _, e := range experiments { | ||||||
| if strings.Contains(e, "beam_fn_api") { | ||||||
|
|
@@ -349,8 +349,9 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions | |||||
| if strings.Contains(e, "use_portable_job_submission") { | ||||||
| portaSubmission = true | ||||||
| } | ||||||
| if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") { | ||||||
| return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+") | ||||||
| // enable_portable_runner is not documented and hence wont be set by default. This will be fixed in later versions. | ||||||
| if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") || strings.Contains(e, "disable_portable_runner") || strings.Contains(e, "enable_streaming_java_runner") { | ||||||
| return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2 | disable_portable_runner | enable_streaming_java_runner. Disabling runner v2 is no longer supported as of Beam version 2.45.0+") | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For consistency with the rest of the changes in this PR and the Python SDK implementation, "runner v2" in the error message should be updated to "Portable Runner".
Suggested change
|
||||||
| } | ||||||
| } | ||||||
| // Enable default experiments. | ||||||
|
|
@@ -368,7 +369,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions | |||||
| } | ||||||
|
|
||||||
| // Ensure that streaming specific experiments are set for streaming pipelines | ||||||
| // since runner v2 only supports using streaming engine. | ||||||
| // since Portable Runner only supports using streaming engine. | ||||||
| if streaming { | ||||||
| if !seSet { | ||||||
| experiments = append(experiments, "enable_streaming_engine") | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -244,6 +244,40 @@ func TestGetJobOptions_DisableRunnerV2ExperimentsSet(t *testing.T) { | |
| } | ||
| } | ||
|
|
||
| func TestGetJobOptions_DisablePortableRunnerExperimentsSet(t *testing.T) { | ||
| resetGlobals() | ||
| *stagingLocation = "gs://testStagingLocation" | ||
| *gcpopts.Project = "testProject" | ||
| *gcpopts.Region = "testRegion" | ||
| *jobopts.Experiments = "disable_portable_runner" | ||
|
|
||
| opts, err := getJobOptions(context.Background(), false) | ||
|
|
||
| if err == nil { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what are these testing? It seems it is verifying errors, but the test name doesn't indicate we expect a failure. If we do it woudl be better to verify the error more specifically too ditto for the other test |
||
| t.Error("getJobOptions() returned error nil, want an error") | ||
| } | ||
| if opts != nil { | ||
| t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) | ||
| } | ||
| } | ||
|
|
||
| func TestGetJobOptions_EnableStreamingJavaRunnerExperimentsSet(t *testing.T) { | ||
| resetGlobals() | ||
| *stagingLocation = "gs://testStagingLocation" | ||
| *gcpopts.Project = "testProject" | ||
| *gcpopts.Region = "testRegion" | ||
| *jobopts.Experiments = "enable_streaming_java_runner" | ||
|
|
||
| opts, err := getJobOptions(context.Background(), false) | ||
|
|
||
| if err == nil { | ||
| t.Error("getJobOptions() returned error nil, want an error") | ||
| } | ||
| if opts != nil { | ||
| t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts) | ||
| } | ||
| } | ||
|
|
||
| func TestGetJobOptions_NoStagingLocation(t *testing.T) { | ||
| resetGlobals() | ||
| *stagingLocation = "" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,7 +67,7 @@ Perform Beam runner specific setup. | |
|
|
||
| ℹ️ Note that cross-language transforms require | ||
| portable implementations of Spark/Flink/Direct runners. Dataflow requires | ||
| [runner V2](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2). | ||
| [Portable Runner](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2). | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about |
||
| See [here](https://beam.apache.org/documentation/runners/dataflow/) for | ||
| instructions for setting up Dataflow. | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6764,9 +6764,9 @@ class StreamingConfigTask(_messages.Message): | |
| format version for streaming engine jobs. | ||
| userStepToStateFamilyNameMap: Map from user step names to state families. | ||
| userWorkerRunnerV1Settings: Binary encoded proto to control runtime | ||
| behavior of the java runner v1 user worker. | ||
| behavior of the Streaming Java Runner user worker. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. think this file is generated |
||
| userWorkerRunnerV2Settings: Binary encoded proto to control runtime | ||
| behavior of the runner v2 user worker. | ||
| behavior of the Portable Runner user worker. | ||
| windmillServiceEndpoint: If present, the worker must use this endpoint to | ||
| communicate with Windmill Service dispatchers, otherwise the worker must | ||
| continue to use whatever endpoint it had been using. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The word "streaming" is redundant here as "Streaming Java Runner" already includes it.