diff --git a/settings.json.template b/settings.json.template index e88e82a36af..cee476a9b1e 100644 --- a/settings.json.template +++ b/settings.json.template @@ -733,6 +733,14 @@ */ "loadTest": false, + /* + * Coalesce NEW_CHANGES fan-out within a debounce window of N milliseconds + * (#7756 lever 3). 0 (default) preserves legacy synchronous behaviour. + * Increase (typical: 25-100) to trade a little latency for a quadratic + * reduction in socket.io emit volume under high pad concurrency. + */ + "fanoutDebounceMs": 0, + /** * Disable dump of objects preventing a clean exit */ diff --git a/src/node/handler/FanoutScheduler.ts b/src/node/handler/FanoutScheduler.ts new file mode 100644 index 00000000000..8254aff3ac8 --- /dev/null +++ b/src/node/handler/FanoutScheduler.ts @@ -0,0 +1,94 @@ +// Per-pad fan-out scheduler — debounce wrapper for #7756 lever 3. +// +// When `settings.fanoutDebounceMs <= 0` the scheduler is bypassed entirely; +// PadMessageHandler.handleUserChanges awaits updatePadClients directly. When +// > 0, rapid scheduleFanout(padId, fn) calls coalesce into a single fanout +// invocation per pad per debounce window. +// +// Concurrency contract: a single pad has at most one in-flight fan-out at a +// time. If a new commit arrives while a fan-out is running, a "dirty" flag +// is set; when the running fan-out finishes it triggers a follow-up schedule +// so the late commit isn't missed. +// +// Lives in its own module rather than inside PadMessageHandler so the +// scheduling logic can be unit-tested without pulling in the full pad / DB +// / socket.io stack. + +import settings from '../utils/Settings'; + +export type FanoutCallback = (padId: string) => Promise; + +type PadState = { + /** setTimeout handle for the debounce window — set iff timer is pending. */ + timer?: NodeJS.Timeout; + /** True while updatePadClients (or the test stub) is running for this pad. */ + running: boolean; + /** A schedule arrived while running — re-schedule after the current run finishes. */ + dirty: boolean; +}; + +const state = new Map(); + +const defaultErrorHandler = (_padId: string, err: unknown): void => { + // Re-throw on the next tick so the error surfaces in the log stream. + setImmediate(() => { throw err; }); +}; +let onError: (padId: string, err: unknown) => void = defaultErrorHandler; + +/** Override the error handler. PadMessageHandler installs one that uses messageLogger. */ +export const setErrorHandler = (fn: (padId: string, err: unknown) => void): void => { + onError = fn; +}; + +/** Restore the default error handler (used by tests to avoid leaking state across files). */ +export const resetErrorHandler = (): void => { onError = defaultErrorHandler; }; + +const fireWindow = async (padId: string, fanout: FanoutCallback): Promise => { + const s = state.get(padId); + if (!s) return; + s.timer = undefined; + s.running = true; + s.dirty = false; + try { + await fanout(padId); + } catch (err) { + onError(padId, err); + } finally { + s.running = false; + if (s.dirty) { + // A schedule arrived during the run; do another pass so that + // late commits don't sit until the next user action. + s.dirty = false; + const debounceMs = settings.fanoutDebounceMs ?? 0; + s.timer = setTimeout(() => { void fireWindow(padId, fanout); }, debounceMs); + if (typeof (s.timer as {unref?: () => void}).unref === 'function') { + (s.timer as {unref: () => void}).unref(); + } + } else { + state.delete(padId); + } + } +}; + +/** Schedule a fan-out for the given pad. Caller guarantees fanoutDebounceMs > 0. */ +export const scheduleFanout = (padId: string, fanout: FanoutCallback): void => { + const debounceMs = settings.fanoutDebounceMs ?? 0; + let s = state.get(padId); + if (!s) { s = {running: false, dirty: false}; state.set(padId, s); } + if (s.running) { + // Defer to follow-up after the current run. + s.dirty = true; + return; + } + if (s.timer !== undefined) { + // Already scheduled in the current window. + return; + } + s.timer = setTimeout(() => { void fireWindow(padId, fanout); }, debounceMs); + if (typeof (s.timer as {unref?: () => void}).unref === 'function') { + (s.timer as {unref: () => void}).unref(); + } +}; + +/** Test helper. */ +export const _state = state; diff --git a/src/node/handler/PadMessageHandler.ts b/src/node/handler/PadMessageHandler.ts index 3a27a7ac7be..3ae02e76a8c 100644 --- a/src/node/handler/PadMessageHandler.ts +++ b/src/node/handler/PadMessageHandler.ts @@ -48,6 +48,20 @@ const hooks = require('../../static/js/pluginfw/hooks'); const stats = require('../stats') const assert = require('assert').strict; import {recordChangesetApply, recordSocketEmit} from '../prom-instruments'; +import {scheduleFanout, setErrorHandler as setFanoutErrorHandler} from './FanoutScheduler'; + +// Route fan-out scheduler errors through the existing messageLogger so they +// land in the regular log stream. +setFanoutErrorHandler((padId, err) => { + messageLogger.error(`fan-out for pad ${padId} failed: ${(err as Error).stack ?? err}`); +}); + +// Cached helper that the scheduler calls back into. Goes through exports so +// tests can still monkey-patch updatePadClients. +const runFanout = async (padId: string): Promise => { + const pad = await padManager.getPad(padId); + await exports.updatePadClients(pad); +}; import {RateLimiterMemory} from 'rate-limiter-flexible'; import {ChangesetRequest, PadUserInfo, SocketClientRequest} from "../types/SocketClientRequest"; import {APool, AText, PadAuthor, PadType} from "../types/PadType"; @@ -936,7 +950,13 @@ const handleUserChanges = async (socket:any, message: { socket.emit('message', {type: 'COLLABROOM', data: {type: 'ACCEPT_COMMIT', newRev}}); thisSession.rev = newRev; if (newRev !== r) thisSession.time = await pad.getRevisionDate(newRev); - await exports.updatePadClients(pad); + if ((settings.fanoutDebounceMs ?? 0) > 0) { + // Debounce window enabled: defer + coalesce fan-out. + scheduleFanout(pad.id, runFanout); + } else { + // Default / legacy: synchronous, awaited, errors flow into this try/catch. + await exports.updatePadClients(pad); + } } catch (err:any) { socket.emit('message', {disconnect: 'badChangeset'}); stats.meter('failedChangesets').mark(); diff --git a/src/node/utils/Settings.ts b/src/node/utils/Settings.ts index 97413004100..f45fe29e766 100644 --- a/src/node/utils/Settings.ts +++ b/src/node/utils/Settings.ts @@ -272,6 +272,7 @@ export type SettingsType = { automaticReconnectionTimeout: number, loadTest: boolean, scalingDiveMetrics: boolean, + fanoutDebounceMs: number, dumpOnUncleanExit: boolean, indentationOnNewLine: boolean, logconfig: any | null, @@ -658,6 +659,20 @@ const settings: SettingsType = { * production deployments aren't paying for instrumentation they don't use. */ scalingDiveMetrics: false, + /** + * Coalesce NEW_CHANGES fan-out within a debounce window of N milliseconds + * (#7756 lever 3). When > 0, after a USER_CHANGES is accepted the pad-wide + * broadcast is deferred by this many ms; further commits arriving during + * the window are batched into the same fan-out pass. + * + * 0 (default) = legacy behaviour: fan-out fires synchronously, awaited, + * inside the existing handleUserChanges try/catch. The scheduler is + * bypassed entirely so disabling the feature is a strict no-op. + * + * Increase to trade a few ms of latency for a quadratic reduction in + * socket.io emit volume under high concurrency. + */ + fanoutDebounceMs: 0, /** * Disable dump of objects preventing a clean exit */ diff --git a/src/tests/backend-new/specs/fanout-debounce.test.ts b/src/tests/backend-new/specs/fanout-debounce.test.ts new file mode 100644 index 00000000000..fe00a20fcfb --- /dev/null +++ b/src/tests/backend-new/specs/fanout-debounce.test.ts @@ -0,0 +1,98 @@ +// Unit coverage for the per-pad fan-out debounce (#7756 lever 3). +// With debounce > 0, rapid scheduleFanout calls coalesce into a single +// fanout invocation per pad per debounce window. Concurrent fan-outs for +// the same pad are prevented by the running/dirty state. + +import {describe, it, expect, beforeEach, afterEach, vi} from 'vitest'; +import settings from '../../../node/utils/Settings'; +import { + scheduleFanout, + _state, + setErrorHandler, + resetErrorHandler, + type FanoutCallback, +} from '../../../node/handler/FanoutScheduler'; + +describe('fanout debounce', () => { + const originalDebounce = settings.fanoutDebounceMs; + let calls: string[]; + let fanout: FanoutCallback; + + beforeEach(() => { + vi.useFakeTimers(); + _state.clear(); + calls = []; + fanout = async (padId) => { calls.push(padId); }; + setErrorHandler(() => {/* swallow in tests */}); + settings.fanoutDebounceMs = 50; + }); + + afterEach(() => { + vi.useRealTimers(); + settings.fanoutDebounceMs = originalDebounce; + _state.clear(); + resetErrorHandler(); + }); + + it('coalesces N rapid calls into a single fanout call within one window', async () => { + for (let i = 0; i < 10; i++) scheduleFanout('pad-a', fanout); + expect(calls).toEqual([]); + expect(_state.size).toBe(1); + await vi.advanceTimersByTimeAsync(60); + expect(calls).toEqual(['pad-a']); + expect(_state.size).toBe(0); + }); + + it('debounces independently per pad', async () => { + scheduleFanout('pad-a', fanout); + scheduleFanout('pad-b', fanout); + scheduleFanout('pad-a', fanout); + expect(_state.size).toBe(2); + await vi.advanceTimersByTimeAsync(60); + expect(calls.sort()).toEqual(['pad-a', 'pad-b']); + expect(_state.size).toBe(0); + }); + + it('after the window fires, a new schedule starts a fresh window', async () => { + scheduleFanout('pad-a', fanout); + await vi.advanceTimersByTimeAsync(60); + expect(calls).toEqual(['pad-a']); + scheduleFanout('pad-a', fanout); + expect(_state.size).toBe(1); + await vi.advanceTimersByTimeAsync(60); + expect(calls).toEqual(['pad-a', 'pad-a']); + }); + + it('schedules arriving during an in-flight fan-out get a follow-up pass', async () => { + // Slow fanout that yields so we can interleave a schedule mid-flight. + let release: () => void; + const gate = new Promise((r) => { release = r; }); + fanout = async (padId) => { + calls.push(`start-${padId}`); + await gate; + calls.push(`end-${padId}`); + }; + scheduleFanout('pad-a', fanout); + await vi.advanceTimersByTimeAsync(60); // window fires, callback awaits gate + expect(calls).toEqual(['start-pad-a']); + // A new commit lands while the fan-out is awaiting. + scheduleFanout('pad-a', fanout); + expect(_state.get('pad-a')?.dirty).toBe(true); + // Release the running fan-out; the dirty flag should trigger another window. + release!(); + await vi.runOnlyPendingTimersAsync(); + await vi.advanceTimersByTimeAsync(60); + expect(calls).toEqual(['start-pad-a', 'end-pad-a', 'start-pad-a', 'end-pad-a']); + expect(_state.size).toBe(0); + }); + + it('routes fanout errors through setErrorHandler', async () => { + const errors: Array<{padId: string; err: unknown}> = []; + setErrorHandler((padId, err) => { errors.push({padId, err}); }); + scheduleFanout('pad-x', async () => { throw new Error('boom'); }); + await vi.advanceTimersByTimeAsync(60); + expect(errors).toHaveLength(1); + expect(errors[0]!.padId).toBe('pad-x'); + expect((errors[0]!.err as Error).message).toBe('boom'); + }); +});