Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions packages/next/src/server/pipe-readable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { ServerResponse } from 'node:http'
import type { Readable } from 'node:stream'

import {
ResponseAborted,
ResponseAbortedName,
createAbortController,
} from './web/spec-extension/adapters/next-request'
Expand All @@ -10,8 +11,9 @@ import { getTracer } from './lib/trace/tracer'
import { NextNodeServerSpan } from './lib/trace/constants'
import { getClientComponentLoaderMetrics } from './client-component-renderer-logger'

export function isAbortError(e: any): e is Error & { name: 'AbortError' } {
return e?.name === 'AbortError' || e?.name === ResponseAbortedName
export function isAbortError(e: any): boolean {
if(e?.name === 'AbortError' || e?.name === ResponseAbortedName) return true;
return false;
}

const HAS_CLIENT_COMPONENT_METRICS_ENABLED =
Expand Down Expand Up @@ -135,9 +137,28 @@ export async function pipeToNodeResponse(
// client disconnects.
const controller = createAbortController(res)

const writer = createWriterFromResponse(res, waitUntilForEnd)

await readable.pipeTo(writer, { signal: controller.signal })
const writable = createWriterFromResponse(res, waitUntilForEnd)
const reader = readable.getReader()
const writer = writable.getWriter()
const abortReason = controller.signal.reason ?? new ResponseAborted()

controller.signal.addEventListener(
'abort',
Comment thread
coffeeispower marked this conversation as resolved.
() => {
void reader.cancel(abortReason).catch(() => {})
void writer.abort(abortReason).catch(() => {})
},
{ once: true }
)

while (true) {
const { done, value } = await reader.read()
if (done) break
await writer.write(value)
}
try {
await writer.close()
} catch(e) {}
} catch (err: any) {
// If this isn't related to an abort error, re-throw it.
if (isAbortError(err)) return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,26 @@ export class ResponseAborted extends Error {
public readonly name = ResponseAbortedName
}

function attachAbortListener(
target: Writable | NodeJS.EventEmitter | undefined,
event: string,
onAbort: () => void
): () => void {
if (!target || typeof target.once !== 'function') {
return () => {}
}

target.once(event, onAbort)

return () => {
if (typeof target.off === 'function') {
target.off(event, onAbort)
} else if (typeof target.removeListener === 'function') {
target.removeListener(event, onAbort)
}
}
}

/**
* Creates an AbortController tied to the closing of a ServerResponse (or other
* appropriate Writable).
Expand All @@ -22,15 +42,43 @@ export class ResponseAborted extends Error {
*/
export function createAbortController(response: Writable): AbortController {
const controller = new AbortController()
const cleanup: Array<() => void> = []
const responseWithSocket = response as Writable & {
socket?: NodeJS.WritableStream &
NodeJS.EventEmitter & { destroyed?: boolean }
}

const abortIfDisconnected = () => {
if (response.writableFinished || controller.signal.aborted) return

controller.abort(new ResponseAborted())
}

// If `finish` fires first, then `res.end()` has been called and the close is
// just us finishing the stream on our side. If `close` fires first, then we
// know the client disconnected before we finished.
response.once('close', () => {
if (response.writableFinished) return
cleanup.push(attachAbortListener(response, 'close', abortIfDisconnected))

controller.abort(new ResponseAborted())
})
const socket = responseWithSocket.socket
if (socket) {
cleanup.push(attachAbortListener(socket, 'close', abortIfDisconnected))
cleanup.push(attachAbortListener(socket, 'end', abortIfDisconnected))
cleanup.push(attachAbortListener(socket, 'error', abortIfDisconnected))

if (socket.destroyed) {
abortIfDisconnected()
}
}

controller.signal.addEventListener(
'abort',
() => {
for (const dispose of cleanup) {
dispose()
}
},
{ once: true }
)

return controller
}
Expand All @@ -45,7 +93,14 @@ export function createAbortController(response: Writable): AbortController {
*/
export function signalFromNodeResponse(response: Writable): AbortSignal {
const { errored, destroyed } = response
if (errored || destroyed) {
const responseWithSocket = response as Writable & {
socket?: { destroyed?: boolean }
}
if (
errored ||
destroyed ||
(responseWithSocket.socket?.destroyed && !response.writableFinished)
) {
return AbortSignal.abort(errored ?? new ResponseAborted())
}

Expand Down