Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 42 additions & 17 deletions bson/src/main/org/bson/BsonBinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,26 @@ public void pipe(final BsonReader reader) {
pipeDocument(reader, null);
}

/**
* Pipes a raw BSON document from the given byte array to this writer, writing the bytes directly to the
* output without intermediate object allocation.
*
* @param bytes the byte array containing the BSON document
* @param offset the offset into the byte array
* @param length the length of the BSON document
* @since 5.8
*/
public void pipe(final byte[] bytes, final int offset, final int length) {
checkMinDocumentSize(length);
if (getState() == State.VALUE) {
bsonOutput.writeByte(BsonType.DOCUMENT.getValue());
writeCurrentName();
}
int pipedDocumentStartPosition = bsonOutput.getPosition();
bsonOutput.writeBytes(bytes, offset, length);
completePipeDocument(pipedDocumentStartPosition);
Comment thread
vbabanin marked this conversation as resolved.
Comment thread
vbabanin marked this conversation as resolved.
}

@Override
public void pipe(final BsonReader reader, final List<BsonElement> extraElements) {
notNull("reader", reader);
Expand All @@ -350,14 +370,10 @@ private void pipeDocument(final BsonReader reader, final List<BsonElement> extra
}
BsonInput bsonInput = binaryReader.getBsonInput();
int size = bsonInput.readInt32();
if (size < 5) {
throw new BsonSerializationException("Document size must be at least 5");
}
checkMinDocumentSize(size);
int pipedDocumentStartPosition = bsonOutput.getPosition();
bsonOutput.writeInt32(size);
byte[] bytes = new byte[size - 4];
bsonInput.readBytes(bytes);
bsonOutput.writeBytes(bytes);
Comment on lines -358 to -360
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avoids an extra temporary byte[] allocation by reading directly into the target buffer, reducing both allocation pressure and byte-copy/processing overhead.

bsonInput.pipe(bsonOutput, size - 4);

binaryReader.setState(AbstractBsonReader.State.TYPE);

Expand All @@ -371,24 +387,27 @@ private void pipeDocument(final BsonReader reader, final List<BsonElement> extra
setContext(getContext().getParentContext());
}

if (getContext() == null) {
setState(State.DONE);
} else {
if (getContext().getContextType() == BsonContextType.JAVASCRIPT_WITH_SCOPE) {
backpatchSize(); // size of the JavaScript with scope value
setContext(getContext().getParentContext());
}
setState(getNextState());
}

validateSize(bsonOutput.getPosition() - pipedDocumentStartPosition);
completePipeDocument(pipedDocumentStartPosition);
} else if (extraElements != null) {
super.pipe(reader, extraElements);
} else {
super.pipe(reader);
}
}

private void completePipeDocument(final int pipedDocumentStartPosition) {
if (getContext() == null) {
setState(State.DONE);
} else {
if (getContext().getContextType() == BsonContextType.JAVASCRIPT_WITH_SCOPE) {
backpatchSize(); // size of the JavaScript with scope value
setContext(getContext().getParentContext());
}
setState(getNextState());
}
validateSize(bsonOutput.getPosition() - pipedDocumentStartPosition);
}

/**
* Sets a maximum size for documents from this point.
*
Expand Down Expand Up @@ -426,6 +445,12 @@ public void reset() {
mark = null;
}

private static void checkMinDocumentSize(final int size) {
if (size < 5) {
throw new BsonSerializationException("Document size must be at least 5");
}
}

private void writeCurrentName() {
if (getContext().getContextType() == BsonContextType.ARRAY) {
int index = getContext().index++;
Expand Down
1 change: 0 additions & 1 deletion bson/src/main/org/bson/BsonWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -356,5 +356,4 @@ public interface BsonWriter {
* @param reader The source.
*/
void pipe(BsonReader reader);

}
30 changes: 30 additions & 0 deletions bson/src/main/org/bson/RawBsonDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,36 @@ public ByteBuf getByteBuffer() {
return new ByteBufNIO(buffer);
}

/**
* Returns the byte array backing this document. Changes to the returned array will be reflected in this document.
*
* @return the backing byte array
* @since 5.8
*/
public byte[] getBackingArray() {
return bytes;
}
Comment thread
vbabanin marked this conversation as resolved.

/**
* Returns the offset into the {@linkplain #getBackingArray() backing byte array} where this document starts.
*
* @return the offset
* @since 5.8
*/
public int getByteOffset() {
return offset;
}

/**
* Returns the length of this document within the {@linkplain #getBackingArray() backing byte array}.
*
* @return the length
* @since 5.8
*/
public int getByteLength() {
return length;
}

/**
* Decode this into a document.
*
Expand Down
13 changes: 11 additions & 2 deletions bson/src/main/org/bson/codecs/RawBsonDocumentCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,17 @@ public RawBsonDocumentCodec() {

@Override
public void encode(final BsonWriter writer, final RawBsonDocument value, final EncoderContext encoderContext) {
try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(value.getByteBuffer()))) {
writer.pipe(reader);
if (writer instanceof BsonBinaryWriter) {
// Fast path. The pipe method should ideally exist on BsonWriter, but adding it as
// abstract would be a breaking change, and adding it as a default method would force
// BsonWriter to depend on BsonBinaryReader/ByteBufferBsonInput, violating the
// interface's abstraction.
// TODO JAVA-6211 move pipe(byte[], int, int) to BsonWriter to remove this instanceof.
((BsonBinaryWriter) writer).pipe(value.getBackingArray(), value.getByteOffset(), value.getByteLength());
} else {
Comment on lines 42 to +50
try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(value.getByteBuffer()))) {
writer.pipe(reader);
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions bson/src/main/org/bson/io/BsonInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,19 @@ public interface BsonInput extends Closeable {
*/
boolean hasRemaining();

/**
* Pipes the specified number of bytes from {@linkplain BsonInput this} input to the given {@linkplain BsonOutput output}.
*
* @param output the output to pipe to
* @param numBytes the number of bytes to pipe
* @since 5.8
*/
default void pipe(BsonOutput output, int numBytes) {
byte[] bytes = new byte[numBytes];
readBytes(bytes);
output.writeBytes(bytes);
}

@Override
void close();
}
18 changes: 18 additions & 0 deletions bson/src/main/org/bson/io/ByteBufferBsonInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,24 @@ public boolean hasRemaining() {
return buffer.hasRemaining();
}

@Override
public void pipe(final BsonOutput output, final int numBytes) {
ensureOpen();
Comment thread
vbabanin marked this conversation as resolved.
ensureAvailable(numBytes);

if (buffer.isBackedByArray()) {
int position = buffer.position();
int arrayOffset = buffer.arrayOffset();
output.writeBytes(buffer.array(), arrayOffset + position, numBytes);
buffer.position(position + numBytes);
} else {
// Fallback: use temporary buffer for non-array-backed buffers
byte[] temp = new byte[numBytes];
buffer.get(temp);
output.writeBytes(temp);
}
Comment thread
vbabanin marked this conversation as resolved.
}

@Override
public void close() {
buffer.release();
Expand Down
27 changes: 27 additions & 0 deletions bson/src/test/unit/org/bson/BsonBinaryWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,34 @@ public void testPipeOfDocumentWithInvalidSize() {
// expected
}
}
}

@Test
public void testPipeOfRawBytes() {
BasicOutputBuffer sourceBuffer = new BasicOutputBuffer();
try (BsonBinaryWriter sourceWriter = new BsonBinaryWriter(sourceBuffer)) {
sourceWriter.writeStartDocument();
sourceWriter.writeBoolean("a", true);
sourceWriter.writeEndDocument();
}
byte[] documentBytes = sourceBuffer.toByteArray();

BasicOutputBuffer destBuffer = new BasicOutputBuffer();
try (BsonBinaryWriter destWriter = new BsonBinaryWriter(destBuffer)) {
destWriter.pipe(documentBytes, 0, documentBytes.length);
}

assertArrayEquals(documentBytes, destBuffer.toByteArray());
}

@Test
public void testPipeOfRawBytesWithInvalidSize() {
byte[] bytes = {4, 0, 0, 0}; // minimum document size is 5
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw the validation for size of 5, just curious why is it 5 ?


BasicOutputBuffer newBuffer = new BasicOutputBuffer();
try (BsonBinaryWriter newWriter = new BsonBinaryWriter(newBuffer)) {
assertThrows(BsonSerializationException.class, () -> newWriter.pipe(bytes, 0, bytes.length));
}
}

// CHECKSTYLE:OFF
Expand Down
17 changes: 17 additions & 0 deletions bson/src/test/unit/org/bson/RawBsonDocumentSpecification.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,23 @@ class RawBsonDocumentSpecification extends Specification {
rawDocument << createRawDocumentVariants()
}


def 'getBackingArray, getByteOffset and getByteLength should expose the document range'() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be mistaken but I saw a lot of PRs that also remove groovy spec class, can we move this test case to java test instead ?

expect:
rawDocument.getByteOffset() == expectedOffset
rawDocument.getByteLength() == expectedLength
Arrays.copyOfRange(
rawDocument.getBackingArray(),
rawDocument.getByteOffset(),
rawDocument.getByteOffset() + rawDocument.getByteLength()) == getBytesFromDocument()

where:
rawDocument | expectedOffset | expectedLength
createRawDocumenFromDocument() | 0 | 66
createRawDocumentFromByteArray() | 0 | 66
createRawDocumentFromByteArrayOffsetLength()| 1 | 66
}

def 'parse should through if parameter is invalid'() {
when:
RawBsonDocument.parse(null)
Expand Down
122 changes: 122 additions & 0 deletions bson/src/test/unit/org/bson/io/BsonInputTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.bson.io;

import org.bson.ByteBufNIO;
import org.bson.types.ObjectId;
import org.junit.jupiter.api.Test;

import java.nio.ByteBuffer;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;

class BsonInputTest {

@Test
void defaultPipeShouldCopyBytesFromInputToOutput() {
// given
byte[] inputBytes = {0x4a, 0x61, 0x76, 0x61, 0x21};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do

"Java!".getBytes(StandardCharsets.UTF_8);

instead ? Same thing but for me personally it's really hard to reason about raw bytes , what do you think ?


try (BsonInput bsonInput = new ForwardingBsonInput(
new ByteBufferBsonInput(new ByteBufNIO(ByteBuffer.wrap(inputBytes))));
BasicOutputBuffer output = new BasicOutputBuffer()) {
// when
bsonInput.pipe(output, inputBytes.length);

// then
assertEquals(inputBytes.length, bsonInput.getPosition());
assertEquals(inputBytes.length, output.getPosition());
assertArrayEquals(inputBytes, output.toByteArray());
}
}

@Test
void defaultPipeShouldCopyPartialBytesFromInputToOutput() {
// given
byte[] inputBytes = {0x4a, 0x61, 0x76, 0x61, 0x21};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


try (BsonInput bsonInput = new ForwardingBsonInput(
new ByteBufferBsonInput(new ByteBufNIO(ByteBuffer.wrap(inputBytes))));
BasicOutputBuffer output = new BasicOutputBuffer()) {
// when
bsonInput.pipe(output, 3);

// then
assertEquals(3, bsonInput.getPosition());
assertEquals(3, output.getPosition());
assertArrayEquals(new byte[]{0x4a, 0x61, 0x76}, output.toByteArray());
}
}

/**
* Delegates all abstract methods but does NOT override pipe,
* so the default implementation is exercised.
*/
private static class ForwardingBsonInput implements BsonInput {
private final ByteBufferBsonInput delegate;

ForwardingBsonInput(final ByteBufferBsonInput delegate) {
this.delegate = delegate;
}

@Override
public int getPosition() { return delegate.getPosition(); }

@Override
public byte readByte() { return delegate.readByte(); }

@Override
public void readBytes(final byte[] bytes) { delegate.readBytes(bytes); }

@Override
public void readBytes(final byte[] bytes, final int offset, final int length) { delegate.readBytes(bytes, offset, length); }

@Override
public long readInt64() { return delegate.readInt64(); }

@Override
public double readDouble() { return delegate.readDouble(); }

@Override
public int readInt32() { return delegate.readInt32(); }

@Override
public String readString() { return delegate.readString(); }

@Override
public ObjectId readObjectId() { return delegate.readObjectId(); }

@Override
public String readCString() { return delegate.readCString(); }

@Override
public void skipCString() { delegate.skipCString(); }

@Override
public void skip(final int numBytes) { delegate.skip(numBytes); }

@Override
public BsonInputMark getMark(final int readLimit) { return delegate.getMark(readLimit); }

@Override
public boolean hasRemaining() { return delegate.hasRemaining(); }

@Override
public void close() { delegate.close(); }
}
}
Loading