Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion runners/google-cloud-dataflow-java/arm/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ task printrunnerV2PipelineOptionsARM {
dependsOn buildAndPushDockerJavaMultiarchContainer

doLast {
println "To run a Dataflow job with runner V2 on ARM, add the following pipeline options to your command-line:"
println "To run a Dataflow job with Portable Runner on ARM, add the following pipeline options to your command-line:"
println runnerV2PipelineOptionsARM.join(' ')
}
}
Expand Down
10 changes: 5 additions & 5 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ task printRunnerV2PipelineOptions {
dependsOn buildAndPushDockerJavaContainer

doLast {
println "To run a Dataflow job with runner V2, add the following pipeline options to your command-line:"
println "To run a Dataflow job with Portable Runner, add the following pipeline options to your command-line:"
println runnerV2PipelineOptions.join(' ')
println "Please delete your image upon completion with the following command:"
println "docker rmi ${dockerJavaImageName}; gcloud container images delete --force-delete-tags ${dockerJavaImageName}"
Expand Down Expand Up @@ -471,7 +471,7 @@ def validatesRunnerStreamingConfig = [
excludedTests: [
// TODO(https://github.com/apache/beam/issues/21472)
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
// GroupIntoBatches.withShardedKey not supported on streaming runner v1
// GroupIntoBatches.withShardedKey not supported on streaming Streaming Java Runner
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The word "streaming" is redundant here as "Streaming Java Runner" already includes it.

    // GroupIntoBatches.withShardedKey not supported on Streaming Java Runner

// https://github.com/apache/beam/issues/22592
'org.apache.beam.sdk.transforms.GroupIntoBatchesTest.testWithShardedKeyInGlobalWindow',

Expand Down Expand Up @@ -570,7 +570,7 @@ createCrossLanguageValidatesRunnerTask(

task validatesRunnerV2 {
group = "Verification"
description = "Runs the ValidatesRunner tests on Dataflow Runner V2"
description = "Runs the ValidatesRunner tests on Dataflow Portable Runner"
dependsOn(createRunnerV2ValidatesRunnerTest(
name: 'validatesRunnerV2Test',
pipelineOptions: runnerV2PipelineOptions,
Expand Down Expand Up @@ -610,7 +610,7 @@ task validatesRunnerV2 {

task validatesRunnerV2Streaming {
group = "Verification"
description = "Runs the ValidatesRunner tests on Dataflow Runner V2 forcing streaming mode"
description = "Runs the ValidatesRunner tests on Dataflow Portable Runner forcing streaming mode"
dependsOn(createRunnerV2ValidatesRunnerTest(
name: 'validatesRunnerV2TestStreaming',
pipelineOptions: runnerV2PipelineOptions + ['--streaming', '--experiments=enable_streaming_engine'],
Expand Down Expand Up @@ -880,7 +880,7 @@ task postCommit {

task postCommitRunnerV2 {
group = "Verification"
description = "Various integration tests using the Dataflow runner V2."
description = "Various integration tests using the Dataflow Portable Runner."
dependsOn googleCloudPlatformRunnerV2IntegrationTest
dependsOn coreSDKJavaRunnerV2IntegrationTest
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1242,25 +1242,25 @@ private static boolean includesTransformUpgrades(Pipeline pipeline) {
@Override
public DataflowPipelineJob run(Pipeline pipeline) {
// Multi-language pipelines and pipelines that include upgrades should automatically be upgraded
// to Runner v2.
// to Portable Runner.
if (DataflowRunner.isMultiLanguagePipeline(pipeline) || includesTransformUpgrades(pipeline)) {
if (!useUnifiedWorker(options)) {
List<String> experiments = firstNonNull(options.getExperiments(), Collections.emptyList());
LOG.info(
"Automatically enabling Dataflow Runner v2 since the pipeline used cross-language"
"Automatically enabling Dataflow Portable Runner since the pipeline used cross-language"
+ " transforms or pipeline needed a transform upgrade.");
options.setExperiments(
ImmutableList.<String>builder().addAll(experiments).add("use_runner_v2").build());
}
}
if (useUnifiedWorker(options)) {
if (hasExperiment(options, "disable_runner_v2")
if (hasExperiment(options, "disable_runner_v2")
|| hasExperiment(options, "disable_runner_v2_until_2023")
|| hasExperiment(options, "disable_prime_runner_v2")
|| hasExperiment(options, "disable_portable_runner")
|| hasExperiment(options, "enable_streaming_java_runner")) {
throw new IllegalArgumentException(
"Runner V2 both disabled and enabled: at least one of ['beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set.");
"Portable Runner both disabled and enabled: at least one of ['enable_portable_runner', 'beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['enable_streaming_java_runner', 'disable_portable_runner', 'disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set.");
}
List<String> experiments =
new ArrayList<>(options.getExperiments()); // non-null if useUnifiedWorker is true
Expand Down Expand Up @@ -1374,10 +1374,10 @@ public DataflowPipelineJob run(Pipeline pipeline) {
options.as(SdkHarnessOptions.class).setPipelineProtoHash(pipelineProtoHash);

if (useUnifiedWorker(options)) {
LOG.info("Skipping v1 transform replacements since job will run on v2.");
LOG.info("Skipping Streaming Java Runner transform replacements since job will run on Portable Runner.");
} else {
// Now rewrite things to be as needed for v1 (mutates the pipeline)
// This way the job submitted is valid for v1 and v2, simultaneously
// Now rewrite things to be as needed for Streaming Java Runner (mutates the pipeline)
// This way the job submitted is valid for Streaming Java Runner and Portable Runner, simultaneously
replaceV1Transforms(pipeline);
}
// Capture the SdkComponents for look up during step translations
Expand All @@ -1388,7 +1388,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
.addAllDependencies(getDefaultArtifacts())
.addAllCapabilities(Environments.getJavaCapabilities())
.build());
// No need to perform transform upgrading for the Runner v1 proto.
// No need to perform transform upgrading for the Streaming Java Runner proto.
RunnerApi.Pipeline dataflowV1PipelineProto =
PipelineTranslation.toProto(pipeline, dataflowV1Components, true, false);

Expand Down Expand Up @@ -1544,7 +1544,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
options.setExperiments(experiments);
LOG.warn(
"The upload_graph experiment was specified, but it does not apply "
+ "to runner v2 jobs. Option has been automatically removed.");
+ "to Portable Runner jobs. Option has been automatically removed.");
}

// Upload the job to GCS and remove the graph object from the API call. The graph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1847,7 +1847,7 @@ public void testSettingConflictingEnableAndDisableExperimentsThrowsException() t
ExperimentalOptions.addExperiment(options, disabledExperiment);
Pipeline p = Pipeline.create(options);
p.apply(Create.of("A"));
assertThrows("Runner V2 both disabled and enabled", IllegalArgumentException.class, p::run);
assertThrows("Portable Runner both disabled and enabled", IllegalArgumentException.class, p::run);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ enum ConnectivityType {
CONNECTIVITY_TYPE_DIRECTPATH = 2;
}

// Settings to control runtime behavior of the java runner v1 user worker.
// Settings to control runtime behavior of the Streaming Java Runner user worker.
message UserWorkerRunnerV1Settings {
optional UserWorkerGrpcFlowControlSettings flow_control_settings = 3;

Expand Down
9 changes: 5 additions & 4 deletions sdks/go/pkg/beam/runners/dataflow/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions

experiments := jobopts.GetExperiments()
// Ensure that we enable the same set of experiments across all SDKs
// for runner v2.
// for Portable Runner.
var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool
for _, e := range experiments {
if strings.Contains(e, "beam_fn_api") {
Expand All @@ -349,8 +349,9 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions
if strings.Contains(e, "use_portable_job_submission") {
portaSubmission = true
}
if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") {
return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+")
// enable_portable_runner is not documented and hence wont be set by default. This will be fixed in later versions.
if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") || strings.Contains(e, "disable_portable_runner") || strings.Contains(e, "enable_streaming_java_runner") {
return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2 | disable_portable_runner | enable_streaming_java_runner. Disabling runner v2 is no longer supported as of Beam version 2.45.0+")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with the rest of the changes in this PR and the Python SDK implementation, "runner v2" in the error message should be updated to "Portable Runner".

Suggested change
return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2 | disable_portable_runner | enable_streaming_java_runner. Disabling runner v2 is no longer supported as of Beam version 2.45.0+")
return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2 | disable_portable_runner | enable_streaming_java_runner. Disabling Portable Runner is no longer supported as of Beam version 2.45.0+")

}
}
// Enable default experiments.
Expand All @@ -368,7 +369,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions
}

// Ensure that streaming specific experiments are set for streaming pipelines
// since runner v2 only supports using streaming engine.
// since Portable Runner only supports using streaming engine.
if streaming {
if !seSet {
experiments = append(experiments, "enable_streaming_engine")
Expand Down
34 changes: 34 additions & 0 deletions sdks/go/pkg/beam/runners/dataflow/dataflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,40 @@ func TestGetJobOptions_DisableRunnerV2ExperimentsSet(t *testing.T) {
}
}

func TestGetJobOptions_DisablePortableRunnerExperimentsSet(t *testing.T) {
resetGlobals()
*stagingLocation = "gs://testStagingLocation"
*gcpopts.Project = "testProject"
*gcpopts.Region = "testRegion"
*jobopts.Experiments = "disable_portable_runner"

opts, err := getJobOptions(context.Background(), false)

if err == nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are these testing? It seems it is verifying errors, but the test name doesn't indicate we expect a failure. If we do it woudl be better to verify the error more specifically too

ditto for the other test

t.Error("getJobOptions() returned error nil, want an error")
}
if opts != nil {
t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts)
}
}

func TestGetJobOptions_EnableStreamingJavaRunnerExperimentsSet(t *testing.T) {
resetGlobals()
*stagingLocation = "gs://testStagingLocation"
*gcpopts.Project = "testProject"
*gcpopts.Region = "testRegion"
*jobopts.Experiments = "enable_streaming_java_runner"

opts, err := getJobOptions(context.Background(), false)

if err == nil {
t.Error("getJobOptions() returned error nil, want an error")
}
if opts != nil {
t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts)
}
}

func TestGetJobOptions_NoStagingLocation(t *testing.T) {
resetGlobals()
*stagingLocation = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def _state_read_range(self, buffer_state, range_lo, range_hi):

def _state_clear_range(self, buffer_state, range_lo, range_hi):
"""Clears a specified range of elements from the buffer state."""
# TODO: Dataflow runner v2 gets stuck when MIN_TIMESTAMP is used
# TODO: Dataflow Portable Runner gets stuck when MIN_TIMESTAMP is used
# as the lower bound for clear_range. Investigate this further.
buffer_state.clear_range(range_lo, range_hi)

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/examples/kafkataxi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Perform Beam runner specific setup.

ℹ️ Note that cross-language transforms require
portable implementations of Spark/Flink/Direct runners. Dataflow requires
[runner V2](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2).
[Portable Runner](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about
Dataflow requires using the [Dataflow Portable Runner]

See [here](https://beam.apache.org/documentation/runners/dataflow/) for
instructions for setting up Dataflow.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ def test_streaming_with_fixed_num_streams(self):

@unittest.skip(
"Streaming to the Storage Write API sink with autosharding is broken "
"with Dataflow Runner V2.")
"with Dataflow Portable Runner.")
def test_streaming_with_auto_sharding(self):
self.skip_if_not_dataflow_runner()
table = 'streaming_with_auto_sharding'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def _translate_step_name(self, internal_name):
user_step_name = None
if (self._job_graph and internal_name
in self._job_graph.proto_pipeline.components.transforms.keys()):
# Dataflow Runner v2 with portable job submission uses proto transform map
# Dataflow Portable Runner with portable job submission uses proto transform map
# IDs for step names. Also PTransform.unique_name maps to user step names.
# Hence we lookup user step names based on the proto.
user_step_name = self._job_graph.proto_pipeline.components.transforms[
Expand Down
16 changes: 10 additions & 6 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@

BQ_SOURCE_UW_ERROR = (
'The Read(BigQuerySource(...)) transform is not supported with newer stack '
'features (Fn API, Dataflow Runner V2, etc). Please use the transform '
'features (Fn API, Dataflow Portable Runner, etc). Please use the transform '
'apache_beam.io.gcp.bigquery.ReadFromBigQuery instead.')


Expand Down Expand Up @@ -320,7 +320,7 @@ def visit_transform(self, applied_transform):
raise ValueError(
'CombineFn.setup and CombineFn.teardown are '
'not supported with non-portable Dataflow '
'runner. Please use Dataflow Runner V2 instead.')
'runner. Please use Dataflow Portable Runner instead.')

@staticmethod
def _overrides_setup_or_teardown(combinefn):
Expand All @@ -342,7 +342,7 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None):
"""Remotely executes entire pipeline or parts reachable from node."""
if _is_runner_v2_disabled(options):
raise ValueError(
'Disabling Runner V2 no longer supported '
'Disabling Portable Runner no longer supported '
'using Beam Python %s.' % beam.version.__version__)

# Label goog-dataflow-notebook if job is started from notebook.
Expand Down Expand Up @@ -591,6 +591,8 @@ def _add_runner_v2_missing_options(options):
debug_options.add_experiment('use_unified_worker')
debug_options.add_experiment('use_runner_v2')
debug_options.add_experiment('use_portable_job_submission')
# enable_portable_runner is not added by default as it is not documented.
# This behavior will be fixed in later versions.


def _check_and_add_missing_options(options):
Expand Down Expand Up @@ -648,8 +650,8 @@ def _check_and_add_missing_streaming_options(options):

:param options: PipelineOptions for this pipeline.
"""
# Streaming only supports using runner v2 (aka unified worker).
# Runner v2 only supports using streaming engine (aka windmill service)
# Streaming only supports using Portable Runner (aka unified worker).
# Portable Runner only supports using streaming engine (aka windmill service)
if options.view_as(StandardOptions).streaming:
debug_options = options.view_as(DebugOptions)
debug_options.add_experiment('enable_streaming_engine')
Expand All @@ -659,9 +661,11 @@ def _check_and_add_missing_streaming_options(options):
def _is_runner_v2_disabled(options):
# Type: (PipelineOptions) -> bool

"""Returns true if runner v2 is disabled."""
"""Returns true if Portable Runner is disabled."""
debug_options = options.view_as(DebugOptions)
return (
debug_options.lookup_experiment('disable_portable_runner') or
debug_options.lookup_experiment('enable_streaming_java_runner') or
debug_options.lookup_experiment('disable_runner_v2') or
debug_options.lookup_experiment('disable_runner_v2_until_2023') or
debug_options.lookup_experiment('disable_runner_v2_until_v2.50') or
Expand Down
19 changes: 19 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException
from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_options
from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_streaming_options
from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2_disabled
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
from apache_beam.runners.internal import names
from apache_beam.runners.runner import PipelineState
Expand Down Expand Up @@ -733,6 +734,24 @@ def test_explicit_streaming_no_unbounded(self):
p.result.job.proto.type,
apiclient.dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING)

def test_runner_v2_disabled_experiments_raise(self):
disable_experiments = [
'disable_portable_runner',
'enable_streaming_java_runner',
'disable_runner_v2',
'disable_runner_v2_until_2023',
'disable_runner_v2_until_v2.50',
'disable_prime_runner_v2',
]
for experiment in disable_experiments:
options = PipelineOptions([f'--experiments={experiment}'])
self.assertTrue(
_is_runner_v2_disabled(options),
f'Expected {experiment} to disable Portable Runner')
with self.assertRaisesRegex(ValueError,
'Disabling Portable Runner .* no longer supported'):
DataflowRunner().run_pipeline(None, options)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,7 @@ def to_split_int(n):


# TODO: Used in legacy batch worker. Move under MetricUpdateTranslators
# after Runner V2 transition.
# after Portable Runner transition.
def translate_distribution(distribution_update, metric_update_proto):
"""Translate metrics DistributionUpdate to dataflow distribution update.

Expand All @@ -1174,7 +1174,7 @@ def translate_distribution(distribution_update, metric_update_proto):
metric_update_proto.distribution = dist_update_proto


# TODO: Used in legacy batch worker. Delete after Runner V2 transition.
# TODO: Used in legacy batch worker. Delete after Portable Runner transition.
def translate_value(value, metric_update_proto):
metric_update_proto.integer = to_split_int(value)

Expand Down Expand Up @@ -1203,8 +1203,8 @@ def get_container_image_from_options(pipeline_options):
if worker_options.sdk_container_image:
return worker_options.sdk_container_image

# Legacy and runner v2 exist in different repositories.
# Set to legacy format, override if runner v2
# Legacy and Portable Runner exist in different repositories.
# Set to legacy format, override if Portable Runner
container_repo = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY
image_name = '{repository}/beam_python{major}.{minor}_sdk'.format(
repository=container_repo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6764,9 +6764,9 @@ class StreamingConfigTask(_messages.Message):
format version for streaming engine jobs.
userStepToStateFamilyNameMap: Map from user step names to state families.
userWorkerRunnerV1Settings: Binary encoded proto to control runtime
behavior of the java runner v1 user worker.
behavior of the Streaming Java Runner user worker.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

userWorkerRunnerV2Settings: Binary encoded proto to control runtime
behavior of the runner v2 user worker.
behavior of the Portable Runner user worker.
windmillServiceEndpoint: If present, the worker must use this endpoint to
communicate with Windmill Service dispatchers, otherwise the worker must
continue to use whatever endpoint it had been using.
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ def setup(self, *args, **kwargs):
before executing any of the other methods. The resources can then be
disposed of in ``CombineFn.teardown``.

If you are using Dataflow, you need to enable Dataflow Runner V2
If you are using Dataflow, you need to enable Dataflow Portable Runner
before using this feature.

Args:
Expand Down Expand Up @@ -1194,7 +1194,7 @@ def extract_output(self, accumulator, *args, **kwargs):
def teardown(self, *args, **kwargs):
"""Called to clean up an instance before it is discarded.

If you are using Dataflow, you need to enable Dataflow Runner V2
If you are using Dataflow, you need to enable Dataflow Portable Runner
before using this feature.

Args:
Expand Down
Loading