Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions settings.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,14 @@
*/
"loadTest": false,

/*
* Coalesce NEW_CHANGES fan-out within a debounce window of N milliseconds
* (#7756 lever 3). 0 (default) preserves legacy synchronous behaviour.
* Increase (typical: 25-100) to trade a little latency for a quadratic
* reduction in socket.io emit volume under high pad concurrency.
*/
"fanoutDebounceMs": 0,

/**
* Disable dump of objects preventing a clean exit
*/
Expand Down
94 changes: 94 additions & 0 deletions src/node/handler/FanoutScheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Per-pad fan-out scheduler — debounce wrapper for #7756 lever 3.
//
// When `settings.fanoutDebounceMs <= 0` the scheduler is bypassed entirely;
// PadMessageHandler.handleUserChanges awaits updatePadClients directly. When
// > 0, rapid scheduleFanout(padId, fn) calls coalesce into a single fanout
// invocation per pad per debounce window.
//
// Concurrency contract: a single pad has at most one in-flight fan-out at a
// time. If a new commit arrives while a fan-out is running, a "dirty" flag
// is set; when the running fan-out finishes it triggers a follow-up schedule
// so the late commit isn't missed.
//
// Lives in its own module rather than inside PadMessageHandler so the
// scheduling logic can be unit-tested without pulling in the full pad / DB
// / socket.io stack.

import settings from '../utils/Settings';

export type FanoutCallback = (padId: string) => Promise<void>;

type PadState = {
/** setTimeout handle for the debounce window — set iff timer is pending. */
timer?: NodeJS.Timeout;
/** True while updatePadClients (or the test stub) is running for this pad. */
running: boolean;
/** A schedule arrived while running — re-schedule after the current run finishes. */
dirty: boolean;
};

const state = new Map<string, PadState>();

const defaultErrorHandler = (_padId: string, err: unknown): void => {
// Re-throw on the next tick so the error surfaces in the log stream.
setImmediate(() => { throw err; });
};
let onError: (padId: string, err: unknown) => void = defaultErrorHandler;

/** Override the error handler. PadMessageHandler installs one that uses messageLogger. */
export const setErrorHandler = (fn: (padId: string, err: unknown) => void): void => {
onError = fn;
};

/** Restore the default error handler (used by tests to avoid leaking state across files). */
export const resetErrorHandler = (): void => { onError = defaultErrorHandler; };

const fireWindow = async (padId: string, fanout: FanoutCallback): Promise<void> => {
const s = state.get(padId);
if (!s) return;
s.timer = undefined;
s.running = true;
s.dirty = false;
try {
await fanout(padId);
} catch (err) {
onError(padId, err);
} finally {
s.running = false;
if (s.dirty) {
// A schedule arrived during the run; do another pass so that
// late commits don't sit until the next user action.
s.dirty = false;
const debounceMs = settings.fanoutDebounceMs ?? 0;
s.timer = setTimeout(() => { void fireWindow(padId, fanout); }, debounceMs);
if (typeof (s.timer as {unref?: () => void}).unref === 'function') {
(s.timer as {unref: () => void}).unref();
}
} else {
state.delete(padId);
}
}
};

/** Schedule a fan-out for the given pad. Caller guarantees fanoutDebounceMs > 0. */
export const scheduleFanout = (padId: string, fanout: FanoutCallback): void => {
const debounceMs = settings.fanoutDebounceMs ?? 0;
let s = state.get(padId);
if (!s) { s = {running: false, dirty: false}; state.set(padId, s); }
if (s.running) {
// Defer to follow-up after the current run.
s.dirty = true;
return;
}
if (s.timer !== undefined) {
// Already scheduled in the current window.
return;
}
s.timer = setTimeout(() => { void fireWindow(padId, fanout); }, debounceMs);
if (typeof (s.timer as {unref?: () => void}).unref === 'function') {
(s.timer as {unref: () => void}).unref();
}
};

/** Test helper. */
export const _state = state;
22 changes: 21 additions & 1 deletion src/node/handler/PadMessageHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ const hooks = require('../../static/js/pluginfw/hooks');
const stats = require('../stats')
const assert = require('assert').strict;
import {recordChangesetApply, recordSocketEmit} from '../prom-instruments';
import {scheduleFanout, setErrorHandler as setFanoutErrorHandler} from './FanoutScheduler';

// Route fan-out scheduler errors through the existing messageLogger so they
// land in the regular log stream.
setFanoutErrorHandler((padId, err) => {
messageLogger.error(`fan-out for pad ${padId} failed: ${(err as Error).stack ?? err}`);
});

// Cached helper that the scheduler calls back into. Goes through exports so
// tests can still monkey-patch updatePadClients.
const runFanout = async (padId: string): Promise<void> => {
const pad = await padManager.getPad(padId);
await exports.updatePadClients(pad);
};
import {RateLimiterMemory} from 'rate-limiter-flexible';
import {ChangesetRequest, PadUserInfo, SocketClientRequest} from "../types/SocketClientRequest";
import {APool, AText, PadAuthor, PadType} from "../types/PadType";
Expand Down Expand Up @@ -936,7 +950,13 @@ const handleUserChanges = async (socket:any, message: {
socket.emit('message', {type: 'COLLABROOM', data: {type: 'ACCEPT_COMMIT', newRev}});
thisSession.rev = newRev;
if (newRev !== r) thisSession.time = await pad.getRevisionDate(newRev);
await exports.updatePadClients(pad);
if ((settings.fanoutDebounceMs ?? 0) > 0) {
// Debounce window enabled: defer + coalesce fan-out.
scheduleFanout(pad.id, runFanout);
} else {
// Default / legacy: synchronous, awaited, errors flow into this try/catch.
await exports.updatePadClients(pad);
}
} catch (err:any) {
socket.emit('message', {disconnect: 'badChangeset'});
stats.meter('failedChangesets').mark();
Expand Down
15 changes: 15 additions & 0 deletions src/node/utils/Settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ export type SettingsType = {
automaticReconnectionTimeout: number,
loadTest: boolean,
scalingDiveMetrics: boolean,
fanoutDebounceMs: number,
dumpOnUncleanExit: boolean,
indentationOnNewLine: boolean,
logconfig: any | null,
Expand Down Expand Up @@ -658,6 +659,20 @@ const settings: SettingsType = {
* production deployments aren't paying for instrumentation they don't use.
*/
scalingDiveMetrics: false,
/**
* Coalesce NEW_CHANGES fan-out within a debounce window of N milliseconds
* (#7756 lever 3). When > 0, after a USER_CHANGES is accepted the pad-wide
* broadcast is deferred by this many ms; further commits arriving during
* the window are batched into the same fan-out pass.
*
* 0 (default) = legacy behaviour: fan-out fires synchronously, awaited,
* inside the existing handleUserChanges try/catch. The scheduler is
* bypassed entirely so disabling the feature is a strict no-op.
*
* Increase to trade a few ms of latency for a quadratic reduction in
* socket.io emit volume under high concurrency.
*/
fanoutDebounceMs: 0,
/**
* Disable dump of objects preventing a clean exit
*/
Expand Down
98 changes: 98 additions & 0 deletions src/tests/backend-new/specs/fanout-debounce.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Unit coverage for the per-pad fan-out debounce (#7756 lever 3).
// With debounce > 0, rapid scheduleFanout calls coalesce into a single
// fanout invocation per pad per debounce window. Concurrent fan-outs for
// the same pad are prevented by the running/dirty state.

import {describe, it, expect, beforeEach, afterEach, vi} from 'vitest';
import settings from '../../../node/utils/Settings';
import {
scheduleFanout,
_state,
setErrorHandler,
resetErrorHandler,
type FanoutCallback,
} from '../../../node/handler/FanoutScheduler';

describe('fanout debounce', () => {
const originalDebounce = settings.fanoutDebounceMs;
let calls: string[];
let fanout: FanoutCallback;

beforeEach(() => {
vi.useFakeTimers();
_state.clear();
calls = [];
fanout = async (padId) => { calls.push(padId); };
setErrorHandler(() => {/* swallow in tests */});
settings.fanoutDebounceMs = 50;
});

afterEach(() => {
vi.useRealTimers();
settings.fanoutDebounceMs = originalDebounce;
_state.clear();
resetErrorHandler();
});

it('coalesces N rapid calls into a single fanout call within one window', async () => {
for (let i = 0; i < 10; i++) scheduleFanout('pad-a', fanout);
expect(calls).toEqual([]);
expect(_state.size).toBe(1);
await vi.advanceTimersByTimeAsync(60);
expect(calls).toEqual(['pad-a']);
expect(_state.size).toBe(0);
});

it('debounces independently per pad', async () => {
scheduleFanout('pad-a', fanout);
scheduleFanout('pad-b', fanout);
scheduleFanout('pad-a', fanout);
expect(_state.size).toBe(2);
await vi.advanceTimersByTimeAsync(60);
expect(calls.sort()).toEqual(['pad-a', 'pad-b']);
expect(_state.size).toBe(0);
});

it('after the window fires, a new schedule starts a fresh window', async () => {
scheduleFanout('pad-a', fanout);
await vi.advanceTimersByTimeAsync(60);
expect(calls).toEqual(['pad-a']);
scheduleFanout('pad-a', fanout);
expect(_state.size).toBe(1);
await vi.advanceTimersByTimeAsync(60);
expect(calls).toEqual(['pad-a', 'pad-a']);
});

it('schedules arriving during an in-flight fan-out get a follow-up pass', async () => {
// Slow fanout that yields so we can interleave a schedule mid-flight.
let release: () => void;
const gate = new Promise<void>((r) => { release = r; });
fanout = async (padId) => {
calls.push(`start-${padId}`);
await gate;
calls.push(`end-${padId}`);
};
scheduleFanout('pad-a', fanout);
await vi.advanceTimersByTimeAsync(60); // window fires, callback awaits gate
expect(calls).toEqual(['start-pad-a']);
// A new commit lands while the fan-out is awaiting.
scheduleFanout('pad-a', fanout);
expect(_state.get('pad-a')?.dirty).toBe(true);
// Release the running fan-out; the dirty flag should trigger another window.
release!();
await vi.runOnlyPendingTimersAsync();
await vi.advanceTimersByTimeAsync(60);
expect(calls).toEqual(['start-pad-a', 'end-pad-a', 'start-pad-a', 'end-pad-a']);
expect(_state.size).toBe(0);
});

it('routes fanout errors through setErrorHandler', async () => {
const errors: Array<{padId: string; err: unknown}> = [];
setErrorHandler((padId, err) => { errors.push({padId, err}); });
scheduleFanout('pad-x', async () => { throw new Error('boom'); });
await vi.advanceTimersByTimeAsync(60);
expect(errors).toHaveLength(1);
expect(errors[0]!.padId).toBe('pad-x');
expect((errors[0]!.err as Error).message).toBe('boom');
});
});
Loading