From bdbe3262346462f92b8f5bd18a85882013d37e52 Mon Sep 17 00:00:00 2001 From: Tiago Dinis Date: Sun, 17 May 2026 19:28:29 +0100 Subject: [PATCH 1/2] fix: propagate client disconnects to response body streams --- packages/next/src/server/pipe-readable.ts | 31 +++++++-- .../spec-extension/adapters/next-request.ts | 65 +++++++++++++++++-- 2 files changed, 86 insertions(+), 10 deletions(-) diff --git a/packages/next/src/server/pipe-readable.ts b/packages/next/src/server/pipe-readable.ts index 8b23b46e3cf9..7c8e8f3c8691 100644 --- a/packages/next/src/server/pipe-readable.ts +++ b/packages/next/src/server/pipe-readable.ts @@ -2,6 +2,7 @@ import type { ServerResponse } from 'node:http' import type { Readable } from 'node:stream' import { + ResponseAborted, ResponseAbortedName, createAbortController, } from './web/spec-extension/adapters/next-request' @@ -10,8 +11,9 @@ import { getTracer } from './lib/trace/tracer' import { NextNodeServerSpan } from './lib/trace/constants' import { getClientComponentLoaderMetrics } from './client-component-renderer-logger' -export function isAbortError(e: any): e is Error & { name: 'AbortError' } { - return e?.name === 'AbortError' || e?.name === ResponseAbortedName +export function isAbortError(e: any): boolean { + if(e?.name === 'AbortError' || e?.name === ResponseAbortedName) return true; + return false; } const HAS_CLIENT_COMPONENT_METRICS_ENABLED = @@ -135,9 +137,28 @@ export async function pipeToNodeResponse( // client disconnects. const controller = createAbortController(res) - const writer = createWriterFromResponse(res, waitUntilForEnd) - - await readable.pipeTo(writer, { signal: controller.signal }) + const writable = createWriterFromResponse(res, waitUntilForEnd) + const reader = readable.getReader() + const writer = writable.getWriter() + const abortReason = controller.signal.reason ?? new ResponseAborted() + + controller.signal.addEventListener( + 'abort', + () => { + void reader.cancel(abortReason).catch(() => {}) + void writer.abort(abortReason).catch(() => {}) + }, + { once: true } + ) + + while (true) { + const { done, value } = await reader.read() + if (done) break + await writer.write(value) + } + try { + await writer.close() + } catch(e) {} } catch (err: any) { // If this isn't related to an abort error, re-throw it. if (isAbortError(err)) return diff --git a/packages/next/src/server/web/spec-extension/adapters/next-request.ts b/packages/next/src/server/web/spec-extension/adapters/next-request.ts index 0e0b2df3c33a..a54d0308c3cd 100644 --- a/packages/next/src/server/web/spec-extension/adapters/next-request.ts +++ b/packages/next/src/server/web/spec-extension/adapters/next-request.ts @@ -13,6 +13,26 @@ export class ResponseAborted extends Error { public readonly name = ResponseAbortedName } +function attachAbortListener( + target: Writable | NodeJS.EventEmitter | undefined, + event: string, + onAbort: () => void +): () => void { + if (!target || typeof target.once !== 'function') { + return () => {} + } + + target.once(event, onAbort) + + return () => { + if (typeof target.off === 'function') { + target.off(event, onAbort) + } else if (typeof target.removeListener === 'function') { + target.removeListener(event, onAbort) + } + } +} + /** * Creates an AbortController tied to the closing of a ServerResponse (or other * appropriate Writable). @@ -22,15 +42,43 @@ export class ResponseAborted extends Error { */ export function createAbortController(response: Writable): AbortController { const controller = new AbortController() + const cleanup: Array<() => void> = [] + const responseWithSocket = response as Writable & { + socket?: NodeJS.WritableStream & + NodeJS.EventEmitter & { destroyed?: boolean } + } + + const abortIfDisconnected = () => { + if (response.writableFinished || controller.signal.aborted) return + + controller.abort(new ResponseAborted()) + } // If `finish` fires first, then `res.end()` has been called and the close is // just us finishing the stream on our side. If `close` fires first, then we // know the client disconnected before we finished. - response.once('close', () => { - if (response.writableFinished) return + cleanup.push(attachAbortListener(response, 'close', abortIfDisconnected)) - controller.abort(new ResponseAborted()) - }) + const socket = responseWithSocket.socket + if (socket) { + cleanup.push(attachAbortListener(socket, 'close', abortIfDisconnected)) + cleanup.push(attachAbortListener(socket, 'end', abortIfDisconnected)) + cleanup.push(attachAbortListener(socket, 'error', abortIfDisconnected)) + + if (socket.destroyed) { + abortIfDisconnected() + } + } + + controller.signal.addEventListener( + 'abort', + () => { + for (const dispose of cleanup) { + dispose() + } + }, + { once: true } + ) return controller } @@ -45,7 +93,14 @@ export function createAbortController(response: Writable): AbortController { */ export function signalFromNodeResponse(response: Writable): AbortSignal { const { errored, destroyed } = response - if (errored || destroyed) { + const responseWithSocket = response as Writable & { + socket?: { destroyed?: boolean } + } + if ( + errored || + destroyed || + (responseWithSocket.socket?.destroyed && !response.writableFinished) + ) { return AbortSignal.abort(errored ?? new ResponseAborted()) } From 546f8201af7ad5963245c91ba3d6181962f22b12 Mon Sep 17 00:00:00 2001 From: Tiago Dinis Date: Sun, 17 May 2026 20:32:08 +0100 Subject: [PATCH 2/2] fix: handle the case where the controller was already cancelled --- packages/next/src/server/pipe-readable.ts | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/packages/next/src/server/pipe-readable.ts b/packages/next/src/server/pipe-readable.ts index 7c8e8f3c8691..268c4d0b65c4 100644 --- a/packages/next/src/server/pipe-readable.ts +++ b/packages/next/src/server/pipe-readable.ts @@ -142,14 +142,27 @@ export async function pipeToNodeResponse( const writer = writable.getWriter() const abortReason = controller.signal.reason ?? new ResponseAborted() + const abortStreams = () => { + const reason = controller.signal.reason ?? new ResponseAborted() + + return Promise.allSettled([ + reader.cancel(reason), + writer.abort(reason), + ]) + } + controller.signal.addEventListener( 'abort', () => { - void reader.cancel(abortReason).catch(() => {}) - void writer.abort(abortReason).catch(() => {}) + void abortStreams() }, - { once: true } + { once: true }, ) + + if (controller.signal.aborted) { + await abortStreams() + return + } while (true) { const { done, value } = await reader.read()