From 883d90051543f82befd79c584ca629e1dc7b8ea4 Mon Sep 17 00:00:00 2001 From: John McLear Date: Fri, 15 May 2026 21:16:51 +0100 Subject: [PATCH 1/2] =?UTF-8?q?feat(scaling):=20fan-out=20debounce=20?= =?UTF-8?q?=E2=80=94=20#7756=20lever=203=20prototype?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per the scaling-dive doc (docs/scaling-dive-2026-05.md): - etherpad_changeset_apply_duration_seconds stays under 5ms even at 200 concurrent authors — server-side apply is not the bottleneck. - etherpad_socket_emits_total{type=NEW_CHANGES} scales O(N^2): 1160 emits/dwell at 20 authors -> 66032 at 200 authors. - Cliff in baseline appears at ~250 authors (errorRate flag). Fan-out is the dominant cost. This prototype debounces the per-pad fan-out so rapid commits coalesce into a single updatePadClients invocation per pad per window. Implementation: - New `settings.fanoutDebounceMs` (default 0 = legacy behaviour). When > 0, scheduleFanout queues a setTimeout per pad; further commits arriving inside the window don't reschedule, they just ride the existing timer. - Extracted scheduling into src/node/handler/FanoutScheduler.ts rather than nesting it in PadMessageHandler. The wrapper has no pad/DB dependency so it's unit-testable in isolation. - handleUserChanges replaces its direct `await exports.updatePadClients(pad)` with `scheduleFanout(pad.id, runFanout)`. Errors during the deferred fan-out are surfaced via the existing messageLogger. - Initial-join catch-up (line ~1377) keeps the synchronous call — a joining client must not miss revisions, no batching there. Tests: 5/5 in src/tests/backend-new/specs/fanout-debounce.test.ts — covers debounce=0 passthrough, N-into-1 coalescing, per-pad independence, post-flush rescheduling, and error routing. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/node/handler/FanoutScheduler.ts | 43 ++++++++++ src/node/handler/PadMessageHandler.ts | 16 +++- src/node/utils/Settings.ts | 14 ++++ .../backend-new/specs/fanout-debounce.test.ts | 78 +++++++++++++++++++ 4 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 src/node/handler/FanoutScheduler.ts create mode 100644 src/tests/backend-new/specs/fanout-debounce.test.ts diff --git a/src/node/handler/FanoutScheduler.ts b/src/node/handler/FanoutScheduler.ts new file mode 100644 index 00000000000..5056f41aec4 --- /dev/null +++ b/src/node/handler/FanoutScheduler.ts @@ -0,0 +1,43 @@ +// Per-pad fan-out scheduler — debounce wrapper for #7756 lever 3. +// +// When `settings.fanoutDebounceMs <= 0` (default), fan-out fires immediately +// — legacy behaviour. When > 0, rapid scheduleFanout(pad) calls coalesce +// into a single fan-out per pad per debounce window. +// +// Lives in its own module rather than inside PadMessageHandler so the +// scheduling logic can be unit-tested without pulling in the full pad / DB +// / socket.io stack. + +import settings from '../utils/Settings'; + +export type FanoutCallback = (padId: string) => Promise; + +const pendingFanouts = new Map(); +let onError: (padId: string, err: unknown) => void = (padId, err) => { + // Default error sink: re-throw on the next tick so it shows up in the log. + setImmediate(() => { throw err; }); +}; + +/** Override the error handler. PadMessageHandler installs one that uses messageLogger. */ +export const setErrorHandler = (fn: (padId: string, err: unknown) => void): void => { + onError = fn; +}; + +/** Schedule a fan-out for the given pad. */ +export const scheduleFanout = (padId: string, fanout: FanoutCallback): void => { + const debounceMs = settings.fanoutDebounceMs ?? 0; + if (debounceMs <= 0) { + void fanout(padId).catch((err) => onError(padId, err)); + return; + } + if (pendingFanouts.has(padId)) return; + const t = setTimeout(() => { + pendingFanouts.delete(padId); + void fanout(padId).catch((err) => onError(padId, err)); + }, debounceMs); + if (typeof (t as {unref?: () => void}).unref === 'function') (t as {unref: () => void}).unref(); + pendingFanouts.set(padId, t); +}; + +/** Test helper. */ +export const _state = {pendingFanouts}; diff --git a/src/node/handler/PadMessageHandler.ts b/src/node/handler/PadMessageHandler.ts index 3a27a7ac7be..f74446870a8 100644 --- a/src/node/handler/PadMessageHandler.ts +++ b/src/node/handler/PadMessageHandler.ts @@ -48,6 +48,20 @@ const hooks = require('../../static/js/pluginfw/hooks'); const stats = require('../stats') const assert = require('assert').strict; import {recordChangesetApply, recordSocketEmit} from '../prom-instruments'; +import {scheduleFanout, setErrorHandler as setFanoutErrorHandler} from './FanoutScheduler'; + +// Route fan-out scheduler errors through the existing messageLogger so they +// land in the regular log stream. +setFanoutErrorHandler((padId, err) => { + messageLogger.error(`fan-out for pad ${padId} failed: ${(err as Error).stack ?? err}`); +}); + +// Cached helper that the scheduler calls back into. Goes through exports so +// tests can still monkey-patch updatePadClients. +const runFanout = async (padId: string): Promise => { + const pad = await padManager.getPad(padId); + await exports.updatePadClients(pad); +}; import {RateLimiterMemory} from 'rate-limiter-flexible'; import {ChangesetRequest, PadUserInfo, SocketClientRequest} from "../types/SocketClientRequest"; import {APool, AText, PadAuthor, PadType} from "../types/PadType"; @@ -936,7 +950,7 @@ const handleUserChanges = async (socket:any, message: { socket.emit('message', {type: 'COLLABROOM', data: {type: 'ACCEPT_COMMIT', newRev}}); thisSession.rev = newRev; if (newRev !== r) thisSession.time = await pad.getRevisionDate(newRev); - await exports.updatePadClients(pad); + scheduleFanout(pad.id, runFanout); } catch (err:any) { socket.emit('message', {disconnect: 'badChangeset'}); stats.meter('failedChangesets').mark(); diff --git a/src/node/utils/Settings.ts b/src/node/utils/Settings.ts index 97413004100..0ed820f9698 100644 --- a/src/node/utils/Settings.ts +++ b/src/node/utils/Settings.ts @@ -272,6 +272,7 @@ export type SettingsType = { automaticReconnectionTimeout: number, loadTest: boolean, scalingDiveMetrics: boolean, + fanoutDebounceMs: number, dumpOnUncleanExit: boolean, indentationOnNewLine: boolean, logconfig: any | null, @@ -658,6 +659,19 @@ const settings: SettingsType = { * production deployments aren't paying for instrumentation they don't use. */ scalingDiveMetrics: false, + /** + * Coalesce NEW_CHANGES fan-out within a debounce window of N milliseconds + * (#7756 lever 3). When > 0, after a USER_CHANGES is accepted the pad-wide + * broadcast is deferred by this many ms; further commits arriving during + * the window are batched into the same fan-out pass. + * + * 0 (default) = legacy behaviour: fan-out fires synchronously after every + * accepted commit. Increase to trade a few ms of latency for a quadratic + * reduction in socket.io emit volume under high concurrency. Gated behind + * `loadTest` AND `scalingDiveMetrics` — production deployments are not + * affected by default. + */ + fanoutDebounceMs: 0, /** * Disable dump of objects preventing a clean exit */ diff --git a/src/tests/backend-new/specs/fanout-debounce.test.ts b/src/tests/backend-new/specs/fanout-debounce.test.ts new file mode 100644 index 00000000000..455db5db23b --- /dev/null +++ b/src/tests/backend-new/specs/fanout-debounce.test.ts @@ -0,0 +1,78 @@ +// Unit coverage for the per-pad fan-out debounce (#7756 lever 3). +// With debounce > 0, rapid scheduleFanout calls coalesce into a single +// fanout invocation per pad per debounce window. + +import {describe, it, expect, beforeEach, afterEach, vi} from 'vitest'; +import settings from '../../../node/utils/Settings'; +import {scheduleFanout, _state, setErrorHandler, type FanoutCallback} from '../../../node/handler/FanoutScheduler'; + +describe('fanout debounce', () => { + const originalDebounce = settings.fanoutDebounceMs; + let calls: string[]; + let fanout: FanoutCallback; + + beforeEach(() => { + vi.useFakeTimers(); + _state.pendingFanouts.clear(); + calls = []; + fanout = async (padId) => { calls.push(padId); }; + setErrorHandler(() => {/* swallow in tests */}); + }); + + afterEach(() => { + vi.useRealTimers(); + settings.fanoutDebounceMs = originalDebounce; + }); + + it('with debounce=0 fires the fanout synchronously per call', async () => { + settings.fanoutDebounceMs = 0; + scheduleFanout('pad-a', fanout); + scheduleFanout('pad-a', fanout); + scheduleFanout('pad-a', fanout); + await vi.runAllTimersAsync(); + expect(calls).toEqual(['pad-a', 'pad-a', 'pad-a']); + expect(_state.pendingFanouts.size).toBe(0); + }); + + it('with debounce>0 coalesces N rapid calls into a single fanout call', async () => { + settings.fanoutDebounceMs = 50; + for (let i = 0; i < 10; i++) scheduleFanout('pad-a', fanout); + expect(calls).toEqual([]); + expect(_state.pendingFanouts.size).toBe(1); + await vi.advanceTimersByTimeAsync(60); + expect(calls).toEqual(['pad-a']); + expect(_state.pendingFanouts.size).toBe(0); + }); + + it('debounces independently per pad', async () => { + settings.fanoutDebounceMs = 50; + scheduleFanout('pad-a', fanout); + scheduleFanout('pad-b', fanout); + scheduleFanout('pad-a', fanout); + expect(_state.pendingFanouts.size).toBe(2); + await vi.advanceTimersByTimeAsync(60); + expect(calls.sort()).toEqual(['pad-a', 'pad-b']); + }); + + it('after the window fires, a new schedule starts a fresh window', async () => { + settings.fanoutDebounceMs = 50; + scheduleFanout('pad-a', fanout); + await vi.advanceTimersByTimeAsync(60); + expect(calls).toEqual(['pad-a']); + scheduleFanout('pad-a', fanout); + expect(_state.pendingFanouts.size).toBe(1); + await vi.advanceTimersByTimeAsync(60); + expect(calls).toEqual(['pad-a', 'pad-a']); + }); + + it('routes fanout errors through setErrorHandler', async () => { + settings.fanoutDebounceMs = 0; + const errors: Array<{padId: string; err: unknown}> = []; + setErrorHandler((padId, err) => { errors.push({padId, err}); }); + scheduleFanout('pad-x', async () => { throw new Error('boom'); }); + await vi.runAllTimersAsync(); + expect(errors).toHaveLength(1); + expect(errors[0]!.padId).toBe('pad-x'); + expect((errors[0]!.err as Error).message).toBe('boom'); + }); +}); From af9f5ad7773fe3418cdf7d0d70877034c0eb5236 Mon Sep 17 00:00:00 2001 From: John McLear Date: Fri, 15 May 2026 21:26:54 +0100 Subject: [PATCH 2/2] fix(fanout): address Qodo review on #7766 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Five issues raised on the initial PR: 1. **Disabled state regressed to async.** Caller went through the scheduler even when debounceMs<=0, dropping the legacy `await updatePadClients(...)` semantics — errors no longer flowed through handleUserChanges' try/catch. Fix: only call scheduleFanout when debounceMs>0; otherwise the existing synchronous await path runs unchanged. 2. **Overlapping fan-outs per pad.** The scheduler deleted the pad's entry before the async callback resolved, so a schedule arriving mid-fanout could spawn a concurrent updatePadClients. Now tracked via running/dirty state: schedules during a run set `dirty`, and the running fan-out re-arms itself in its finally block to catch up. Late commits aren't missed. 3. **fanoutDebounceMs undocumented.** Added to settings.json.template alongside loadTest with the same prose as Settings.ts. 4. **Test leaked error handler.** beforeEach overrode the module- level handler but afterEach didn't restore it, polluting later test files. Added a resetErrorHandler() export and an afterEach that calls it. 5. **Misleading gating comment.** Settings.ts claimed the debounce was gated behind loadTest + scalingDiveMetrics, but the scheduler applied it unconditionally. Updated comment to reflect actual behaviour. 10/10 tests green (5 fanout + 5 prom-instruments). Co-Authored-By: Claude Opus 4.7 (1M context) --- settings.json.template | 8 ++ src/node/handler/FanoutScheduler.ts | 85 +++++++++++++++---- src/node/handler/PadMessageHandler.ts | 8 +- src/node/utils/Settings.ts | 11 +-- .../backend-new/specs/fanout-debounce.test.ts | 66 +++++++++----- 5 files changed, 132 insertions(+), 46 deletions(-) diff --git a/settings.json.template b/settings.json.template index e88e82a36af..cee476a9b1e 100644 --- a/settings.json.template +++ b/settings.json.template @@ -733,6 +733,14 @@ */ "loadTest": false, + /* + * Coalesce NEW_CHANGES fan-out within a debounce window of N milliseconds + * (#7756 lever 3). 0 (default) preserves legacy synchronous behaviour. + * Increase (typical: 25-100) to trade a little latency for a quadratic + * reduction in socket.io emit volume under high pad concurrency. + */ + "fanoutDebounceMs": 0, + /** * Disable dump of objects preventing a clean exit */ diff --git a/src/node/handler/FanoutScheduler.ts b/src/node/handler/FanoutScheduler.ts index 5056f41aec4..8254aff3ac8 100644 --- a/src/node/handler/FanoutScheduler.ts +++ b/src/node/handler/FanoutScheduler.ts @@ -1,8 +1,14 @@ // Per-pad fan-out scheduler — debounce wrapper for #7756 lever 3. // -// When `settings.fanoutDebounceMs <= 0` (default), fan-out fires immediately -// — legacy behaviour. When > 0, rapid scheduleFanout(pad) calls coalesce -// into a single fan-out per pad per debounce window. +// When `settings.fanoutDebounceMs <= 0` the scheduler is bypassed entirely; +// PadMessageHandler.handleUserChanges awaits updatePadClients directly. When +// > 0, rapid scheduleFanout(padId, fn) calls coalesce into a single fanout +// invocation per pad per debounce window. +// +// Concurrency contract: a single pad has at most one in-flight fan-out at a +// time. If a new commit arrives while a fan-out is running, a "dirty" flag +// is set; when the running fan-out finishes it triggers a follow-up schedule +// so the late commit isn't missed. // // Lives in its own module rather than inside PadMessageHandler so the // scheduling logic can be unit-tested without pulling in the full pad / DB @@ -12,32 +18,77 @@ import settings from '../utils/Settings'; export type FanoutCallback = (padId: string) => Promise; -const pendingFanouts = new Map(); -let onError: (padId: string, err: unknown) => void = (padId, err) => { - // Default error sink: re-throw on the next tick so it shows up in the log. +type PadState = { + /** setTimeout handle for the debounce window — set iff timer is pending. */ + timer?: NodeJS.Timeout; + /** True while updatePadClients (or the test stub) is running for this pad. */ + running: boolean; + /** A schedule arrived while running — re-schedule after the current run finishes. */ + dirty: boolean; +}; + +const state = new Map(); + +const defaultErrorHandler = (_padId: string, err: unknown): void => { + // Re-throw on the next tick so the error surfaces in the log stream. setImmediate(() => { throw err; }); }; +let onError: (padId: string, err: unknown) => void = defaultErrorHandler; /** Override the error handler. PadMessageHandler installs one that uses messageLogger. */ export const setErrorHandler = (fn: (padId: string, err: unknown) => void): void => { onError = fn; }; -/** Schedule a fan-out for the given pad. */ +/** Restore the default error handler (used by tests to avoid leaking state across files). */ +export const resetErrorHandler = (): void => { onError = defaultErrorHandler; }; + +const fireWindow = async (padId: string, fanout: FanoutCallback): Promise => { + const s = state.get(padId); + if (!s) return; + s.timer = undefined; + s.running = true; + s.dirty = false; + try { + await fanout(padId); + } catch (err) { + onError(padId, err); + } finally { + s.running = false; + if (s.dirty) { + // A schedule arrived during the run; do another pass so that + // late commits don't sit until the next user action. + s.dirty = false; + const debounceMs = settings.fanoutDebounceMs ?? 0; + s.timer = setTimeout(() => { void fireWindow(padId, fanout); }, debounceMs); + if (typeof (s.timer as {unref?: () => void}).unref === 'function') { + (s.timer as {unref: () => void}).unref(); + } + } else { + state.delete(padId); + } + } +}; + +/** Schedule a fan-out for the given pad. Caller guarantees fanoutDebounceMs > 0. */ export const scheduleFanout = (padId: string, fanout: FanoutCallback): void => { const debounceMs = settings.fanoutDebounceMs ?? 0; - if (debounceMs <= 0) { - void fanout(padId).catch((err) => onError(padId, err)); + let s = state.get(padId); + if (!s) { s = {running: false, dirty: false}; state.set(padId, s); } + if (s.running) { + // Defer to follow-up after the current run. + s.dirty = true; return; } - if (pendingFanouts.has(padId)) return; - const t = setTimeout(() => { - pendingFanouts.delete(padId); - void fanout(padId).catch((err) => onError(padId, err)); - }, debounceMs); - if (typeof (t as {unref?: () => void}).unref === 'function') (t as {unref: () => void}).unref(); - pendingFanouts.set(padId, t); + if (s.timer !== undefined) { + // Already scheduled in the current window. + return; + } + s.timer = setTimeout(() => { void fireWindow(padId, fanout); }, debounceMs); + if (typeof (s.timer as {unref?: () => void}).unref === 'function') { + (s.timer as {unref: () => void}).unref(); + } }; /** Test helper. */ -export const _state = {pendingFanouts}; +export const _state = state; diff --git a/src/node/handler/PadMessageHandler.ts b/src/node/handler/PadMessageHandler.ts index f74446870a8..3ae02e76a8c 100644 --- a/src/node/handler/PadMessageHandler.ts +++ b/src/node/handler/PadMessageHandler.ts @@ -950,7 +950,13 @@ const handleUserChanges = async (socket:any, message: { socket.emit('message', {type: 'COLLABROOM', data: {type: 'ACCEPT_COMMIT', newRev}}); thisSession.rev = newRev; if (newRev !== r) thisSession.time = await pad.getRevisionDate(newRev); - scheduleFanout(pad.id, runFanout); + if ((settings.fanoutDebounceMs ?? 0) > 0) { + // Debounce window enabled: defer + coalesce fan-out. + scheduleFanout(pad.id, runFanout); + } else { + // Default / legacy: synchronous, awaited, errors flow into this try/catch. + await exports.updatePadClients(pad); + } } catch (err:any) { socket.emit('message', {disconnect: 'badChangeset'}); stats.meter('failedChangesets').mark(); diff --git a/src/node/utils/Settings.ts b/src/node/utils/Settings.ts index 0ed820f9698..f45fe29e766 100644 --- a/src/node/utils/Settings.ts +++ b/src/node/utils/Settings.ts @@ -665,11 +665,12 @@ const settings: SettingsType = { * broadcast is deferred by this many ms; further commits arriving during * the window are batched into the same fan-out pass. * - * 0 (default) = legacy behaviour: fan-out fires synchronously after every - * accepted commit. Increase to trade a few ms of latency for a quadratic - * reduction in socket.io emit volume under high concurrency. Gated behind - * `loadTest` AND `scalingDiveMetrics` — production deployments are not - * affected by default. + * 0 (default) = legacy behaviour: fan-out fires synchronously, awaited, + * inside the existing handleUserChanges try/catch. The scheduler is + * bypassed entirely so disabling the feature is a strict no-op. + * + * Increase to trade a few ms of latency for a quadratic reduction in + * socket.io emit volume under high concurrency. */ fanoutDebounceMs: 0, /** diff --git a/src/tests/backend-new/specs/fanout-debounce.test.ts b/src/tests/backend-new/specs/fanout-debounce.test.ts index 455db5db23b..fe00a20fcfb 100644 --- a/src/tests/backend-new/specs/fanout-debounce.test.ts +++ b/src/tests/backend-new/specs/fanout-debounce.test.ts @@ -1,10 +1,17 @@ // Unit coverage for the per-pad fan-out debounce (#7756 lever 3). // With debounce > 0, rapid scheduleFanout calls coalesce into a single -// fanout invocation per pad per debounce window. +// fanout invocation per pad per debounce window. Concurrent fan-outs for +// the same pad are prevented by the running/dirty state. import {describe, it, expect, beforeEach, afterEach, vi} from 'vitest'; import settings from '../../../node/utils/Settings'; -import {scheduleFanout, _state, setErrorHandler, type FanoutCallback} from '../../../node/handler/FanoutScheduler'; +import { + scheduleFanout, + _state, + setErrorHandler, + resetErrorHandler, + type FanoutCallback, +} from '../../../node/handler/FanoutScheduler'; describe('fanout debounce', () => { const originalDebounce = settings.fanoutDebounceMs; @@ -13,64 +20,77 @@ describe('fanout debounce', () => { beforeEach(() => { vi.useFakeTimers(); - _state.pendingFanouts.clear(); + _state.clear(); calls = []; fanout = async (padId) => { calls.push(padId); }; setErrorHandler(() => {/* swallow in tests */}); + settings.fanoutDebounceMs = 50; }); afterEach(() => { vi.useRealTimers(); settings.fanoutDebounceMs = originalDebounce; + _state.clear(); + resetErrorHandler(); }); - it('with debounce=0 fires the fanout synchronously per call', async () => { - settings.fanoutDebounceMs = 0; - scheduleFanout('pad-a', fanout); - scheduleFanout('pad-a', fanout); - scheduleFanout('pad-a', fanout); - await vi.runAllTimersAsync(); - expect(calls).toEqual(['pad-a', 'pad-a', 'pad-a']); - expect(_state.pendingFanouts.size).toBe(0); - }); - - it('with debounce>0 coalesces N rapid calls into a single fanout call', async () => { - settings.fanoutDebounceMs = 50; + it('coalesces N rapid calls into a single fanout call within one window', async () => { for (let i = 0; i < 10; i++) scheduleFanout('pad-a', fanout); expect(calls).toEqual([]); - expect(_state.pendingFanouts.size).toBe(1); + expect(_state.size).toBe(1); await vi.advanceTimersByTimeAsync(60); expect(calls).toEqual(['pad-a']); - expect(_state.pendingFanouts.size).toBe(0); + expect(_state.size).toBe(0); }); it('debounces independently per pad', async () => { - settings.fanoutDebounceMs = 50; scheduleFanout('pad-a', fanout); scheduleFanout('pad-b', fanout); scheduleFanout('pad-a', fanout); - expect(_state.pendingFanouts.size).toBe(2); + expect(_state.size).toBe(2); await vi.advanceTimersByTimeAsync(60); expect(calls.sort()).toEqual(['pad-a', 'pad-b']); + expect(_state.size).toBe(0); }); it('after the window fires, a new schedule starts a fresh window', async () => { - settings.fanoutDebounceMs = 50; scheduleFanout('pad-a', fanout); await vi.advanceTimersByTimeAsync(60); expect(calls).toEqual(['pad-a']); scheduleFanout('pad-a', fanout); - expect(_state.pendingFanouts.size).toBe(1); + expect(_state.size).toBe(1); await vi.advanceTimersByTimeAsync(60); expect(calls).toEqual(['pad-a', 'pad-a']); }); + it('schedules arriving during an in-flight fan-out get a follow-up pass', async () => { + // Slow fanout that yields so we can interleave a schedule mid-flight. + let release: () => void; + const gate = new Promise((r) => { release = r; }); + fanout = async (padId) => { + calls.push(`start-${padId}`); + await gate; + calls.push(`end-${padId}`); + }; + scheduleFanout('pad-a', fanout); + await vi.advanceTimersByTimeAsync(60); // window fires, callback awaits gate + expect(calls).toEqual(['start-pad-a']); + // A new commit lands while the fan-out is awaiting. + scheduleFanout('pad-a', fanout); + expect(_state.get('pad-a')?.dirty).toBe(true); + // Release the running fan-out; the dirty flag should trigger another window. + release!(); + await vi.runOnlyPendingTimersAsync(); + await vi.advanceTimersByTimeAsync(60); + expect(calls).toEqual(['start-pad-a', 'end-pad-a', 'start-pad-a', 'end-pad-a']); + expect(_state.size).toBe(0); + }); + it('routes fanout errors through setErrorHandler', async () => { - settings.fanoutDebounceMs = 0; const errors: Array<{padId: string; err: unknown}> = []; setErrorHandler((padId, err) => { errors.push({padId, err}); }); scheduleFanout('pad-x', async () => { throw new Error('boom'); }); - await vi.runAllTimersAsync(); + await vi.advanceTimersByTimeAsync(60); expect(errors).toHaveLength(1); expect(errors[0]!.padId).toBe('pad-x'); expect((errors[0]!.err as Error).message).toBe('boom');