diff --git a/docs/otap-spec.md b/docs/otap-spec.md index 4fc418db36..0d34a4a0fc 100644 --- a/docs/otap-spec.md +++ b/docs/otap-spec.md @@ -444,8 +444,8 @@ properties for the `resource` column's `id` field. | resource.dropped_attributes_count | UInt32 | - | No | - | - | Number of dropped resource attributes | | scope | Struct | - | No | - | - | Instrumentation scope | | scope.id | UInt16 | - | No | [DELTA](#642-delta-encoding) | encoding | Foreign key to SCOPE_ATTRS | -| scope.name | Utf8 | - | No | - | - | Instrumentation scope name | -| scope.version | Utf8 | - | No | - | - | Instrumentation scope version | +| scope.name | Utf8 | Dict(u8), Dict(u16) | No | - | - | Instrumentation scope name | +| scope.version | Utf8 | Dict(u8), Dict(u16) | No | - | - | Instrumentation scope version | | scope.dropped_attributes_count | UInt32 | - | No | - | - | Number of dropped scope attributes | | schema_url | Utf8 | - | No | - | - | Span schema URL | | start_time_unix_nano | Timestamp(Nanosecond) | - | Yes | - | - | Span start time in Unix nanoseconds | @@ -495,12 +495,12 @@ properties for the `resource` column's `id` field. | id | UInt16 | - | Yes | [DELTA](#642-delta-encoding) | encoding | Metric identifier (primary key) | | resource | Struct | - | No | - | - | Resource information | | resource.id | UInt16 | - | No | [DELTA](#642-delta-encoding) | encoding | Foreign key to RESOURCE_ATTRS | -| resource.schema_url | Utf8 | - | No | - | - | Resource schema URL | +| resource.schema_url | Utf8 | Dict(u8), Dict(u16) | No | - | - | Resource schema URL | | resource.dropped_attributes_count | UInt32 | - | No | - | - | Number of dropped resource attributes | | scope | Struct | - | No | - | - | Instrumentation scope information | | scope.id | UInt16 | - | No | [DELTA](#642-delta-encoding) | encoding | Foreign key to SCOPE_ATTRS | -| scope.name | Utf8 | - | No | - | - | Instrumentation scope name | -| scope.version | Utf8 | - | No | - | - | Instrumentation scope version | +| scope.name | Utf8 | Dict(u8), Dict(u16) | No | - | - | Instrumentation scope name | +| scope.version | Utf8 | Dict(u8), Dict(u16) | No | - | - | Instrumentation scope version | | scope.dropped_attributes_count | UInt32 | - | No | - | - | Number of dropped scope attributes | | schema_url | Utf8 | - | No | - | - | Metric schema URL | | metric_type | UInt8 | - | Yes | - | - | Metric type enum (Gauge, Sum, Histogram, etc.) | diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs index 2a8560f442..515dd2f034 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/assign.rs @@ -18,14 +18,14 @@ use std::rc::Rc; use std::sync::Arc; use arrow::array::{ - Array, ArrayRef, BooleanArray, DictionaryArray, Float64Array, Int64Array, NullArray, + Array, ArrayRef, AsArray, BooleanArray, DictionaryArray, Float64Array, Int64Array, NullArray, RecordBatch, StringArray, StructArray, UInt8Array, UInt16Array, }; use arrow::buffer::BooleanBuffer; use arrow::compute::kernels::cmp::{eq, neq}; use arrow::compute::kernels::merge::merge; use arrow::compute::{and_not, cast, filter, max, take}; -use arrow::datatypes::{DataType, Field, Schema, UInt16Type}; +use arrow::datatypes::{DataType, Field, Fields, Schema, UInt16Type}; use async_trait::async_trait; use data_engine_expressions::QueryLocation; use datafusion::config::ConfigOptions; @@ -53,11 +53,12 @@ use crate::pipeline::expr::join::{ RootToAttributesJoin, }; use crate::pipeline::expr::types::{ - ExprLogicalType, root_field_supports_dict_encoding, root_field_type, + ExprLogicalType, nested_struct_field_type, root_field_supports_dict_encoding, root_field_type, }; use crate::pipeline::expr::{ DataScope, ExprPhysicalPlanner, LogicalExprDataSource, PhysicalExprEvalResult, - SCALAR_RECORD_BATCH_INPUT, ScopedLogicalExpr, ScopedPhysicalExpr, VALUE_COLUMN_NAME, + RootParentStruct, SCALAR_RECORD_BATCH_INPUT, ScopedLogicalExpr, ScopedPhysicalExpr, + VALUE_COLUMN_NAME, }; use crate::pipeline::planner::{AttributesIdentifier, ColumnAccessor}; use crate::pipeline::project::anyval::{ @@ -419,6 +420,161 @@ impl AssignPipelineStage { Ok(otap_batch) } + /// try to assign an all-null value to a field within a struct column in the root record batch. + /// In practice, this just means removing the field from the struct. This will return an error + /// if it turns out the field is not nullable. If the struct column or the field does not exist, + /// this is a no-op. + fn assign_null_struct_field( + &self, + mut otap_batch: OtapArrowRecords, + struct_col_name: &str, + field_name: &str, + ) -> Result { + let root_batch = match otap_batch.root_record_batch() { + Some(rb) => rb, + None => { + // nothing to do + return Ok(otap_batch); + } + }; + + let schema = root_batch.schema_ref(); + let maybe_struct_col = schema.fields().find(struct_col_name); + if let Some((struct_col_index, struct_field)) = maybe_struct_col { + // the column should always be a struct, but guard against schema corruption + let DataType::Struct(struct_fields) = struct_field.data_type() else { + return Err(Error::ExecutionError { + cause: format!( + "expected struct column '{struct_col_name}' to have DataType::Struct, \ + found {:?}", + struct_field.data_type() + ), + }); + }; + + let maybe_found_field = struct_fields.find(field_name); + if let Some((field_index, field)) = maybe_found_field { + if field.is_nullable() { + let struct_array = root_batch.column(struct_col_index).as_struct(); + let mut new_fields = struct_fields.to_vec(); + _ = new_fields.remove(field_index); + let mut new_columns = struct_array.columns().to_vec(); + _ = new_columns.remove(field_index); + + let root_payload_type = otap_batch.root_payload_type(); + if new_columns.is_empty() { + let mut new_root_batch = root_batch.clone(); + _ = new_root_batch.remove_column(struct_col_index); + otap_batch.set(root_payload_type, new_root_batch)?; + } else { + let new_struct = Arc::new(StructArray::new( + Fields::from(new_fields), + new_columns, + struct_array.nulls().cloned(), + )); + otap_batch.set( + root_payload_type, + try_upsert_column(struct_col_name, new_struct, root_batch)?, + )?; + } + } else { + return Err(Error::ExecutionError { + cause: format!( + "cannot assign null to non-nullable field '{field_name}' \ + on struct column '{struct_col_name}'" + ), + }); + } + } + } + + Ok(otap_batch) + } + + fn assign_to_struct_column( + &self, + mut otap_batch: OtapArrowRecords, + mut eval_result: PhysicalExprEvalResult, + dest_scope: &Rc, + dest_column_name: &str, + dest_field_name: &str, + ) -> Result { + let root_batch = match otap_batch.root_record_batch() { + Some(rb) => rb, + None => { + // nothing to do + return Ok(otap_batch); + } + }; + + let column_supports_dict_encoding = + nested_struct_field_supports_dict_encoding(dest_column_name, dest_field_name); + + if let ColumnarValue::Array(values) = &eval_result.values { + if is_any_value_data_type(values.data_type()) { + let coerced = attempt_coerce_value_column_from_any_value_struct_column(values)?; + eval_result.values = ColumnarValue::Array(coerced); + } + } + + // Coerce static scalar integers to the destination field type (e.g. AnyInt literal → UInt32). + // Mirrors the same cast done in assign_to_root. + if let Some(dest_logical_type) = nested_struct_field_type(dest_field_name) { + if let Some(dest_arrow_type) = dest_logical_type.datatype() { + if eval_result.data_scope.as_ref() == &DataScope::StaticScalar + && eval_result.values.data_type().is_integer() + && dest_arrow_type.is_integer() + { + eval_result.values = eval_result.values.cast_to(&dest_arrow_type, None)?; + } + } + } + + let mut values = eval_result_to_array( + &eval_result.values, + column_supports_dict_encoding, + root_batch.num_rows(), + )?; + + // Check if the source rows are already aligned with the root batch rows. + // Scalars broadcast to any row count; Root and RootParent both live in the root batch + // so their row order matches. If the source is an Attributes batch, it has fewer rows + // (one per scope/resource) than the root batch (one per log/span/metric), so we need + // a join to expand and reorder the values to match the root batch row count. + let already_aligned = eval_result.data_scope.is_scalar() + || eval_result.data_scope.as_ref() == dest_scope.as_ref() + || matches!( + eval_result.data_scope.as_ref(), + DataScope::Root | DataScope::RootParent(_) + ); + + if !already_aligned { + let DataScope::Attributes(attrs_id, _) = eval_result.data_scope.as_ref() else { + unreachable!("unexpected data_scope") + }; + + let join_exec = RootToAttributesJoin::new(*attrs_id); + let vals_take_indices = join_exec.rows_to_take( + &PhysicalExprEvalResult::new( + ColumnarValue::Scalar(ScalarValue::Null), + Rc::clone(dest_scope), + root_batch, + ), + &eval_result, + &OtapArrowRecords::Logs(Logs::default()), + )?; + + values = take(&values, &vals_take_indices, None)?; + } + + otap_batch.set( + otap_batch.root_payload_type(), + try_upsert_struct_col(dest_column_name, dest_field_name, values, root_batch)?, + )?; + + Ok(otap_batch) + } + fn assign_to_attributes( &mut self, mut otap_batch: OtapArrowRecords, @@ -607,11 +763,8 @@ impl AssignPipelineStage { .rows_to_take(left_join_input, &eval_result, &otap_batch)? } } - DataScope::Root => RootAttrsToRootJoin::new().rows_to_take( - left_join_input, - &eval_result, - &otap_batch, - )?, + DataScope::Root | DataScope::RootParent(_) => RootAttrsToRootJoin::new() + .rows_to_take(left_join_input, &eval_result, &otap_batch)?, DataScope::StaticScalar => { // safety: if the data scope was scalar, the result would have also been a // Scalar which would have been handled above where we checked the @@ -792,26 +945,37 @@ impl PipelineStage for AssignPipelineStage { // support bulk assignment so we just evaluate the expressions and update the columns // one at a time for i in 0..self.sources.len() { - let dest_col_name = match &self.dest_columns[i] { - ColumnAccessor::ColumnName(col_name) => col_name, - other_dest => { - return Err(Error::NotYetSupportedError { - message: format!( - "assignment to column destination {:?} not yet supported", - other_dest - ), - }); - } - }; - let eval_result = self.sources[i].execute(&otap_batch, session_context)?; let dest_scope = &self.dest_scopes[i]; - otap_batch = match eval_result { - Some(eval_result) => { - self.assign_to_root(otap_batch, eval_result, dest_scope, dest_col_name) + match &self.dest_columns[i] { + ColumnAccessor::ColumnName(dest_col_name) => { + otap_batch = match eval_result { + Some(eval_result) => { + self.assign_to_root(otap_batch, eval_result, dest_scope, dest_col_name) + } + None => self.assign_null_root_column(otap_batch, dest_col_name), + }?; + } + ColumnAccessor::StructCol(struct_col_name, field_name) => { + otap_batch = match eval_result { + Some(eval_result) => self.assign_to_struct_column( + otap_batch, + eval_result, + dest_scope, + struct_col_name, + field_name, + ), + None => { + self.assign_null_struct_field(otap_batch, struct_col_name, field_name) + } + }?; } - None => self.assign_null_root_column(otap_batch, dest_col_name), - }?; + ColumnAccessor::Attributes(_, _) => { + unreachable!( + "attributes assignment should be handled separately before this loop" + ) + } + }; } Ok(otap_batch) @@ -1410,6 +1574,39 @@ fn validate_assign( }); } } + ColumnAccessor::StructCol(struct_name, field_name) => { + if !is_valid_struct_field(struct_name, field_name) { + return Err(Error::InvalidPipelineError { + cause: format!( + "cannot assign to field '{field_name}' on struct column '{struct_name}'" + ), + query_location: dest_query_location.cloned(), + }); + } + + let dest_type = nested_struct_field_type(field_name).ok_or_else(|| { + Error::InvalidPipelineError { + cause: format!("cannot assign to non-existent struct field '{field_name}'"), + query_location: dest_query_location.cloned(), + } + })?; + + let source_type = &source_logical_plan.expr_type; + if !can_assign_type(&dest_type, source_type) { + return Err(Error::InvalidPipelineError { + cause: format!( + "cannot assign expression of type {source_type:?} to type {dest_type:?}" + ), + query_location: dest_query_location.cloned(), + }); + } + + validate_struct_col_assign_cardinality( + struct_name, + dest_query_location, + source_logical_plan, + )?; + } ColumnAccessor::Attributes(dest_attrs_id, _) => { if !can_assign_type(&ExprLogicalType::AnyValue, &source_logical_plan.expr_type) { return Err(Error::InvalidPipelineError { @@ -1427,15 +1624,6 @@ fn validate_assign( source_logical_plan, )?; } - other_dest => { - // TODO other assignment destinations will be supported soon - return Err(Error::NotYetSupportedError { - message: format!( - "assignment to column destination {:?} not yet supported", - other_dest - ), - }); - } } Ok(()) @@ -1466,7 +1654,7 @@ fn validate_attribute_assign_cardinality( // we've already determined we're not assigning to a root attribute, so the // destination must be something that has a one:many relationship with root like // resource or scope - DataScope::Root => false, + DataScope::Root | DataScope::RootParent(_) => false, DataScope::Attributes(source_attrs_id, _) => { dest_attrs_id == *source_attrs_id @@ -1505,6 +1693,99 @@ fn validate_attribute_assign_cardinality( Ok(()) } +/// Returns true if the given field is a valid assignable field on the given struct column. +/// +/// `id` fields are excluded as they are internal OTAP identifiers and should not be +/// modified by users. +fn is_valid_struct_field(struct_name: &str, field_name: &str) -> bool { + matches!( + (struct_name, field_name), + (consts::RESOURCE, consts::SCHEMA_URL) + | (consts::RESOURCE, consts::DROPPED_ATTRIBUTES_COUNT) + | (consts::SCOPE, consts::NAME) + | (consts::SCOPE, consts::VERSION) + | (consts::SCOPE, consts::DROPPED_ATTRIBUTES_COUNT) + ) +} + +/// Returns true if the given struct field supports dictionary encoding for the given payload type. +fn nested_struct_field_supports_dict_encoding(struct_name: &str, field_name: &str) -> bool { + matches!( + (struct_name, field_name), + (consts::RESOURCE, consts::SCHEMA_URL) + | (consts::SCOPE, consts::NAME) + | (consts::SCOPE, consts::VERSION) + ) +} + +/// Validates that assigning to a struct column field does not involve a 1:many relationship. +/// +/// The hierarchy is RESOURCE (highest) > SCOPE > LOG/SPAN/METRIC (lowest). +/// Source data from a lower level cannot be assigned to a higher-level struct field because +/// one resource/scope row maps to many log rows, making the value ambiguous. +fn validate_struct_col_assign_cardinality( + dest_struct_name: &str, + dest_query_location: Option<&QueryLocation>, + source_logical_plan: &ScopedLogicalExpr, +) -> Result<()> { + match &source_logical_plan.source { + LogicalExprDataSource::DataSource(data_scope) => { + let is_valid = match data_scope { + DataScope::StaticScalar => true, + // root (log/span/metric level) is always lower than resource or scope + DataScope::Root => false, + DataScope::RootParent(source_parent) => match dest_struct_name { + consts::RESOURCE => { + matches!(source_parent, RootParentStruct::Resource) + } + consts::SCOPE => matches!( + source_parent, + RootParentStruct::Resource | RootParentStruct::Scope + ), + _ => false, + }, + DataScope::Attributes(source_attrs_id, _) => match dest_struct_name { + consts::RESOURCE => matches!( + source_attrs_id, + AttributesIdentifier::NonRoot(ArrowPayloadType::ResourceAttrs) + ), + consts::SCOPE => matches!( + source_attrs_id, + AttributesIdentifier::NonRoot(ArrowPayloadType::ResourceAttrs) + | AttributesIdentifier::NonRoot(ArrowPayloadType::ScopeAttrs) + ), + _ => false, + }, + }; + + if !is_valid { + return Err(Error::InvalidPipelineError { + cause: format!( + "cannot assign data scope {data_scope:?} to \ + struct column {dest_struct_name}" + ), + query_location: dest_query_location.cloned(), + }); + } + } + LogicalExprDataSource::Join(left, right) => { + validate_struct_col_assign_cardinality(dest_struct_name, dest_query_location, left)?; + validate_struct_col_assign_cardinality(dest_struct_name, dest_query_location, right)?; + } + LogicalExprDataSource::MultiJoin(children) => { + for child in children { + validate_struct_col_assign_cardinality( + dest_struct_name, + dest_query_location, + child, + )?; + } + } + } + + Ok(()) +} + /// Determine if the source type can be assigned to the destination. /// /// See comments on [`validate_assign`] for more details about what types are considered compatible @@ -1670,12 +1951,71 @@ fn try_upsert_column( new_column: ArrayRef, record_batch: &RecordBatch, ) -> Result { - let mut columns = record_batch.columns().to_vec(); - let schema = record_batch.schema(); - let fields = schema.fields(); + let (fields, columns) = try_upsert_array_in_columns( + column_name, + new_column, + record_batch.schema().fields(), + record_batch.columns().to_vec(), + )?; + + Ok(RecordBatch::try_new( + Arc::new(Schema::new(fields)), + columns, + )?) +} + +fn try_upsert_struct_col( + struct_column_name: &str, + field_name: &str, + new_column: ArrayRef, + record_batch: &RecordBatch, +) -> Result { + let rb_schema = record_batch.schema_ref(); + let rb_fields = rb_schema.fields(); + let maybe_found_column = rb_fields.find(struct_column_name); + let new_struct_col = if let Some((rb_col_index, current_field)) = maybe_found_column { + // upsert the column on the existing struct field + let struct_col = record_batch.column(rb_col_index); + let (new_struct_fields, new_struct_columns) = + if let DataType::Struct(struct_fields) = current_field.data_type() { + try_upsert_array_in_columns( + field_name, + new_column, + struct_fields, + struct_col.as_struct().columns().to_vec(), + )? + } else { + return Err(Error::ExecutionError { + cause: format!( + "expected struct column '{struct_column_name}' to have DataType::Struct, \ + found {:?}", + current_field.data_type() + ), + }); + }; + + StructArray::new( + new_struct_fields, + new_struct_columns, + struct_col.nulls().cloned(), + ) + } else { + // struct column doesn't exist yet - create it with just this one field + let new_field = Arc::new(Field::new(field_name, new_column.data_type().clone(), true)); + StructArray::new(Fields::from(vec![new_field]), vec![new_column], None) + }; + + try_upsert_column(struct_column_name, Arc::new(new_struct_col), record_batch) +} + +fn try_upsert_array_in_columns( + column_name: &str, + new_column: ArrayRef, + fields: &Fields, + mut columns: Vec, +) -> Result<(Fields, Vec)> { let maybe_found_column = fields.find(column_name); let mut fields = fields.to_vec(); - if let Some((target_col_index, current_field)) = maybe_found_column { // check that we're not assigning a column with nulls to a non-nullable column if !current_field.is_nullable() && new_column.null_count() != 0 { @@ -1723,10 +2063,7 @@ fn try_upsert_column( columns.push(new_column) } - Ok(RecordBatch::try_new( - Arc::new(Schema::new(fields)), - columns, - )?) + Ok((fields.into(), columns)) } #[cfg(test)] @@ -2968,6 +3305,715 @@ mod test { } } + async fn test_insert_scalar_to_struct_col() { + let logs_data = LogsData { + resource_logs: vec![ResourceLogs::new( + Resource::default(), + vec![ + ScopeLogs::new( + InstrumentationScope::build().name("scope1").finish(), + vec![LogRecord::build().event_name("event1").finish()], + ), + ScopeLogs::new( + InstrumentationScope::build().name("scope2").finish(), + vec![LogRecord::build().event_name("event2").finish()], + ), + ], + )], + }; + + let input_batch = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let pipeline_expr = P::parse("logs | extend instrumentation_scope.name = \"new_name\"") + .unwrap() + .pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + let result = pipeline.execute(input_batch).await.unwrap(); + + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let resource_0 = &result_logs_data.resource_logs[0]; + let expected = vec![ + ScopeLogs::new( + InstrumentationScope::build().name("new_name").finish(), + vec![LogRecord::build().event_name("event1").finish()], + ), + ScopeLogs::new( + InstrumentationScope::build().name("new_name").finish(), + vec![LogRecord::build().event_name("event2").finish()], + ), + ]; + assert_eq!(resource_0.scope_logs, expected); + } + + #[tokio::test] + async fn test_insert_scalar_to_struct_col_opl_parser() { + test_insert_scalar_to_struct_col::().await + } + + #[tokio::test] + async fn test_insert_scalar_to_struct_col_kql_parser() { + test_insert_scalar_to_struct_col::().await + } + + async fn test_struct_str_assign_from_actual_bigger_struct_str() { + let logs_data = LogsData::new(vec![ + ResourceLogs::new( + Resource::default(), + vec![ + ScopeLogs::new( + InstrumentationScope::build().name("scope1").finish(), + vec![LogRecord::build().event_name("event1").finish()], + ), + ScopeLogs::new( + InstrumentationScope::build().name("scope2").finish(), + vec![LogRecord::build().event_name("event2").finish()], + ), + ], + ) + .set_schema_url("schema_url_1"), + ResourceLogs::new( + Resource::default(), + vec![ScopeLogs::new( + InstrumentationScope::build().name("scope3").finish(), + vec![LogRecord::build().event_name("event3").finish()], + )], + ) + .set_schema_url("schema_url_2"), + ]); + + let query = "logs | extend instrumentation_scope.name = resource.schema_url"; + let pipeline_expr = P::parse(query).unwrap().pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let resource_0 = &result_logs_data.resource_logs[0]; + let expected = vec![ + ScopeLogs::new( + InstrumentationScope::build().name("schema_url_1").finish(), + vec![LogRecord::build().event_name("event1").finish()], + ), + ScopeLogs::new( + InstrumentationScope::build().name("schema_url_1").finish(), + vec![LogRecord::build().event_name("event2").finish()], + ), + ]; + assert_eq!(resource_0.scope_logs, expected); + + let resource_1 = &result_logs_data.resource_logs[1]; + let expected = vec![ScopeLogs::new( + InstrumentationScope::build().name("schema_url_2").finish(), + vec![LogRecord::build().event_name("event3").finish()], + )]; + assert_eq!(resource_1.scope_logs, expected); + } + + #[tokio::test] + async fn test_struct_str_assign_from_actual_bigger_struct_str_opl_parser() { + test_struct_str_assign_from_actual_bigger_struct_str::().await + } + + #[tokio::test] + async fn test_struct_str_assign_from_actual_bigger_struct_str_kql_parser() { + test_struct_str_assign_from_actual_bigger_struct_str::().await + } + + #[tokio::test] + async fn test_struct_str_assign_fails_from_smaller_struct_str() { + let logs_data = to_logs_data(vec![LogRecord::build().event_name("event1").finish()]); + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + + let query = "logs | extend resource.schema_url = instrumentation_scope.name"; + let pipeline_expr = OplParser::parse(query).unwrap().pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + let err = pipeline.execute(input).await.unwrap_err(); + assert!( + err.to_string() + .contains("cannot assign data scope RootParent(Scope) to struct column resource"), + "unexpected error: {}", + err + ); + } + + #[tokio::test] + async fn test_struct_str_assign_fails_from_root_str() { + let logs_data = to_logs_data(vec![LogRecord::build().event_name("event1").finish()]); + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + + let query = "logs | extend instrumentation_scope.name = event_name"; + let pipeline_expr = OplParser::parse(query).unwrap().pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + let err = pipeline.execute(input).await.unwrap_err(); + assert!( + err.to_string() + .contains("cannot assign data scope Root to struct column scope"), + "unexpected error: {}", + err + ); + } + + async fn test_struct_str_assign_from_self_attribute() { + let logs_data = LogsData::new(vec![ResourceLogs::new( + Resource::default(), + vec![ + ScopeLogs::new( + InstrumentationScope::build() + .name("scope1") + .attributes(vec![KeyValue::new("key", AnyValue::new_string("val1"))]) + .finish(), + vec![LogRecord::build().event_name("event1").finish()], + ), + ScopeLogs::new( + InstrumentationScope::build() + .name("scope2") + .attributes(vec![KeyValue::new("key", AnyValue::new_string("val2"))]) + .finish(), + vec![LogRecord::build().event_name("event2").finish()], + ), + ], + )]); + + let query = + "logs | extend instrumentation_scope.name = instrumentation_scope.attributes[\"key\"]"; + let pipeline_expr = P::parse(query).unwrap().pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let resource_0 = &result_logs_data.resource_logs[0]; + let expected = vec![ + ScopeLogs::new( + InstrumentationScope::build() + .name("val1") + .attributes(vec![KeyValue::new("key", AnyValue::new_string("val1"))]) + .finish(), + vec![LogRecord::build().event_name("event1").finish()], + ), + ScopeLogs::new( + InstrumentationScope::build() + .name("val2") + .attributes(vec![KeyValue::new("key", AnyValue::new_string("val2"))]) + .finish(), + vec![LogRecord::build().event_name("event2").finish()], + ), + ]; + assert_eq!(resource_0.scope_logs, expected); + } + + #[tokio::test] + async fn test_struct_str_assign_from_self_attribute_opl_parser() { + test_struct_str_assign_from_self_attribute::().await + } + + #[tokio::test] + async fn test_struct_str_assign_from_self_attribute_kql_parser() { + test_struct_str_assign_from_self_attribute::().await + } + + #[tokio::test] + async fn test_struct_str_assign_fails_from_smaller_attribute() { + let logs_data = to_logs_data(vec![LogRecord::build().event_name("event1").finish()]); + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + + let query = "logs | extend resource.schema_url = instrumentation_scope.attributes[\"key\"]"; + let pipeline_expr = OplParser::parse(query).unwrap().pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + let err = pipeline.execute(input).await.unwrap_err(); + assert!( + err.to_string().contains( + "cannot assign data scope Attributes(NonRoot(ScopeAttrs), \"key\") to struct column resource" + ), + "unexpected error: {}", + err + ); + } + + async fn test_struct_str_assign_from_bigger_struct_str() { + let logs_data = LogsData::new(vec![ + ResourceLogs::new( + Resource::build() + .attributes(vec![KeyValue::new( + "name", + AnyValue::new_string("resource1"), + )]) + .finish(), + vec![ + ScopeLogs::new( + InstrumentationScope::build().name("scope1").finish(), + vec![LogRecord::build().event_name("event1").finish()], + ), + ScopeLogs::new( + InstrumentationScope::build().name("scope2").finish(), + vec![LogRecord::build().event_name("event2").finish()], + ), + ], + ), + ResourceLogs::new( + Resource::build() + .attributes(vec![KeyValue::new( + "name", + AnyValue::new_string("resource2"), + )]) + .finish(), + vec![ScopeLogs::new( + InstrumentationScope::build().name("scope3").finish(), + vec![LogRecord::build().event_name("event3").finish()], + )], + ), + ]); + + let query = "logs | extend instrumentation_scope.name = resource.attributes[\"name\"]"; + let pipeline_expr = P::parse(query).unwrap().pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let resource_0 = &result_logs_data.resource_logs[0]; + let expected = vec![ + ScopeLogs::new( + InstrumentationScope::build().name("resource1").finish(), + vec![LogRecord::build().event_name("event1").finish()], + ), + ScopeLogs::new( + InstrumentationScope::build().name("resource1").finish(), + vec![LogRecord::build().event_name("event2").finish()], + ), + ]; + assert_eq!(resource_0.scope_logs, expected); + + let resource_1 = &result_logs_data.resource_logs[1]; + let expected = vec![ScopeLogs::new( + InstrumentationScope::build().name("resource2").finish(), + vec![LogRecord::build().event_name("event3").finish()], + )]; + assert_eq!(resource_1.scope_logs, expected); + } + + #[tokio::test] + async fn test_struct_str_assign_from_bigger_struct_str_opl_parser() { + test_struct_str_assign_from_bigger_struct_str::().await + } + + #[tokio::test] + async fn test_struct_str_assign_from_bigger_struct_str_kql_parser() { + test_struct_str_assign_from_bigger_struct_str::().await + } + + async fn test_struct_col_assign_fails_on_wrong_type() { + let logs_data = to_logs_data(vec![LogRecord::build().finish()]); + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + + let query = "logs | extend instrumentation_scope.name = 42"; + let pipeline_expr = P::parse(query).unwrap().pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + let err = pipeline.execute(input).await.unwrap_err(); + assert!( + err.to_string() + .contains("cannot assign expression of type AnyInt to type String"), + "unexpected error: {}", + err + ); + } + + #[tokio::test] + async fn test_struct_col_assign_fails_on_wrong_type_opl_parser() { + test_struct_col_assign_fails_on_wrong_type::().await + } + + #[tokio::test] + async fn test_struct_col_assign_fails_on_wrong_type_kql_parser() { + test_struct_col_assign_fails_on_wrong_type::().await + } + + async fn test_struct_col_assign_uint32_field() { + let logs_data = LogsData::new(vec![ResourceLogs::new( + Resource::default(), + vec![ + ScopeLogs::new( + InstrumentationScope::build().name("scope1").finish(), + vec![LogRecord::build().finish()], + ), + ScopeLogs::new( + InstrumentationScope::build().name("scope2").finish(), + vec![LogRecord::build().finish()], + ), + ], + )]); + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + + let query = "logs | extend instrumentation_scope.dropped_attributes_count = 42"; + let pipeline_expr = P::parse(query).unwrap().pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + let result = pipeline.execute(input).await.unwrap(); + + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let resource_0 = &result_logs_data.resource_logs[0]; + let expected = vec![ + ScopeLogs::new( + InstrumentationScope::build() + .name("scope1") + .dropped_attributes_count(42u32) + .finish(), + vec![LogRecord::build().finish()], + ), + ScopeLogs::new( + InstrumentationScope::build() + .name("scope2") + .dropped_attributes_count(42u32) + .finish(), + vec![LogRecord::build().finish()], + ), + ]; + assert_eq!(resource_0.scope_logs, expected); + } + + #[tokio::test] + async fn test_struct_col_assign_uint32_field_opl_parser() { + test_struct_col_assign_uint32_field::().await + } + + #[tokio::test] + async fn test_struct_col_assign_uint32_field_kql_parser() { + test_struct_col_assign_uint32_field::().await + } + + #[tokio::test] + async fn test_struct_col_assign_null_removes_nullable_field() { + let logs_data = LogsData::new(vec![ + ResourceLogs::new( + Resource::default(), + vec![ScopeLogs::new( + InstrumentationScope::default(), + vec![LogRecord::build().finish()], + )], + ) + .set_schema_url("existing_url"), + ]); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + + // resource has no attributes so resource.attributes["x"] evaluates to None, + // which triggers assign_null_struct_field for the nullable schema_url field + let query = "logs | extend resource.schema_url = resource.attributes[\"x\"]"; + let pipeline_expr = OplParser::parse(query).unwrap().pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + let result = pipeline.execute(input).await.unwrap(); + + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + // null assigned to nullable schema_url removes the field "" in OTLP + assert_eq!(result_logs_data.resource_logs[0].schema_url, ""); + } + + async fn test_struct_col_assign_from_func_call_expr() { + let logs_data = LogsData::new(vec![ResourceLogs::new( + Resource::default(), + vec![ + ScopeLogs::new( + InstrumentationScope::build().name("scope1_full").finish(), + vec![LogRecord::build().finish()], + ), + ScopeLogs::new( + InstrumentationScope::build().name("scope2_full").finish(), + vec![LogRecord::build().finish()], + ), + ], + )]); + + let query = "logs | extend instrumentation_scope.name = substring(instrumentation_scope.name, 0, 6)"; + let pipeline_expr = P::parse(query).unwrap().pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let resource_0 = &result_logs_data.resource_logs[0]; + let expected = vec![ + ScopeLogs::new( + InstrumentationScope::build().name("scope1").finish(), + vec![LogRecord::build().finish()], + ), + ScopeLogs::new( + InstrumentationScope::build().name("scope2").finish(), + vec![LogRecord::build().finish()], + ), + ]; + assert_eq!(resource_0.scope_logs, expected); + } + + #[tokio::test] + async fn test_struct_col_assign_from_func_call_expr_opl_parser() { + test_struct_col_assign_from_func_call_expr::().await + } + + #[tokio::test] + async fn test_struct_col_assign_from_func_call_expr_kql_parser() { + test_struct_col_assign_from_func_call_expr::().await + } + + #[test] + fn test_try_upsert_struct_col_when_struct_col_absent() { + use std::sync::Arc; + + use arrow::array::RecordBatch; + use arrow::datatypes::{Field, Schema}; + + let schema = Arc::new(Schema::new(vec![Field::new( + "event_name", + DataType::Utf8, + true, + )])); + let event_names: Arc = Arc::new(StringArray::from(vec!["event1", "event2"])); + let rb = RecordBatch::try_new(schema, vec![event_names]).unwrap(); + + let new_col: Arc = Arc::new(StringArray::from(vec!["url1", "url2"])); + let result = super::try_upsert_struct_col("resource", "schema_url", new_col, &rb).unwrap(); + + let resource_col = result + .column_by_name("resource") + .expect("resource struct column should have been created"); + let DataType::Struct(struct_fields) = resource_col.data_type() else { + panic!( + "expected Struct data type, got {:?}", + resource_col.data_type() + ); + }; + assert!( + struct_fields.find("schema_url").is_some(), + "schema_url field should exist in the newly created resource struct" + ); + let resource_struct = resource_col.as_any().downcast_ref::().unwrap(); + let url_values = resource_struct + .column_by_name("schema_url") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(url_values.value(0), "url1"); + assert_eq!(url_values.value(1), "url2"); + } + + async fn test_struct_col_assign_when_field_absent() { + // default ResourceLogs has no schema_url, so the resource struct will not have a + // schema_url field; assigning to it should add the field + let logs_data = to_logs_data(vec![ + LogRecord::build().event_name("event1").finish(), + LogRecord::build().event_name("event2").finish(), + ]); + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + + let query = "logs | extend resource.schema_url = \"new_url\""; + let pipeline_expr = P::parse(query).unwrap().pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + let result = pipeline.execute(input).await.unwrap(); + + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + assert_eq!( + result_logs_data.resource_logs[0].schema_url, "new_url", + "schema_url should have been added to the existing resource struct" + ); + } + + #[tokio::test] + async fn test_struct_col_assign_when_field_absent_opl_parser() { + test_struct_col_assign_when_field_absent::().await + } + + #[tokio::test] + async fn test_struct_col_assign_when_field_absent_kql_parser() { + test_struct_col_assign_when_field_absent::().await + } + + async fn test_assign_to_schema_url() { + let logs_data = LogsData::new(vec![ + ResourceLogs::new( + Resource::default(), + vec![ScopeLogs::new( + InstrumentationScope::default(), + vec![LogRecord::build().finish()], + )], + ) + .set_schema_url("old_url"), + ]); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + + let query = "logs | extend resource.schema_url = \"new_url\""; + let pipeline_expr = P::parse(query).unwrap().pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + let result = pipeline.execute(input).await.unwrap(); + + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + assert_eq!(result_logs_data.resource_logs[0].schema_url, "new_url"); + } + + #[tokio::test] + async fn test_assign_to_schema_url_opl_parser() { + test_assign_to_schema_url::().await + } + + #[tokio::test] + async fn test_assign_to_schema_url_kql_parser() { + test_assign_to_schema_url::().await + } + + async fn test_assign_multiple_struct_cols() { + let logs_data = LogsData::new(vec![ + ResourceLogs::new( + Resource::default(), + vec![ScopeLogs::new( + InstrumentationScope::build().name("old_name").finish(), + vec![LogRecord::build().finish()], + )], + ) + .set_schema_url("old_url"), + ]); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + + let query = "logs | extend instrumentation_scope.name = \"new_name\", resource.schema_url = \"new_url\""; + let pipeline_expr = P::parse(query).unwrap().pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + let result = pipeline.execute(input).await.unwrap(); + + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + let resource_0 = &result_logs_data.resource_logs[0]; + let expected_schema_url = "new_url"; + assert_eq!(resource_0.schema_url, expected_schema_url); + let expected = vec![ScopeLogs::new( + InstrumentationScope::build().name("new_name").finish(), + vec![LogRecord::build().finish()], + )]; + assert_eq!(resource_0.scope_logs, expected); + } + + #[tokio::test] + async fn test_assign_multiple_struct_cols_opl_parser() { + test_assign_multiple_struct_cols::().await + } + + #[tokio::test] + async fn test_assign_multiple_struct_cols_kql_parser() { + test_assign_multiple_struct_cols::().await + } + + async fn test_struct_col_assign_from_anyvalue() { + let logs_data = LogsData::new(vec![ + ResourceLogs::new( + Resource::build() + .attributes(vec![KeyValue::new( + "url", + AnyValue::new_string("resource_url_1"), + )]) + .finish(), + vec![ + ScopeLogs::new( + InstrumentationScope::build().name("scope1").finish(), + vec![LogRecord::build().finish()], + ), + ScopeLogs::new( + InstrumentationScope::build().name("scope2").finish(), + vec![LogRecord::build().finish()], + ), + ], + ), + ResourceLogs::new( + Resource::build() + .attributes(vec![KeyValue::new( + "url", + AnyValue::new_string("resource_url_2"), + )]) + .finish(), + vec![ScopeLogs::new( + InstrumentationScope::build().name("scope3").finish(), + vec![LogRecord::build().finish()], + )], + ), + ]); + + let query = "logs | extend instrumentation_scope.name = resource.attributes[\"url\"]"; + let pipeline_expr = P::parse(query).unwrap().pipeline; + let mut pipeline = Pipeline::new(pipeline_expr); + + let input = otlp_to_otap(&OtlpProtoMessage::Logs(logs_data)); + let result = pipeline.execute(input).await.unwrap(); + + let OtlpProtoMessage::Logs(result_logs_data) = otap_to_otlp(&result) else { + panic!("invalid signal type"); + }; + + let resource_0 = &result_logs_data.resource_logs[0]; + let expected = vec![ + ScopeLogs::new( + InstrumentationScope::build() + .name("resource_url_1") + .finish(), + vec![LogRecord::build().finish()], + ), + ScopeLogs::new( + InstrumentationScope::build() + .name("resource_url_1") + .finish(), + vec![LogRecord::build().finish()], + ), + ]; + assert_eq!(resource_0.scope_logs, expected); + + let resource_1 = &result_logs_data.resource_logs[1]; + let expected = vec![ScopeLogs::new( + InstrumentationScope::build() + .name("resource_url_2") + .finish(), + vec![LogRecord::build().finish()], + )]; + assert_eq!(resource_1.scope_logs, expected); + } + + #[tokio::test] + async fn test_struct_col_assign_from_anyvalue_opl_parser() { + test_struct_col_assign_from_anyvalue::().await + } + + #[tokio::test] + async fn test_struct_col_assign_from_anyvalue_kql_parser() { + test_struct_col_assign_from_anyvalue::().await + } + + #[tokio::test] + async fn test_struct_col_assign_on_empty_batch() { + let query = "logs | extend instrumentation_scope.name = \"new_name\""; + let pipeline_expr = OplParser::parse(query).unwrap().pipeline; + let input = OtapArrowRecords::Logs(Logs::default()); + let mut pipeline = Pipeline::new(pipeline_expr); + let result = pipeline.execute(input.clone()).await.unwrap(); + assert_eq!(result, input); + } + async fn test_upserts_attribute_computed_from_root() { let logs_data = to_logs_data(vec![ LogRecord::build() diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs index 0ddeef9001..bde84e6d6e 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/expr.rs @@ -125,6 +125,13 @@ pub(crate) fn arg_column_name(index: usize) -> String { format!("arg_{index}") } +/// Identifies which root-level parent struct column a [`DataScope::RootParent`] belongs to. +#[derive(Clone, Debug, PartialEq)] +pub(crate) enum RootParentStruct { + Resource, + Scope, +} + /// Identifies OTAP data either consumed or produced by some expression. /// /// OTAP batches contain multiple [`RecordBatch`]s, and within a given record batch, some expression @@ -144,6 +151,11 @@ pub(crate) enum DataScope { /// A special data scope indicating the data is produced from a static scalar value defined /// in the input expression tree, rather than data from the OTAP batch. StaticScalar, + + /// A field read from a resource or scope struct column in the root record batch (e.g., + /// resource.schema_url or scope.name). Physically the data lives in the root batch (same + /// as Root), but the parent struct records the hierarchy level for cardinality validation. + RootParent(RootParentStruct), } impl DataScope { @@ -153,8 +165,14 @@ impl DataScope { /// Rules: /// - Any scope can combine with StaticScalar (constants) /// - Same scopes can combine (e.g., Root + Root), because the row order is the same. + /// - Root and RootParent can combine because both live in the root record batch. pub(crate) fn can_combine(&self, other: &Self) -> bool { - self.is_scalar() || other.is_scalar() || (self == other) + if self.is_scalar() || other.is_scalar() { + return true; + } + let self_in_root = matches!(self, Self::Root | Self::RootParent(_)); + let other_in_root = matches!(other, Self::Root | Self::RootParent(_)); + (self_in_root && other_in_root) || self == other } /// Returns true if this scope represents a static scalar value. @@ -166,7 +184,12 @@ impl DataScope { impl From<&ColumnAccessor> for DataScope { fn from(value: &ColumnAccessor) -> Self { match value { - ColumnAccessor::ColumnName(_) | ColumnAccessor::StructCol(_, _) => Self::Root, + ColumnAccessor::ColumnName(_) => Self::Root, + ColumnAccessor::StructCol(struct_name, _) => match *struct_name { + consts::RESOURCE => Self::RootParent(RootParentStruct::Resource), + consts::SCOPE => Self::RootParent(RootParentStruct::Scope), + _ => Self::Root, + }, ColumnAccessor::Attributes(attrs_id, attrs_key) => { Self::Attributes(*attrs_id, attrs_key.clone()) } @@ -310,10 +333,15 @@ impl ExprLogicalPlanner { source_scalar_expr.get_query_location().clone(), ), })?; + let data_scope = match column_name { + consts::RESOURCE => DataScope::RootParent(RootParentStruct::Resource), + consts::SCOPE => DataScope::RootParent(RootParentStruct::Scope), + _ => DataScope::Root, + }; Ok(ScopedLogicalExpr { logical_expr: col(column_name).field(struct_field_name), requires_dict_downcast: false, - source: LogicalExprDataSource::DataSource(DataScope::Root), + source: LogicalExprDataSource::DataSource(data_scope), expr_type: field_type, }) } @@ -1010,7 +1038,9 @@ impl ScopedPhysicalExpr { let (source_rb, result_data_scope) = match &mut self.source { PhysicalExprDataSource::DataSource(data_scope_id) => { let input_rb = match data_scope_id.as_ref() { - DataScope::Root => otap_batch.root_record_batch().map(Cow::Borrowed), + DataScope::Root | DataScope::RootParent(_) => { + otap_batch.root_record_batch().map(Cow::Borrowed) + } DataScope::Attributes(attrs_id, key) => { let attrs_payload_type = match *attrs_id { AttributesIdentifier::Root => match otap_batch.root_payload_type() { @@ -1317,7 +1347,7 @@ pub(crate) struct PhysicalExprEvalResult { impl PhysicalExprEvalResult { pub fn new(values: ColumnarValue, data_scope: Rc, source: &RecordBatch) -> Self { - let is_root = *data_scope == DataScope::Root; + let is_root = matches!(*data_scope, DataScope::Root | DataScope::RootParent(_)); let mut result = Self { values, diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs index 1c41de82ed..711933d581 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/filter.rs @@ -1519,7 +1519,7 @@ impl FilterExec { // check if the result is already aligned to the root batch match eval_result.data_scope.as_ref() { - DataScope::Root | DataScope::StaticScalar => { + DataScope::Root | DataScope::StaticScalar | DataScope::RootParent(_) => { // result is already aligned to root batch rows Ok(boolean_arr) } diff --git a/rust/otap-dataflow/crates/query-engine/src/pipeline/planner.rs b/rust/otap-dataflow/crates/query-engine/src/pipeline/planner.rs index 89f377fdc7..e9710d5381 100644 --- a/rust/otap-dataflow/crates/query-engine/src/pipeline/planner.rs +++ b/rust/otap-dataflow/crates/query-engine/src/pipeline/planner.rs @@ -669,7 +669,7 @@ impl PipelinePlanner { DataScope::StaticScalar => false, // visit the expression applied to the root and search for any column exprs - DataScope::Root => { + DataScope::Root | DataScope::RootParent(_) => { let mut source_contains_refed_column = false; _ = source_expr.logical_expr.apply(|expr| { if let Expr::Column(column) = expr {