Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 280 additions & 10 deletions rust/otap-dataflow/crates/engine/src/flow_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::borrow::Cow;
use std::cell::Cell;
use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::time::Instant;
Expand Down Expand Up @@ -224,6 +224,7 @@ pub(crate) fn build_flow_metric_state(
node_name_to_index: &HashMap<String, usize>,
processor_indices: &HashSet<usize>,
pipeline_context: &PipelineContext,
pipeline_connections: &[(usize, usize)],
) -> Result<PipelineFlowMetricState, crate::error::Error> {
let mut signals_incoming_metrics = Vec::new();
let mut duration_metrics = Vec::new();
Expand All @@ -233,6 +234,10 @@ pub(crate) fn build_flow_metric_state(

let pipeline_attrs = pipeline_context.pipeline_attribute_set();

// Collect resolved (start, end, name) triples for the graph-based
// interleave check that runs after all flow metrics are registered.
let mut resolved_ranges: Vec<(usize, usize, String)> = Vec::new();

for flow_config in &telemetry_policy.flow_metrics {
let start_idx = node_name_to_index
.get(&flow_config.bounds.start_node)
Expand Down Expand Up @@ -296,6 +301,13 @@ pub(crate) fn build_flow_metric_state(
signals_outgoing_metrics.push(signals_outgoing_metric);
let _ = end_nodes.insert(end_idx, id);
let _ = start_nodes.insert(start_idx, id);
resolved_ranges.push((start_idx, end_idx, flow_config.name.clone()));
}

// Validate that flow_metric ranges don't interleave with each other.
if resolved_ranges.len() > 1 {
Comment thread
lalitb marked this conversation as resolved.
let adjacency = build_adjacency(pipeline_connections);
detect_interleaved_ranges(&resolved_ranges, &adjacency)?;
}

Ok(PipelineFlowMetricState {
Expand All @@ -313,6 +325,87 @@ fn invalid_flow_metric_config(error: String) -> crate::error::Error {
}))
}

/// Compute the "active range" of a flow metric: the set of node indices
/// reachable from "start" via the forward adjacency list, stopping
/// expansion at "end".
fn active_range(
start: usize,
end: usize,
adjacency: &HashMap<usize, Vec<usize>>,
) -> HashSet<usize> {
let mut visited = HashSet::new();
let mut queue = VecDeque::new();
let _ = visited.insert(start);
queue.push_back(start);

while let Some(node) = queue.pop_front() {
if node == end {
continue;
}
if let Some(neighbors) = adjacency.get(&node) {
for &next in neighbors {
if visited.insert(next) {
queue.push_back(next);
}
}
}
}

// The end node does not alter the accumulator for this
// flow metric range.
let _ = visited.remove(&end);
visited
}

/// Detect interleaved flow metric ranges using graph reachability.
/// Two flow metrics interleave when one's start node falls inside the
/// other's active range.
fn detect_interleaved_ranges(
ranges: &[(usize, usize, String)],
adjacency: &HashMap<usize, Vec<usize>>,
) -> Result<(), crate::error::Error> {
// Pre-compute the active range for each flow metric.
let active_ranges: Vec<HashSet<usize>> = ranges
.iter()
.map(|&(start, end, _)| active_range(start, end, adjacency))
.collect();

// Pairwise check: does any flow metric's start fall inside another's
// active range?
for i in 0..ranges.len() {
for j in (i + 1)..ranges.len() {
let (start_j, _, ref name_j) = ranges[j];
if active_ranges[i].contains(&start_j) {
return Err(invalid_flow_metric_config(format!(
"flow metric `{}` interleaves with `{}`: \
start node of `{}` is reachable from start of `{}` \
before reaching its end node",
name_j, ranges[i].2, name_j, ranges[i].2,
)));
}
let (start_i, _, ref name_i) = ranges[i];
if active_ranges[j].contains(&start_i) {
return Err(invalid_flow_metric_config(format!(
"flow metric `{}` interleaves with `{}`: \
start node of `{}` is reachable from start of `{}` \
before reaching its end node",
name_i, name_j, name_i, name_j,
)));
}
}
}
Ok(())
}

/// Build a forward-edge adjacency list from a flat edge list.
fn build_adjacency(edges: &[(usize, usize)]) -> HashMap<usize, Vec<usize>> {
let mut adj: HashMap<usize, Vec<usize>> = HashMap::new();
for &(src, dst) in edges {
adj.entry(src).or_default().push(dst);
}
adj
}

/// Saturating cast of `u128` nanoseconds to `u64`.
///
/// Used by EffectHandlers when computing elapsed durations between
Expand Down Expand Up @@ -512,6 +605,22 @@ mod tests {
(name_to_index, processor_indices)
}

/// Helper: build a flat edge list from `(from, to)` string pairs,
/// resolving names through the provided `name_to_index` map.
fn test_edges(
edges: &[(&str, &str)],
name_to_index: &HashMap<String, usize>,
) -> Vec<(usize, usize)> {
edges
.iter()
.filter_map(|&(from, to)| {
let src = *name_to_index.get(from)?;
let dst = *name_to_index.get(to)?;
Some((src, dst))
})
.collect()
}

#[test]
fn valid_flow_metric_is_registered() {
let (ctx, _) = test_pipeline_ctx();
Expand All @@ -520,7 +629,7 @@ mod tests {
let (names, procs) = test_maps(&["a", "b", "c"], &[]);
let policy = policy_with(vec![sw("sw1", "a", "c")]);

let state = build_flow_metric_state(&policy, &names, &procs, &ctx)
let state = build_flow_metric_state(&policy, &names, &procs, &ctx, &[])
.expect("valid config should build");

assert_eq!(state.duration_metrics.len(), 1);
Expand All @@ -536,7 +645,7 @@ mod tests {
flow.metrics = Some(vec![FlowMetric::ComputeDuration]);
let policy = policy_with(vec![flow]);

let state = build_flow_metric_state(&policy, &names, &procs, &ctx)
let state = build_flow_metric_state(&policy, &names, &procs, &ctx, &[])
.expect("duration-only config should build");

assert!(state.duration_metrics[0].is_some());
Expand All @@ -550,7 +659,7 @@ mod tests {
let (names, procs) = test_maps(&["a", "b"], &[]);
let policy = policy_with(vec![sw("sw1", "a", "missing")]);

let err = build_flow_metric_state(&policy, &names, &procs, &ctx)
let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &[])
.err()
.expect("expected Err");

Expand All @@ -563,7 +672,7 @@ mod tests {
let (names, procs) = test_maps(&["recv", "proc1", "proc2"], &["recv"]);
let policy = policy_with(vec![sw("sw1", "recv", "proc2")]);

let err = build_flow_metric_state(&policy, &names, &procs, &ctx)
let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &[])
.err()
.expect("expected Err");

Expand All @@ -576,7 +685,7 @@ mod tests {
let (names, procs) = test_maps(&["proc1", "proc2", "exp"], &["exp"]);
let policy = policy_with(vec![sw("sw1", "proc1", "exp")]);

let err = build_flow_metric_state(&policy, &names, &procs, &ctx)
let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &[])
.err()
.expect("expected Err");

Expand All @@ -590,7 +699,7 @@ mod tests {
// Two flow_metrics share "a" as start node.
let policy = policy_with(vec![sw("sw1", "a", "b"), sw("sw2", "a", "d")]);

let err = build_flow_metric_state(&policy, &names, &procs, &ctx)
let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &[])
.err()
.expect("expected Err");

Expand All @@ -604,7 +713,7 @@ mod tests {
// Two flow_metrics share "d" as stop node.
let policy = policy_with(vec![sw("sw1", "a", "d"), sw("sw2", "c", "d")]);

let err = build_flow_metric_state(&policy, &names, &procs, &ctx)
let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &[])
.err()
.expect("expected Err");

Expand All @@ -617,7 +726,7 @@ mod tests {
let (names, procs) = test_maps(&["a", "b", "c"], &[]);
let policy = policy_with(vec![sw("sw1", "a", "b"), sw("sw2", "b", "c")]);

let err = build_flow_metric_state(&policy, &names, &procs, &ctx)
let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &[])
.err()
.expect("expected Err");

Expand All @@ -631,7 +740,8 @@ mod tests {
// Non-overlapping: a→b and c→d.
let policy = policy_with(vec![sw("sw1", "a", "b"), sw("sw2", "c", "d")]);

let state = build_flow_metric_state(&policy, &names, &procs, &ctx)
let edges = test_edges(&[("a", "b"), ("b", "c"), ("c", "d")], &names);
let state = build_flow_metric_state(&policy, &names, &procs, &ctx, &edges)
.expect("disjoint config should build");

assert_eq!(state.duration_metrics.len(), 2);
Expand All @@ -640,4 +750,164 @@ mod tests {
assert_eq!(state.end_nodes.get(&1), Some(&0)); // "b" → sw1
assert_eq!(state.end_nodes.get(&3), Some(&1)); // "d" → sw2
}

// Validation of flow metric interleaving detection.

#[test]
fn interleaved_distinct_endpoints_linear_path_is_rejected() {
// Toplogy: a -> b -> c -> d
// Flow metrics: a->c and b->d
// Vanilla interleaving on linear path. Should fail validation.
let (ctx, _) = test_pipeline_ctx();
let (names, procs) = test_maps(&["a", "b", "c", "d"], &[]);
let edges = test_edges(&[("a", "b"), ("b", "c"), ("c", "d")], &names);
let policy = policy_with(vec![sw("sw1", "a", "c"), sw("sw2", "b", "d")]);

let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &edges)
.err()
.expect("expected Err for interleaved ranges");

assert_invalid_user_config(&err, "sw2");
}

#[test]
fn interleaved_reverse_order_is_rejected() {
// Toplogy: a -> b -> c -> d
// Flow metrics: a->c and b->d
// Vanilla interleaving on linear path, but with reverse declaration order. Should fail validation.
let (ctx, _) = test_pipeline_ctx();
let (names, procs) = test_maps(&["a", "b", "c", "d"], &[]);
let edges = test_edges(&[("a", "b"), ("b", "c"), ("c", "d")], &names);
let policy = policy_with(vec![sw("sw1", "b", "d"), sw("sw2", "a", "c")]);

let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &edges)
.err()
.expect("expected Err for interleaved ranges");

assert_invalid_user_config(&err, "sw2");
}

#[test]
fn disjoint_on_linear_path_is_accepted() {
// Topology: a -> b -> c -> d -> e -> f
// Flow metrics: a->c and d->f
// Fully disjoint. Should pass validation.
let (ctx, _) = test_pipeline_ctx();
let (names, procs) = test_maps(&["a", "b", "c", "d", "e", "f"], &[]);
let edges = test_edges(
&[("a", "b"), ("b", "c"), ("c", "d"), ("d", "e"), ("e", "f")],
&names,
);
let policy = policy_with(vec![sw("sw1", "a", "c"), sw("sw2", "d", "f")]);

let state = build_flow_metric_state(&policy, &names, &procs, &ctx, &edges)
.expect("disjoint ranges should build");

assert_eq!(state.duration_metrics.len(), 2);
}

#[test]
fn interleaved_on_branching_path_is_rejected() {
// Topology: a -> b -> d, a -> c -> d, d -> e (Diamond pattern)
// Flow metrics: a->d and b->e.
// b is inside a->d's range, so these interleave. Should fail validation.
let (ctx, _) = test_pipeline_ctx();
let (names, procs) = test_maps(&["a", "b", "c", "d", "e"], &[]);
let edges = test_edges(
&[("a", "b"), ("a", "c"), ("b", "d"), ("c", "d"), ("d", "e")],
&names,
);
let policy = policy_with(vec![sw("sw1", "a", "d"), sw("sw2", "b", "e")]);

let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &edges)
.err()
.expect("expected Err for interleaved ranges on diamond");

assert_invalid_user_config(&err, "sw2");
}

#[test]
fn disjoint_on_separate_branches_is_accepted() {
// Topology: a -> b -> c, a -> d -> e
// Flow metrics: b->c and d->e
// Separate branches, should have no overlap. Should pass validation.
let (ctx, _) = test_pipeline_ctx();
let (names, procs) = test_maps(&["a", "b", "c", "d", "e"], &[]);
let edges = test_edges(&[("a", "b"), ("b", "c"), ("a", "d"), ("d", "e")], &names);
let policy = policy_with(vec![sw("sw1", "b", "c"), sw("sw2", "d", "e")]);

let state = build_flow_metric_state(&policy, &names, &procs, &ctx, &edges)
.expect("separate-branch ranges should build");

assert_eq!(state.duration_metrics.len(), 2);
}

#[test]
fn nested_ranges_are_rejected() {
// Topology: a -> b -> c -> d
// Flow metrics: a->d and b->c
// The single-accumulator design does not support nesting. Should fail validation.
let (ctx, _) = test_pipeline_ctx();
let (names, procs) = test_maps(&["a", "b", "c", "d"], &[]);
let edges = test_edges(&[("a", "b"), ("b", "c"), ("c", "d")], &names);
let policy = policy_with(vec![sw("sw1", "a", "d"), sw("sw2", "b", "c")]);

let err = build_flow_metric_state(&policy, &names, &procs, &ctx, &edges)
.err()
.expect("expected Err for nested ranges");

assert_invalid_user_config(&err, "sw2");
}

#[test]
fn single_flow_metric_accepted() {
// Topology: a -> b -> c
// Flow metrics: a->b and b->c
// Only one flow metric, so no interleaving possible. Should pass validation.
let (ctx, _) = test_pipeline_ctx();
let (names, procs) = test_maps(&["a", "b", "c"], &[]);
let edges = test_edges(&[("a", "b"), ("b", "c")], &names);
let policy = policy_with(vec![sw("sw1", "a", "c")]);

let state = build_flow_metric_state(&policy, &names, &procs, &ctx, &edges)
.expect("single flow metric should build");

assert_eq!(state.duration_metrics.len(), 1);
}

// Validation of active_range helper for interleaved flow metrics detection.

#[test]
fn active_range_linear() {
// a(0) -> b(1) -> c(2) -> d(3)
let adj = build_adjacency(&[(0, 1), (1, 2), (2, 3)]);
let range = active_range(0, 2, &adj);
// Start=0 included, 1 is between, end=2 excluded.
assert!(range.contains(&0));
assert!(range.contains(&1));
assert!(!range.contains(&2));
assert!(!range.contains(&3));
}

#[test]
fn active_range_diamond() {
// a(0) -> b(1) -> d(3)
// a(0) -> c(2) -> d(3)
let adj = build_adjacency(&[(0, 1), (0, 2), (1, 3), (2, 3)]);
let range = active_range(0, 3, &adj);
assert!(range.contains(&0));
assert!(range.contains(&1));
assert!(range.contains(&2));
assert!(!range.contains(&3));
}

#[test]
fn active_range_end_unreachable() {
// a(0) -> b(1), end=5 not in graph, so range includes everything reachable.
let adj = build_adjacency(&[(0, 1), (4, 5)]);
let range = active_range(0, 5, &adj);
assert!(range.contains(&0));
assert!(range.contains(&1));
assert!(!range.contains(&5));
}
}
Loading
Loading