diff --git a/src/js/internal/fs/streams.ts b/src/js/internal/fs/streams.ts index 939ce1153ea..b6c2a618107 100644 --- a/src/js/internal/fs/streams.ts +++ b/src/js/internal/fs/streams.ts @@ -36,11 +36,6 @@ type FD = number; const { validateInteger, validateInt32, validateFunction } = require("internal/validators"); -// Bun supports a fast path for `createReadStream("path.txt")` with `.pipe(res)`, -// where the entire stream implementation can be bypassed, effectively making it -// `new Response(Bun.file("path.txt"))`. -// This makes an idomatic Node.js pattern much faster. -const kReadStreamFastPath = Symbol("kReadStreamFastPath"); // Bun supports a fast path for `createWriteStream("path.txt")` where instead of // using `node:fs`, `Bun.file(...).writer()` is used instead. const kWriteStreamFastPath = Symbol("kWriteStreamFastPath"); @@ -141,7 +136,7 @@ function ReadStream(this: FSStream, path, options): void { // Only buffers are supported. options.decodeStrings = true; - let { fd, autoClose, fs: customFs, start, end = Infinity, encoding } = options; + let { fd, autoClose, fs: customFs, start, end = Infinity } = options; if (fd == null) { this[kFs] = customFs || fs; @@ -206,13 +201,6 @@ function ReadStream(this: FSStream, path, options): void { } } - this[kReadStreamFastPath] = - start === 0 && - end === Infinity && - autoClose && - !customFs && - // is it an encoding which we don't need to decode? - (encoding === "buffer" || encoding === "binary" || encoding == null || encoding === "utf-8" || encoding === "utf8"); Readable.$call(this, options); return this as unknown as void; } @@ -309,6 +297,16 @@ readStreamPrototype._read = function (n) { const buf = Buffer.allocUnsafeSlow(n); this[kFs].read(this.fd, buf, 0, n, this.pos, (er, bytesRead, buf) => { + // If the stream was destroyed while the read was in flight, ignore the + // result. The fd has been (or is about to be) closed by _destroy, and + // pushing more data or EOF here would clobber the destroyed state — + // in particular, push(null) would mark the readable side as ended and + // suppress the ERR_STREAM_PREMATURE_CLOSE that end-of-stream/finished/ + // async iteration relies on. Matches Node.js behavior. + if (this.destroyed) { + return; + } + if (er) { require("internal/streams/destroy").errorOrDestroy(this, er); } else if (bytesRead > 0) { @@ -335,17 +333,12 @@ readStreamPrototype._read = function (n) { }; readStreamPrototype._destroy = function (this: FSStream, err, cb) { - // Usually for async IO it is safe to close a file descriptor - // even when there are pending operations. However, due to platform - // differences file IO is implemented using synchronous operations - // running in a thread pool. Therefore, file descriptors are not safe - // to close while used in a pending read or write operation. Wait for - // any pending IO (kIsPerformingIO) to complete (kIoDone). - if (this[kReadStreamFastPath]) { - this.once(kReadStreamFastPath, er => close(this, err || er, cb)); - } else { - close(this, err, cb); - } + // If a read happens to complete after destroy() has run, the _read + // callback above short-circuits on `this.destroyed`, so the stream's + // readable state can't be clobbered. Close the fd immediately, matching + // the behavior the common `createReadStream(path)` path has had since + // #16754. + close(this, err, cb); }; readStreamPrototype.close = function (cb) { @@ -389,13 +382,6 @@ function closeAfterSync(stream, err, cb) { stream.fd = null; } -ReadStream.prototype.pipe = function (this: FSStream, dest, pipeOpts) { - // Fast path for streaming files: - // if (this[kReadStreamFastPath]) { - // } - return Readable.prototype.pipe.$call(this, dest, pipeOpts); -}; - function WriteStream(this: FSStream, path: string | null, options?: any): void { if (!(this instanceof WriteStream)) { return new WriteStream(path, options); diff --git a/test/js/node/fs/fs.test.ts b/test/js/node/fs/fs.test.ts index ee5c0af2ea1..fef7018740d 100644 --- a/test/js/node/fs/fs.test.ts +++ b/test/js/node/fs/fs.test.ts @@ -2221,6 +2221,42 @@ describe("createReadStream", () => { }, { timeout: 100 }, ); + + // https://github.com/oven-sh/bun/issues/30919 + it("async iterator rejects with ERR_STREAM_PREMATURE_CLOSE when destroy() is called during iteration", async () => { + const stream = createReadStream(join(import.meta.dir, "readFileSync.txt")); + + let chunks = 0; + let caught: any = undefined; + try { + for await (const _ of stream) { + chunks++; + stream.destroy(); + } + } catch (err) { + caught = err; + } + + expect(chunks).toBe(1); + expect(caught).toBeDefined(); + expect(caught?.code).toBe("ERR_STREAM_PREMATURE_CLOSE"); + }); + + // Regression: _destroy used to register `once(kReadStreamFastPath, ...)` for an + // event that is never emitted when { start, autoClose } were both truthy, so + // 'close' never fired and the fd was leaked. + it("emits 'close' and releases fd with { start: 0, autoClose: true }", async () => { + const stream = createReadStream(join(import.meta.dir, "readFileSync.txt"), { start: 0, autoClose: true }); + const { promise, resolve } = Promise.withResolvers(); + + stream.on("data", () => {}); + stream.on("close", () => resolve()); + + await promise; + expect(stream.destroyed).toBe(true); + expect(stream.closed).toBe(true); + expect(stream.fd).toBeNull(); + }); }); describe("fs.WriteStream", () => {