Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions bson/src/main/org/bson/BsonBinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,17 @@ public void pipe(final BsonReader reader) {
pipeDocument(reader, null);
}

@Override
public void pipe(final byte[] bytes, final int offset, final int length) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of routing through a BsonReader (which wraps a BsonInput but doesn’t expose the underlying array unless copied), we write the bytes directly to the output.

if (getState() == State.VALUE) {
bsonOutput.writeByte(BsonType.DOCUMENT.getValue());
writeCurrentName();
}
int pipedDocumentStartPosition = bsonOutput.getPosition();
bsonOutput.writeBytes(bytes, offset, length);
completePipeDocument(pipedDocumentStartPosition);
Comment thread
vbabanin marked this conversation as resolved.
Comment thread
vbabanin marked this conversation as resolved.
}

@Override
public void pipe(final BsonReader reader, final List<BsonElement> extraElements) {
notNull("reader", reader);
Expand All @@ -355,9 +366,7 @@ private void pipeDocument(final BsonReader reader, final List<BsonElement> extra
}
int pipedDocumentStartPosition = bsonOutput.getPosition();
bsonOutput.writeInt32(size);
byte[] bytes = new byte[size - 4];
bsonInput.readBytes(bytes);
bsonOutput.writeBytes(bytes);
Comment on lines -358 to -360
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avoids an extra temporary byte[] allocation by reading directly into the target buffer, reducing both allocation pressure and byte-copy/processing overhead.

bsonInput.pipe(bsonOutput, size - 4);

binaryReader.setState(AbstractBsonReader.State.TYPE);

Expand All @@ -371,24 +380,27 @@ private void pipeDocument(final BsonReader reader, final List<BsonElement> extra
setContext(getContext().getParentContext());
}

if (getContext() == null) {
setState(State.DONE);
} else {
if (getContext().getContextType() == BsonContextType.JAVASCRIPT_WITH_SCOPE) {
backpatchSize(); // size of the JavaScript with scope value
setContext(getContext().getParentContext());
}
setState(getNextState());
}

validateSize(bsonOutput.getPosition() - pipedDocumentStartPosition);
completePipeDocument(pipedDocumentStartPosition);
} else if (extraElements != null) {
super.pipe(reader, extraElements);
} else {
super.pipe(reader);
}
}

private void completePipeDocument(final int pipedDocumentStartPosition) {
if (getContext() == null) {
setState(State.DONE);
} else {
if (getContext().getContextType() == BsonContextType.JAVASCRIPT_WITH_SCOPE) {
backpatchSize(); // size of the JavaScript with scope value
setContext(getContext().getParentContext());
}
setState(getNextState());
}
validateSize(bsonOutput.getPosition() - pipedDocumentStartPosition);
}

/**
* Sets a maximum size for documents from this point.
*
Expand Down
25 changes: 25 additions & 0 deletions bson/src/main/org/bson/BsonWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

package org.bson;

import org.bson.io.ByteBufferBsonInput;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;

/**
* An interface for writing a logical BSON document using a push-oriented API.
*
Expand Down Expand Up @@ -357,4 +361,25 @@ public interface BsonWriter {
*/
void pipe(BsonReader reader);

/**
* Pipes a raw BSON document from the given byte array to this writer.
*
* <p>The default implementation wraps the bytes in a {@linkplain BsonBinaryReader}
* and calls {@link #pipe(BsonReader)}. Implementations may override this
* to write the raw bytes directly without intermediate object allocation.</p>
*
* @param bytes the byte array containing the BSON document
* @param offset the offset into the byte array
* @param length the length of the BSON document
* @since 5.7
*/
default void pipe(byte[] bytes, int offset, int length) {
try (BsonBinaryReader reader = new BsonBinaryReader(
new ByteBufferBsonInput(
new ByteBufNIO(ByteBuffer.wrap(bytes, offset, length)
.order(ByteOrder.LITTLE_ENDIAN))))) {
pipe(reader);
}
}

}
30 changes: 30 additions & 0 deletions bson/src/main/org/bson/RawBsonDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,36 @@ public ByteBuf getByteBuffer() {
return new ByteBufNIO(buffer);
}

/**
* Returns the byte array backing this document. Changes to the returned array will be reflected in this document.
*
* @return the backing byte array
* @since 5.7
*/
public byte[] getByteBacking() {
return bytes;
}
Comment thread
vbabanin marked this conversation as resolved.

/**
* Returns the offset into the {@linkplain #getByteBacking() backing byte array} where this document starts.
*
* @return the offset
* @since 5.7
*/
public int getByteOffset() {
return offset;
}

/**
* Returns the length of this document within the {@linkplain #getByteBacking() backing byte array}.
*
* @return the length
* @since 5.7
*/
public int getByteLength() {
return length;
}

/**
* Decode this into a document.
*
Expand Down
6 changes: 1 addition & 5 deletions bson/src/main/org/bson/codecs/RawBsonDocumentCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@

package org.bson.codecs;

import org.bson.BsonBinaryReader;
import org.bson.BsonBinaryWriter;
import org.bson.BsonReader;
import org.bson.BsonWriter;
import org.bson.RawBsonDocument;
import org.bson.io.BasicOutputBuffer;
import org.bson.io.ByteBufferBsonInput;

/**
* A simple BSONDocumentBuffer codec. It does not attempt to validate the contents of the underlying ByteBuffer. It assumes that it
Expand All @@ -40,9 +38,7 @@ public RawBsonDocumentCodec() {

@Override
public void encode(final BsonWriter writer, final RawBsonDocument value, final EncoderContext encoderContext) {
try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(value.getByteBuffer()))) {
writer.pipe(reader);
}
writer.pipe(value.getByteBacking(), value.getByteOffset(), value.getByteLength());
}

@Override
Expand Down
9 changes: 9 additions & 0 deletions bson/src/main/org/bson/io/BsonInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ public interface BsonInput extends Closeable {
*/
boolean hasRemaining();

/**
* Pipes the specified number of bytes from {@linkplain BsonInput this} input to the given {@linkplain BsonOutput output}.
*
* @param output the output to pipe to
* @param numBytes the number of bytes to pipe
* @since 5.7
*/
void pipe(BsonOutput output, int numBytes);
Comment thread
vbabanin marked this conversation as resolved.
Outdated
Comment thread
rozza marked this conversation as resolved.
Outdated

@Override
void close();
}
18 changes: 18 additions & 0 deletions bson/src/main/org/bson/io/ByteBufferBsonInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,24 @@ public boolean hasRemaining() {
return buffer.hasRemaining();
}

@Override
public void pipe(final BsonOutput output, final int numBytes) {
ensureOpen();
Comment thread
vbabanin marked this conversation as resolved.
ensureAvailable(numBytes);

if (buffer.isBackedByArray()) {
int position = buffer.position();
int arrayOffset = buffer.arrayOffset();
output.writeBytes(buffer.array(), arrayOffset + position, numBytes);
buffer.position(position + numBytes);
} else {
// Fallback: use temporary buffer for non-array-backed buffers
byte[] temp = new byte[numBytes];
buffer.get(temp);
output.writeBytes(temp);
}
Comment thread
vbabanin marked this conversation as resolved.
}

@Override
public void close() {
buffer.release();
Expand Down