Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 104 additions & 20 deletions rust/otap-dataflow/crates/pdata/src/encode/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -719,16 +719,17 @@ where
let mut curr_metric_id: u16 = 0;

for resource_metric in metrics_view.resources() {
let metric_count: usize = resource_metric
.scopes()
.map(|scope| scope.metrics().count())
.sum();
let schema_url = resource_metric.schema_url();

metrics
.resource
.append_schema_url_n(schema_url, metric_count);
metrics.resource.append_id_n(curr_resource_id, metric_count);
if let Some(resource) = resource_metric.resource() {
let metric_count = resource_metric
.scopes()
.map(|scope| scope.metrics().count())
.sum();
let schema_url = resource_metric.schema_url();
metrics
.resource
.append_schema_url_n(schema_url, metric_count);
metrics.resource.append_id_n(curr_resource_id, metric_count);
metrics.resource.append_dropped_attributes_count_n(
resource.dropped_attributes_count(),
metric_count,
Expand All @@ -737,34 +738,43 @@ where
resource_attrs.append_parent_id(&curr_resource_id);
append_attribute_value(&mut resource_attrs, &kv)?;
}
} else {
// OTLP may omit `resource`; same as an empty Resource. Still emit one resource row
// per metric-definition row so the `resource` struct column matches `id` length.
metrics
.resource
.append_dropped_attributes_count_n(0, metric_count);
}

for scope_metric in resource_metric.scopes() {
let metric_count = scope_metric.metrics().count();
// Note: do not call `scope_metric.metrics().count()` (or otherwise iterate `metrics()`)
// before the loop below. The OTLP-bytes view shares one `ProtoBytesParser` cursor
// across `metrics()` / `scope()` / `schema_url()`; draining metrics for a count leaves
// the cursor at EOF so a second `metrics()` pass yields too few rows, while this
// pre-loop would still have appended `scope_schema_url` / scope columns using the
// full count — causing column length mismatch in `MetricsRecordBatchBuilder::finish`.
let scope_schema_url = scope_metric.schema_url();
metrics.append_scope_schema_url_n(scope_schema_url, metric_count);

let scope = scope_metric.scope();
let scope_name = scope.as_ref().and_then(|s| s.name());
let scope_version = scope.as_ref().and_then(|s| s.version());
let scope_dropped_attributes_count = scope
.as_ref()
.map(|s| s.dropped_attributes_count())
.unwrap_or(0);
metrics.scope.append_id_n(curr_scope_id, metric_count);
metrics.scope.append_name_n(scope_name, metric_count);
metrics.scope.append_version_n(scope_version, metric_count);
metrics
.scope
.append_dropped_attributes_count_n(scope_dropped_attributes_count, metric_count);
if let Some(scope) = scope {
for kv in scope.attributes() {
if let Some(ref scope_ref) = scope {
for kv in scope_ref.attributes() {
scope_attrs.append_parent_id(&curr_scope_id);
append_attribute_value(&mut scope_attrs, &kv)?;
}
}

for metric in scope_metric.metrics() {
metrics.append_scope_schema_url(scope_schema_url);
metrics.scope.append_id(Some(curr_scope_id));
metrics.scope.append_name(scope_name);
metrics.scope.append_version(scope_version);
metrics.scope.append_dropped_attributes_count(scope_dropped_attributes_count);

metrics.append_id(curr_metric_id);
let data_obj = metric.data();
let data = data_obj.as_ref();
Expand Down Expand Up @@ -1039,6 +1049,7 @@ mod test {
};
use crate::schema::{FieldExt, SpanId, TraceId, consts, no_nulls};
use crate::views::otlp::bytes::logs::RawLogsData;
use crate::views::otlp::bytes::metrics::RawMetricsData;
use crate::views::otlp::bytes::traces::RawTraceData;
use pretty_assertions::assert_eq;
use prost::Message;
Expand Down Expand Up @@ -3266,6 +3277,79 @@ mod test {
assert!(neg_counts.is_empty(), "expected no negative bucket counts");
}

/// Regression: OTLP allows `ResourceMetrics` with no `resource` message. The encoder must still
/// append one resource row per metric-definition row (treat as empty resource), matching
/// `ResourceAttrs` / struct column lengths.
#[test]
fn test_encode_metrics_otlp_resource_field_absent() {
use crate::proto::opentelemetry::arrow::v1::ArrowPayloadType;
use crate::proto::opentelemetry::metrics::v1::{
Gauge, Metric, MetricsData, NumberDataPoint, ResourceMetrics, ScopeMetrics,
};

let metrics_data = MetricsData::new(vec![ResourceMetrics {
resource: None,
schema_url: "https://opentelemetry.io/schemas/1.37.0".into(),
scope_metrics: vec![ScopeMetrics {
scope: None,
schema_url: "".into(),
metrics: vec![
Metric::build()
.name("gauge.metric")
.data_gauge(Gauge::new(vec![
NumberDataPoint::build()
.time_unix_nano(99u64)
.value_double(1.0)
.finish(),
]))
.finish(),
],
}],
}]);

let otap_batch = encode_metrics_otap_batch(&metrics_data).expect("encode metrics");
let rb = otap_batch
.get(ArrowPayloadType::UnivariateMetrics)
.or_else(|| otap_batch.get(ArrowPayloadType::MultivariateMetrics))
.expect("metrics definition batch");
assert_eq!(rb.num_rows(), 1);
assert!(
rb.column_by_name(consts::RESOURCE).is_some(),
"resource column must exist when OTLP omits ResourceMetrics.resource"
);
}

/// Regression: real OTLP/collector `ExportMetricsServiceRequest` bytes must encode without
/// panicking. Historically, pre-counting `metrics()` on the shared proto-bytes cursor caused
/// column length mismatch in `MetricsRecordBatchBuilder::finish`.
#[test]
fn test_encode_metrics_otap_batch_real_export_request_does_not_panic() {
use std::panic::{AssertUnwindSafe, catch_unwind};

use crate::proto::opentelemetry::arrow::v1::ArrowPayloadType;

static EXPORT_METRICS_REQUEST: &[u8] =
include_bytes!("../../testdata/export_metrics_service_request_realistic.bin");

let view = RawMetricsData::new(EXPORT_METRICS_REQUEST);
let outcome = catch_unwind(AssertUnwindSafe(|| encode_metrics_otap_batch(&view)));
let otap_batch = match outcome {
Ok(Ok(batch)) => batch,
Ok(Err(err)) => panic!("encode_metrics_otap_batch returned error: {err:?}"),
Err(payload) => panic!(
"encode_metrics_otap_batch panicked (column mismatch / shared OTLP cursor): {payload:?}"
),
};
let rb = otap_batch
.get(ArrowPayloadType::UnivariateMetrics)
.or_else(|| otap_batch.get(ArrowPayloadType::MultivariateMetrics))
.expect("metrics definition batch");
assert!(
rb.num_rows() > 0,
"fixture should contain at least one metric definition row"
);
}

#[test]
fn test_metrics_all_fields_proto_struct() {
let metrics_view = _generate_metrics_data_all_fields();
Expand Down
Binary file not shown.
Loading