Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions src/node/handler/FanoutScheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Per-pad fan-out scheduler — debounce wrapper for #7756 lever 3.
//
// When `settings.fanoutDebounceMs <= 0` (default), fan-out fires immediately
// — legacy behaviour. When > 0, rapid scheduleFanout(pad) calls coalesce
// into a single fan-out per pad per debounce window.
//
// Lives in its own module rather than inside PadMessageHandler so the
// scheduling logic can be unit-tested without pulling in the full pad / DB
// / socket.io stack.

import settings from '../utils/Settings';

export type FanoutCallback = (padId: string) => Promise<void>;

const pendingFanouts = new Map<string, NodeJS.Timeout>();
let onError: (padId: string, err: unknown) => void = (padId, err) => {
// Default error sink: re-throw on the next tick so it shows up in the log.
setImmediate(() => { throw err; });
};

/** Override the error handler. PadMessageHandler installs one that uses messageLogger. */
export const setErrorHandler = (fn: (padId: string, err: unknown) => void): void => {
onError = fn;
};

/** Schedule a fan-out for the given pad. */
export const scheduleFanout = (padId: string, fanout: FanoutCallback): void => {
const debounceMs = settings.fanoutDebounceMs ?? 0;
if (debounceMs <= 0) {
void fanout(padId).catch((err) => onError(padId, err));
return;
}
if (pendingFanouts.has(padId)) return;
const t = setTimeout(() => {
pendingFanouts.delete(padId);
void fanout(padId).catch((err) => onError(padId, err));
}, debounceMs);
Comment thread
qodo-free-for-open-source-projects[bot] marked this conversation as resolved.
Outdated
if (typeof (t as {unref?: () => void}).unref === 'function') (t as {unref: () => void}).unref();
pendingFanouts.set(padId, t);
};

/** Test helper. */
export const _state = {pendingFanouts};
16 changes: 15 additions & 1 deletion src/node/handler/PadMessageHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ const hooks = require('../../static/js/pluginfw/hooks');
const stats = require('../stats')
const assert = require('assert').strict;
import {recordChangesetApply, recordSocketEmit} from '../prom-instruments';
import {scheduleFanout, setErrorHandler as setFanoutErrorHandler} from './FanoutScheduler';

// Route fan-out scheduler errors through the existing messageLogger so they
// land in the regular log stream.
setFanoutErrorHandler((padId, err) => {
messageLogger.error(`fan-out for pad ${padId} failed: ${(err as Error).stack ?? err}`);
});

// Cached helper that the scheduler calls back into. Goes through exports so
// tests can still monkey-patch updatePadClients.
const runFanout = async (padId: string): Promise<void> => {
const pad = await padManager.getPad(padId);
await exports.updatePadClients(pad);
};
import {RateLimiterMemory} from 'rate-limiter-flexible';
import {ChangesetRequest, PadUserInfo, SocketClientRequest} from "../types/SocketClientRequest";
import {APool, AText, PadAuthor, PadType} from "../types/PadType";
Expand Down Expand Up @@ -936,7 +950,7 @@ const handleUserChanges = async (socket:any, message: {
socket.emit('message', {type: 'COLLABROOM', data: {type: 'ACCEPT_COMMIT', newRev}});
thisSession.rev = newRev;
if (newRev !== r) thisSession.time = await pad.getRevisionDate(newRev);
await exports.updatePadClients(pad);
scheduleFanout(pad.id, runFanout);
Comment thread
qodo-free-for-open-source-projects[bot] marked this conversation as resolved.
Outdated
} catch (err:any) {
socket.emit('message', {disconnect: 'badChangeset'});
stats.meter('failedChangesets').mark();
Expand Down
14 changes: 14 additions & 0 deletions src/node/utils/Settings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ export type SettingsType = {
automaticReconnectionTimeout: number,
loadTest: boolean,
scalingDiveMetrics: boolean,
fanoutDebounceMs: number,
dumpOnUncleanExit: boolean,
indentationOnNewLine: boolean,
logconfig: any | null,
Expand Down Expand Up @@ -658,6 +659,19 @@ const settings: SettingsType = {
* production deployments aren't paying for instrumentation they don't use.
*/
scalingDiveMetrics: false,
/**
* Coalesce NEW_CHANGES fan-out within a debounce window of N milliseconds
* (#7756 lever 3). When > 0, after a USER_CHANGES is accepted the pad-wide
* broadcast is deferred by this many ms; further commits arriving during
* the window are batched into the same fan-out pass.
*
* 0 (default) = legacy behaviour: fan-out fires synchronously after every
* accepted commit. Increase to trade a few ms of latency for a quadratic
* reduction in socket.io emit volume under high concurrency. Gated behind
* `loadTest` AND `scalingDiveMetrics` — production deployments are not
* affected by default.
*/
fanoutDebounceMs: 0,
/**
* Disable dump of objects preventing a clean exit
*/
Expand Down
78 changes: 78 additions & 0 deletions src/tests/backend-new/specs/fanout-debounce.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Unit coverage for the per-pad fan-out debounce (#7756 lever 3).
// With debounce > 0, rapid scheduleFanout calls coalesce into a single
// fanout invocation per pad per debounce window.

import {describe, it, expect, beforeEach, afterEach, vi} from 'vitest';
import settings from '../../../node/utils/Settings';
import {scheduleFanout, _state, setErrorHandler, type FanoutCallback} from '../../../node/handler/FanoutScheduler';

describe('fanout debounce', () => {
const originalDebounce = settings.fanoutDebounceMs;
let calls: string[];
let fanout: FanoutCallback;

beforeEach(() => {
vi.useFakeTimers();
_state.pendingFanouts.clear();
calls = [];
fanout = async (padId) => { calls.push(padId); };
setErrorHandler(() => {/* swallow in tests */});
});

afterEach(() => {
vi.useRealTimers();
settings.fanoutDebounceMs = originalDebounce;
});

it('with debounce=0 fires the fanout synchronously per call', async () => {
settings.fanoutDebounceMs = 0;
scheduleFanout('pad-a', fanout);
scheduleFanout('pad-a', fanout);
scheduleFanout('pad-a', fanout);
await vi.runAllTimersAsync();
expect(calls).toEqual(['pad-a', 'pad-a', 'pad-a']);
expect(_state.pendingFanouts.size).toBe(0);
});

it('with debounce>0 coalesces N rapid calls into a single fanout call', async () => {
settings.fanoutDebounceMs = 50;
for (let i = 0; i < 10; i++) scheduleFanout('pad-a', fanout);
expect(calls).toEqual([]);
expect(_state.pendingFanouts.size).toBe(1);
await vi.advanceTimersByTimeAsync(60);
expect(calls).toEqual(['pad-a']);
expect(_state.pendingFanouts.size).toBe(0);
});

it('debounces independently per pad', async () => {
settings.fanoutDebounceMs = 50;
scheduleFanout('pad-a', fanout);
scheduleFanout('pad-b', fanout);
scheduleFanout('pad-a', fanout);
expect(_state.pendingFanouts.size).toBe(2);
await vi.advanceTimersByTimeAsync(60);
expect(calls.sort()).toEqual(['pad-a', 'pad-b']);
});

it('after the window fires, a new schedule starts a fresh window', async () => {
settings.fanoutDebounceMs = 50;
scheduleFanout('pad-a', fanout);
await vi.advanceTimersByTimeAsync(60);
expect(calls).toEqual(['pad-a']);
scheduleFanout('pad-a', fanout);
expect(_state.pendingFanouts.size).toBe(1);
await vi.advanceTimersByTimeAsync(60);
expect(calls).toEqual(['pad-a', 'pad-a']);
});

it('routes fanout errors through setErrorHandler', async () => {
settings.fanoutDebounceMs = 0;
const errors: Array<{padId: string; err: unknown}> = [];
setErrorHandler((padId, err) => { errors.push({padId, err}); });
scheduleFanout('pad-x', async () => { throw new Error('boom'); });
await vi.runAllTimersAsync();
expect(errors).toHaveLength(1);
expect(errors[0]!.padId).toBe('pad-x');
expect((errors[0]!.err as Error).message).toBe('boom');
});
});
Loading