diff --git a/.cargo/config.toml b/.cargo/config.toml index db328cdb9e..7b99172c2f 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -18,6 +18,7 @@ rustflags = [ "-C", "force-unwind-tables", # Include full unwind tables when aborting on panic "--cfg", "uuid_unstable", # Enable unstable Uuid "--cfg", "tokio_unstable", # Enable unstable tokio + "-C", "force-frame-pointers=yes", # Enable frame pointers for profiling (dial9, perf) ] [target.aarch64-unknown-linux-gnu] @@ -33,6 +34,7 @@ rustflags = [ "-C", "force-unwind-tables", # Include full unwind tables when aborting on panic "--cfg", "uuid_unstable", # Enable unstable Uuid "--cfg", "tokio_unstable", # Enable unstable tokio + "-C", "force-frame-pointers=yes", # Enable frame pointers for profiling (dial9, perf) "-C", "link-self-contained=yes", # Link statically ] diff --git a/Cargo.lock b/Cargo.lock index 2283351645..739abd4fe8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,16 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "Inflector" +version = "0.11.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" +dependencies = [ + "lazy_static", + "regex", +] + [[package]] name = "adaptive-timeout" version = "0.0.1-alpha.4" @@ -1020,7 +1030,7 @@ dependencies = [ "addr2line", "cfg-if", "libc", - "miniz_oxide", + "miniz_oxide 0.8.9", "object", "rustc-demangle", "windows-link", @@ -1223,6 +1233,20 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "blazesym" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48ceccc54b9c3e60e5f36b0498908c8c0f87387229cb0e0e5d65a074e00a8ba4" +dependencies = [ + "cpp_demangle", + "gimli", + "libc", + "memmap2", + "miniz_oxide 0.9.1", + "rustc-demangle", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -1241,6 +1265,31 @@ dependencies = [ "objc2", ] +[[package]] +name = "bon" +version = "3.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f47dbe92550676ee653353c310dfb9cf6ba17ee70396e1f7cf0a2020ad49b2fe" +dependencies = [ + "bon-macros", + "rustversion", +] + +[[package]] +name = "bon-macros" +version = "3.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "519bd3116aeeb42d5372c29d982d16d0170d3d4a5ed85fc7dd91642ffff3c67c" +dependencies = [ + "darling 0.20.11", + "ident_case", + "prettyplease", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.117", +] + [[package]] name = "borrow-or-share" version = "0.2.4" @@ -1365,6 +1414,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "c-enum" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd17eb909a8c6a894926bfcc3400a4bb0e732f5a57d37b1f14e8b29e329bace8" + [[package]] name = "camino" version = "1.2.2" @@ -1961,6 +2016,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -2833,6 +2897,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "derive-where" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d08b3a0bcc0d079199cd476b2cae8435016ec11d1c0986c6901c5ac223041534" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "derive_builder" version = "0.20.2" @@ -2887,6 +2962,67 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "dial9-perf-self-profile" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70fb315fdebb7f7c00c358298b55e0541738ea1819c575d2f45696a568333642" +dependencies = [ + "blazesym", + "dial9-trace-format", + "libc", + "perf-event-data", + "perf-event-open-sys2", + "tracing", +] + +[[package]] +name = "dial9-tokio-telemetry" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35c8348045a09a7dee69d000124d42f773f59f85737d3212ae65aa64ac0b4e3e" +dependencies = [ + "arc-swap", + "bon", + "crossbeam-queue", + "dial9-perf-self-profile", + "dial9-trace-format", + "flate2", + "futures-util", + "hostname", + "libc", + "metrique", + "metrique-writer", + "pin-project-lite", + "serde", + "serde_json", + "smallvec", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "dial9-trace-format" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e0ee560b05f09bf817602d57644947e31e83c521d4e0277f723a6e64d44f92" +dependencies = [ + "dial9-trace-format-derive", + "serde", +] + +[[package]] +name = "dial9-trace-format-derive" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dbbd8126d4d6613931317cfe2a7275c1cd487e41c961e42456ab5f956570030" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + [[package]] name = "dialoguer" version = "0.12.0" @@ -3053,6 +3189,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "enum-as-inner" version = "0.6.1" @@ -3233,6 +3375,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "fallible-iterator" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" + [[package]] name = "fancy-regex" version = "0.17.0" @@ -3314,7 +3462,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "843fba2746e448b37e26a819579957415c8cef339bf08564fe8b7ddbd959573c" dependencies = [ "crc32fast", - "miniz_oxide", + "miniz_oxide 0.8.9", "zlib-rs", ] @@ -3609,6 +3757,11 @@ name = "gimli" version = "0.32.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" +dependencies = [ + "fallible-iterator", + "indexmap 2.13.0", + "stable_deref_trait", +] [[package]] name = "glob" @@ -4950,16 +5103,125 @@ version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdfb1365fea27e6dd9dc1dbc19f570198bc86914533ad639dae939635f096be4" dependencies = [ + "aho-corasick", "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.16.1", + "indexmap 2.13.0", "metrics", + "ordered-float", "quanta", + "radix_trie", "rand 0.9.3", "rand_xoshiro", "sketches-ddsketch", ] +[[package]] +name = "metrique" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f3e5ecbbefec32dafed0fd98ef23768aaade6de35b8434fc3e44f6346b73cd6" +dependencies = [ + "itoa", + "jiff", + "metrique-core", + "metrique-macro", + "metrique-service-metrics", + "metrique-timesource", + "metrique-writer", + "metrique-writer-core", + "metrique-writer-macro", + "ryu", + "serde_json", + "tokio", +] + +[[package]] +name = "metrique-core" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad6478374c256ffbb0d2de67b7d93e43ac94e35a083f40bd5f72a9770f6110bb" +dependencies = [ + "itertools 0.14.0", + "metrique-writer-core", +] + +[[package]] +name = "metrique-macro" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07c50c41313eaa762e16c251aa3aa7f1eff170ab403fac8fc688840e93f39b18" +dependencies = [ + "Inflector", + "darling 0.23.0", + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "metrique-service-metrics" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d01f36f47452cd6e33f66fc8185bb32f320aaa5721b6ad7230776442d3cf180" +dependencies = [ + "metrique-writer", +] + +[[package]] +name = "metrique-timesource" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c60fb3f2836dffc05146f0dfe7bf2e0789909f3fefd72c729491adaef01acc1a" + +[[package]] +name = "metrique-writer" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d9ba4f5a6b5dd821f78315095840e88d244fafbdda3cf1688835cd2a56aec" +dependencies = [ + "ahash", + "crossbeam-queue", + "crossbeam-utils", + "metrics", + "metrics-util", + "metrique-core", + "metrique-writer-core", + "metrique-writer-macro", + "rand 0.9.3", + "smallvec", + "tokio", + "tracing", + "tracing-subscriber", +] + +[[package]] +name = "metrique-writer-core" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "642989d2c349dfcd705a0b6b63887459f71c8b8deb6dc79e39e12eaa17400aba" +dependencies = [ + "derive-where", + "itertools 0.14.0", + "serde", + "smallvec", +] + +[[package]] +name = "metrique-writer-macro" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12edafee41e67f90ab2efe2b850e10751f0da3da4aeb61b8eb7e6c31666e8da8" +dependencies = [ + "darling 0.23.0", + "proc-macro2", + "quote", + "str_inflector", + "syn 2.0.117", + "synstructure", +] + [[package]] name = "mime" version = "0.3.17" @@ -4992,6 +5254,16 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "miniz_oxide" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b63fbc4a50860e98e7b2aa7804ded1db5cbc3aff9193adaff57a6931bf7c4b4c" +dependencies = [ + "adler2", + "simd-adler32", +] + [[package]] name = "mio" version = "1.1.1" @@ -5126,6 +5398,15 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nix" version = "0.26.4" @@ -5671,6 +5952,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ordered-float" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7d950ca161dc355eaf28f82b11345ed76c6e1f6eb1f4f4479e0323b9e2fbd0e" +dependencies = [ + "num-traits", +] + [[package]] name = "os_pipe" version = "1.2.3" @@ -5777,6 +6067,27 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "perf-event-data" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "575828d9d7d205188048eb1508560607a03d21eafdbba47b8cade1736c1c28e1" +dependencies = [ + "bitflags 2.11.0", + "c-enum", + "perf-event-open-sys2", +] + +[[package]] +name = "perf-event-open-sys2" +version = "5.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c25955321465255e437600b54296983fab1feac2cd0c38958adeb26dbae49e" +dependencies = [ + "libc", + "memoffset", +] + [[package]] name = "petgraph" version = "0.8.3" @@ -6435,6 +6746,16 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "raft" version = "0.7.0" @@ -7105,6 +7426,7 @@ dependencies = [ "bytestring", "dashmap", "derive_more", + "dial9-tokio-telemetry", "enum-map", "enumset", "futures", @@ -8490,6 +8812,7 @@ dependencies = [ "getrandom 0.2.17", "getrandom 0.3.4", "getrandom 0.4.2", + "gimli", "half", "hashbrown 0.14.5", "hashbrown 0.16.1", @@ -8512,7 +8835,8 @@ dependencies = [ "log", "md-5", "memchr", - "miniz_oxide", + "metrics-util", + "miniz_oxide 0.8.9", "mio", "nom", "num", @@ -9475,6 +9799,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "str_inflector" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0b848d5a7695b33ad1be00f84a3c079fe85c9278a325ff9159e6c99cef4ef7" +dependencies = [ + "lazy_static", + "regex", +] + [[package]] name = "str_stack" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 4c72c9a893..0b52dc3c91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -139,6 +139,7 @@ const_format = "0.2.35" criterion = "0.5" crossterm = { version = "0.29.0" } dashmap = { version = "6" } +dial9-tokio-telemetry = { version = "0.2.1", default-features = false } datafusion = { version = "52.1.0", default-features = false, features = [ "crypto_expressions", "encoding_expressions", diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index ec387e1e58..6a42aa6ac0 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -21,6 +21,7 @@ test-util = [ "tokio/test-util" ] taskdump = [] +dial9 = ["dep:dial9-tokio-telemetry"] [dependencies] restate-workspace-hack = { workspace = true } @@ -70,6 +71,12 @@ tower = { workspace = true } tower-http = { workspace = true, features = ["trace"] } tracing = { workspace = true } +[target.'cfg(target_os = "linux")'.dependencies] +dial9-tokio-telemetry = { workspace = true, optional = true, features = ["cpu-profiling"] } + +[target.'cfg(not(target_os = "linux"))'.dependencies] +dial9-tokio-telemetry = { workspace = true, optional = true } + [build-dependencies] tonic-prost-build = { workspace = true } diff --git a/crates/core/src/task_center.rs b/crates/core/src/task_center.rs index ffbb7d9e08..44c45b93a1 100644 --- a/crates/core/src/task_center.rs +++ b/crates/core/src/task_center.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. mod builder; +mod dial9; mod extensions; mod handle; mod monitoring; @@ -333,6 +334,9 @@ struct TaskCenterInner { default_runtime_handle: tokio::runtime::Handle, managed_runtimes: Mutex>, start_time: Instant, + /// dial9 telemetry state (guards for default + per-partition runtimes). + /// Declared before `default_runtime` to ensure guards are flushed first. + dial9: dial9::Dial9State, /// We hold on to the owned Runtime to ensure it's dropped when task center is dropped. If this /// is None, it means that it's the responsibility of the Handle owner to correctly drop /// tokio's runtime after dropping the task center. @@ -359,6 +363,7 @@ impl TaskCenterInner { // used in tests to start all runtimes with clock paused. Note that this only impacts // partition processor runtimes #[cfg(any(test, feature = "test-util"))] pause_time: bool, + dial9: dial9::Dial9State, ) -> Self { crate::metric_definitions::describe_metrics(); let root_task_context = TaskContext { @@ -368,10 +373,12 @@ impl TaskCenterInner { cancellation_token: CancellationToken::new(), partition_id: None, }; + Self { id: rand::random(), start_time: Instant::now(), default_runtime_handle, + dial9, default_runtime, global_cancel_token: CancellationToken::new(), shutdown_requested: AtomicBool::new(false), @@ -736,12 +743,22 @@ impl TaskCenterInner { #[cfg(any(test, feature = "test-util"))] builder.start_paused(self.pause_time); - let rt = builder - .enable_all() - .build() - .expect("runtime builder succeeds"); - let tc = self.clone(); + builder.enable_all(); + // Build the runtime, optionally with dial9 instrumentation. + // The fallback closure recreates the builder if dial9 init fails. + #[cfg(any(test, feature = "test-util"))] + let pause_time = self.pause_time; + let (rt, dial9_thread_handle) = + dial9::build_child_runtime(&runtime_name, builder, &self.dial9, || { + let mut fallback = tokio::runtime::Builder::new_current_thread(); + #[cfg(any(test, feature = "test-util"))] + fallback.start_paused(pause_time); + fallback.enable_all(); + fallback + }); + + let tc = self.clone(); let rt_handle = Arc::new(rt); runtimes_guard.insert( @@ -768,6 +785,11 @@ impl TaskCenterInner { .spawn({ let runtime_name = runtime_name.clone(); move || { + // Set the thread-local dial9 handle so that spawn_on_runtime() + // can use it for wake-tracked spawning on this runtime's thread. + // This only works because we are using a current thread tokio runtime. + dial9::set_thread_local(dial9_thread_handle); + let local_set = LocalSet::new(); let result = rt_handle.block_on(local_set.run_until(unmanaged_wrapper( tc.clone(), @@ -790,6 +812,11 @@ impl TaskCenterInner { /// Runs **only** after the inner main thread has completed work and no other owner exists for /// the runtime handle. fn drop_runtime(self: &Arc, name: SharedString) { + // Note: we intentionally do NOT remove the dial9 guard here. This method + // runs on the partition runtime thread, and TelemetryGuard::drop() accesses + // a dial9-internal thread-local (BUFFER) that may already be destroyed at + // this point. Instead, the guards remain in Dial9State and are dropped + // with TaskCenterInner on the main thread. let mut runtimes_guard = self.managed_runtimes.lock(); if let Some(runtime) = runtimes_guard.remove(&name) { // We must be the only owner of runtime at this point. @@ -938,13 +965,23 @@ impl TaskCenterInner { let runtime_name: &'static str = kind.runtime().into(); counter!(TC_SPAWN, "kind" => kind_str, "runtime" => runtime_name).increment(1); } - let runtime = match kind.runtime() { - crate::AsyncRuntime::Inherit => &tokio::runtime::Handle::current(), - crate::AsyncRuntime::Default => &self.default_runtime_handle, + + // Select the dial9 handle based on which runtime the task targets: + // - Default runtime: use the handle from the guard in TaskCenterInner + // - Inherited runtime: prefer the thread-local handle set by + // start_runtime(), fall back to the default handle (e.g. when + // spawning on the default runtime with AsyncRuntime::Inherit) + let (runtime, dial9_handle) = match kind.runtime() { + crate::AsyncRuntime::Inherit => ( + &tokio::runtime::Handle::current(), + dial9::thread_local_handle().or(self.dial9.default_handle()), + ), + crate::AsyncRuntime::Default => { + (&self.default_runtime_handle, self.dial9.default_handle()) + } }; - let inner_handle = tokio_task - .spawn_on(fut, runtime) - .expect("runtime can spawn tasks"); + + let inner_handle = dial9::spawn_or_fallback(dial9_handle, tokio_task, fut, runtime); TaskHandle { cancellation_token, diff --git a/crates/core/src/task_center/builder.rs b/crates/core/src/task_center/builder.rs index 7940d85115..5c78c679b3 100644 --- a/crates/core/src/task_center/builder.rs +++ b/crates/core/src/task_center/builder.rs @@ -66,12 +66,18 @@ impl TaskCenterBuilder { pub fn build(mut self) -> Result { let options = self.options.unwrap_or_default(); - if self.default_runtime_handle.is_none() { - let mut default_runtime_builder = tokio_builder("worker", &options); - let default_runtime = default_runtime_builder.build()?; + let dial9_state = if self.default_runtime_handle.is_none() { + let (default_runtime, dial9_state) = + super::dial9::build_default_runtime(tokio_builder("worker", &options), || { + tokio_builder("worker", &options) + })?; + self.default_runtime_handle = Some(default_runtime.handle().clone()); self.default_runtime = Some(default_runtime); - } + dial9_state + } else { + super::dial9::Dial9State::empty() + }; if cfg!(any(test, feature = "test-util")) { eprintln!("!!!! Running with test-util enabled !!!!"); @@ -81,6 +87,7 @@ impl TaskCenterBuilder { self.default_runtime, #[cfg(any(test, feature = "test-util"))] self.pause_time, + dial9_state, ))) } } diff --git a/crates/core/src/task_center/dial9.rs b/crates/core/src/task_center/dial9.rs new file mode 100644 index 0000000000..aaae5918d4 --- /dev/null +++ b/crates/core/src/task_center/dial9.rs @@ -0,0 +1,309 @@ +// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Dial9 tokio runtime telemetry support. +//! +//! When the `dial9` feature is enabled, this module provides wake-tracked +//! spawning, per-runtime trace files, and CPU profiling (on Linux). When the +//! feature is disabled, every type is a zero-sized no-op so callers never need +//! `#[cfg]` annotations. + +#[cfg(feature = "dial9")] +mod inner { + use std::collections::HashMap; + + use parking_lot::Mutex; + + use restate_types::SharedString; + use restate_types::config::{Configuration, Dial9Options}; + + use super::super::TaskCenterBuildError; + + // Per-thread telemetry handle for partition runtimes. Each partition + // runtime thread sets this when it starts so that `spawn_on_runtime()` + // can use the correct handle for wake-tracked spawning. + thread_local! { + static DIAL9_HANDLE: std::cell::RefCell> = const { std::cell::RefCell::new(None) }; + } + + /// Holds the default-runtime telemetry guard and per-runtime guards map. + /// Must be declared before `default_runtime` in [`TaskCenterInner`] so that + /// the guard is flushed before the runtime is dropped. + pub struct Dial9State { + guard: Option, + runtime_guards: + Mutex>, + } + + impl Dial9State { + /// State with an active default-runtime guard. + fn new(guard: dial9_tokio_telemetry::telemetry::TelemetryGuard) -> Self { + Self { + guard: Some(guard), + runtime_guards: Mutex::new(HashMap::new()), + } + } + + /// State without an active guard (e.g. when the runtime handle was + /// provided externally or dial9 initialisation failed). + pub fn empty() -> Self { + Self { + guard: None, + runtime_guards: Mutex::new(HashMap::new()), + } + } + + /// Whether dial9 telemetry is active for the default runtime. + pub fn is_active(&self) -> bool { + self.guard.is_some() + } + + /// Extract a cloneable handle from the default-runtime guard. + pub fn default_handle(&self) -> Dial9Handle { + Dial9Handle(self.guard.as_ref().map(|g| g.handle())) + } + } + + /// A cloneable handle for wake-tracked spawning. Wraps an optional + /// `TelemetryHandle` — `None` means we fall back to plain tokio spawning. + #[derive(Clone)] + pub struct Dial9Handle(Option); + + impl Dial9Handle { + /// Return `self` if it has a handle, otherwise `other`. + pub fn or(self, other: Self) -> Self { + if self.0.is_some() { self } else { other } + } + } + + /// Build the default runtime with dial9 tracing. On failure, logs a warning + /// and falls back to an uninstrumented runtime. + pub fn build_default_runtime( + builder: tokio::runtime::Builder, + fallback_builder: impl FnOnce() -> tokio::runtime::Builder, + ) -> Result<(tokio::runtime::Runtime, Dial9State), TaskCenterBuildError> { + match create_traced_runtime("default", builder) { + Ok((rt, guard)) => Ok((rt, Dial9State::new(guard))), + Err(e) => { + tracing::warn!( + "Failed to initialize dial9 telemetry: {e}, \ + running without instrumentation" + ); + let rt = fallback_builder().build()?; + Ok((rt, Dial9State::empty())) + } + } + } + + /// Build a child (partition) runtime with dial9 tracing, storing the guard + /// in `state` so it stays alive while the runtime runs. + /// + /// Guards are dropped with `TaskCenterInner`, not in `drop_runtime()`, + /// because `TelemetryGuard::drop()` accesses a dial9-internal thread-local + /// that may already be destroyed on the runtime thread. + /// + /// If `state.is_active()` is false, this just builds a plain runtime. + pub fn build_child_runtime( + runtime_name: &SharedString, + mut builder: tokio::runtime::Builder, + state: &Dial9State, + fallback_builder: impl FnOnce() -> tokio::runtime::Builder, + ) -> (tokio::runtime::Runtime, Dial9Handle) { + if !state.is_active() { + let rt = builder.build().expect("runtime builder succeeds"); + return (rt, Dial9Handle(None)); + } + + match create_traced_runtime(runtime_name, builder) { + Ok((rt, guard)) => { + let handle = Dial9Handle(Some(guard.handle())); + state + .runtime_guards + .lock() + .insert(runtime_name.clone(), guard); + (rt, handle) + } + Err(e) => { + tracing::warn!( + "Failed to instrument runtime {runtime_name} with dial9: {e}, \ + falling back to uninstrumented runtime" + ); + let rt = fallback_builder() + .build() + .expect("runtime builder succeeds"); + (rt, Dial9Handle(None)) + } + } + } + + /// Set the per-thread dial9 handle so that [`spawn_or_fallback`] can use it + /// for wake-tracked spawning on inherited runtimes. + pub fn set_thread_local(handle: Dial9Handle) { + if let Some(h) = handle.0 { + DIAL9_HANDLE.with_borrow_mut(|slot| *slot = Some(h)); + } + } + + /// Get the thread-local dial9 handle (set by [`set_thread_local`]). + pub fn thread_local_handle() -> Dial9Handle { + Dial9Handle(DIAL9_HANDLE.with_borrow(|h| h.clone())) + } + + /// Spawn a future with wake-tracked telemetry if a handle is available, + /// otherwise fall back to plain tokio spawning. + /// + /// Always spawns on the explicit `runtime` handle via `spawn_on` so that + /// tasks targeting `AsyncRuntime::Default` land on the default runtime even + /// when the caller is on a partition-processor runtime. + pub fn spawn_or_fallback( + handle: Dial9Handle, + task_builder: tokio::task::Builder<'_>, + fut: F, + runtime: &tokio::runtime::Handle, + ) -> tokio::task::JoinHandle + where + F: std::future::Future + Send + 'static, + T: Send + 'static, + { + if let Some(dh) = handle.0 { + // Wrap in dial9 tracing but spawn on the correct target runtime. + // `TelemetryHandle::spawn()` uses `tokio::spawn()` which would + // place the task on the *current* runtime instead of `runtime`. + let traced_handle = dh.traced_handle(); + task_builder + .spawn_on( + async move { + let task_id = tokio::task::try_id() + .map(dial9_tokio_telemetry::telemetry::TaskId::from) + .unwrap_or_default(); + dial9_tokio_telemetry::traced::Traced::new(fut, traced_handle, task_id) + .await + }, + runtime, + ) + .expect("runtime can spawn tasks") + } else { + task_builder + .spawn_on(fut, runtime) + .expect("runtime can spawn tasks") + } + } + + /// Create a `TracedRuntime` wrapping the given builder, with its own trace + /// files under `{trace_dir}/{runtime_name}/`. + fn create_traced_runtime( + runtime_name: &str, + builder: tokio::runtime::Builder, + ) -> std::io::Result<( + tokio::runtime::Runtime, + dial9_tokio_telemetry::telemetry::TelemetryGuard, + )> { + let opts = Configuration::pinned().common.dial9.clone(); + let trace_path = opts.trace_dir().join(runtime_name).join("trace"); + let writer = create_dial9_writer(&trace_path, &opts)?; + let traced = dial9_tokio_telemetry::telemetry::TracedRuntime::builder() + .with_task_tracking(true) + // The trace path is needed for the background symbolization worker to + // find sealed segments, resolve addresses to symbols, and write back + // SymbolTableEntry events into the trace files. + .with_trace_path(&trace_path); + + // cpu-profiling feature is automatically enabled on Linux via Cargo.toml + #[cfg(target_os = "linux")] + let traced = traced + .with_cpu_profiling(dial9_tokio_telemetry::telemetry::CpuProfilingConfig::default()) + .with_sched_events(dial9_tokio_telemetry::telemetry::SchedEventConfig { + include_kernel: true, + }); + + traced.build_and_start(builder, writer) + } + + fn create_dial9_writer( + trace_path: &std::path::Path, + opts: &Dial9Options, + ) -> std::io::Result { + dial9_tokio_telemetry::telemetry::RotatingWriter::builder() + .base_path(trace_path) + .max_file_size(opts.max_file_size.as_u64()) + .max_total_size(opts.max_total_size.as_u64()) + .build() + } +} + +#[cfg(not(feature = "dial9"))] +mod inner { + use super::super::TaskCenterBuildError; + + /// Zero-sized no-op replacement for [`Dial9State`] when the feature is off. + pub struct Dial9State; + + impl Dial9State { + pub fn empty() -> Self { + Self + } + + pub fn default_handle(&self) -> Dial9Handle { + Dial9Handle + } + } + + /// Zero-sized no-op replacement for [`Dial9Handle`] when the feature is off. + #[derive(Clone)] + pub struct Dial9Handle; + + impl Dial9Handle { + pub fn or(self, _other: Self) -> Self { + self + } + } + + pub fn build_default_runtime( + mut builder: tokio::runtime::Builder, + _fallback_builder: impl FnOnce() -> tokio::runtime::Builder, + ) -> Result<(tokio::runtime::Runtime, Dial9State), TaskCenterBuildError> { + Ok((builder.build()?, Dial9State)) + } + + pub fn build_child_runtime( + _runtime_name: &restate_types::SharedString, + mut builder: tokio::runtime::Builder, + _state: &Dial9State, + _fallback_builder: impl FnOnce() -> tokio::runtime::Builder, + ) -> (tokio::runtime::Runtime, Dial9Handle) { + ( + builder.build().expect("runtime builder succeeds"), + Dial9Handle, + ) + } + + pub fn set_thread_local(_handle: Dial9Handle) {} + + pub fn thread_local_handle() -> Dial9Handle { + Dial9Handle + } + + pub fn spawn_or_fallback( + _handle: Dial9Handle, + task_builder: tokio::task::Builder<'_>, + fut: F, + runtime: &tokio::runtime::Handle, + ) -> tokio::task::JoinHandle + where + F: std::future::Future + Send + 'static, + T: Send + 'static, + { + task_builder + .spawn_on(fut, runtime) + .expect("runtime can spawn tasks") + } +} + +pub(super) use inner::*; diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index a0f4b11216..222254587d 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -516,6 +516,56 @@ pub struct CommonOptions { /// /// Defaults to `false` in v1.6. pub experimental_shuffler_batch_ingestion: bool, + + /// # dial9 Tokio runtime telemetry + /// + /// Configuration for dial9 runtime telemetry. Only effective when the server + /// is compiled with the `dial9` feature flag. + /// Since v1.6.3 + #[serde(default)] + pub dial9: Dial9Options, +} + +/// # dial9 Tokio runtime telemetry options +/// +/// Configuration for dial9 runtime telemetry trace files. +/// Only effective when the server is compiled with the `dial9` feature flag. +/// Since v1.6.3 +#[derive(Debug, Clone, Serialize, Deserialize)] +#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schemars", schemars(default))] +#[serde(rename_all = "kebab-case")] +pub struct Dial9Options { + /// Base directory for trace files. Each runtime writes to a subdirectory. + /// Defaults to `{data-dir}/dial9-traces/`. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub trace_dir: Option, + + /// Maximum size per trace file before rotation. + /// Default: 10 MiB. + pub max_file_size: NonZeroByteCount, + + /// Maximum total disk budget for trace files per runtime. + /// Default: 100 MiB. + pub max_total_size: NonZeroByteCount, +} + +impl Dial9Options { + pub fn trace_dir(&self) -> PathBuf { + self.trace_dir + .clone() + .unwrap_or_else(|| super::node_filepath("dial9-traces")) + } +} + +impl Default for Dial9Options { + fn default() -> Self { + Self { + trace_dir: None, + max_file_size: NonZeroByteCount::try_from(10 * 1024 * 1024).unwrap(), + max_total_size: NonZeroByteCount::try_from(100 * 1024 * 1024).unwrap(), + } + } } serde_with::with_prefix!(pub prefix_tokio_console "tokio_console_"); @@ -755,6 +805,7 @@ impl Default for CommonOptions { hlc_max_drift: FriendlyDuration::from_millis(5000), experimental_kafka_batch_ingestion: false, experimental_shuffler_batch_ingestion: false, + dial9: Dial9Options::default(), } } } diff --git a/lite/Cargo.toml b/lite/Cargo.toml index ddac1112cb..1215e5b330 100644 --- a/lite/Cargo.toml +++ b/lite/Cargo.toml @@ -17,6 +17,7 @@ dist = true [features] default = ["no-trace-logging"] no-trace-logging = ["tracing/max_level_trace", "tracing/release_max_level_debug"] +dial9 = ["restate-core/dial9"] [dependencies] restate-workspace-hack = { workspace = true } diff --git a/server/Cargo.toml b/server/Cargo.toml index bf1f79793b..ae9987e819 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -33,6 +33,7 @@ no-trace-logging = ["tracing/max_level_trace", "tracing/release_max_level_debug" metadata-api = ["restate-admin/metadata-api"] kafka-oidc = ["restate-node/kafka-oidc"] taskdump = ["restate-core/taskdump"] +dial9 = ["restate-core/dial9"] [dependencies] restate-workspace-hack = { workspace = true } diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index f66bfdeeb2..284f0616be 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -284,9 +284,11 @@ bitflags = { version = "2", default-features = false, features = ["std"] } clap = { version = "4" } crossterm = { version = "0.29" } getrandom-9fbad63c4bcf4a8f = { package = "getrandom", version = "0.4", default-features = false, features = ["std"] } +gimli = { version = "0.32" } hyper-rustls = { version = "0.27", default-features = false, features = ["webpki-tokio"] } jemalloc_pprof = { version = "0.8", default-features = false, features = ["flamegraph", "symbolize"] } libc = { version = "0.2", default-features = false, features = ["use_std"] } +metrics-util = { version = "0.20" } miniz_oxide = { version = "0.8", default-features = false, features = ["simd", "with-alloc"] } mio = { version = "1", features = ["net", "os-ext"] } num = { version = "0.4" } @@ -303,9 +305,11 @@ bitflags = { version = "2", default-features = false, features = ["std"] } clap = { version = "4" } crossterm = { version = "0.29" } getrandom-9fbad63c4bcf4a8f = { package = "getrandom", version = "0.4", default-features = false, features = ["std"] } +gimli = { version = "0.32" } hyper-rustls = { version = "0.27", default-features = false, features = ["webpki-tokio"] } jemalloc_pprof = { version = "0.8", default-features = false, features = ["flamegraph", "symbolize"] } libc = { version = "0.2", default-features = false, features = ["use_std"] } +metrics-util = { version = "0.20" } miniz_oxide = { version = "0.8", default-features = false, features = ["simd", "with-alloc"] } mio = { version = "1", features = ["net", "os-ext"] } num = { version = "0.4" } @@ -326,6 +330,7 @@ getrandom-9fbad63c4bcf4a8f = { package = "getrandom", version = "0.4", default-f hyper-rustls = { version = "0.27", default-features = false, features = ["webpki-tokio"] } jemalloc_pprof = { version = "0.8", default-features = false, features = ["flamegraph", "symbolize"] } libc = { version = "0.2", default-features = false, features = ["use_std"] } +metrics-util = { version = "0.20" } miniz_oxide = { version = "0.8", default-features = false, features = ["simd", "with-alloc"] } num = { version = "0.4" } object = { version = "0.37", default-features = false, features = ["read", "std"] } @@ -345,6 +350,7 @@ getrandom-9fbad63c4bcf4a8f = { package = "getrandom", version = "0.4", default-f hyper-rustls = { version = "0.27", default-features = false, features = ["webpki-tokio"] } jemalloc_pprof = { version = "0.8", default-features = false, features = ["flamegraph", "symbolize"] } libc = { version = "0.2", default-features = false, features = ["use_std"] } +metrics-util = { version = "0.20" } miniz_oxide = { version = "0.8", default-features = false, features = ["simd", "with-alloc"] } num = { version = "0.4" } object = { version = "0.37", default-features = false, features = ["read", "std"] }