diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 2dfc9d96540c..ce533d014584 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -114,6 +114,18 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
// Percentage of segments we failed to get size for
TABLE_STORAGE_EST_MISSING_SEGMENT_PERCENT("TableStorageEstMissingSegmentPercent", false),
+ // Forward index compression ratio scaled by 100 (e.g., 4.5x ratio → 450). Divide by 100 to get actual ratio.
+ TABLE_COMPRESSION_RATIO_PERCENT("TableCompressionRatioPercent", false),
+
+ // Raw (uncompressed) forward index size per replica
+ TABLE_RAW_FORWARD_INDEX_SIZE_PER_REPLICA("TableRawForwardIndexSizePerReplica", false),
+
+ // Compressed forward index size per replica
+ TABLE_COMPRESSED_FORWARD_INDEX_SIZE_PER_REPLICA("TableCompressedForwardIndexSizePerReplica", false),
+
+ // Size per replica broken down by storage tier
+ TABLE_TIERED_STORAGE_SIZE("TableTieredStorageSize", false),
+
// Number of scheduled Cron jobs
CRON_SCHEDULER_JOB_SCHEDULED("cronSchedulerJobScheduled", false),
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ColumnCompressionStatsInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ColumnCompressionStatsInfo.java
new file mode 100644
index 000000000000..022fa4ec9a5c
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/ColumnCompressionStatsInfo.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import javax.annotation.Nullable;
+
+
+/**
+ * Per-column forward index compression statistics.
+ *
+ *
Contains the column name, uncompressed and compressed sizes, compression ratio, codec,
+ * whether the column has a dictionary, and the list of indexes present on the column.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ColumnCompressionStatsInfo {
+ private final String _column;
+ private final long _uncompressedSizeInBytes;
+ private final long _compressedSizeInBytes;
+ private final double _compressionRatio;
+ private final String _codec;
+ private final boolean _hasDictionary;
+ private final List _indexes;
+
+ @JsonCreator
+ public ColumnCompressionStatsInfo(
+ @JsonProperty("column") String column,
+ @JsonProperty("uncompressedSizeInBytes") long uncompressedSizeInBytes,
+ @JsonProperty("compressedSizeInBytes") long compressedSizeInBytes,
+ @JsonProperty("compressionRatio") double compressionRatio,
+ @JsonProperty("codec") @Nullable String codec,
+ @JsonProperty("hasDictionary") boolean hasDictionary,
+ @JsonProperty("indexes") @Nullable List indexes) {
+ _column = column;
+ _uncompressedSizeInBytes = uncompressedSizeInBytes;
+ _compressedSizeInBytes = compressedSizeInBytes;
+ _compressionRatio = compressionRatio;
+ _codec = codec;
+ _hasDictionary = hasDictionary;
+ _indexes = indexes;
+ }
+
+ public String getColumn() {
+ return _column;
+ }
+
+ public long getUncompressedSizeInBytes() {
+ return _uncompressedSizeInBytes;
+ }
+
+ public long getCompressedSizeInBytes() {
+ return _compressedSizeInBytes;
+ }
+
+ public double getCompressionRatio() {
+ return _compressionRatio;
+ }
+
+ @Nullable
+ public String getCodec() {
+ return _codec;
+ }
+
+ @JsonProperty("hasDictionary")
+ public boolean hasDictionary() {
+ return _hasDictionary;
+ }
+
+ @Nullable
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public List getIndexes() {
+ return _indexes;
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/CompressionStatsSummary.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/CompressionStatsSummary.java
new file mode 100644
index 000000000000..beb44bb90977
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/CompressionStatsSummary.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * Table-level compression statistics summary, aggregated from per-column data.
+ * Contains total raw and compressed forward index sizes, the overall compression ratio,
+ * and segment coverage information.
+ *
+ * JSON schema is identical to {@code TableSizeReader.CompressionStats} on the size endpoint.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CompressionStatsSummary {
+ private final long _rawForwardIndexSizePerReplicaInBytes;
+ private final long _compressedForwardIndexSizePerReplicaInBytes;
+ private final double _compressionRatio;
+ private final int _segmentsWithStats;
+ private final int _totalSegments;
+ private final boolean _isPartialCoverage;
+
+ @JsonCreator
+ public CompressionStatsSummary(
+ @JsonProperty("rawForwardIndexSizePerReplicaInBytes") long rawForwardIndexSizePerReplicaInBytes,
+ @JsonProperty("compressedForwardIndexSizePerReplicaInBytes") long compressedForwardIndexSizePerReplicaInBytes,
+ @JsonProperty("compressionRatio") double compressionRatio,
+ @JsonProperty("segmentsWithStats") int segmentsWithStats,
+ @JsonProperty("totalSegments") int totalSegments,
+ @JsonProperty("isPartialCoverage") boolean isPartialCoverage) {
+ _rawForwardIndexSizePerReplicaInBytes = rawForwardIndexSizePerReplicaInBytes;
+ _compressedForwardIndexSizePerReplicaInBytes = compressedForwardIndexSizePerReplicaInBytes;
+ _compressionRatio = compressionRatio;
+ _segmentsWithStats = segmentsWithStats;
+ _totalSegments = totalSegments;
+ _isPartialCoverage = isPartialCoverage;
+ }
+
+ public long getRawForwardIndexSizePerReplicaInBytes() {
+ return _rawForwardIndexSizePerReplicaInBytes;
+ }
+
+ public long getCompressedForwardIndexSizePerReplicaInBytes() {
+ return _compressedForwardIndexSizePerReplicaInBytes;
+ }
+
+ public double getCompressionRatio() {
+ return _compressionRatio;
+ }
+
+ public int getSegmentsWithStats() {
+ return _segmentsWithStats;
+ }
+
+ public int getTotalSegments() {
+ return _totalSegments;
+ }
+
+ @JsonProperty("isPartialCoverage")
+ public boolean isPartialCoverage() {
+ return _isPartialCoverage;
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentSizeInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentSizeInfo.java
index 6a9fdf59e0c7..e54ac824a870 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentSizeInfo.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentSizeInfo.java
@@ -21,18 +21,42 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import javax.annotation.Nullable;
@JsonIgnoreProperties(ignoreUnknown = true)
public class SegmentSizeInfo {
private final String _segmentName;
private final long _diskSizeInBytes;
+ private final long _rawForwardIndexSizeBytes;
+ private final long _compressedForwardIndexSizeBytes;
+ private final String _tier;
+ private final Map _columnCompressionStats;
+
+ public SegmentSizeInfo(String segmentName, long sizeBytes) {
+ this(segmentName, sizeBytes, -1, -1, null, null);
+ }
+
+ public SegmentSizeInfo(String segmentName, long sizeBytes, long rawForwardIndexSizeBytes,
+ long compressedForwardIndexSizeBytes, @Nullable String tier) {
+ this(segmentName, sizeBytes, rawForwardIndexSizeBytes, compressedForwardIndexSizeBytes, tier, null);
+ }
@JsonCreator
public SegmentSizeInfo(@JsonProperty("segmentName") String segmentName,
- @JsonProperty("diskSizeInBytes") long sizeBytes) {
+ @JsonProperty("diskSizeInBytes") long sizeBytes,
+ @JsonProperty("rawForwardIndexSizeBytes") long rawForwardIndexSizeBytes,
+ @JsonProperty("compressedForwardIndexSizeBytes") long compressedForwardIndexSizeBytes,
+ @JsonProperty("tier") @Nullable String tier,
+ @JsonProperty("columnCompressionStats") @Nullable Map
+ columnCompressionStats) {
_segmentName = segmentName;
_diskSizeInBytes = sizeBytes;
+ _rawForwardIndexSizeBytes = rawForwardIndexSizeBytes;
+ _compressedForwardIndexSizeBytes = compressedForwardIndexSizeBytes;
+ _tier = tier;
+ _columnCompressionStats = columnCompressionStats;
}
public String getSegmentName() {
@@ -43,6 +67,24 @@ public long getDiskSizeInBytes() {
return _diskSizeInBytes;
}
+ public long getRawForwardIndexSizeBytes() {
+ return _rawForwardIndexSizeBytes;
+ }
+
+ public long getCompressedForwardIndexSizeBytes() {
+ return _compressedForwardIndexSizeBytes;
+ }
+
+ @Nullable
+ public String getTier() {
+ return _tier;
+ }
+
+ @Nullable
+ public Map getColumnCompressionStats() {
+ return _columnCompressionStats;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StorageBreakdownInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StorageBreakdownInfo.java
new file mode 100644
index 000000000000..480e411e01dd
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StorageBreakdownInfo.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+
+
+/**
+ * Storage breakdown by tier. Contains a map of tier names to their respective
+ * segment count and per-replica size.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class StorageBreakdownInfo {
+
+ private final Map _tiers;
+
+ @JsonCreator
+ public StorageBreakdownInfo(@JsonProperty("tiers") Map tiers) {
+ _tiers = tiers;
+ }
+
+ public Map getTiers() {
+ return _tiers;
+ }
+
+ /**
+ * Segment count and size for a single storage tier.
+ */
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class TierInfo {
+ private final int _count;
+ private final long _sizePerReplicaInBytes;
+
+ @JsonCreator
+ public TierInfo(@JsonProperty("count") int count,
+ @JsonProperty("sizePerReplicaInBytes") long sizePerReplicaInBytes) {
+ _count = count;
+ _sizePerReplicaInBytes = sizePerReplicaInBytes;
+ }
+
+ public int getCount() {
+ return _count;
+ }
+
+ public long getSizePerReplicaInBytes() {
+ return _sizePerReplicaInBytes;
+ }
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java
index 21468d7d426a..86e42745303b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/TableMetadataInfo.java
@@ -20,8 +20,11 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
/**
@@ -46,6 +49,9 @@ public class TableMetadataInfo {
// JSON property name kept as "upsertPartitionToServerPrimaryKeyCountMap" to avoid silent data loss during rolling
// upgrades where servers and controllers may temporarily run different versions of this class.
private final Map> _partitionToServerPrimaryKeyCountMap;
+ private final List _columnCompressionStats;
+ private final CompressionStatsSummary _compressionStats;
+ private final StorageBreakdownInfo _storageBreakdown;
@JsonCreator
public TableMetadataInfo(@JsonProperty("tableName") String tableName,
@@ -55,7 +61,11 @@ public TableMetadataInfo(@JsonProperty("tableName") String tableName,
@JsonProperty("maxNumMultiValuesMap") Map maxNumMultiValuesMap,
@JsonProperty("columnIndexSizeMap") Map> columnIndexSizeMap,
@JsonProperty("upsertPartitionToServerPrimaryKeyCountMap")
- Map> partitionToServerPrimaryKeyCountMap) {
+ Map> partitionToServerPrimaryKeyCountMap,
+ @JsonProperty("columnCompressionStats") @Nullable
+ List columnCompressionStats,
+ @JsonProperty("compressionStats") @Nullable CompressionStatsSummary compressionStats,
+ @JsonProperty("storageBreakdown") @Nullable StorageBreakdownInfo storageBreakdown) {
_tableName = tableName;
_diskSizeInBytes = sizeInBytes;
_numSegments = numSegments;
@@ -65,6 +75,32 @@ public TableMetadataInfo(@JsonProperty("tableName") String tableName,
_maxNumMultiValuesMap = maxNumMultiValuesMap;
_columnIndexSizeMap = columnIndexSizeMap;
_partitionToServerPrimaryKeyCountMap = partitionToServerPrimaryKeyCountMap;
+ _columnCompressionStats = columnCompressionStats;
+ _compressionStats = compressionStats;
+ _storageBreakdown = storageBreakdown;
+ }
+
+ /**
+ * Constructor for callers that provide columnCompressionStats but not compressionStats/storageBreakdown.
+ */
+ public TableMetadataInfo(String tableName, long sizeInBytes, long numSegments, long numRows,
+ Map columnLengthMap, Map columnCardinalityMap,
+ Map maxNumMultiValuesMap, Map> columnIndexSizeMap,
+ Map> partitionToServerPrimaryKeyCountMap,
+ @Nullable List columnCompressionStats) {
+ this(tableName, sizeInBytes, numSegments, numRows, columnLengthMap, columnCardinalityMap, maxNumMultiValuesMap,
+ columnIndexSizeMap, partitionToServerPrimaryKeyCountMap, columnCompressionStats, null, null);
+ }
+
+ /**
+ * Backwards-compatible constructor for callers that don't provide any compression/storage fields.
+ */
+ public TableMetadataInfo(String tableName, long sizeInBytes, long numSegments, long numRows,
+ Map columnLengthMap, Map columnCardinalityMap,
+ Map maxNumMultiValuesMap, Map> columnIndexSizeMap,
+ Map> partitionToServerPrimaryKeyCountMap) {
+ this(tableName, sizeInBytes, numSegments, numRows, columnLengthMap, columnCardinalityMap, maxNumMultiValuesMap,
+ columnIndexSizeMap, partitionToServerPrimaryKeyCountMap, null, null, null);
}
public String getTableName() {
@@ -103,4 +139,22 @@ public Map> getColumnIndexSizeMap() {
public Map> getPartitionToServerPrimaryKeyCountMap() {
return _partitionToServerPrimaryKeyCountMap;
}
+
+ @Nullable
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public List getColumnCompressionStats() {
+ return _columnCompressionStats;
+ }
+
+ @Nullable
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public CompressionStatsSummary getCompressionStats() {
+ return _compressionStats;
+ }
+
+ @Nullable
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public StorageBreakdownInfo getStorageBreakdown() {
+ return _storageBreakdown;
+ }
}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/restlet/resources/ColumnCompressionStatsInfoTest.java b/pinot-common/src/test/java/org/apache/pinot/common/restlet/resources/ColumnCompressionStatsInfoTest.java
new file mode 100644
index 000000000000..6e0ba1429f8a
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/restlet/resources/ColumnCompressionStatsInfoTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class ColumnCompressionStatsInfoTest {
+
+ @Test
+ public void testGetters() {
+ List indexes = Arrays.asList("forward_index", "inverted_index");
+ ColumnCompressionStatsInfo info =
+ new ColumnCompressionStatsInfo("myCol", 8000L, 2000L, 4.0, "LZ4", false, indexes);
+
+ assertEquals(info.getColumn(), "myCol");
+ assertEquals(info.getUncompressedSizeInBytes(), 8000L);
+ assertEquals(info.getCompressedSizeInBytes(), 2000L);
+ assertEquals(info.getCompressionRatio(), 4.0, 1e-9);
+ assertEquals(info.getCodec(), "LZ4");
+ assertFalse(info.hasDictionary());
+ assertEquals(info.getIndexes(), indexes);
+ }
+
+ @Test
+ public void testHasDictionaryTrue() {
+ ColumnCompressionStatsInfo info =
+ new ColumnCompressionStatsInfo("dictCol", 5000L, 1000L, 5.0, "SNAPPY", true,
+ List.of("forward_index"));
+
+ assertTrue(info.hasDictionary());
+ assertEquals(info.getCodec(), "SNAPPY");
+ }
+
+ @Test
+ public void testJsonRoundTrip()
+ throws Exception {
+ List indexes = Arrays.asList("forward_index", "range_index");
+ ColumnCompressionStatsInfo original =
+ new ColumnCompressionStatsInfo("col1", 10000L, 2500L, 4.0, "ZSTANDARD", false, indexes);
+
+ String json = JsonUtils.objectToString(original);
+ ColumnCompressionStatsInfo deserialized =
+ JsonUtils.stringToObject(json, ColumnCompressionStatsInfo.class);
+
+ assertEquals(deserialized.getColumn(), "col1");
+ assertEquals(deserialized.getUncompressedSizeInBytes(), 10000L);
+ assertEquals(deserialized.getCompressedSizeInBytes(), 2500L);
+ assertEquals(deserialized.getCompressionRatio(), 4.0, 1e-9);
+ assertEquals(deserialized.getCodec(), "ZSTANDARD");
+ assertFalse(deserialized.hasDictionary());
+ assertNotNull(deserialized.getIndexes());
+ assertEquals(deserialized.getIndexes().size(), 2);
+ assertTrue(deserialized.getIndexes().contains("forward_index"));
+ assertTrue(deserialized.getIndexes().contains("range_index"));
+ }
+
+ @Test
+ public void testNullCodecAndNullIndexesRoundTrip()
+ throws Exception {
+ ColumnCompressionStatsInfo original =
+ new ColumnCompressionStatsInfo("noCodecCol", 3000L, 1500L, 2.0, null, false, null);
+
+ String json = JsonUtils.objectToString(original);
+ ColumnCompressionStatsInfo deserialized =
+ JsonUtils.stringToObject(json, ColumnCompressionStatsInfo.class);
+
+ assertEquals(deserialized.getColumn(), "noCodecCol");
+ assertEquals(deserialized.getUncompressedSizeInBytes(), 3000L);
+ assertEquals(deserialized.getCompressedSizeInBytes(), 1500L);
+ assertEquals(deserialized.getCompressionRatio(), 2.0, 1e-9);
+ assertNull(deserialized.getCodec());
+ assertFalse(deserialized.hasDictionary());
+ assertNull(deserialized.getIndexes());
+ }
+
+ @Test
+ public void testJsonIgnoresUnknownFields()
+ throws Exception {
+ String json = "{\"column\":\"futureCol\",\"uncompressedSizeInBytes\":6000,"
+ + "\"compressedSizeInBytes\":1200,\"compressionRatio\":5.0,"
+ + "\"codec\":\"LZ4\",\"hasDictionary\":false,"
+ + "\"indexes\":[\"forward_index\"],\"unknownField\":\"ignored\"}";
+
+ ColumnCompressionStatsInfo deserialized =
+ JsonUtils.stringToObject(json, ColumnCompressionStatsInfo.class);
+
+ assertEquals(deserialized.getColumn(), "futureCol");
+ assertEquals(deserialized.getUncompressedSizeInBytes(), 6000L);
+ assertEquals(deserialized.getCompressedSizeInBytes(), 1200L);
+ assertEquals(deserialized.getCompressionRatio(), 5.0, 1e-9);
+ assertEquals(deserialized.getCodec(), "LZ4");
+ assertFalse(deserialized.hasDictionary());
+ assertNotNull(deserialized.getIndexes());
+ assertEquals(deserialized.getIndexes(), List.of("forward_index"));
+ }
+
+ @Test
+ public void testHasDictionaryJsonRoundTrip()
+ throws Exception {
+ ColumnCompressionStatsInfo original =
+ new ColumnCompressionStatsInfo("dictRoundTrip", 7000L, 3500L, 2.0, null, true,
+ List.of("forward_index"));
+
+ String json = JsonUtils.objectToString(original);
+ ColumnCompressionStatsInfo deserialized =
+ JsonUtils.stringToObject(json, ColumnCompressionStatsInfo.class);
+
+ assertEquals(deserialized.getColumn(), "dictRoundTrip");
+ assertTrue(deserialized.hasDictionary());
+ assertNull(deserialized.getCodec());
+ }
+}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/restlet/resources/CompressionStatsSummaryTest.java b/pinot-common/src/test/java/org/apache/pinot/common/restlet/resources/CompressionStatsSummaryTest.java
new file mode 100644
index 000000000000..69bb227bd85a
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/restlet/resources/CompressionStatsSummaryTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class CompressionStatsSummaryTest {
+
+ @Test
+ public void testGetters() {
+ CompressionStatsSummary summary = new CompressionStatsSummary(100000, 40000, 2.5, 8, 10, true);
+ assertEquals(summary.getRawForwardIndexSizePerReplicaInBytes(), 100000);
+ assertEquals(summary.getCompressedForwardIndexSizePerReplicaInBytes(), 40000);
+ assertEquals(summary.getCompressionRatio(), 2.5, 0.001);
+ assertEquals(summary.getSegmentsWithStats(), 8);
+ assertEquals(summary.getTotalSegments(), 10);
+ assertTrue(summary.isPartialCoverage());
+ }
+
+ @Test
+ public void testFullCoverage() {
+ CompressionStatsSummary summary = new CompressionStatsSummary(50000, 25000, 2.0, 5, 5, false);
+ assertEquals(summary.getSegmentsWithStats(), 5);
+ assertEquals(summary.getTotalSegments(), 5);
+ assertFalse(summary.isPartialCoverage());
+ }
+
+ @Test
+ public void testJsonRoundTrip()
+ throws Exception {
+ CompressionStatsSummary original = new CompressionStatsSummary(200000, 80000, 2.5, 3, 4, true);
+ String json = JsonUtils.objectToString(original);
+
+ assertTrue(json.contains("rawForwardIndexSizePerReplicaInBytes"));
+ assertTrue(json.contains("compressedForwardIndexSizePerReplicaInBytes"));
+ assertTrue(json.contains("compressionRatio"));
+ assertTrue(json.contains("segmentsWithStats"));
+ assertTrue(json.contains("totalSegments"));
+ assertTrue(json.contains("isPartialCoverage"));
+
+ CompressionStatsSummary deserialized = JsonUtils.stringToObject(json, CompressionStatsSummary.class);
+ assertEquals(deserialized.getRawForwardIndexSizePerReplicaInBytes(), 200000);
+ assertEquals(deserialized.getCompressedForwardIndexSizePerReplicaInBytes(), 80000);
+ assertEquals(deserialized.getCompressionRatio(), 2.5, 0.001);
+ assertEquals(deserialized.getSegmentsWithStats(), 3);
+ assertEquals(deserialized.getTotalSegments(), 4);
+ assertTrue(deserialized.isPartialCoverage());
+ }
+
+ @Test
+ public void testJsonIgnoresUnknownFields()
+ throws Exception {
+ String json = "{\"rawForwardIndexSizePerReplicaInBytes\":1000,\"compressedForwardIndexSizePerReplicaInBytes\":500,"
+ + "\"compressionRatio\":2.0,\"segmentsWithStats\":1,\"totalSegments\":1,\"isPartialCoverage\":false,"
+ + "\"unknownFutureField\":\"ignored\"}";
+ CompressionStatsSummary summary = JsonUtils.stringToObject(json, CompressionStatsSummary.class);
+ assertEquals(summary.getRawForwardIndexSizePerReplicaInBytes(), 1000);
+ assertFalse(summary.isPartialCoverage());
+ }
+}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/restlet/resources/SegmentSizeInfoTest.java b/pinot-common/src/test/java/org/apache/pinot/common/restlet/resources/SegmentSizeInfoTest.java
new file mode 100644
index 000000000000..7c48e3e7a753
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/restlet/resources/SegmentSizeInfoTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class SegmentSizeInfoTest {
+
+ @Test
+ public void testJsonRoundTripWithCompressionStats()
+ throws Exception {
+ Map columnStats = new HashMap<>();
+ columnStats.put("col1", new ColumnCompressionStatsInfo("col1", 10000, 2500, 4.0, "LZ4", false,
+ List.of("forward_index")));
+ columnStats.put("col2", new ColumnCompressionStatsInfo("col2", 20000, 4000, 5.0, "ZSTANDARD", false,
+ List.of("forward_index")));
+
+ SegmentSizeInfo original = new SegmentSizeInfo("seg1", 50000, 30000, 6500, "tier1", columnStats);
+
+ String json = JsonUtils.objectToString(original);
+ SegmentSizeInfo deserialized = JsonUtils.stringToObject(json, SegmentSizeInfo.class);
+
+ assertEquals(deserialized.getSegmentName(), "seg1");
+ assertEquals(deserialized.getDiskSizeInBytes(), 50000);
+ assertEquals(deserialized.getRawForwardIndexSizeBytes(), 30000);
+ assertEquals(deserialized.getCompressedForwardIndexSizeBytes(), 6500);
+ assertEquals(deserialized.getTier(), "tier1");
+ assertNotNull(deserialized.getColumnCompressionStats());
+ assertEquals(deserialized.getColumnCompressionStats().size(), 2);
+
+ ColumnCompressionStatsInfo col1Stats = deserialized.getColumnCompressionStats().get("col1");
+ assertNotNull(col1Stats);
+ assertEquals(col1Stats.getColumn(), "col1");
+ assertEquals(col1Stats.getUncompressedSizeInBytes(), 10000);
+ assertEquals(col1Stats.getCompressedSizeInBytes(), 2500);
+ assertEquals(col1Stats.getCompressionRatio(), 4.0, 0.01);
+ assertEquals(col1Stats.getCodec(), "LZ4");
+ assertFalse(col1Stats.hasDictionary());
+ }
+
+ @Test
+ public void testJsonRoundTripBackwardCompatible()
+ throws Exception {
+ // Simulate old server response without compression fields
+ String oldJson = "{\"segmentName\":\"seg1\",\"diskSizeInBytes\":50000}";
+ SegmentSizeInfo deserialized = JsonUtils.stringToObject(oldJson, SegmentSizeInfo.class);
+
+ assertEquals(deserialized.getSegmentName(), "seg1");
+ assertEquals(deserialized.getDiskSizeInBytes(), 50000);
+ assertEquals(deserialized.getRawForwardIndexSizeBytes(), 0);
+ assertEquals(deserialized.getCompressedForwardIndexSizeBytes(), 0);
+ assertNull(deserialized.getTier());
+ assertNull(deserialized.getColumnCompressionStats());
+ }
+
+ @Test
+ public void testJsonRoundTripWithoutColumnStats()
+ throws Exception {
+ SegmentSizeInfo original = new SegmentSizeInfo("seg1", 50000, 30000, 6500, "default");
+
+ String json = JsonUtils.objectToString(original);
+ SegmentSizeInfo deserialized = JsonUtils.stringToObject(json, SegmentSizeInfo.class);
+
+ assertEquals(deserialized.getSegmentName(), "seg1");
+ assertEquals(deserialized.getDiskSizeInBytes(), 50000);
+ assertEquals(deserialized.getRawForwardIndexSizeBytes(), 30000);
+ assertEquals(deserialized.getCompressedForwardIndexSizeBytes(), 6500);
+ assertEquals(deserialized.getTier(), "default");
+ assertNull(deserialized.getColumnCompressionStats());
+ }
+
+ @Test
+ public void testLegacyTwoArgConstructor() {
+ SegmentSizeInfo info = new SegmentSizeInfo("seg1", 1000);
+ assertEquals(info.getSegmentName(), "seg1");
+ assertEquals(info.getDiskSizeInBytes(), 1000);
+ assertEquals(info.getRawForwardIndexSizeBytes(), -1);
+ assertEquals(info.getCompressedForwardIndexSizeBytes(), -1);
+ assertNull(info.getTier());
+ assertNull(info.getColumnCompressionStats());
+ }
+}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/restlet/resources/StorageBreakdownInfoTest.java b/pinot-common/src/test/java/org/apache/pinot/common/restlet/resources/StorageBreakdownInfoTest.java
new file mode 100644
index 000000000000..92870a4cdb60
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/restlet/resources/StorageBreakdownInfoTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class StorageBreakdownInfoTest {
+
+ @Test
+ public void testTierInfoGetters() {
+ StorageBreakdownInfo.TierInfo tierInfo = new StorageBreakdownInfo.TierInfo(5, 1048576L);
+
+ assertEquals(tierInfo.getCount(), 5);
+ assertEquals(tierInfo.getSizePerReplicaInBytes(), 1048576L);
+ }
+
+ @Test
+ public void testGetTiersMap() {
+ Map tiers = new HashMap<>();
+ tiers.put("hotTier", new StorageBreakdownInfo.TierInfo(3, 2000000L));
+ tiers.put("coldTier", new StorageBreakdownInfo.TierInfo(7, 8000000L));
+
+ StorageBreakdownInfo info = new StorageBreakdownInfo(tiers);
+
+ assertNotNull(info.getTiers());
+ assertEquals(info.getTiers().size(), 2);
+ assertEquals(info.getTiers().get("hotTier").getCount(), 3);
+ assertEquals(info.getTiers().get("hotTier").getSizePerReplicaInBytes(), 2000000L);
+ assertEquals(info.getTiers().get("coldTier").getCount(), 7);
+ assertEquals(info.getTiers().get("coldTier").getSizePerReplicaInBytes(), 8000000L);
+ }
+
+ @Test
+ public void testJsonRoundTripWithMultipleTiers()
+ throws Exception {
+ Map tiers = new HashMap<>();
+ tiers.put("tier1", new StorageBreakdownInfo.TierInfo(10, 5000000L));
+ tiers.put("tier2", new StorageBreakdownInfo.TierInfo(4, 1500000L));
+
+ StorageBreakdownInfo original = new StorageBreakdownInfo(tiers);
+
+ String json = JsonUtils.objectToString(original);
+ StorageBreakdownInfo deserialized = JsonUtils.stringToObject(json, StorageBreakdownInfo.class);
+
+ assertNotNull(deserialized.getTiers());
+ assertEquals(deserialized.getTiers().size(), 2);
+
+ StorageBreakdownInfo.TierInfo tier1 = deserialized.getTiers().get("tier1");
+ assertNotNull(tier1);
+ assertEquals(tier1.getCount(), 10);
+ assertEquals(tier1.getSizePerReplicaInBytes(), 5000000L);
+
+ StorageBreakdownInfo.TierInfo tier2 = deserialized.getTiers().get("tier2");
+ assertNotNull(tier2);
+ assertEquals(tier2.getCount(), 4);
+ assertEquals(tier2.getSizePerReplicaInBytes(), 1500000L);
+ }
+
+ @Test
+ public void testJsonRoundTripEmptyTiers()
+ throws Exception {
+ StorageBreakdownInfo original = new StorageBreakdownInfo(Collections.emptyMap());
+
+ String json = JsonUtils.objectToString(original);
+ StorageBreakdownInfo deserialized = JsonUtils.stringToObject(json, StorageBreakdownInfo.class);
+
+ assertNotNull(deserialized.getTiers());
+ assertTrue(deserialized.getTiers().isEmpty());
+ }
+
+ @Test
+ public void testJsonIgnoresUnknownFieldsOnStorageBreakdownInfo()
+ throws Exception {
+ String json = "{\"tiers\":{\"hotTier\":{\"count\":2,\"sizePerReplicaInBytes\":900000}},"
+ + "\"unknownTopField\":\"ignored\"}";
+
+ StorageBreakdownInfo deserialized = JsonUtils.stringToObject(json, StorageBreakdownInfo.class);
+
+ assertNotNull(deserialized.getTiers());
+ assertEquals(deserialized.getTiers().size(), 1);
+ assertEquals(deserialized.getTiers().get("hotTier").getCount(), 2);
+ assertEquals(deserialized.getTiers().get("hotTier").getSizePerReplicaInBytes(), 900000L);
+ }
+
+ @Test
+ public void testJsonIgnoresUnknownFieldsOnTierInfo()
+ throws Exception {
+ String json = "{\"tiers\":{\"tier1\":{\"count\":3,\"sizePerReplicaInBytes\":4000000,"
+ + "\"futureField\":\"ignored\"}}}";
+
+ StorageBreakdownInfo deserialized = JsonUtils.stringToObject(json, StorageBreakdownInfo.class);
+
+ assertNotNull(deserialized.getTiers());
+ StorageBreakdownInfo.TierInfo tierInfo = deserialized.getTiers().get("tier1");
+ assertNotNull(tierInfo);
+ assertEquals(tierInfo.getCount(), 3);
+ assertEquals(tierInfo.getSizePerReplicaInBytes(), 4000000L);
+ }
+
+ @Test
+ public void testNullTiersMap()
+ throws Exception {
+ StorageBreakdownInfo original = new StorageBreakdownInfo(null);
+
+ String json = JsonUtils.objectToString(original);
+ StorageBreakdownInfo deserialized = JsonUtils.stringToObject(json, StorageBreakdownInfo.class);
+
+ assertNull(deserialized.getTiers());
+ }
+}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/restlet/resources/TableMetadataInfoCompressionTest.java b/pinot-common/src/test/java/org/apache/pinot/common/restlet/resources/TableMetadataInfoCompressionTest.java
new file mode 100644
index 000000000000..1459e43b5b4d
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/restlet/resources/TableMetadataInfoCompressionTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+/**
+ * Tests the TableMetadataInfo response schema for compression stats (T056/T057).
+ * Validates server-side response includes columnCompressionStats array when present
+ * and suppresses it (via NON_NULL) when absent.
+ */
+public class TableMetadataInfoCompressionTest {
+
+ @Test
+ public void testSerializationWithCompressionStats()
+ throws Exception {
+ List colStats = new ArrayList<>();
+ colStats.add(new ColumnCompressionStatsInfo("col_a", 10000, 2000, 5.0, "LZ4", false, List.of("forward_index")));
+ colStats.add(new ColumnCompressionStatsInfo("col_b", 20000, 5000, 4.0, "ZSTANDARD", false,
+ List.of("forward_index", "inverted_index")));
+
+ TableMetadataInfo info = new TableMetadataInfo("testTable", 50000, 3, 1000,
+ Map.of("col_a", 4.0), Map.of("col_a", 50.0), Map.of(), Map.of(), Map.of(), colStats);
+
+ String json = JsonUtils.objectToString(info);
+ JsonNode node = JsonUtils.stringToJsonNode(json);
+
+ // columnCompressionStats should be present as an array
+ assertTrue(node.has("columnCompressionStats"));
+ JsonNode colStatsNode = node.get("columnCompressionStats");
+ assertTrue(colStatsNode.isArray(), "columnCompressionStats should be a JSON array");
+ assertEquals(colStatsNode.size(), 2);
+
+ // Validate col_a values (first element)
+ JsonNode colA = colStatsNode.get(0);
+ assertEquals(colA.get("column").asText(), "col_a");
+ assertEquals(colA.get("uncompressedSizeInBytes").asLong(), 10000);
+ assertEquals(colA.get("compressedSizeInBytes").asLong(), 2000);
+ assertEquals(colA.get("compressionRatio").asDouble(), 5.0, 0.01);
+ assertEquals(colA.get("codec").asText(), "LZ4");
+ assertFalse(colA.get("hasDictionary").asBoolean());
+ assertTrue(colA.has("indexes"));
+
+ // Validate col_b values (second element)
+ JsonNode colB = colStatsNode.get(1);
+ assertEquals(colB.get("column").asText(), "col_b");
+ assertEquals(colB.get("uncompressedSizeInBytes").asLong(), 20000);
+ assertEquals(colB.get("compressedSizeInBytes").asLong(), 5000);
+ assertEquals(colB.get("compressionRatio").asDouble(), 4.0, 0.01);
+ assertEquals(colB.get("codec").asText(), "ZSTANDARD");
+ assertFalse(colB.get("hasDictionary").asBoolean());
+ assertEquals(colB.get("indexes").size(), 2);
+ }
+
+ @Test
+ public void testSerializationWithoutCompressionStats()
+ throws Exception {
+ // Use backwards-compatible constructor (no compression stats)
+ TableMetadataInfo info = new TableMetadataInfo("testTable", 50000, 3, 1000,
+ Map.of("col_a", 4.0), Map.of("col_a", 50.0), Map.of(), Map.of(), Map.of());
+
+ String json = JsonUtils.objectToString(info);
+ JsonNode node = JsonUtils.stringToJsonNode(json);
+
+ // columnCompressionStats should be absent (suppressed by NON_NULL)
+ assertFalse(node.has("columnCompressionStats"),
+ "columnCompressionStats should be suppressed from JSON when null");
+ }
+
+ @Test
+ public void testDeserializationRoundTrip()
+ throws Exception {
+ List colStats = new ArrayList<>();
+ colStats.add(new ColumnCompressionStatsInfo("metric_col", 50000, 8000, 6.25, "SNAPPY", false,
+ List.of("forward_index")));
+
+ TableMetadataInfo original = new TableMetadataInfo("roundTripTable", 100000, 5, 5000,
+ Map.of("metric_col", 8.0), Map.of("metric_col", 100.0), Map.of(), Map.of(), Map.of(), colStats);
+
+ String json = JsonUtils.objectToString(original);
+ TableMetadataInfo deserialized = JsonUtils.stringToObject(json, TableMetadataInfo.class);
+
+ assertEquals(deserialized.getTableName(), "roundTripTable");
+ assertEquals(deserialized.getDiskSizeInBytes(), 100000);
+ assertNotNull(deserialized.getColumnCompressionStats());
+ assertEquals(deserialized.getColumnCompressionStats().size(), 1);
+
+ ColumnCompressionStatsInfo stats = deserialized.getColumnCompressionStats().get(0);
+ assertNotNull(stats);
+ assertEquals(stats.getColumn(), "metric_col");
+ assertEquals(stats.getUncompressedSizeInBytes(), 50000);
+ assertEquals(stats.getCompressedSizeInBytes(), 8000);
+ assertEquals(stats.getCompressionRatio(), 6.25, 0.01);
+ assertEquals(stats.getCodec(), "SNAPPY");
+ assertFalse(stats.hasDictionary());
+ assertNotNull(stats.getIndexes());
+ }
+
+ @Test
+ public void testBackwardCompatDeserialization()
+ throws Exception {
+ // Simulate JSON from an old server that doesn't include columnCompressionStats
+ String oldJson = "{\"tableName\":\"oldTable\",\"diskSizeInBytes\":30000,"
+ + "\"numSegments\":2,\"numRows\":500,"
+ + "\"columnLengthMap\":{\"col\":4.0},"
+ + "\"columnCardinalityMap\":{\"col\":10.0},"
+ + "\"maxNumMultiValuesMap\":{},"
+ + "\"columnIndexSizeMap\":{},"
+ + "\"upsertPartitionToServerPrimaryKeyCountMap\":{}}";
+
+ TableMetadataInfo info = JsonUtils.stringToObject(oldJson, TableMetadataInfo.class);
+ assertNotNull(info);
+ assertEquals(info.getTableName(), "oldTable");
+ assertEquals(info.getDiskSizeInBytes(), 30000);
+ // columnCompressionStats should be null (not present in old JSON)
+ assertNull(info.getColumnCompressionStats());
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 91985f5997ea..38f92894d76f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -1270,9 +1270,14 @@ public String getTableAggregateMetadata(
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
int numReplica = tableConfig == null ? 1 : tableConfig.getReplication();
+ // Check feature flag — suppress columnCompressionStats when disabled
+ boolean compressionStatsEnabled = tableConfig != null && tableConfig.getIndexingConfig() != null
+ && tableConfig.getIndexingConfig().isCompressionStatsEnabled();
+
String segmentsMetadata;
try {
- JsonNode segmentsMetadataJson = getAggregateMetadataFromServer(tableNameWithType, columns, numReplica);
+ JsonNode segmentsMetadataJson =
+ getAggregateMetadataFromServer(tableNameWithType, columns, numReplica, compressionStatsEnabled);
segmentsMetadata = JsonUtils.objectToPrettyString(segmentsMetadataJson);
} catch (InvalidConfigException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST);
@@ -1322,9 +1327,14 @@ public String getTableAggregateMetadataDeprecated(
}
}
+ // Check feature flag — suppress columnCompressionStats when disabled
+ boolean compressionStatsEnabled = tableConfig != null && tableConfig.getIndexingConfig() != null
+ && tableConfig.getIndexingConfig().isCompressionStatsEnabled();
+
try {
JsonNode segmentsMetadataJson =
- getAggregateMetadataFromServer(existingTableNameWithType, columnsList, numReplica);
+ getAggregateMetadataFromServer(existingTableNameWithType, columnsList, numReplica,
+ compressionStatsEnabled);
return JsonUtils.objectToPrettyString(segmentsMetadataJson);
} catch (InvalidConfigException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST);
@@ -1454,12 +1464,13 @@ private JsonNode getAggregateIndexMetadataFromServer(String tableNameWithType)
* @param numReplica num or replica for the table
* @return aggregated metadata of the table segments
*/
- private JsonNode getAggregateMetadataFromServer(String tableNameWithType, List columns, int numReplica)
+ private JsonNode getAggregateMetadataFromServer(String tableNameWithType, List columns, int numReplica,
+ boolean compressionStatsEnabled)
throws InvalidConfigException, IOException {
TableMetadataReader tableMetadataReader =
new TableMetadataReader(_executor, _connectionManager, _pinotHelixResourceManager);
return tableMetadataReader.getAggregateTableMetadata(tableNameWithType, columns, numReplica,
- _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+ _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000, compressionStatsEnabled);
}
@GET
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
index ab20f6fb7453..f352a3269889 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java
@@ -500,6 +500,8 @@ protected void nonLeaderCleanup(List tableNamesWithType) {
private void removeMetricsForTable(String tableNameWithType) {
LOGGER.info("Removing metrics from {} given it is not a table known by Helix", tableNameWithType);
+ // Remove tier-suffixed gauges that use composite keys (tableName.tierKey)
+ _tableSizeReader.clearTierMetrics(tableNameWithType);
for (ControllerGauge metric : ControllerGauge.values()) {
if (!metric.isGlobal()) {
_controllerMetrics.removeTableGauge(tableNameWithType, metric);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index a1669a2882b8..ca89cb6d2684 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -42,6 +42,9 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.pinot.common.restlet.resources.ColumnCompressionStatsInfo;
+import org.apache.pinot.common.restlet.resources.CompressionStatsSummary;
+import org.apache.pinot.common.restlet.resources.StorageBreakdownInfo;
import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
import org.apache.pinot.common.restlet.resources.TableSegments;
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
@@ -93,7 +96,8 @@ public ServerSegmentMetadataReader(Executor executor, HttpClientConnectionManage
* table.
*/
public TableMetadataInfo getAggregatedTableMetadataFromServer(String tableNameWithType,
- BiMap serverEndPoints, List columns, int numReplica, int timeoutMs) {
+ BiMap serverEndPoints, List columns, int numReplica, int timeoutMs,
+ boolean compressionStatsEnabled) {
int numServers = serverEndPoints.size();
LOGGER.info("Reading aggregated segment metadata from {} servers for table: {} with timeout: {}ms", numServers,
tableNameWithType, timeoutMs);
@@ -120,6 +124,17 @@ public TableMetadataInfo getAggregatedTableMetadataFromServer(String tableNameWi
final Map maxNumMultiValuesMap = new HashMap<>();
final Map> columnIndexSizeMap = new HashMap<>();
final Map> partitionToServerPrimaryKeyCountMap = new HashMap<>();
+ // Per-column compression stats accumulators: [0]=uncompressed, [1]=compressed
+ final Map columnCompressionAccum = new HashMap<>();
+ final Map columnCodecMap = new HashMap<>();
+ final Map columnHasDictMap = new HashMap<>();
+ final Map> columnIndexNamesMap = new HashMap<>();
+ long aggRawSize = 0;
+ long aggCompressedSize = 0;
+ int aggSegmentsWithStats = 0;
+ int aggTotalSegments = 0;
+ boolean hasCompressionSummary = false;
+ final Map tierAccum = new HashMap<>(); // [count, size]
for (Map.Entry streamResponse : serviceResponse._httpResponses.entrySet()) {
try {
TableMetadataInfo tableMetadataInfo =
@@ -144,6 +159,50 @@ public TableMetadataInfo getAggregatedTableMetadataFromServer(String tableNameWi
}
return l;
}));
+ // Aggregate per-column compression stats from server responses
+ List serverColStats = tableMetadataInfo.getColumnCompressionStats();
+ if (serverColStats != null) {
+ for (ColumnCompressionStatsInfo info : serverColStats) {
+ // Skip columns with no meaningful compression data (old raw segments without persisted codec)
+ if (info.getCodec() == null && !info.hasDictionary()) {
+ continue;
+ }
+ String col = info.getColumn();
+ long[] accum = columnCompressionAccum.computeIfAbsent(col, k -> new long[2]);
+ // Only accumulate uncompressed size when it is a real value (not the -1 sentinel from dict columns)
+ if (info.getUncompressedSizeInBytes() >= 0) {
+ accum[0] += info.getUncompressedSizeInBytes();
+ }
+ accum[1] += info.getCompressedSizeInBytes();
+ if (info.getCodec() != null) {
+ columnCodecMap.merge(col, info.getCodec(),
+ (existing, incoming) -> existing.equals(incoming) ? existing : "MIXED");
+ }
+ columnHasDictMap.put(col, info.hasDictionary());
+ if (info.getIndexes() != null) {
+ columnIndexNamesMap.computeIfAbsent(col, k -> new HashSet<>()).addAll(info.getIndexes());
+ }
+ }
+ }
+ // Aggregate compressionStats summary (sum raw/compressed across servers)
+ CompressionStatsSummary serverSummary = tableMetadataInfo.getCompressionStats();
+ if (serverSummary != null) {
+ aggRawSize += serverSummary.getRawForwardIndexSizePerReplicaInBytes();
+ aggCompressedSize += serverSummary.getCompressedForwardIndexSizePerReplicaInBytes();
+ aggSegmentsWithStats += serverSummary.getSegmentsWithStats();
+ aggTotalSegments += serverSummary.getTotalSegments();
+ hasCompressionSummary = true;
+ }
+ // Aggregate storageBreakdown (sum counts and sizes per tier)
+ StorageBreakdownInfo serverBreakdown = tableMetadataInfo.getStorageBreakdown();
+ if (serverBreakdown != null && serverBreakdown.getTiers() != null) {
+ for (Map.Entry tierEntry
+ : serverBreakdown.getTiers().entrySet()) {
+ long[] vals = tierAccum.computeIfAbsent(tierEntry.getKey(), k -> new long[2]);
+ vals[0] += tierEntry.getValue().getCount();
+ vals[1] += tierEntry.getValue().getSizePerReplicaInBytes();
+ }
+ }
} catch (IOException e) {
failedParses++;
LOGGER.error("Unable to parse server {} response due to an error: ", streamResponse.getKey(), e);
@@ -165,9 +224,65 @@ public TableMetadataInfo getAggregatedTableMetadataFromServer(String tableNameWi
totalNumSegments /= numReplica;
totalNumRows /= numReplica;
+ // Build per-column compression stats list (divide by numReplica since each replica reports the same stats)
+ List columnCompressionStats = null;
+ if (!columnCompressionAccum.isEmpty()) {
+ columnCompressionStats = new ArrayList<>();
+ for (Map.Entry entry : columnCompressionAccum.entrySet()) {
+ String col = entry.getKey();
+ long[] accum = entry.getValue();
+ boolean hasDictionary = Boolean.TRUE.equals(columnHasDictMap.get(col));
+ // Dict columns have no uncompressed size; preserve -1 sentinel instead of dividing 0
+ long uncompressed = (hasDictionary && accum[0] == 0) ? -1 : accum[0] / numReplica;
+ long compressed = accum[1] / numReplica;
+ double ratio = (uncompressed > 0 && compressed > 0) ? (double) uncompressed / compressed : 0;
+ Set idxNames = columnIndexNamesMap.get(col);
+ List indexes = idxNames != null ? new ArrayList<>(idxNames) : null;
+ columnCompressionStats.add(new ColumnCompressionStatsInfo(
+ col, uncompressed, compressed, ratio, columnCodecMap.get(col), hasDictionary, indexes));
+ }
+ columnCompressionStats.sort((a, b) -> a.getColumn().compareTo(b.getColumn()));
+ }
+
+ // Build aggregated compression summary (divide by numReplica to avoid double counting)
+ CompressionStatsSummary compressionStatsSummary = null;
+ if (hasCompressionSummary) {
+ long rawPerReplica = aggRawSize / numReplica;
+ long compressedPerReplica = aggCompressedSize / numReplica;
+ double ratio = (rawPerReplica > 0 && compressedPerReplica > 0)
+ ? (double) rawPerReplica / compressedPerReplica : 0;
+ int segmentsWithStats = aggSegmentsWithStats / numReplica;
+ int totalSegments = aggTotalSegments / numReplica;
+ if (segmentsWithStats > 0) {
+ boolean isPartialCoverage = segmentsWithStats < totalSegments;
+ compressionStatsSummary = new CompressionStatsSummary(rawPerReplica, compressedPerReplica, ratio,
+ segmentsWithStats, totalSegments, isPartialCoverage);
+ }
+ }
+
+ // Build aggregated storage breakdown (divide by numReplica to avoid double counting)
+ StorageBreakdownInfo storageBreakdownInfo = null;
+ if (!tierAccum.isEmpty()) {
+ Map tiers = new HashMap<>();
+ for (Map.Entry entry : tierAccum.entrySet()) {
+ int count = (int) (entry.getValue()[0] / numReplica);
+ long size = entry.getValue()[1] / numReplica;
+ tiers.put(entry.getKey(), new StorageBreakdownInfo.TierInfo(count, size));
+ }
+ storageBreakdownInfo = new StorageBreakdownInfo(tiers);
+ }
+
+ // When compression stats flag is OFF, suppress compressionStats and columnCompressionStats
+ // but always keep storageBreakdown (tier breakdown is independent of the compression stats flag)
+ if (!compressionStatsEnabled) {
+ columnCompressionStats = null;
+ compressionStatsSummary = null;
+ }
+
TableMetadataInfo aggregateTableMetadataInfo =
new TableMetadataInfo(tableNameWithType, totalDiskSizeInBytes, totalNumSegments, totalNumRows, columnLengthMap,
- columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizeMap, partitionToServerPrimaryKeyCountMap);
+ columnCardinalityMap, maxNumMultiValuesMap, columnIndexSizeMap, partitionToServerPrimaryKeyCountMap,
+ columnCompressionStats, compressionStatsSummary, storageBreakdownInfo);
if (failedParses != 0) {
LOGGER.warn("Failed to parse {} / {} aggregated segment metadata responses from servers.", failedParses,
serverUrls.size());
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
index 628c917ff061..17665d0c94d4 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
@@ -261,7 +261,7 @@ public JsonNode getSegmentMetadata(String tableNameWithType, String segmentName,
* @return a map of segmentName to its metadata
*/
public JsonNode getAggregateTableMetadata(String tableNameWithType, List columns, int numReplica,
- int timeoutMs)
+ int timeoutMs, boolean compressionStatsEnabled)
throws InvalidConfigException {
final Map> serverToSegments =
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
@@ -272,7 +272,7 @@ public JsonNode getAggregateTableMetadata(String tableNameWithType, List
TableMetadataInfo aggregateTableMetadataInfo =
serverSegmentMetadataReader.getAggregatedTableMetadataFromServer(tableNameWithType, endpoints, columns,
- numReplica, timeoutMs);
+ numReplica, timeoutMs, compressionStatsEnabled);
return JsonUtils.objectToJsonNode(aggregateTableMetadataInfo);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableSizeReader.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableSizeReader.java
index de9330289daa..1a8ef416e2a6 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableSizeReader.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableSizeReader.java
@@ -19,14 +19,19 @@
package org.apache.pinot.controller.util;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import javax.annotation.Nonnegative;
import javax.annotation.Nullable;
@@ -35,6 +40,7 @@
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.restlet.resources.ColumnCompressionStatsInfo;
import org.apache.pinot.common.restlet.resources.SegmentSizeInfo;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.api.resources.ServerTableSizeReader;
@@ -57,6 +63,8 @@ public class TableSizeReader {
private final PinotHelixResourceManager _helixResourceManager;
private final ControllerMetrics _controllerMetrics;
private final LeadControllerManager _leadControllerManager;
+ // Tracks emitted tier keys per table so stale tier gauges can be removed
+ private final Map> _emittedTierKeys = new ConcurrentHashMap<>();
public TableSizeReader(Executor executor, HttpClientConnectionManager connectionManager,
ControllerMetrics controllerMetrics, PinotHelixResourceManager helixResourceManager,
@@ -125,6 +133,14 @@ public TableSizeDetails getTableSizeDetails(String tableName, @Nonnegative int t
if (largestSegmentSizeOnServer != DEFAULT_SIZE_WHEN_MISSING_OR_ERROR) {
emitMetrics(realtimeTableName, ControllerGauge.LARGEST_SEGMENT_SIZE_ON_SERVER, largestSegmentSizeOnServer);
}
+ emitTierMetrics(realtimeTableName, tableSizeDetails._realtimeSegments._storageBreakdown);
+ if (isCompressionStatsEnabled(realtimeTableConfig)) {
+ emitCompressionMetrics(realtimeTableName, tableSizeDetails._realtimeSegments);
+ } else {
+ clearCompressionMetrics(realtimeTableName);
+ tableSizeDetails._realtimeSegments._compressionStats = null;
+ tableSizeDetails._realtimeSegments._columnCompressionStats = null;
+ }
}
if (hasOfflineTableConfig) {
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
@@ -151,6 +167,14 @@ public TableSizeDetails getTableSizeDetails(String tableName, @Nonnegative int t
if (largestSegmentSizeOnServer != DEFAULT_SIZE_WHEN_MISSING_OR_ERROR) {
emitMetrics(offlineTableName, ControllerGauge.LARGEST_SEGMENT_SIZE_ON_SERVER, largestSegmentSizeOnServer);
}
+ emitTierMetrics(offlineTableName, tableSizeDetails._offlineSegments._storageBreakdown);
+ if (isCompressionStatsEnabled(offlineTableConfig)) {
+ emitCompressionMetrics(offlineTableName, tableSizeDetails._offlineSegments);
+ } else {
+ clearCompressionMetrics(offlineTableName);
+ tableSizeDetails._offlineSegments._compressionStats = null;
+ tableSizeDetails._offlineSegments._columnCompressionStats = null;
+ }
}
// Set the top level sizes to DEFAULT_SIZE_WHEN_MISSING_OR_ERROR when all segments are error
@@ -164,12 +188,87 @@ public TableSizeDetails getTableSizeDetails(String tableName, @Nonnegative int t
return tableSizeDetails;
}
+ private void emitCompressionMetrics(String tableNameWithType, TableSubTypeSizeDetails subTypeDetails) {
+ CompressionStats stats = subTypeDetails._compressionStats;
+ if (stats != null && stats._segmentsWithStats > 0 && stats._compressedForwardIndexSizePerReplicaInBytes > 0) {
+ emitMetrics(tableNameWithType, ControllerGauge.TABLE_RAW_FORWARD_INDEX_SIZE_PER_REPLICA,
+ stats._rawForwardIndexSizePerReplicaInBytes);
+ emitMetrics(tableNameWithType, ControllerGauge.TABLE_COMPRESSED_FORWARD_INDEX_SIZE_PER_REPLICA,
+ stats._compressedForwardIndexSizePerReplicaInBytes);
+ // Emit ratio * 100 to preserve two decimal digits of precision as a long gauge
+ long ratioPercent = Math.round(stats._compressionRatio * 100);
+ emitMetrics(tableNameWithType, ControllerGauge.TABLE_COMPRESSION_RATIO_PERCENT, ratioPercent);
+ } else {
+ // No segments have stats — clear any previously emitted stale metrics
+ clearCompressionMetrics(tableNameWithType);
+ }
+ }
+
+ private void emitTierMetrics(String tableNameWithType, @Nullable StorageBreakdown breakdown) {
+ Set currentTierKeys = new HashSet<>();
+ if (breakdown != null && _leadControllerManager.isLeaderForTable(tableNameWithType)) {
+ for (Map.Entry tierEntry : breakdown._tiers.entrySet()) {
+ String tierKey = tierEntry.getKey();
+ currentTierKeys.add(tierKey);
+ _controllerMetrics.setOrUpdateTableGauge(tableNameWithType, tierKey,
+ ControllerGauge.TABLE_TIERED_STORAGE_SIZE, tierEntry.getValue()._sizePerReplicaInBytes);
+ }
+ }
+ // Remove gauges for tier keys that were emitted previously but are no longer present.
+ // Only track tables that actually have tiers to avoid unnecessary map entries.
+ Set previousTierKeys;
+ if (currentTierKeys.isEmpty()) {
+ previousTierKeys = _emittedTierKeys.remove(tableNameWithType);
+ } else {
+ previousTierKeys = _emittedTierKeys.put(tableNameWithType, currentTierKeys);
+ }
+ if (previousTierKeys != null) {
+ for (String oldKey : previousTierKeys) {
+ if (!currentTierKeys.contains(oldKey)) {
+ if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
+ _controllerMetrics.removeTableGauge(tableNameWithType, oldKey,
+ ControllerGauge.TABLE_TIERED_STORAGE_SIZE);
+ }
+ }
+ }
+ }
+ }
+
+ private void clearCompressionMetrics(String tableNameWithType) {
+ if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
+ _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.TABLE_RAW_FORWARD_INDEX_SIZE_PER_REPLICA);
+ _controllerMetrics.removeTableGauge(tableNameWithType,
+ ControllerGauge.TABLE_COMPRESSED_FORWARD_INDEX_SIZE_PER_REPLICA);
+ _controllerMetrics.removeTableGauge(tableNameWithType, ControllerGauge.TABLE_COMPRESSION_RATIO_PERCENT);
+ }
+ }
+
+ /**
+ * Removes all tier-specific gauges previously emitted for the given table.
+ * Called from SegmentStatusChecker.removeMetricsForTable during both leader and non-leader cleanup,
+ * so no leader check is applied here (the caller decides when cleanup is appropriate).
+ */
+ public void clearTierMetrics(String tableNameWithType) {
+ Set previousTierKeys = _emittedTierKeys.remove(tableNameWithType);
+ if (previousTierKeys != null) {
+ for (String tierKey : previousTierKeys) {
+ _controllerMetrics.removeTableGauge(tableNameWithType, tierKey,
+ ControllerGauge.TABLE_TIERED_STORAGE_SIZE);
+ }
+ }
+ }
+
private void emitMetrics(String tableNameWithType, ControllerGauge controllerGauge, long value) {
if (_leadControllerManager.isLeaderForTable(tableNameWithType)) {
_controllerMetrics.setValueOfTableGauge(tableNameWithType, controllerGauge, value);
}
}
+ private static boolean isCompressionStatsEnabled(@Nullable TableConfig tableConfig) {
+ return tableConfig != null && tableConfig.getIndexingConfig() != null
+ && tableConfig.getIndexingConfig().isCompressionStatsEnabled();
+ }
+
//
// Reported size below indicates the sizes actually reported by servers on successful responses.
// Estimated sizes indicates the size estimated size with approximated calculations for errored servers
@@ -223,6 +322,21 @@ static public class TableSubTypeSizeDetails {
@JsonProperty("reportedSizePerReplicaInBytes")
public long _reportedSizePerReplicaInBytes = 0;
+ @Nullable
+ @JsonProperty("compressionStats")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public CompressionStats _compressionStats;
+
+ @Nullable
+ @JsonProperty("columnCompressionStats")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public List _columnCompressionStats;
+
+ @Nullable
+ @JsonProperty("storageBreakdown")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ public StorageBreakdown _storageBreakdown;
+
@JsonProperty("segments")
public Map _segments = new HashMap<>();
}
@@ -243,6 +357,52 @@ static public class SegmentSizeDetails {
public Map _serverInfo = new HashMap<>();
}
+ // Mutable accumulator used during per-server aggregation. Intentionally separate from the immutable
+ // CompressionStatsSummary DTO in pinot-common, which is only constructed once aggregation is complete.
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class CompressionStats {
+ @JsonProperty("rawForwardIndexSizePerReplicaInBytes")
+ public long _rawForwardIndexSizePerReplicaInBytes = 0;
+
+ @JsonProperty("compressedForwardIndexSizePerReplicaInBytes")
+ public long _compressedForwardIndexSizePerReplicaInBytes = 0;
+
+ @JsonProperty("compressionRatio")
+ public double _compressionRatio = 0;
+
+ @JsonProperty("segmentsWithStats")
+ public int _segmentsWithStats = 0;
+
+ @JsonProperty("totalSegments")
+ public int _totalSegments = 0;
+
+ @JsonProperty("isPartialCoverage")
+ public boolean _isPartialCoverage = false;
+ }
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class TierSizeInfo {
+ @JsonProperty("count")
+ public int _count = 0;
+
+ @JsonProperty("sizePerReplicaInBytes")
+ public long _sizePerReplicaInBytes = 0;
+
+ public TierSizeInfo() {
+ }
+
+ public TierSizeInfo(int count, long sizePerReplicaInBytes) {
+ _count = count;
+ _sizePerReplicaInBytes = sizePerReplicaInBytes;
+ }
+ }
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ public static class StorageBreakdown {
+ @JsonProperty("tiers")
+ public Map _tiers = new HashMap<>();
+ }
+
public TableSubTypeSizeDetails getTableSubtypeSize(String tableNameWithType, int timeoutMs,
boolean includeReplacedSegments)
throws InvalidConfigException {
@@ -293,22 +453,65 @@ public TableSubTypeSizeDetails getTableSubtypeSize(String tableNameWithType, int
// segments are not reflected in that count. Estimated size is what we estimate in case of
// errors, as described above.
// estimatedSize >= reportedSize. If no server reported error, estimatedSize == reportedSize
+ CompressionStats compressionStats = new CompressionStats();
+ StorageBreakdown storageBreakdown = new StorageBreakdown();
+ // Per-column aggregation: accumulate across segments (max across replicas per segment, sum across segments)
+ Map columnAccum = new HashMap<>(); // [rawSize, compressedSize]
+ Map columnCodecAgg = new HashMap<>();
+ Map columnDictMap = new HashMap<>();
+ Map> columnIndexesMap = new HashMap<>();
List missingSegments = new ArrayList<>();
for (Map.Entry entry : segmentToSizeDetailsMap.entrySet()) {
String segment = entry.getKey();
SegmentSizeDetails sizeDetails = entry.getValue();
- // Iterate over all segment size info, update reported size, track max segment size and number of errored servers
+ // Iterate over all segment size info: update reported size, track max segment size,
+ // count errored servers, and track max raw/compressed forward index sizes across replicas.
sizeDetails._maxReportedSizePerReplicaInBytes = DEFAULT_SIZE_WHEN_MISSING_OR_ERROR;
int errors = 0;
+ long maxRawFwdIndexSize = 0;
+ long maxCompressedFwdIndexSize = 0;
+ String segmentTier = null;
+ // Track per-column max stats across replicas for this segment
+ Map perColumnMax = new HashMap<>(); // [rawSize, compressedSize]
+ Map perColumnCodec = new HashMap<>();
for (SegmentSizeInfo sizeInfo : sizeDetails._serverInfo.values()) {
if (sizeInfo.getDiskSizeInBytes() != DEFAULT_SIZE_WHEN_MISSING_OR_ERROR) {
sizeDetails._reportedSizeInBytes += sizeInfo.getDiskSizeInBytes();
sizeDetails._maxReportedSizePerReplicaInBytes =
Math.max(sizeDetails._maxReportedSizePerReplicaInBytes, sizeInfo.getDiskSizeInBytes());
+ if (sizeInfo.getRawForwardIndexSizeBytes() > 0) {
+ maxRawFwdIndexSize = Math.max(maxRawFwdIndexSize, sizeInfo.getRawForwardIndexSizeBytes());
+ }
+ if (sizeInfo.getCompressedForwardIndexSizeBytes() > 0) {
+ maxCompressedFwdIndexSize =
+ Math.max(maxCompressedFwdIndexSize, sizeInfo.getCompressedForwardIndexSizeBytes());
+ }
+ if (sizeInfo.getTier() != null) {
+ segmentTier = sizeInfo.getTier();
+ }
+ // Track per-column stats (max across replicas)
+ Map colStats = sizeInfo.getColumnCompressionStats();
+ if (colStats != null) {
+ for (Map.Entry colEntry : colStats.entrySet()) {
+ String colName = colEntry.getKey();
+ ColumnCompressionStatsInfo colInfo = colEntry.getValue();
+ long[] maxVals = perColumnMax.computeIfAbsent(colName, k -> new long[2]);
+ if (colInfo.getUncompressedSizeInBytes() > 0) {
+ maxVals[0] = Math.max(maxVals[0], colInfo.getUncompressedSizeInBytes());
+ }
+ if (colInfo.getCompressedSizeInBytes() > 0) {
+ maxVals[1] = Math.max(maxVals[1], colInfo.getCompressedSizeInBytes());
+ }
+ if (colInfo.getCodec() != null) {
+ perColumnCodec.put(colName, colInfo.getCodec());
+ }
+ }
+ }
} else {
errors++;
}
}
+
// Update estimated size, track segments that are missing from all servers
if (errors != sizeDetails._serverInfo.size()) {
// Use max segment size from other servers to estimate the segment size not reported
@@ -317,6 +520,47 @@ public TableSubTypeSizeDetails getTableSubtypeSize(String tableNameWithType, int
subTypeSizeDetails._reportedSizeInBytes += sizeDetails._reportedSizeInBytes;
subTypeSizeDetails._estimatedSizeInBytes += sizeDetails._estimatedSizeInBytes;
subTypeSizeDetails._reportedSizePerReplicaInBytes += sizeDetails._maxReportedSizePerReplicaInBytes;
+
+ // Aggregate forward index compression stats (per-replica max)
+ if (maxRawFwdIndexSize > 0 && maxCompressedFwdIndexSize > 0) {
+ compressionStats._rawForwardIndexSizePerReplicaInBytes += maxRawFwdIndexSize;
+ compressionStats._compressedForwardIndexSizePerReplicaInBytes += maxCompressedFwdIndexSize;
+ compressionStats._segmentsWithStats++;
+ }
+
+ // Accumulate per-column compression stats across segments
+ for (Map.Entry colEntry : perColumnMax.entrySet()) {
+ String colName = colEntry.getKey();
+ long[] maxVals = colEntry.getValue();
+ long[] accum = columnAccum.computeIfAbsent(colName, k -> new long[2]);
+ accum[0] += maxVals[0];
+ accum[1] += maxVals[1];
+ String segmentCodec = perColumnCodec.get(colName);
+ if (segmentCodec != null) {
+ columnCodecAgg.merge(colName, segmentCodec,
+ (existing, incoming) -> existing.equals(incoming) ? existing : "MIXED");
+ }
+ }
+ // Track per-column dictionary/indexes from per-segment server info
+ for (SegmentSizeInfo sizeInfo : sizeDetails._serverInfo.values()) {
+ Map colStats = sizeInfo.getColumnCompressionStats();
+ if (colStats != null) {
+ for (Map.Entry colEntry : colStats.entrySet()) {
+ ColumnCompressionStatsInfo colInfo = colEntry.getValue();
+ columnDictMap.putIfAbsent(colEntry.getKey(), colInfo.hasDictionary());
+ if (colInfo.getIndexes() != null) {
+ columnIndexesMap.computeIfAbsent(colEntry.getKey(), k -> new LinkedHashSet<>())
+ .addAll(colInfo.getIndexes());
+ }
+ }
+ }
+ }
+
+ // Aggregate tier-based storage breakdown
+ String tierKey = segmentTier != null ? segmentTier : "default";
+ TierSizeInfo tierInfo = storageBreakdown._tiers.computeIfAbsent(tierKey, k -> new TierSizeInfo());
+ tierInfo._count++;
+ tierInfo._sizePerReplicaInBytes += sizeDetails._maxReportedSizePerReplicaInBytes;
} else {
// Segment is missing from all servers
missingSegments.add(segment);
@@ -327,6 +571,44 @@ public TableSubTypeSizeDetails getTableSubtypeSize(String tableNameWithType, int
}
}
+ // Compute compression ratio and coverage stats
+ compressionStats._totalSegments = segmentToSizeDetailsMap.size();
+ int nonMissingSegments = compressionStats._totalSegments - subTypeSizeDetails._missingSegments;
+ compressionStats._isPartialCoverage = compressionStats._segmentsWithStats < nonMissingSegments;
+ if (compressionStats._compressedForwardIndexSizePerReplicaInBytes > 0) {
+ compressionStats._compressionRatio =
+ (double) compressionStats._rawForwardIndexSizePerReplicaInBytes
+ / compressionStats._compressedForwardIndexSizePerReplicaInBytes;
+ }
+ // Build per-column compression stats list from accumulated data
+ List columnStatsList = null;
+ if (!columnAccum.isEmpty()) {
+ columnStatsList = new ArrayList<>();
+ for (Map.Entry colEntry : columnAccum.entrySet()) {
+ String colName = colEntry.getKey();
+ long[] accum = colEntry.getValue();
+ boolean hasDictionary = Boolean.TRUE.equals(columnDictMap.get(colName));
+ long uncompressed = (hasDictionary && accum[0] == 0) ? -1 : accum[0];
+ long compressed = accum[1];
+ double ratio = (uncompressed > 0 && compressed > 0) ? (double) uncompressed / compressed : 0;
+ Set indexSet = columnIndexesMap.get(colName);
+ List indexes = (indexSet != null && !indexSet.isEmpty()) ? new ArrayList<>(indexSet) : null;
+ columnStatsList.add(new ColumnCompressionStatsInfo(colName, uncompressed, compressed, ratio,
+ columnCodecAgg.get(colName), hasDictionary, indexes));
+ }
+ columnStatsList.sort((a, b) -> a.getColumn().compareTo(b.getColumn()));
+ }
+ subTypeSizeDetails._columnCompressionStats = columnStatsList;
+
+ // Suppress table-level compression stats when no segments have raw forward index data,
+ // but keep per-column stats (dict columns may still have valid forward index size data)
+ if (compressionStats._segmentsWithStats > 0) {
+ subTypeSizeDetails._compressionStats = compressionStats;
+ } else {
+ subTypeSizeDetails._compressionStats = null;
+ }
+ subTypeSizeDetails._storageBreakdown = storageBreakdown._tiers.isEmpty() ? null : storageBreakdown;
+
// Update metrics for missing segments
if (subTypeSizeDetails._missingSegments > 0) {
int numSegments = segmentToSizeDetailsMap.size();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ServerTableSizeReaderRawBytesTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ServerTableSizeReaderRawBytesTest.java
new file mode 100644
index 000000000000..6b4dc305736a
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ServerTableSizeReaderRawBytesTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.pinot.common.restlet.resources.ColumnCompressionStatsInfo;
+import org.apache.pinot.common.restlet.resources.SegmentSizeInfo;
+import org.apache.pinot.common.restlet.resources.TableSizeInfo;
+import org.apache.pinot.controller.api.resources.ServerTableSizeReader;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+/**
+ * Tests that ServerTableSizeReader correctly deserializes SegmentSizeInfo with compression stats fields
+ * (rawForwardIndexSizeBytes, compressedForwardIndexSizeBytes, tier, columnCompressionStats).
+ */
+public class ServerTableSizeReaderRawBytesTest {
+ private static final String URI_PATH = "/table/";
+ private static final int TIMEOUT_MSEC = 5000;
+ private static final int PORT_WITH_STATS = 11100;
+ private static final int PORT_WITHOUT_STATS = 11101;
+ private static final int PORT_ERROR = 11102;
+
+ private final ExecutorService _executor = Executors.newFixedThreadPool(2);
+ private final PoolingHttpClientConnectionManager _connectionManager = new PoolingHttpClientConnectionManager();
+ private HttpServer _serverWithStats;
+ private HttpServer _serverWithoutStats;
+ private HttpServer _serverError;
+
+ @BeforeClass
+ public void setUp()
+ throws IOException {
+ // Server with compression stats
+ Map colStats = new HashMap<>();
+ colStats.put("col_a", new ColumnCompressionStatsInfo("col_a", 10000, 2000, 5.0, "LZ4", false,
+ List.of("forward_index")));
+ colStats.put("col_b", new ColumnCompressionStatsInfo("col_b", 20000, 5000, 4.0, "ZSTANDARD", false,
+ List.of("forward_index")));
+
+ List statsSegments = Arrays.asList(
+ new SegmentSizeInfo("s1", 50000, 30000, 7000, "default", colStats),
+ new SegmentSizeInfo("s2", 40000, 15000, 3000, "tier1", null));
+ TableSizeInfo statsTable = new TableSizeInfo("testTable", 90000, statsSegments);
+
+ _serverWithStats = startServer(PORT_WITH_STATS, createHandler(200, statsTable));
+
+ // Server without compression stats (backward compat)
+ List noStatsSegments = Arrays.asList(new SegmentSizeInfo("s3", 60000));
+ TableSizeInfo noStatsTable = new TableSizeInfo("testTable", 60000, noStatsSegments);
+ _serverWithoutStats = startServer(PORT_WITHOUT_STATS, createHandler(200, noStatsTable));
+
+ // Server returning 500
+ _serverError = startServer(PORT_ERROR, createHandler(500, null));
+ }
+
+ @AfterClass
+ public void tearDown() {
+ if (_serverWithStats != null) {
+ _serverWithStats.stop(0);
+ }
+ if (_serverWithoutStats != null) {
+ _serverWithoutStats.stop(0);
+ }
+ if (_serverError != null) {
+ _serverError.stop(0);
+ }
+ }
+
+ @Test
+ public void testDeserializesNewFields() {
+ ServerTableSizeReader reader = new ServerTableSizeReader(_executor, _connectionManager);
+ BiMap endpoints = HashBiMap.create();
+ endpoints.put("server0", "http://localhost:" + PORT_WITH_STATS);
+
+ Map> result =
+ reader.getSegmentSizeInfoFromServers(endpoints, "testTable", TIMEOUT_MSEC);
+ assertEquals(result.size(), 1);
+
+ List segments = result.get("server0");
+ assertNotNull(segments);
+ assertEquals(segments.size(), 2);
+
+ // s1 has compression stats
+ SegmentSizeInfo s1 = segments.get(0);
+ assertEquals(s1.getSegmentName(), "s1");
+ assertEquals(s1.getDiskSizeInBytes(), 50000);
+ assertEquals(s1.getRawForwardIndexSizeBytes(), 30000);
+ assertEquals(s1.getCompressedForwardIndexSizeBytes(), 7000);
+ assertEquals(s1.getTier(), "default");
+
+ Map colStats = s1.getColumnCompressionStats();
+ assertNotNull(colStats);
+ assertEquals(colStats.size(), 2);
+ assertEquals(colStats.get("col_a").getColumn(), "col_a");
+ assertEquals(colStats.get("col_a").getUncompressedSizeInBytes(), 10000);
+ assertEquals(colStats.get("col_a").getCompressedSizeInBytes(), 2000);
+ assertEquals(colStats.get("col_a").getCompressionRatio(), 5.0, 0.01);
+ assertEquals(colStats.get("col_a").getCodec(), "LZ4");
+ assertFalse(colStats.get("col_a").hasDictionary());
+
+ // s2 has tier but no column stats
+ SegmentSizeInfo s2 = segments.get(1);
+ assertEquals(s2.getTier(), "tier1");
+ assertEquals(s2.getRawForwardIndexSizeBytes(), 15000);
+ }
+
+ @Test
+ public void testBackwardCompatWithoutNewFields() {
+ ServerTableSizeReader reader = new ServerTableSizeReader(_executor, _connectionManager);
+ BiMap endpoints = HashBiMap.create();
+ endpoints.put("server1", "http://localhost:" + PORT_WITHOUT_STATS);
+
+ Map> result =
+ reader.getSegmentSizeInfoFromServers(endpoints, "testTable", TIMEOUT_MSEC);
+ assertEquals(result.size(), 1);
+
+ List segments = result.get("server1");
+ assertNotNull(segments);
+ assertEquals(segments.size(), 1);
+
+ SegmentSizeInfo s3 = segments.get(0);
+ assertEquals(s3.getSegmentName(), "s3");
+ assertEquals(s3.getDiskSizeInBytes(), 60000);
+ // Default values for missing fields (-1 indicates not available)
+ assertEquals(s3.getRawForwardIndexSizeBytes(), -1);
+ assertEquals(s3.getCompressedForwardIndexSizeBytes(), -1);
+ }
+
+ @Test
+ public void testErrorServerExcluded() {
+ ServerTableSizeReader reader = new ServerTableSizeReader(_executor, _connectionManager);
+ BiMap endpoints = HashBiMap.create();
+ endpoints.put("server0", "http://localhost:" + PORT_WITH_STATS);
+ endpoints.put("server_err", "http://localhost:" + PORT_ERROR);
+
+ Map> result =
+ reader.getSegmentSizeInfoFromServers(endpoints, "testTable", TIMEOUT_MSEC);
+ // Error server should be excluded
+ assertTrue(result.containsKey("server0"));
+ assertFalse(result.containsKey("server_err"));
+ }
+
+ private HttpHandler createHandler(int status, TableSizeInfo tableSize) {
+ return httpExchange -> {
+ String json = tableSize != null ? JsonUtils.objectToString(tableSize) : "error";
+ httpExchange.sendResponseHeaders(status, json.length());
+ OutputStream responseBody = httpExchange.getResponseBody();
+ responseBody.write(json.getBytes());
+ responseBody.close();
+ };
+ }
+
+ private HttpServer startServer(int port, HttpHandler handler)
+ throws IOException {
+ HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
+ server.createContext(URI_PATH, handler);
+ new Thread(server::start).start();
+ return server;
+ }
+}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableMetadataReaderCompressionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableMetadataReaderCompressionTest.java
new file mode 100644
index 000000000000..322792cfd36d
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableMetadataReaderCompressionTest.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.pinot.common.restlet.resources.ColumnCompressionStatsInfo;
+import org.apache.pinot.common.restlet.resources.CompressionStatsSummary;
+import org.apache.pinot.common.restlet.resources.StorageBreakdownInfo;
+import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
+import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+/**
+ * Tests that per-column compression stats are correctly aggregated across servers in the metadata endpoint
+ * (ServerSegmentMetadataReader.getAggregatedTableMetadataFromServer).
+ */
+public class TableMetadataReaderCompressionTest {
+ private static final int PORT_SERVER0 = 11200;
+ private static final int PORT_SERVER1 = 11201;
+ private static final int TIMEOUT_MSEC = 10000;
+ private static final int NUM_REPLICAS = 2;
+
+ private final ExecutorService _executor = Executors.newFixedThreadPool(2);
+ private final PoolingHttpClientConnectionManager _connectionManager = new PoolingHttpClientConnectionManager();
+ private HttpServer _httpServer0;
+ private HttpServer _httpServer1;
+
+ @BeforeClass
+ public void setUp()
+ throws IOException {
+ // Server 0: has compression stats for col_a and col_b
+ List colStats0 = new ArrayList<>();
+ colStats0.add(new ColumnCompressionStatsInfo("col_a", 10000, 2000, 5.0, "LZ4", false,
+ List.of("forward_index")));
+ colStats0.add(new ColumnCompressionStatsInfo("col_b", 20000, 5000, 4.0, "ZSTANDARD", false,
+ List.of("forward_index", "inverted_index")));
+
+ TableMetadataInfo server0Info = new TableMetadataInfo("testTable_OFFLINE", 50000, 3, 1000,
+ Map.of("col_a", 4.0, "col_b", 100.0),
+ Map.of("col_a", 50.0, "col_b", 200.0),
+ Map.of(), Map.of(), Map.of(), colStats0);
+
+ _httpServer0 = startServer(PORT_SERVER0, createHandler(server0Info));
+
+ // Server 1 (replica): same compression stats
+ List colStats1 = new ArrayList<>();
+ colStats1.add(new ColumnCompressionStatsInfo("col_a", 10000, 2000, 5.0, "LZ4", false,
+ List.of("forward_index")));
+ colStats1.add(new ColumnCompressionStatsInfo("col_b", 20000, 5000, 4.0, "ZSTANDARD", false,
+ List.of("forward_index", "inverted_index")));
+
+ TableMetadataInfo server1Info = new TableMetadataInfo("testTable_OFFLINE", 50000, 3, 1000,
+ Map.of("col_a", 4.0, "col_b", 100.0),
+ Map.of("col_a", 50.0, "col_b", 200.0),
+ Map.of(), Map.of(), Map.of(), colStats1);
+
+ _httpServer1 = startServer(PORT_SERVER1, createHandler(server1Info));
+ }
+
+ @AfterClass
+ public void tearDown() {
+ if (_httpServer0 != null) {
+ _httpServer0.stop(0);
+ }
+ if (_httpServer1 != null) {
+ _httpServer1.stop(0);
+ }
+ }
+
+ @Test
+ public void testColumnCompressionStatsAggregation() {
+ ServerSegmentMetadataReader reader = new ServerSegmentMetadataReader(_executor, _connectionManager);
+ BiMap endpoints = HashBiMap.create();
+ endpoints.put("server0", "http://localhost:" + PORT_SERVER0);
+ endpoints.put("server1", "http://localhost:" + PORT_SERVER1);
+
+ TableMetadataInfo result = reader.getAggregatedTableMetadataFromServer(
+ "testTable_OFFLINE", endpoints, null, NUM_REPLICAS, TIMEOUT_MSEC, true);
+
+ assertNotNull(result);
+ // Disk size divided by replicas: (50000+50000) / 2 = 50000
+ assertEquals(result.getDiskSizeInBytes(), 50000);
+
+ // Per-column compression stats should be aggregated and divided by replicas
+ List colStats = result.getColumnCompressionStats();
+ assertNotNull(colStats);
+ assertEquals(colStats.size(), 2);
+
+ // Results are sorted by column name
+ ColumnCompressionStatsInfo colA = colStats.get(0);
+ assertNotNull(colA);
+ assertEquals(colA.getColumn(), "col_a");
+ // (10000+10000)/2 = 10000 uncompressed, (2000+2000)/2 = 2000 compressed
+ assertEquals(colA.getUncompressedSizeInBytes(), 10000);
+ assertEquals(colA.getCompressedSizeInBytes(), 2000);
+ assertEquals(colA.getCompressionRatio(), 5.0, 0.01);
+ assertEquals(colA.getCodec(), "LZ4");
+ assertFalse(colA.hasDictionary());
+
+ ColumnCompressionStatsInfo colB = colStats.get(1);
+ assertNotNull(colB);
+ assertEquals(colB.getColumn(), "col_b");
+ // (20000+20000)/2 = 20000 uncompressed, (5000+5000)/2 = 5000 compressed
+ assertEquals(colB.getUncompressedSizeInBytes(), 20000);
+ assertEquals(colB.getCompressedSizeInBytes(), 5000);
+ assertEquals(colB.getCompressionRatio(), 4.0, 0.01);
+ assertEquals(colB.getCodec(), "ZSTANDARD");
+ assertFalse(colB.hasDictionary());
+ }
+
+ @Test
+ public void testNoCompressionStatsFromServers() {
+ // Server with no compression stats (old server)
+ ServerSegmentMetadataReader reader = new ServerSegmentMetadataReader(_executor, _connectionManager);
+
+ // Create a temporary server without compression stats
+ HttpServer noStatsServer = null;
+ try {
+ TableMetadataInfo noStatsInfo = new TableMetadataInfo("testTable_OFFLINE", 30000, 2, 500,
+ Map.of("col_a", 4.0), Map.of("col_a", 50.0), Map.of(), Map.of(), Map.of());
+ noStatsServer = startServer(11210, createHandler(noStatsInfo));
+
+ BiMap endpoints = HashBiMap.create();
+ endpoints.put("old_server", "http://localhost:11210");
+
+ TableMetadataInfo result = reader.getAggregatedTableMetadataFromServer(
+ "testTable_OFFLINE", endpoints, null, 1, TIMEOUT_MSEC, true);
+
+ assertNotNull(result);
+ // No compression stats should result in null list
+ assertNull(result.getColumnCompressionStats());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (noStatsServer != null) {
+ noStatsServer.stop(0);
+ }
+ }
+ }
+
+ @Test
+ public void testCompressionStatsSuppressedWhenFlagOff() {
+ ServerSegmentMetadataReader reader = new ServerSegmentMetadataReader(_executor, _connectionManager);
+ BiMap endpoints = HashBiMap.create();
+ endpoints.put("server0", "http://localhost:" + PORT_SERVER0);
+ endpoints.put("server1", "http://localhost:" + PORT_SERVER1);
+
+ // Flag OFF: compression stats and columnCompressionStats should be null,
+ // but storageBreakdown should still be preserved
+ TableMetadataInfo result = reader.getAggregatedTableMetadataFromServer(
+ "testTable_OFFLINE", endpoints, null, NUM_REPLICAS, TIMEOUT_MSEC, false);
+
+ assertNotNull(result);
+ assertNull(result.getColumnCompressionStats(), "columnCompressionStats should be null when flag is OFF");
+ assertNull(result.getCompressionStats(), "compressionStats should be null when flag is OFF");
+ // storageBreakdown is always-on; it is null here only because test servers don't send it
+ }
+
+ @Test
+ public void testCompressionSummaryAndStorageBreakdownAggregation()
+ throws IOException {
+ // Build a server response that includes CompressionStatsSummary and StorageBreakdownInfo
+ Map tiers = new HashMap<>();
+ tiers.put("default", new StorageBreakdownInfo.TierInfo(3, 150000));
+ tiers.put("cold", new StorageBreakdownInfo.TierInfo(1, 60000));
+ StorageBreakdownInfo breakdown = new StorageBreakdownInfo(tiers);
+
+ List colStats = new ArrayList<>();
+ colStats.add(new ColumnCompressionStatsInfo("col_a", 20000, 4000, 5.0, "LZ4", false, null));
+
+ CompressionStatsSummary summary = new CompressionStatsSummary(20000, 4000, 5.0, 3, 3, false);
+
+ TableMetadataInfo info = new TableMetadataInfo("testTable_OFFLINE", 200000, 4, 2000,
+ Map.of("col_a", 4.0), Map.of("col_a", 50.0),
+ Map.of(), Map.of(), Map.of(), colStats, summary, breakdown);
+
+ HttpServer summaryServer = startServer(11215, createHandler(info));
+ try {
+ ServerSegmentMetadataReader reader = new ServerSegmentMetadataReader(_executor, _connectionManager);
+ BiMap endpoints = HashBiMap.create();
+ endpoints.put("srv", "http://localhost:11215");
+
+ TableMetadataInfo result = reader.getAggregatedTableMetadataFromServer(
+ "testTable_OFFLINE", endpoints, null, 1, TIMEOUT_MSEC, true);
+
+ assertNotNull(result);
+
+ // CompressionStatsSummary should be aggregated and returned
+ CompressionStatsSummary resultSummary = result.getCompressionStats();
+ assertNotNull(resultSummary, "compressionStats should be aggregated from server response");
+ assertEquals(resultSummary.getRawForwardIndexSizePerReplicaInBytes(), 20000);
+ assertEquals(resultSummary.getCompressedForwardIndexSizePerReplicaInBytes(), 4000);
+ assertEquals(resultSummary.getCompressionRatio(), 5.0, 0.01);
+ assertEquals(resultSummary.getSegmentsWithStats(), 3);
+ assertEquals(resultSummary.getTotalSegments(), 3);
+ assertFalse(resultSummary.isPartialCoverage());
+
+ // StorageBreakdownInfo should be aggregated and divided by numReplica (1 here)
+ StorageBreakdownInfo resultBreakdown = result.getStorageBreakdown();
+ assertNotNull(resultBreakdown, "storageBreakdown should be aggregated from server response");
+ assertNotNull(resultBreakdown.getTiers());
+ assertEquals(resultBreakdown.getTiers().size(), 2);
+ StorageBreakdownInfo.TierInfo defaultTier = resultBreakdown.getTiers().get("default");
+ assertNotNull(defaultTier);
+ assertEquals(defaultTier.getCount(), 3);
+ assertEquals(defaultTier.getSizePerReplicaInBytes(), 150000);
+ } finally {
+ summaryServer.stop(0);
+ }
+ }
+
+ @Test
+ public void testDictColumnSentinelAndSkipPath()
+ throws IOException {
+ // Dict column: uncompressed=-1 sentinel, codec=null, hasDictionary=true → preserved
+ // Old raw column: uncompressed=0, codec=null, hasDictionary=false → skipped
+ List colStats = new ArrayList<>();
+ colStats.add(new ColumnCompressionStatsInfo("dict_col", -1, 8000, 0.0, null, true,
+ List.of("forward_index")));
+ colStats.add(new ColumnCompressionStatsInfo("old_raw_col", 0, 5000, 0.0, null, false, null));
+
+ TableMetadataInfo info = new TableMetadataInfo("testTable_OFFLINE", 100000, 2, 1000,
+ Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), colStats);
+
+ HttpServer server = startServer(11216, createHandler(info));
+ try {
+ ServerSegmentMetadataReader reader = new ServerSegmentMetadataReader(_executor, _connectionManager);
+ BiMap endpoints = HashBiMap.create();
+ endpoints.put("srv", "http://localhost:11216");
+
+ TableMetadataInfo result = reader.getAggregatedTableMetadataFromServer(
+ "testTable_OFFLINE", endpoints, null, 1, TIMEOUT_MSEC, true);
+
+ assertNotNull(result);
+ List stats = result.getColumnCompressionStats();
+ assertNotNull(stats);
+ // old_raw_col (codec=null, hasDictionary=false) must be skipped
+ assertEquals(stats.size(), 1);
+ ColumnCompressionStatsInfo dictColInfo = stats.get(0);
+ assertEquals(dictColInfo.getColumn(), "dict_col");
+ // dict column: sentinel -1 preserved (not divided as 0)
+ assertEquals(dictColInfo.getUncompressedSizeInBytes(), -1);
+ assertEquals(dictColInfo.getCompressedSizeInBytes(), 8000);
+ assertTrue(dictColInfo.hasDictionary());
+ } finally {
+ server.stop(0);
+ }
+ }
+
+ private HttpHandler createHandler(TableMetadataInfo info) {
+ return httpExchange -> {
+ String json = JsonUtils.objectToString(info);
+ httpExchange.sendResponseHeaders(200, json.length());
+ OutputStream responseBody = httpExchange.getResponseBody();
+ responseBody.write(json.getBytes());
+ responseBody.close();
+ };
+ }
+
+ private HttpServer startServer(int port, HttpHandler handler)
+ throws IOException {
+ HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
+ server.createContext("/tables/", handler);
+ new Thread(server::start).start();
+ return server;
+ }
+}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderCompressionStatsTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderCompressionStatsTest.java
new file mode 100644
index 000000000000..481cbda7e642
--- /dev/null
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderCompressionStatsTest.java
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.sun.net.httpserver.HttpHandler;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.helix.AccessOption;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.MetricValueUtils;
+import org.apache.pinot.common.restlet.resources.ColumnCompressionStatsInfo;
+import org.apache.pinot.common.restlet.resources.SegmentSizeInfo;
+import org.apache.pinot.common.restlet.resources.TableSizeInfo;
+import org.apache.pinot.common.utils.config.TableConfigSerDeUtils;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
+import org.apache.pinot.controller.utils.FakeHttpServer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.mockito.ArgumentMatchers;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+
+/**
+ * Tests compression stats aggregation in TableSizeReader.
+ */
+public class TableSizeReaderCompressionStatsTest {
+ private static final String URI_PATH = "/table/";
+ private static final int TIMEOUT_MSEC = 10000;
+ private static final int NUM_REPLICAS = 2;
+
+ private final Executor _executor = Executors.newFixedThreadPool(1);
+ private final HttpClientConnectionManager _connectionManager = new PoolingHttpClientConnectionManager();
+ private final ControllerMetrics _controllerMetrics =
+ new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+ private final Map _serverMap = new HashMap<>();
+ private PinotHelixResourceManager _helix;
+ private LeadControllerManager _leadControllerManager;
+
+ @BeforeClass
+ public void setUp()
+ throws IOException {
+ _helix = mock(PinotHelixResourceManager.class);
+ _leadControllerManager = mock(LeadControllerManager.class);
+
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("compressionTable").setNumReplicas(NUM_REPLICAS).build();
+ tableConfig.getIndexingConfig().setCompressionStatsEnabled(true);
+
+ TableConfig flagOffTableConfig =
+ new TableConfigBuilder(TableType.OFFLINE).setTableName("flagOffTable").setNumReplicas(NUM_REPLICAS).build();
+ // compressionStatsEnabled defaults to false — do NOT enable it
+
+ ZkHelixPropertyStore mockPropertyStore = mock(ZkHelixPropertyStore.class);
+
+ when(mockPropertyStore.get(ArgumentMatchers.anyString(), ArgumentMatchers.eq(null),
+ ArgumentMatchers.eq(AccessOption.PERSISTENT))).thenAnswer((Answer) invocationOnMock -> {
+ String path = (String) invocationOnMock.getArguments()[0];
+ if (path.contains("offline_OFFLINE")) {
+ return TableConfigSerDeUtils.toZNRecord(tableConfig);
+ }
+ if (path.contains("flagOffTable_OFFLINE")) {
+ return TableConfigSerDeUtils.toZNRecord(flagOffTableConfig);
+ }
+ return null;
+ });
+
+ when(_helix.getPropertyStore()).thenReturn(mockPropertyStore);
+ when(_helix.getNumReplicas(any(TableConfig.class))).thenReturn(NUM_REPLICAS);
+ when(_leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+
+ // server0: segment s1 and s2 with compression stats
+ Map s1ColStats = new HashMap<>();
+ s1ColStats.put("col_a", new ColumnCompressionStatsInfo("col_a", 10000, 2000, 5.0, "LZ4", false, null));
+ s1ColStats.put("col_b", new ColumnCompressionStatsInfo("col_b", 20000, 5000, 4.0, "ZSTANDARD", false, null));
+
+ Map s2ColStats = new HashMap<>();
+ s2ColStats.put("col_a", new ColumnCompressionStatsInfo("col_a", 15000, 3000, 5.0, "LZ4", false, null));
+
+ List server0Sizes = Arrays.asList(
+ new SegmentSizeInfo("s1", 50000, 30000, 7000, "default", s1ColStats),
+ new SegmentSizeInfo("s2", 40000, 15000, 3000, "default", s2ColStats));
+ FakeCompressionServer s0 = new FakeCompressionServer(Arrays.asList("s1", "s2"), server0Sizes);
+ s0.start(URI_PATH, createHandler(200, server0Sizes));
+ _serverMap.put("server0", s0);
+
+ // server1: segment s1 and s2 (replica) with same stats
+ List server1Sizes = Arrays.asList(
+ new SegmentSizeInfo("s1", 50000, 30000, 7000, "default", s1ColStats),
+ new SegmentSizeInfo("s2", 40000, 15000, 3000, "default", s2ColStats));
+ FakeCompressionServer s1 = new FakeCompressionServer(Arrays.asList("s1", "s2"), server1Sizes);
+ s1.start(URI_PATH, createHandler(200, server1Sizes));
+ _serverMap.put("server1", s1);
+
+ // server2: segment s3 without compression stats (old server)
+ List server2Sizes = Arrays.asList(new SegmentSizeInfo("s3", 60000));
+ FakeCompressionServer s2 = new FakeCompressionServer(Arrays.asList("s3"), server2Sizes);
+ s2.start(URI_PATH, createHandler(200, server2Sizes));
+ _serverMap.put("server2", s2);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ for (FakeCompressionServer server : _serverMap.values()) {
+ server.stop();
+ }
+ }
+
+ private HttpHandler createHandler(int status, List segmentSizes) {
+ return httpExchange -> {
+ long tableSizeInBytes = 0;
+ for (SegmentSizeInfo segmentSize : segmentSizes) {
+ tableSizeInBytes += segmentSize.getDiskSizeInBytes();
+ }
+ TableSizeInfo tableInfo = new TableSizeInfo("compressionTable", tableSizeInBytes, segmentSizes);
+ String json = JsonUtils.objectToString(tableInfo);
+ httpExchange.sendResponseHeaders(status, json.length());
+ OutputStream responseBody = httpExchange.getResponseBody();
+ responseBody.write(json.getBytes());
+ responseBody.close();
+ };
+ }
+
+ private static class FakeCompressionServer extends FakeHttpServer {
+ final List _segments;
+ final List _sizes;
+
+ FakeCompressionServer(List segments, List sizes) {
+ _segments = segments;
+ _sizes = sizes;
+ }
+ }
+
+ private TableSizeReader.TableSizeDetails testRunner(String[] servers, String table)
+ throws InvalidConfigException {
+ when(_helix.getServerToSegmentsMap(anyString(), any(), anyBoolean())).thenAnswer(
+ (Answer