From b00fa47cdef9d5881aa70288c2060caa274a1f2e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Sun, 17 May 2026 02:09:16 +0200 Subject: [PATCH] [SPARK-56896][SQL] Add bulk read paths for timestamp/date Parquet vector updaters --- .../ParquetVectorUpdaterBenchmark-results.txt | 79 ++++++++++--------- .../parquet/ParquetVectorUpdaterFactory.java | 28 ++++--- .../ParquetVectorUpdaterBenchmark.scala | 17 +++- 3 files changed, 75 insertions(+), 49 deletions(-) diff --git a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt index 5918db9f759bb..16b337df1b143 100644 --- a/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt +++ b/sql/core/benchmarks/ParquetVectorUpdaterBenchmark-results.txt @@ -2,83 +2,86 @@ Identity Updaters ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10 on Linux 7.0.0-1004-azure +AMD EPYC 9V45 96-Core Processor Identity Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -BooleanUpdater 0 0 0 14617.4 0.1 1.0X -ByteUpdater (INT32 -> Byte) 0 0 0 3667.7 0.3 0.3X -ShortUpdater (INT32 -> Short) 1 1 0 2048.9 0.5 0.1X -IntegerUpdater 0 0 0 10281.9 0.1 0.7X -LongUpdater 0 0 0 5138.0 0.2 0.4X -FloatUpdater 0 0 0 7742.9 0.1 0.5X -DoubleUpdater 0 0 0 3863.4 0.3 0.3X -BinaryUpdater 15 15 0 70.2 14.2 0.0X +BooleanUpdater 0 0 0 38864.9 0.0 1.0X +ByteUpdater (INT32 -> Byte) 0 0 0 7842.7 0.1 0.2X +ShortUpdater (INT32 -> Short) 0 0 0 3017.2 0.3 0.1X +IntegerUpdater 0 0 0 17085.6 0.1 0.4X +LongUpdater 0 0 0 8404.3 0.1 0.2X +FloatUpdater 0 0 0 17584.7 0.1 0.5X +DoubleUpdater 0 0 0 8787.3 0.1 0.2X +BinaryUpdater 8 8 0 135.0 7.4 0.0X ================================================================================================ Type-converting Updaters ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10 on Linux 7.0.0-1004-azure +AMD EPYC 9V45 96-Core Processor Type-converting Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------- -IntegerToLongUpdater 1 1 0 1279.7 0.8 1.0X -IntegerToDoubleUpdater 1 1 0 1544.8 0.6 1.2X -FloatToDoubleUpdater 1 1 0 1417.9 0.7 1.1X -DateToTimestampNTZUpdater 36 36 1 29.5 33.9 0.0X -DowncastLongUpdater (INT64 -> Decimal(9,2)) 1 1 0 1287.3 0.8 1.0X +IntegerToLongUpdater 1 1 0 2050.9 0.5 1.0X +IntegerToDoubleUpdater 0 1 0 2535.4 0.4 1.2X +FloatToDoubleUpdater 0 0 0 2548.2 0.4 1.2X +DateToTimestampNTZUpdater 18 19 0 58.8 17.0 0.0X +LongAsNanosUpdater (TimeType) 0 0 0 2253.7 0.4 1.1X +DowncastLongUpdater (INT64 -> Decimal(9,2)) 0 0 0 2276.0 0.4 1.1X ================================================================================================ Rebase Updaters ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure -AMD EPYC 7763 64-Core Processor -Rebase Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------------- -IntegerWithRebaseUpdater (DATE legacy) 0 0 0 2599.8 0.4 1.0X -LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 1 1 0 2092.2 0.5 0.8X -LongAsMicrosUpdater (TIMESTAMP_MILLIS) 2 2 0 454.7 2.2 0.2X +OpenJDK 64-Bit Server VM 17.0.19+10 on Linux 7.0.0-1004-azure +AMD EPYC 9V45 96-Core Processor +Rebase Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +----------------------------------------------------------------------------------------------------------------------------------- +IntegerWithRebaseUpdater (DATE legacy) 0 0 0 4353.8 0.2 1.0X +LongWithRebaseUpdater (TIMESTAMP_MICROS legacy) 0 0 0 3487.0 0.3 0.8X +LongAsMicrosUpdater (TIMESTAMP_MILLIS) 0 0 0 2258.5 0.4 0.5X +DateToTimestampNTZWithRebaseUpdater (DATE legacy) 18 19 1 58.3 17.1 0.0X +LongAsMicrosRebaseUpdater (TIMESTAMP_MILLIS legacy) 1 1 0 1817.7 0.6 0.4X ================================================================================================ Unsigned Updaters ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10 on Linux 7.0.0-1004-azure +AMD EPYC 9V45 96-Core Processor Unsigned Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------- -UnsignedIntegerUpdater (UINT32 -> Long) 1 1 0 1091.2 0.9 1.0X -UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 18 18 0 59.1 16.9 0.1X +UnsignedIntegerUpdater (UINT32 -> Long) 1 1 0 1979.6 0.5 1.0X +UnsignedLongUpdater (UINT64 -> Decimal(20,0)) 9 10 0 114.5 8.7 0.1X ================================================================================================ Decimal Updaters ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10 on Linux 7.0.0-1004-azure +AMD EPYC 9V45 96-Core Processor Decimal Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ -IntegerToDecimalUpdater 0 0 0 10241.7 0.1 1.0X -LongToDecimalUpdater 0 0 0 5118.1 0.2 0.5X -FixedLenByteArrayToDecimalUpdater 21 21 0 51.1 19.6 0.0X +IntegerToDecimalUpdater 0 0 0 17605.7 0.1 1.0X +LongToDecimalUpdater 0 0 0 8786.5 0.1 0.5X +FixedLenByteArrayToDecimalUpdater 11 12 0 93.0 10.8 0.0X ================================================================================================ FixedLenByteArray Updaters ================================================================================================ -OpenJDK 64-Bit Server VM 17.0.19+10-LTS on Linux 6.17.0-1010-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 17.0.19+10 on Linux 7.0.0-1004-azure +AMD EPYC 9V45 96-Core Processor FixedLenByteArray Updaters: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------------- -FixedLenByteArrayUpdater (len=16 -> Binary) 19 19 0 55.1 18.2 1.0X -FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 7 7 0 160.2 6.2 2.9X -FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 9 9 0 123.1 8.1 2.2X +FixedLenByteArrayUpdater (len=16 -> Binary) 11 11 0 97.4 10.3 1.0X +FixedLenByteArrayAsIntUpdater (len=4 -> Decimal(9,2)) 3 3 0 336.9 3.0 3.5X +FixedLenByteArrayAsLongUpdater (len=8 -> Decimal(18,4)) 4 5 0 234.7 4.3 2.4X diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index 76cedbbef3b44..90e4c3e04f814 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -429,8 +429,10 @@ public void readValues( int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { - for (int i = 0; i < total; ++i) { - readValue(offset + i, values, valuesReader); + valuesReader.readIntegersAsLongs(total, values, offset); + for (int i = 0; i < total; i++) { + values.putLong(offset + i, + DateTimeUtils.daysToMicros((int) values.getLong(offset + i), ZoneOffset.UTC)); } } @@ -472,8 +474,10 @@ public void readValues( int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { - for (int i = 0; i < total; ++i) { - readValue(offset + i, values, valuesReader); + valuesReader.readIntegersAsLongs(total, values, offset); + for (int i = 0; i < total; i++) { + int rebasedDays = rebaseDays((int) values.getLong(offset + i), failIfRebase); + values.putLong(offset + i, DateTimeUtils.daysToMicros(rebasedDays, ZoneOffset.UTC)); } } @@ -798,8 +802,9 @@ public void readValues( int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { - for (int i = 0; i < total; ++i) { - readValue(offset + i, values, valuesReader); + valuesReader.readLongs(total, values, offset); + for (int i = 0; i < total; i++) { + values.putLong(offset + i, DateTimeUtils.millisToMicros(values.getLong(offset + i))); } } @@ -842,8 +847,10 @@ public void readValues( int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { - for (int i = 0; i < total; ++i) { - readValue(offset + i, values, valuesReader); + valuesReader.readLongs(total, values, offset); + for (int i = 0; i < total; i++) { + long julianMicros = DateTimeUtils.millisToMicros(values.getLong(offset + i)); + values.putLong(offset + i, rebaseMicros(julianMicros, failIfRebase, timeZone)); } } @@ -880,8 +887,9 @@ public void readValues( int offset, WritableColumnVector values, VectorizedValuesReader valuesReader) { - for (int i = 0; i < total; ++i) { - readValue(offset + i, values, valuesReader); + valuesReader.readLongs(total, values, offset); + for (int i = 0; i < total; i++) { + values.putLong(offset + i, DateTimeUtils.microsToNanos(values.getLong(offset + i))); } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterBenchmark.scala index a78593096d5c8..6cbd6ede522f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterBenchmark.scala @@ -48,7 +48,7 @@ import org.apache.spark.sql.types._ * `DowncastLong`. * C. Rebase Updaters -- date/timestamp legacy-calendar rebase variants. * `IntegerWithRebase` (DATE), `LongWithRebase` (TIMESTAMP_MICROS), - * `LongAsMicros`. + * `LongAsMicros`, `DateToTimestampNTZWithRebase`, `LongAsMicrosRebase`. * D. Unsigned Updaters -- `UnsignedInteger`, `UnsignedLong`. * E. Decimal Updaters -- `IntegerToDecimal`, `LongToDecimal`, * `BinaryToDecimal`, `FixedLenByteArrayToDecimal`. @@ -264,6 +264,10 @@ object ParquetVectorUpdaterBenchmark extends BenchmarkBase { TimestampNTZType, descriptor(PrimitiveTypeName.INT32, LogicalTypeAnnotation.dateType()), longVec, intBytes) + addReadValuesCase(benchmark, "LongAsNanosUpdater (TimeType)", + TimeType(), + descriptor(PrimitiveTypeName.INT64), + longVec, longBytes) // 32-bit-decimal target with INT64 source routes via canReadAsLongDecimal + // is32BitDecimalType, both TRUE here, hence DowncastLongUpdater. addReadValuesCase(benchmark, "DowncastLongUpdater (INT64 -> Decimal(9,2))", @@ -303,6 +307,17 @@ object ParquetVectorUpdaterBenchmark extends BenchmarkBase { descriptor(PrimitiveTypeName.INT64, LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)), longVec, longBytes) + addReadValuesCase(benchmark, "DateToTimestampNTZWithRebaseUpdater (DATE legacy)", + TimestampNTZType, + descriptor(PrimitiveTypeName.INT32, LogicalTypeAnnotation.dateType()), + longVec, intBytes, + datetimeRebaseMode = "LEGACY") + addReadValuesCase(benchmark, "LongAsMicrosRebaseUpdater (TIMESTAMP_MILLIS legacy)", + TimestampType, + descriptor(PrimitiveTypeName.INT64, + LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS)), + longVec, longBytes, + datetimeRebaseMode = "LEGACY") benchmark.run() }