Skip to content

[WIP] ETW receiver with control path and session#2930

Open
swashtek wants to merge 8 commits into
open-telemetry:mainfrom
swashtek:Add/etw_data_path
Open

[WIP] ETW receiver with control path and session#2930
swashtek wants to merge 8 commits into
open-telemetry:mainfrom
swashtek:Add/etw_data_path

Conversation

@swashtek
Copy link
Copy Markdown
Contributor

@swashtek swashtek commented May 11, 2026

Change Summary

ETW receiver initial PR

  • Control path hanges
  • ETW session to consume using one-collect git commit

What issue does this PR close?

  • Closes #NNN

How are these changes tested?

Validation using cargo check, run etc.
Local validation with ETW console yaml

cargo run --features etw-receiver -- -c configs/etw-console.yaml

Are there any user-facing changes?

N/A

@swashtek swashtek requested a review from a team as a code owner May 11, 2026 20:25
@github-actions github-actions Bot added the rust Pull requests that update Rust code label May 11, 2026
Comment thread rust/otap-dataflow/Cargo.toml Outdated
@swashtek swashtek changed the title ETW receiver with control path and session [WIP] ETW receiver with control path and session May 11, 2026
Comment thread rust/otap-dataflow/configs/etw-console.yaml Outdated
Comment thread rust/otap-dataflow/configs/etw-console.yaml Outdated
Comment thread rust/otap-dataflow/crates/core-nodes/src/receivers/etw_receiver/session.rs Outdated
@swashtek swashtek force-pushed the Add/etw_data_path branch from a67d25d to ce46830 Compare May 11, 2026 21:36
Comment thread rust/otap-dataflow/crates/core-nodes/src/receivers/mod.rs Outdated
@codecov
Copy link
Copy Markdown

codecov Bot commented May 12, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 86.02%. Comparing base (b645a26) to head (1ed3e22).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2930      +/-   ##
==========================================
- Coverage   86.03%   86.02%   -0.01%     
==========================================
  Files         720      720              
  Lines      273264   273264              
==========================================
- Hits       235095   235080      -15     
- Misses      37645    37660      +15     
  Partials      524      524              
Components Coverage Δ
otap-dataflow 87.18% <ø> (-0.01%) ⬇️
query_abstraction 80.61% <ø> (ø)
query_engine 89.57% <ø> (ø)
otel-arrow-go 52.45% <ø> (ø)
quiver 92.25% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Comment thread rust/otap-dataflow/crates/contrib-nodes/src/receivers/etw_receiver/mod.rs Outdated
Comment thread rust/otap-dataflow/crates/contrib-nodes/src/receivers/etw_receiver/session.rs Outdated
Comment thread rust/otap-dataflow/crates/contrib-nodes/src/receivers/etw_receiver/mod.rs Outdated
Comment thread rust/otap-dataflow/crates/contrib-nodes/src/receivers/etw_receiver/mod.rs Outdated
Comment thread rust/otap-dataflow/crates/contrib-nodes/src/receivers/etw_receiver/session.rs Outdated
Comment thread rust/otap-dataflow/crates/contrib-nodes/src/receivers/etw_receiver/session.rs Outdated
Comment thread rust/otap-dataflow/crates/contrib-nodes/src/receivers/etw_receiver/session.rs Outdated
Comment thread rust/otap-dataflow/crates/contrib-nodes/src/receivers/etw_receiver/session.rs Outdated
Comment thread rust/otap-dataflow/crates/contrib-nodes/src/receivers/etw_receiver/session.rs Outdated
Comment thread rust/otap-dataflow/crates/contrib-nodes/src/receivers/etw_receiver/session.rs Outdated
Comment thread rust/otap-dataflow/crates/contrib-nodes/src/receivers/etw_receiver/session.rs Outdated
Comment thread rust/otap-dataflow/crates/contrib-nodes/src/receivers/etw_receiver/session.rs Outdated
*ext.keyword_mut() = keywords.unwrap_or(0);
}

wide_event.add_callback({
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you have changed the callback method registration code in the last commit. I'm okay with the refactor from set_raw_event_callback to per-provider wide_event registrations.

However, these changes replaced the single round-robin counter with one counter per provider, which could underutilize cores in startup. Each callback closure now captures its own local_next. In long-run steady state both schemes give uniform per-core load. The regression is at startup: every provider's counter is 0, so the first event of every provider piles on core 0, the second on core 1, and so on. With M providers all stepping in lock-step from 0, the early cores are oversubscribed and the late cores stay idle until each provider's counter has walked past them.

Example: 4 cores, 2 providers each emitting 3 events (A1, B1, A2, B2, A3, B3):

per-provider counters:           shared counter:
  A1 -> A=0 -> core 0              A1 -> 0 -> core 0
  B1 -> B=0 -> core 0              B1 -> 1 -> core 1
  A2 -> A=1 -> core 1              A2 -> 2 -> core 2
  B2 -> B=1 -> core 1              B2 -> 3 -> core 3
  A3 -> A=2 -> core 2              A3 -> 4 -> core 0
  B3 -> B=2 -> core 2              B3 -> 5 -> core 1
  result: 2, 2, 2, 0               result: 2, 2, 1, 1

Same pattern recurs any time multiple providers' counters happen to be equal mod num_txs and they all burst together — most reliably at session startup, when every counter is exactly 0. The original set_raw_event_callback design had one shared counter so the distribution was uniform.

We can fix this by sharing the counter across callbacks. one_collect requires one Event registration per provider, so we can't collapse to a single closure. We can share the counter (and the Vec<Sender>) across closures via Rc<Cell<usize>> / Rc<Vec<...>>. All callbacks dispatch on the single ProcessTrace thread, so Cell is safe — no atomics, no locking.

use std::cell::Cell;
use std::rc::Rc;

let next: Rc<Cell<usize>> = Rc::new(Cell::new(0));
let txs:  Rc<Vec<mpsc::Sender<EtwEventData>>> = Rc::new(txs);

for (guid, level, keywords) in &resolved_providers {
    let mut wide_event = one_collect::event::Event::new(0, "otap_wide".into());
    {
        let ext = wide_event.extension_mut();
        *ext.provider_mut() = *guid;
        *ext.level_mut()    = *level;
        *ext.keyword_mut()  = keywords.unwrap_or(0);
    }

    let ancillary = ancillary.clone();
    let next      = Rc::clone(&next);
    let txs       = Rc::clone(&txs);

    wide_event.add_callback(move |_event_data| {
        let anc = ancillary.borrow();
        let data = EtwEventData {
            provider_id: anc.provider().to_bytes(),
            timestamp:   anc.time(),
            process_id:  anc.pid(),
            thread_id:   anc.tid(),
            // TODO: populate event_id/opcode/level/keywords/version once
            // WindowsEventExtension exposes EVENT_DESCRIPTOR.
            event_id: 0, opcode: 0, version: 0, level: 0, keywords: 0,
        };
        drop(anc);

        let i = next.get();
        next.set(i.wrapping_add(1));
        let _ = txs[i % txs.len()].try_send(data);
        Ok(())
    });

    session.add_event(wide_event, None);
}

Also collapses N Vec<Sender> clones to N refcount handles, as a side benefit.

//!
//! Instead of using the low-level `set_raw_event_callback` (which bypasses
//! `one_collect`'s event routing), we use the **provider-wide event** mechanism
//! via [`EtwSession::add_wide_event`]. This registers a catch-all handler for
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is not being called in the code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

3 participants