Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"comment": "Trigger file for PostCommit Python Portable Flink tests"
}
"comment": "Trigger file for PostCommit Python Portable Flink tests",
"modification": "2"
}
Original file line number Diff line number Diff line change
Expand Up @@ -1797,15 +1797,35 @@ private static JsonDeserializer<Object> getDeserializerForMethod(Method method)
}

static Object deserializeNode(JsonNode node, Method method) throws IOException {
if (node.isNull()) {
if (node == null || node.isNull() || node.isMissingNode()) {
return null;
}
if (node.isTextual() && node.asText().isEmpty()) {
return null;
}

JavaType targetType = MAPPER.constructType(method.getGenericReturnType());

// Portable pipeline options from protobuf Struct are usually string-encoded scalars.
// convertValue coerces them reliably; TreeTraversingParser + StdDeserializer can NPE on
// Jackson 2.18+ (e.g. LongDeserializer._parseLong when parsing string values).
if (node.isValueNode()) {
return MAPPER.convertValue(node, targetType);
}
Comment thread
aIbrahiim marked this conversation as resolved.
Outdated

JsonDeserializer<Object> jsonDeserializer = getDeserializerForMethod(method);
if (jsonDeserializer == null) {
return MAPPER.convertValue(node, targetType);
}

JsonParser parser = new TreeTraversingParser(node, MAPPER);
parser.nextToken();

JsonDeserializer<Object> jsonDeserializer = getDeserializerForMethod(method);
return jsonDeserializer.deserialize(parser, DESERIALIZATION_CONTEXT.copy());
try {
return jsonDeserializer.deserialize(parser, DESERIALIZATION_CONTEXT.copy());
} catch (RuntimeException e) {
return MAPPER.convertValue(node, targetType);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,8 @@ public void populateDisplayData(DisplayData.Builder builder) {
continue;
}

JsonNode jsonNode = jsonOption.getValue();

for (PipelineOptionSpec spec : specs) {
if (!spec.shouldSerialize()) {
continue;
Expand All @@ -443,7 +445,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
continue;
}

Object value = getValueFromJson(jsonOption.getKey(), spec.getGetterMethod());
Object value = getValueFromJson(jsonNode, spec.getGetterMethod());
DisplayDataValue resolved = DisplayDataValue.resolve(value);
builder.add(
DisplayData.item(jsonOption.getKey(), resolved.getType(), resolved.getValue())
Expand Down Expand Up @@ -600,11 +602,13 @@ public String toString() {
* @return An object matching the return type of the method passed in.
*/
private Object getValueFromJson(String propertyName, Method method) {
JsonNode jsonNode = jsonOptions.get(propertyName);
return getValueFromJson(jsonNode, method);
return getValueFromJson(jsonOptions.get(propertyName), method);
}

private static Object getValueFromJson(JsonNode node, Method method) {
private static Object getValueFromJson(@Nullable JsonNode node, Method method) {
if (node == null || node.isNull() || node.isMissingNode()) {
return null;
}
try {
return PipelineOptionsFactory.deserializeNode(node, method);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,85 @@ public void structWithNullOptionsDeserializes() throws Exception {

assertThat(deserialized, notNullValue());
}

@Test
public void nullKnownOptionSerializesToProto() {
PipelineOptionsFactory.register(TestBoxedOptions.class);
Struct serialized =
Struct.newBuilder()
.putFields(
"beam:option:example:v1",
Value.newBuilder().setNullValue(NullValue.NULL_VALUE).build())
.build();
PipelineOptions deserialized = PipelineOptionsTranslation.fromProto(serialized);
PipelineOptionsTranslation.toProto(deserialized);
}

@Test
public void stringEncodedStreamingOptionsRoundTripToProto() {
PipelineOptionsFactory.register(TestStreamingLikeOptions.class);
Struct serialized =
Struct.newBuilder()
.putFields(
"beam:option:streaming:v1", Value.newBuilder().setStringValue("true").build())
.putFields(
"beam:option:parallelism:v1", Value.newBuilder().setStringValue("2").build())
.putFields(
"beam:option:checkpointing_interval:v1",
Value.newBuilder().setStringValue("3000").build())
.putFields(
"beam:option:shutdown_sources_after_idle_ms:v1",
Value.newBuilder().setStringValue("60000").build())
.putFields(
"beam:option:number_of_execution_retries:v1",
Value.newBuilder().setStringValue("1").build())
.build();
PipelineOptions deserialized = PipelineOptionsTranslation.fromProto(serialized);
PipelineOptionsTranslation.toProto(deserialized);
}

@Test
public void emptyStringLongOptionSerializesToProto() {
PipelineOptionsFactory.register(TestStreamingLikeOptions.class);
Struct serialized =
Struct.newBuilder()
.putFields(
"beam:option:checkpointing_interval:v1",
Value.newBuilder().setStringValue("").build())
.build();
PipelineOptions deserialized = PipelineOptionsTranslation.fromProto(serialized);
PipelineOptionsTranslation.toProto(deserialized);
}
}

/** {@link PipelineOptions} with a nullable boxed option for null struct value tests. */
public interface TestBoxedOptions extends PipelineOptions {
Integer getExample();

void setExample(Integer example);
}

/** Options with types matching Flink portable Kafka test pipeline settings. */
public interface TestStreamingLikeOptions extends PipelineOptions {
boolean isStreaming();

void setStreaming(boolean streaming);

Integer getParallelism();

void setParallelism(Integer parallelism);

Long getCheckpointingInterval();

void setCheckpointingInterval(Long interval);

Long getShutdownSourcesAfterIdleMs();

void setShutdownSourcesAfterIdleMs(Long timeoutMs);

Integer getNumberOfExecutionRetries();

void setNumberOfExecutionRetries(Integer retries);
}

/** {@link PipelineOptions} with an unserializable option. */
Expand Down
Loading