diff --git a/rust/otap-dataflow/crates/engine/src/flow_metrics.rs b/rust/otap-dataflow/crates/engine/src/flow_metrics.rs index b23ea08cd8..056b8c83c8 100644 --- a/rust/otap-dataflow/crates/engine/src/flow_metrics.rs +++ b/rust/otap-dataflow/crates/engine/src/flow_metrics.rs @@ -14,7 +14,7 @@ use std::borrow::Cow; use std::cell::Cell; -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::rc::Rc; use std::sync::{Arc, Mutex}; use std::time::Instant; @@ -224,6 +224,7 @@ pub(crate) fn build_flow_metric_state( node_name_to_index: &HashMap, processor_indices: &HashSet, pipeline_context: &PipelineContext, + pipeline_connections: &[(usize, usize)], ) -> Result { let mut signals_incoming_metrics = Vec::new(); let mut duration_metrics = Vec::new(); @@ -233,6 +234,10 @@ pub(crate) fn build_flow_metric_state( let pipeline_attrs = pipeline_context.pipeline_attribute_set(); + // Collect resolved (start, end, name) triples for the graph-based + // interleave check that runs after all flow metrics are registered. + let mut resolved_ranges: Vec<(usize, usize, String)> = Vec::new(); + for flow_config in &telemetry_policy.flow_metrics { let start_idx = node_name_to_index .get(&flow_config.bounds.start_node) @@ -296,6 +301,13 @@ pub(crate) fn build_flow_metric_state( signals_outgoing_metrics.push(signals_outgoing_metric); let _ = end_nodes.insert(end_idx, id); let _ = start_nodes.insert(start_idx, id); + resolved_ranges.push((start_idx, end_idx, flow_config.name.clone())); + } + + // Validate that flow_metric ranges don't interleave with each other. + if resolved_ranges.len() > 1 { + let adjacency = build_adjacency(pipeline_connections); + detect_interleaved_ranges(&resolved_ranges, &adjacency)?; } Ok(PipelineFlowMetricState { @@ -313,6 +325,87 @@ fn invalid_flow_metric_config(error: String) -> crate::error::Error { })) } +/// Compute the "active range" of a flow metric: the set of node indices +/// reachable from "start" via the forward adjacency list, stopping +/// expansion at "end". +fn active_range( + start: usize, + end: usize, + adjacency: &HashMap>, +) -> HashSet { + let mut visited = HashSet::new(); + let mut queue = VecDeque::new(); + let _ = visited.insert(start); + queue.push_back(start); + + while let Some(node) = queue.pop_front() { + if node == end { + continue; + } + if let Some(neighbors) = adjacency.get(&node) { + for &next in neighbors { + if visited.insert(next) { + queue.push_back(next); + } + } + } + } + + // The end node does not alter the accumulator for this + // flow metric range. + let _ = visited.remove(&end); + visited +} + +/// Detect interleaved flow metric ranges using graph reachability. +/// Two flow metrics interleave when one's start node falls inside the +/// other's active range. +fn detect_interleaved_ranges( + ranges: &[(usize, usize, String)], + adjacency: &HashMap>, +) -> Result<(), crate::error::Error> { + // Pre-compute the active range for each flow metric. + let active_ranges: Vec> = ranges + .iter() + .map(|&(start, end, _)| active_range(start, end, adjacency)) + .collect(); + + // Pairwise check: does any flow metric's start fall inside another's + // active range? + for i in 0..ranges.len() { + for j in (i + 1)..ranges.len() { + let (start_j, _, ref name_j) = ranges[j]; + if active_ranges[i].contains(&start_j) { + return Err(invalid_flow_metric_config(format!( + "flow metric `{}` interleaves with `{}`: \ + start node of `{}` is reachable from start of `{}` \ + before reaching its end node", + name_j, ranges[i].2, name_j, ranges[i].2, + ))); + } + let (start_i, _, ref name_i) = ranges[i]; + if active_ranges[j].contains(&start_i) { + return Err(invalid_flow_metric_config(format!( + "flow metric `{}` interleaves with `{}`: \ + start node of `{}` is reachable from start of `{}` \ + before reaching its end node", + name_i, name_j, name_i, name_j, + ))); + } + } + } + Ok(()) +} + +/// Build a forward-edge adjacency list from a flat edge list. +fn build_adjacency(edges: &[(usize, usize)]) -> HashMap> { + let mut adj: HashMap> = HashMap::new(); + for &(src, dst) in edges { + adj.entry(src).or_default().push(dst); + } + adj +} + /// Saturating cast of `u128` nanoseconds to `u64`. /// /// Used by EffectHandlers when computing elapsed durations between @@ -512,6 +605,22 @@ mod tests { (name_to_index, processor_indices) } + /// Helper: build a flat edge list from `(from, to)` string pairs, + /// resolving names through the provided `name_to_index` map. + fn test_edges( + edges: &[(&str, &str)], + name_to_index: &HashMap, + ) -> Vec<(usize, usize)> { + edges + .iter() + .filter_map(|&(from, to)| { + let src = *name_to_index.get(from)?; + let dst = *name_to_index.get(to)?; + Some((src, dst)) + }) + .collect() + } + #[test] fn valid_flow_metric_is_registered() { let (ctx, _) = test_pipeline_ctx(); @@ -520,7 +629,7 @@ mod tests { let (names, procs) = test_maps(&["a", "b", "c"], &[]); let policy = policy_with(vec![sw("sw1", "a", "c")]); - let state = build_flow_metric_state(&policy, &names, &procs, &ctx) + let state = build_flow_metric_state(&policy, &names, &procs, &ctx, &[]) .expect("valid config should build"); assert_eq!(state.duration_metrics.len(), 1); @@ -536,7 +645,7 @@ mod tests { flow.metrics = Some(vec![FlowMetric::ComputeDuration]); let policy = policy_with(vec![flow]); - let state = build_flow_metric_state(&policy, &names, &procs, &ctx) + let state = build_flow_metric_state(&policy, &names, &procs, &ctx, &[]) .expect("duration-only config should build"); assert!(state.duration_metrics[0].is_some()); @@ -550,7 +659,7 @@ mod tests { let (names, procs) = test_maps(&["a", "b"], &[]); let policy = policy_with(vec![sw("sw1", "a", "missing")]); - let err = build_flow_metric_state(&policy, &names, &procs, &ctx) + let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &[]) .err() .expect("expected Err"); @@ -563,7 +672,7 @@ mod tests { let (names, procs) = test_maps(&["recv", "proc1", "proc2"], &["recv"]); let policy = policy_with(vec![sw("sw1", "recv", "proc2")]); - let err = build_flow_metric_state(&policy, &names, &procs, &ctx) + let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &[]) .err() .expect("expected Err"); @@ -576,7 +685,7 @@ mod tests { let (names, procs) = test_maps(&["proc1", "proc2", "exp"], &["exp"]); let policy = policy_with(vec![sw("sw1", "proc1", "exp")]); - let err = build_flow_metric_state(&policy, &names, &procs, &ctx) + let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &[]) .err() .expect("expected Err"); @@ -590,7 +699,7 @@ mod tests { // Two flow_metrics share "a" as start node. let policy = policy_with(vec![sw("sw1", "a", "b"), sw("sw2", "a", "d")]); - let err = build_flow_metric_state(&policy, &names, &procs, &ctx) + let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &[]) .err() .expect("expected Err"); @@ -604,7 +713,7 @@ mod tests { // Two flow_metrics share "d" as stop node. let policy = policy_with(vec![sw("sw1", "a", "d"), sw("sw2", "c", "d")]); - let err = build_flow_metric_state(&policy, &names, &procs, &ctx) + let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &[]) .err() .expect("expected Err"); @@ -617,7 +726,7 @@ mod tests { let (names, procs) = test_maps(&["a", "b", "c"], &[]); let policy = policy_with(vec![sw("sw1", "a", "b"), sw("sw2", "b", "c")]); - let err = build_flow_metric_state(&policy, &names, &procs, &ctx) + let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &[]) .err() .expect("expected Err"); @@ -631,7 +740,8 @@ mod tests { // Non-overlapping: a→b and c→d. let policy = policy_with(vec![sw("sw1", "a", "b"), sw("sw2", "c", "d")]); - let state = build_flow_metric_state(&policy, &names, &procs, &ctx) + let edges = test_edges(&[("a", "b"), ("b", "c"), ("c", "d")], &names); + let state = build_flow_metric_state(&policy, &names, &procs, &ctx, &edges) .expect("disjoint config should build"); assert_eq!(state.duration_metrics.len(), 2); @@ -640,4 +750,164 @@ mod tests { assert_eq!(state.end_nodes.get(&1), Some(&0)); // "b" → sw1 assert_eq!(state.end_nodes.get(&3), Some(&1)); // "d" → sw2 } + + // Validation of flow metric interleaving detection. + + #[test] + fn interleaved_distinct_endpoints_linear_path_is_rejected() { + // Toplogy: a -> b -> c -> d + // Flow metrics: a->c and b->d + // Vanilla interleaving on linear path. Should fail validation. + let (ctx, _) = test_pipeline_ctx(); + let (names, procs) = test_maps(&["a", "b", "c", "d"], &[]); + let edges = test_edges(&[("a", "b"), ("b", "c"), ("c", "d")], &names); + let policy = policy_with(vec![sw("sw1", "a", "c"), sw("sw2", "b", "d")]); + + let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &edges) + .err() + .expect("expected Err for interleaved ranges"); + + assert_invalid_user_config(&err, "sw2"); + } + + #[test] + fn interleaved_reverse_order_is_rejected() { + // Toplogy: a -> b -> c -> d + // Flow metrics: a->c and b->d + // Vanilla interleaving on linear path, but with reverse declaration order. Should fail validation. + let (ctx, _) = test_pipeline_ctx(); + let (names, procs) = test_maps(&["a", "b", "c", "d"], &[]); + let edges = test_edges(&[("a", "b"), ("b", "c"), ("c", "d")], &names); + let policy = policy_with(vec![sw("sw1", "b", "d"), sw("sw2", "a", "c")]); + + let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &edges) + .err() + .expect("expected Err for interleaved ranges"); + + assert_invalid_user_config(&err, "sw2"); + } + + #[test] + fn disjoint_on_linear_path_is_accepted() { + // Topology: a -> b -> c -> d -> e -> f + // Flow metrics: a->c and d->f + // Fully disjoint. Should pass validation. + let (ctx, _) = test_pipeline_ctx(); + let (names, procs) = test_maps(&["a", "b", "c", "d", "e", "f"], &[]); + let edges = test_edges( + &[("a", "b"), ("b", "c"), ("c", "d"), ("d", "e"), ("e", "f")], + &names, + ); + let policy = policy_with(vec![sw("sw1", "a", "c"), sw("sw2", "d", "f")]); + + let state = build_flow_metric_state(&policy, &names, &procs, &ctx, &edges) + .expect("disjoint ranges should build"); + + assert_eq!(state.duration_metrics.len(), 2); + } + + #[test] + fn interleaved_on_branching_path_is_rejected() { + // Topology: a -> b -> d, a -> c -> d, d -> e (Diamond pattern) + // Flow metrics: a->d and b->e. + // b is inside a->d's range, so these interleave. Should fail validation. + let (ctx, _) = test_pipeline_ctx(); + let (names, procs) = test_maps(&["a", "b", "c", "d", "e"], &[]); + let edges = test_edges( + &[("a", "b"), ("a", "c"), ("b", "d"), ("c", "d"), ("d", "e")], + &names, + ); + let policy = policy_with(vec![sw("sw1", "a", "d"), sw("sw2", "b", "e")]); + + let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &edges) + .err() + .expect("expected Err for interleaved ranges on diamond"); + + assert_invalid_user_config(&err, "sw2"); + } + + #[test] + fn disjoint_on_separate_branches_is_accepted() { + // Topology: a -> b -> c, a -> d -> e + // Flow metrics: b->c and d->e + // Separate branches, should have no overlap. Should pass validation. + let (ctx, _) = test_pipeline_ctx(); + let (names, procs) = test_maps(&["a", "b", "c", "d", "e"], &[]); + let edges = test_edges(&[("a", "b"), ("b", "c"), ("a", "d"), ("d", "e")], &names); + let policy = policy_with(vec![sw("sw1", "b", "c"), sw("sw2", "d", "e")]); + + let state = build_flow_metric_state(&policy, &names, &procs, &ctx, &edges) + .expect("separate-branch ranges should build"); + + assert_eq!(state.duration_metrics.len(), 2); + } + + #[test] + fn nested_ranges_are_rejected() { + // Topology: a -> b -> c -> d + // Flow metrics: a->d and b->c + // The single-accumulator design does not support nesting. Should fail validation. + let (ctx, _) = test_pipeline_ctx(); + let (names, procs) = test_maps(&["a", "b", "c", "d"], &[]); + let edges = test_edges(&[("a", "b"), ("b", "c"), ("c", "d")], &names); + let policy = policy_with(vec![sw("sw1", "a", "d"), sw("sw2", "b", "c")]); + + let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &edges) + .err() + .expect("expected Err for nested ranges"); + + assert_invalid_user_config(&err, "sw2"); + } + + #[test] + fn single_flow_metric_accepted() { + // Topology: a -> b -> c + // Flow metrics: a->b and b->c + // Only one flow metric, so no interleaving possible. Should pass validation. + let (ctx, _) = test_pipeline_ctx(); + let (names, procs) = test_maps(&["a", "b", "c"], &[]); + let edges = test_edges(&[("a", "b"), ("b", "c")], &names); + let policy = policy_with(vec![sw("sw1", "a", "c")]); + + let state = build_flow_metric_state(&policy, &names, &procs, &ctx, &edges) + .expect("single flow metric should build"); + + assert_eq!(state.duration_metrics.len(), 1); + } + + // Validation of active_range helper for interleaved flow metrics detection. + + #[test] + fn active_range_linear() { + // a(0) -> b(1) -> c(2) -> d(3) + let adj = build_adjacency(&[(0, 1), (1, 2), (2, 3)]); + let range = active_range(0, 2, &adj); + // Start=0 included, 1 is between, end=2 excluded. + assert!(range.contains(&0)); + assert!(range.contains(&1)); + assert!(!range.contains(&2)); + assert!(!range.contains(&3)); + } + + #[test] + fn active_range_diamond() { + // a(0) -> b(1) -> d(3) + // a(0) -> c(2) -> d(3) + let adj = build_adjacency(&[(0, 1), (0, 2), (1, 3), (2, 3)]); + let range = active_range(0, 3, &adj); + assert!(range.contains(&0)); + assert!(range.contains(&1)); + assert!(range.contains(&2)); + assert!(!range.contains(&3)); + } + + #[test] + fn active_range_end_unreachable() { + // a(0) -> b(1), end=5 not in graph, so range includes everything reachable. + let adj = build_adjacency(&[(0, 1), (4, 5)]); + let range = active_range(0, 5, &adj); + assert!(range.contains(&0)); + assert!(range.contains(&1)); + assert!(!range.contains(&5)); + } } diff --git a/rust/otap-dataflow/crates/engine/src/runtime_pipeline.rs b/rust/otap-dataflow/crates/engine/src/runtime_pipeline.rs index 78290b5745..beea99897a 100644 --- a/rust/otap-dataflow/crates/engine/src/runtime_pipeline.rs +++ b/rust/otap-dataflow/crates/engine/src/runtime_pipeline.rs @@ -126,6 +126,33 @@ fn report_terminal_metrics(metrics_reporter: &MetricsReporter, terminal_state: T } } +/// Build a flat edge list from a pipeline's connections, resolving node +/// names to indices. Names not found in the map are silently skipped. +fn connection_edges<'a>( + connections: impl Iterator, + node_name_to_index: &HashMap, +) -> Vec<(usize, usize)> { + let mut edges = Vec::new(); + for conn in connections { + let from_indices: Vec = conn + .from_nodes() + .into_iter() + .filter_map(|name| node_name_to_index.get(name.as_ref()).copied()) + .collect(); + let to_indices: Vec = conn + .to_nodes() + .into_iter() + .filter_map(|name| node_name_to_index.get(name.as_ref()).copied()) + .collect(); + for &src in &from_indices { + for &dst in &to_indices { + edges.push((src, dst)); + } + } + } + edges +} + /// PipeNode contains runtime-specific info. pub(crate) struct PipeNode { index: usize, // NodeIndex into the appropriate vector w/ offset precomputed @@ -238,12 +265,17 @@ impl