diff --git a/sql/core/benchmarks/VectorizedDeltaReaderBenchmark-results.txt b/sql/core/benchmarks/VectorizedDeltaReaderBenchmark-results.txt index 9ff1678901e9f..cca0a0bb534d2 100644 --- a/sql/core/benchmarks/VectorizedDeltaReaderBenchmark-results.txt +++ b/sql/core/benchmarks/VectorizedDeltaReaderBenchmark-results.txt @@ -2,92 +2,92 @@ DELTA_BINARY_PACKED INT32 ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1011-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10 on Linux 7.0.0-1004-azure +AMD EPYC 9V45 96-Core Processor DELTA_BINARY_PACKED INT32: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -readIntegers, constant 2 2 0 482.3 2.1 1.0X -skipIntegers, constant 3 3 0 335.4 3.0 0.7X -readIntegers, monotonic 3 3 0 314.9 3.2 0.7X -skipIntegers, monotonic 3 3 0 335.1 3.0 0.7X -readIntegers, small-delta random 4 4 0 257.5 3.9 0.5X -skipIntegers, small-delta random 4 4 0 269.5 3.7 0.6X -readIntegers, wide random 5 5 0 209.3 4.8 0.4X -skipIntegers, wide random 5 5 0 218.2 4.6 0.5X +readIntegers, constant 1 1 0 886.6 1.1 1.0X +skipIntegers, constant 2 2 0 693.6 1.4 0.8X +readIntegers, monotonic 2 2 0 658.2 1.5 0.7X +skipIntegers, monotonic 2 2 0 692.5 1.4 0.8X +readIntegers, small-delta random 2 2 0 468.8 2.1 0.5X +skipIntegers, small-delta random 2 2 0 492.3 2.0 0.6X +readIntegers, wide random 3 3 0 354.0 2.8 0.4X +skipIntegers, wide random 3 3 0 368.6 2.7 0.4X ================================================================================================ DELTA_BINARY_PACKED INT64 ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1011-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10 on Linux 7.0.0-1004-azure +AMD EPYC 9V45 96-Core Processor DELTA_BINARY_PACKED INT64: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -readLongs, constant 5 6 1 191.0 5.2 1.0X -skipLongs, constant 6 6 0 170.1 5.9 0.9X -readLongs, monotonic 7 7 0 144.9 6.9 0.8X -skipLongs, monotonic 6 6 0 170.1 5.9 0.9X -readLongs, small-delta random 8 8 0 134.8 7.4 0.7X -skipLongs, small-delta random 7 7 0 152.8 6.5 0.8X -readLongs, wide random 11 11 0 97.8 10.2 0.5X -skipLongs, wide random 10 10 0 106.4 9.4 0.6X +readLongs, constant 4 4 0 276.6 3.6 1.0X +skipLongs, constant 4 4 0 249.0 4.0 0.9X +readLongs, monotonic 5 5 0 222.6 4.5 0.8X +skipLongs, monotonic 4 5 0 249.3 4.0 0.9X +readLongs, small-delta random 5 6 0 200.5 5.0 0.7X +skipLongs, small-delta random 5 5 0 220.4 4.5 0.8X +readLongs, wide random 7 7 0 149.5 6.7 0.5X +skipLongs, wide random 7 7 0 160.4 6.2 0.6X ================================================================================================ DELTA_BYTE_ARRAY ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1011-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10 on Linux 7.0.0-1004-azure +AMD EPYC 9V45 96-Core Processor DELTA_BYTE_ARRAY: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -readBinary, no overlap, len=16 36 37 1 29.2 34.2 1.0X -skipBinary, no overlap, len=16 41 41 1 25.8 38.7 0.9X -readBinary, half overlap, len=16 42 43 1 25.0 39.9 0.9X -skipBinary, half overlap, len=16 48 50 2 21.6 46.2 0.7X -readBinary, full overlap, len=16 42 44 2 24.8 40.3 0.8X -skipBinary, full overlap, len=16 48 49 1 21.8 45.9 0.7X -readBinary, half overlap, len=64 42 44 1 24.8 40.4 0.8X -skipBinary, half overlap, len=64 47 49 1 22.2 45.1 0.8X +readBinary, no overlap, len=16 24 25 1 43.9 22.8 1.0X +skipBinary, no overlap, len=16 24 25 1 44.0 22.7 1.0X +readBinary, half overlap, len=16 27 28 1 39.0 25.6 0.9X +skipBinary, half overlap, len=16 27 28 1 39.3 25.4 0.9X +readBinary, full overlap, len=16 27 29 1 38.5 26.0 0.9X +skipBinary, full overlap, len=16 27 28 1 39.5 25.3 0.9X +readBinary, half overlap, len=64 26 28 1 40.4 24.8 0.9X +skipBinary, half overlap, len=64 26 28 1 40.7 24.6 0.9X ================================================================================================ DELTA_LENGTH_BYTE_ARRAY ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1011-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10 on Linux 7.0.0-1004-azure +AMD EPYC 9V45 96-Core Processor DELTA_LENGTH_BYTE_ARRAY: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -readBinary, payloadLen=8 22 25 2 48.1 20.8 1.0X -skipBinary, payloadLen=8 11 12 1 95.8 10.4 2.0X -readBinary, payloadLen=32 22 26 3 48.0 20.9 1.0X -skipBinary, payloadLen=32 11 12 1 97.6 10.2 2.0X -readBinary, payloadLen=128 26 29 2 40.0 25.0 0.8X -skipBinary, payloadLen=128 11 12 1 96.0 10.4 2.0X -readBinary, payloadLen=512 54 62 4 19.3 51.9 0.4X -skipBinary, payloadLen=512 11 13 2 95.7 10.4 2.0X +readBinary, payloadLen=8 11 12 1 93.7 10.7 1.0X +skipBinary, payloadLen=8 5 5 0 207.0 4.8 2.2X +readBinary, payloadLen=32 15 15 0 72.3 13.8 0.8X +skipBinary, payloadLen=32 5 5 0 207.0 4.8 2.2X +readBinary, payloadLen=128 12 13 1 86.8 11.5 0.9X +skipBinary, payloadLen=128 5 5 0 207.2 4.8 2.2X +readBinary, payloadLen=512 32 33 1 33.2 30.2 0.4X +skipBinary, payloadLen=512 5 5 0 207.5 4.8 2.2X ================================================================================================ Variant reads ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1011-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10 on Linux 7.0.0-1004-azure +AMD EPYC 9V45 96-Core Processor Variant reads: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------------- -readBytes (INT32) 7 7 0 145.8 6.9 1.0X -readShorts (INT32) 7 8 1 140.7 7.1 1.0X -readUnsignedIntegers (INT32 -> Long) 7 7 0 143.3 7.0 1.0X -readUnsignedLongs (INT64 -> Decimal(20,0)) 227 239 25 4.6 216.4 0.0X -skipBytes 8 8 0 136.8 7.3 0.9X -skipShorts 8 8 0 136.6 7.3 0.9X -readByte (INT32 single-value) 12 12 1 85.8 11.7 0.6X -readShort (INT32 single-value) 12 12 0 85.9 11.6 0.6X -readInteger (INT32 single-value) 12 12 0 86.1 11.6 0.6X -readLong (INT64 single-value) 14 15 1 73.4 13.6 0.5X -readBinary(len) (DELTA_BYTE_ARRAY single-value) 71 77 5 14.7 68.2 0.1X +readBytes (INT32) 5 5 0 216.9 4.6 1.0X +readShorts (INT32) 5 5 0 217.2 4.6 1.0X +readUnsignedIntegers (INT32 -> Long) 5 5 0 214.1 4.7 1.0X +readUnsignedLongs (INT64 -> Decimal(20,0)) 130 151 64 8.1 123.8 0.0X +skipBytes 5 6 0 195.6 5.1 0.9X +skipShorts 5 6 0 196.7 5.1 0.9X +readByte (INT32 single-value) 7 7 0 149.8 6.7 0.7X +readShort (INT32 single-value) 7 8 1 149.7 6.7 0.7X +readInteger (INT32 single-value) 7 8 0 149.2 6.7 0.7X +readLong (INT64 single-value) 8 9 1 130.3 7.7 0.6X +readBinary(len) (DELTA_BYTE_ARRAY single-value) 45 52 6 23.3 42.9 0.1X diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java index 1cfaa59f18ea7..7f2c616960e0c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedDeltaLengthByteArrayReader.java @@ -54,17 +54,26 @@ public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOExce @Override public void readBinary(int total, WritableColumnVector c, int rowId) { - ByteBuffer buffer; - ByteBufferOutputWriter outputWriter = ByteBufferOutputWriter::writeArrayByteBuffer; - int length; + // Compute total data length across all values so we can read everything in a single + // bulk slice instead of allocating one ByteBuffer per value. + int totalDataLen = 0; for (int i = 0; i < total; i++) { - length = lengthsVector.getInt(currentRow + i); - try { - buffer = in.slice(length); - } catch (EOFException e) { - throw new ParquetDecodingException("Failed to read " + length + " bytes"); - } - outputWriter.write(c, rowId + i, buffer, length); + totalDataLen += lengthsVector.getInt(currentRow + i); + } + + ByteBuffer allData; + try { + allData = in.slice(totalDataLen); + } catch (EOFException e) { + throw new ParquetDecodingException("Failed to read " + totalDataLen + " bytes"); + } + byte[] dataArray = allData.array(); + int dataPos = allData.arrayOffset() + allData.position(); + + for (int i = 0; i < total; i++) { + int length = lengthsVector.getInt(currentRow + i); + c.putByteArray(rowId + i, dataArray, dataPos, length); + dataPos += length; } currentRow += total; } @@ -85,10 +94,8 @@ public void readGeography(int total, WritableColumnVector c, int rowId) { private void readGeoData(int total, WritableColumnVector c, int rowId, int srid, WKBConverterStrategy converter) { - ByteBufferOutputWriter outputWriter = ByteBufferOutputWriter::writeArrayByteBuffer; - int length; for (int i = 0; i < total; i++) { - length = lengthsVector.getInt(currentRow + i); + int length = lengthsVector.getInt(currentRow + i); byte[] physicalValue; try { // Converts WKB into a physical representation of geometry/geography. @@ -96,8 +103,7 @@ private void readGeoData(int total, WritableColumnVector c, int rowId, int srid, } catch (IOException e) { throw new ParquetDecodingException("Failed to read " + length + " bytes"); } - - outputWriter.write(c, rowId + i, ByteBuffer.wrap(physicalValue), physicalValue.length); + c.putByteArray(rowId + i, physicalValue, 0, physicalValue.length); } currentRow += total; } @@ -113,11 +119,12 @@ public ByteBuffer getBytes(int rowId) { @Override public void skipBinary(int total) { + long totalSkip = 0; for (int i = 0; i < total; i++) { - int remaining = lengthsVector.getInt(currentRow + i); - while (remaining > 0) { - remaining -= in.skip(remaining); - } + totalSkip += lengthsVector.getInt(currentRow + i); + } + while (totalSkip > 0) { + totalSkip -= in.skip(totalSkip); } currentRow += total; }