Skip to content
Open
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
52421b9
Update AbstractMetrics.java
rsrkpatwari1234 Apr 12, 2026
dcf4bd9
Update IngestionDelayTracker.java
rsrkpatwari1234 Apr 12, 2026
6ef8ada
Update RealtimeTableDataManager.java
rsrkpatwari1234 Apr 12, 2026
b7e6a3e
Update IngestionDelayTrackerTest.java
rsrkpatwari1234 Apr 12, 2026
4526485
Update PinotPrometheusMetricsTest.java
rsrkpatwari1234 Apr 12, 2026
36237ef
Update ServerPrometheusMetricsTest.java
rsrkpatwari1234 Apr 12, 2026
8aad969
Update AbstractMetricsTest.java
rsrkpatwari1234 Apr 12, 2026
2ca726c
Update AbstractMetrics.java
rsrkpatwari1234 Apr 12, 2026
9922d54
Update AbstractMetricsTest.java
rsrkpatwari1234 Apr 12, 2026
b6fa133
Update PinotPrometheusMetricsTest.java
rsrkpatwari1234 Apr 12, 2026
4597452
Update IngestionDelayTracker.java
rsrkpatwari1234 Apr 12, 2026
ab8ced7
Update RealtimeTableDataManager.java
rsrkpatwari1234 Apr 12, 2026
ff5bd09
Update IngestionDelayTracker.java
rsrkpatwari1234 Apr 12, 2026
fc7a92d
Update IngestionDelayTrackerTest.java
rsrkpatwari1234 Apr 12, 2026
2112cf6
Update RealtimeSegmentDataManager.java
rsrkpatwari1234 Apr 12, 2026
fb55606
Update RealtimeTableDataManager.java
rsrkpatwari1234 Apr 12, 2026
dfeadfa
Update IngestionDelayTrackerTest.java
rsrkpatwari1234 Apr 12, 2026
8e239e9
Update IngestionDelayTrackerTest.java
rsrkpatwari1234 Apr 12, 2026
264cbbb
Update IngestionDelayTrackerTest.java
rsrkpatwari1234 Apr 12, 2026
aaf74b1
Update IngestionDelayTrackerTest.java
rsrkpatwari1234 Apr 13, 2026
5e0819c
Update IngestionDelayTracker.java
rsrkpatwari1234 Apr 14, 2026
7a87040
Update IngestionDelayTrackerTest.java
rsrkpatwari1234 Apr 14, 2026
5f89f3e
Update AbstractMetrics.java
rsrkpatwari1234 Apr 20, 2026
e32e45d
Update IngestionDelayTracker.java
rsrkpatwari1234 Apr 20, 2026
23630e0
Update IngestionDelayTracker.java
rsrkpatwari1234 Apr 20, 2026
128854d
Update AbstractMetricsTest.java
rsrkpatwari1234 Apr 20, 2026
7bc3550
Merge branch 'apache:master' into rsrkpatwari1234-issue-18099
rsrkpatwari1234 Apr 24, 2026
71abbd0
Update IngestionConfigUtils.java
rsrkpatwari1234 May 1, 2026
41eca2b
Merge branch 'apache:master' into rsrkpatwari1234-issue-18099
rsrkpatwari1234 May 1, 2026
0c11831
Update IngestionConfigUtilsTest.java
rsrkpatwari1234 May 1, 2026
1a31661
Update RealtimeSegmentDataManager.java
rsrkpatwari1234 May 1, 2026
a29fa6a
Update AbstractMetrics.java
rsrkpatwari1234 May 1, 2026
647554d
Update AbstractMetrics.java
rsrkpatwari1234 May 1, 2026
fe0d33c
Update RealtimeSegmentDataManager.java
rsrkpatwari1234 May 1, 2026
bb45a22
Update RealtimeSegmentDataManager.java
rsrkpatwari1234 May 1, 2026
7fc4a19
Update IngestionDelayTracker.java
rsrkpatwari1234 May 1, 2026
1d6b8a5
Update RealtimeSegmentDataManager.java
rsrkpatwari1234 May 1, 2026
71ea605
Merge branch 'apache:master' into rsrkpatwari1234-issue-18099
rsrkpatwari1234 May 19, 2026
5a5e392
Update AbstractMetrics.java
rsrkpatwari1234 May 19, 2026
09fffac
Update PinotPrometheusMetricsTest.java
rsrkpatwari1234 May 19, 2026
5d24d7e
Update ServerPrometheusMetricsTest.java
rsrkpatwari1234 May 19, 2026
ef97f39
Update AbstractMetricsTest.java
rsrkpatwari1234 May 19, 2026
4498c4e
Update RealtimeTableDataManager.java
rsrkpatwari1234 May 19, 2026
d0b3415
Update RealtimeSegmentDataManager.java
rsrkpatwari1234 May 19, 2026
819f60b
Update IngestionDelayTrackerTest.java
rsrkpatwari1234 May 19, 2026
239b2e5
Update RealtimeTableDataManager.java
rsrkpatwari1234 May 19, 2026
fcb21a6
Update AbstractMetrics.java
rsrkpatwari1234 May 19, 2026
bd3cb8e
Update IngestionDelayTracker.java
rsrkpatwari1234 May 19, 2026
278d959
Update IngestionConfigUtils.java
rsrkpatwari1234 May 19, 2026
0fa2e7f
Update IngestionConfigUtilsTest.java
rsrkpatwari1234 May 19, 2026
622e6fd
Update ServerPrometheusMetricsTest.java
rsrkpatwari1234 May 19, 2026
12c911b
Update IngestionConfigUtilsTest.java
rsrkpatwari1234 May 19, 2026
90d7941
Update pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionCo…
rsrkpatwari1234 May 19, 2026
9066315
Update IngestionConfigUtils.java
rsrkpatwari1234 May 19, 2026
8f12b14
Update IngestionDelayTracker.java
rsrkpatwari1234 May 19, 2026
0743774
Update IngestionConfigUtilsTest.java
rsrkpatwari1234 May 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,14 @@ public abstract class ServerPrometheusMetricsTest extends PinotPrometheusMetrics
ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS,
ServerGauge.LAST_REALTIME_SEGMENT_INITIAL_CONSUMPTION_DURATION_SECONDS,
ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS,
ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS);
ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG, ServerGauge.REALTIME_INGESTION_DELAY_MS,
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET,
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET, ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS);

private static final List<ServerGauge> GAUGES_ACCEPTING_PARTITION =
List.of(ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, ServerGauge.UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG, ServerGauge.REALTIME_INGESTION_DELAY_MS,
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT);

private static final List<ServerGauge> GAUGES_ACCEPTING_RAW_TABLE_NAME =
List.of(ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, ServerGauge.REALTIME_SEGMENT_NUM_PARTITIONS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
Expand All @@ -53,7 +56,8 @@
import org.slf4j.LoggerFactory;

/**
* A Class to track realtime ingestion delay for table partitions on a given server.
* Tracks realtime ingestion delay for table partitions on the current server.
* Ingestion gauges are keyed like stream consumer metrics ({@code tableNameWithType-topic-streamPartition}).
* Highlights:
* 1-An object of this class is hosted by each RealtimeTableDataManager.
* 2-The object tracks ingestion delays for all partitions hosted by the current server for the given Realtime table.
Expand Down Expand Up @@ -132,6 +136,8 @@ void update(long ingestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffs
private final ServerMetrics _serverMetrics;
private final String _tableNameWithType;
private final String _metricName;
@Nullable
private final String _consumerClientIdSuffix;
private final RealtimeTableDataManager _realTimeTableDataManager;
private final BooleanSupplier _isServerReadyToServeQueries;
private final Cache<String, Boolean> _segmentsToIgnore =
Expand Down Expand Up @@ -176,6 +182,10 @@ public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithTy
_serverMetrics = serverMetrics;
_tableNameWithType = tableNameWithType;
_metricName = tableNameWithType;
InstanceDataManagerConfig instanceDataManagerConfig = realtimeTableDataManager.getInstanceDataManagerConfig();
String consumerClientIdSuffix =
instanceDataManagerConfig != null ? instanceDataManagerConfig.getConsumerClientIdSuffix() : null;
_consumerClientIdSuffix = StringUtils.isNotBlank(consumerClientIdSuffix) ? consumerClientIdSuffix : null;
_realTimeTableDataManager = realtimeTableDataManager;
_isServerReadyToServeQueries = isServerReadyToServeQueries;

Expand Down Expand Up @@ -369,27 +379,40 @@ void setClock(Clock clock) {
_clock = clock;
}

/** Table key for ingestion gauges for {@code pinotPartitionId}, aligned with stream consumer metrics. */
private String getIngestionGaugeTableKey(int pinotPartitionId) {
TableConfig tableConfig = _realTimeTableDataManager.getCachedTableConfigAndSchema().getLeft();
List<StreamConfig> streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig);
StreamConfig streamConfig = IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs,
pinotPartitionId);
int streamPartitionId =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(MAJOR) Is this behavior change? What happens to single-topic stream?

Copy link
Copy Markdown
Contributor Author

@rsrkpatwari1234 rsrkpatwari1234 May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For single-topic tables, IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(tableConfig, pinotPartitionId) returns pinotPartitionId as-is (no % 10000 remap). That matches RealtimeSegmentDataManager, which uses _streamPartitionId = _partitionGroupId when there is only one stream. So which physical partition the delay refers to, is unchanged for single-topic.

We only change the metric identity from “table + Pinot partition only” to the same tableNameWithType-topic-streamPartition shape as stream consumer metrics, so a topic label (and consistent partition labeling) appears. Hence users would have to update the dashboards and alerts which rely on these metrices.

IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(tableConfig, pinotPartitionId);
return IngestionConfigUtils.getStreamIngestionMetricTableKey(_tableNameWithType, streamConfig.getTopicName(),
streamPartitionId, _consumerClientIdSuffix);
}

@VisibleForTesting
void createMetrics(int partitionId) {
int streamConfigIndex = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
StreamMetadataProvider streamMetadataProvider = _streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex);
String tableKey = getIngestionGaugeTableKey(partitionId);

if (streamMetadataProvider != null && streamMetadataProvider.supportsOffsetLag()) {
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG,
_serverMetrics.setOrUpdateTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_OFFSET_LAG,
() -> getPartitionIngestionOffsetLag(partitionId));

_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET, () -> getPartitionIngestionConsumingOffset(partitionId));
_serverMetrics.setOrUpdateTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET,
() -> getPartitionIngestionConsumingOffset(partitionId));

_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, () -> getLatestPartitionOffset(partitionId));
_serverMetrics.setOrUpdateTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET,
() -> getLatestPartitionOffset(partitionId));
}
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_DELAY_MS,
_serverMetrics.setOrUpdateTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_DELAY_MS,
() -> getPartitionIngestionDelayMs(partitionId));
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, () -> getPartitionEndToEndIngestionDelayMs(partitionId));
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS, () -> getPartitionIngestionReportingStatus(partitionId));
_serverMetrics.setOrUpdateTableGauge(tableKey, ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
() -> getPartitionEndToEndIngestionDelayMs(partitionId));
_serverMetrics.setOrUpdateTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS,
() -> getPartitionIngestionReportingStatus(partitionId));

LOGGER.info("Successfully created ingestion metrics for partition id: {}", partitionId);
}
Expand All @@ -398,18 +421,16 @@ private void removeMetrics(int partitionId) {
int streamConfigIndex = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
StreamMetadataProvider streamMetadataProvider =
_streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex);
String tableKey = getIngestionGaugeTableKey(partitionId);
// Remove all metrics associated with this partition
if (streamMetadataProvider != null && streamMetadataProvider.supportsOffsetLag()) {
_serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
_serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET);
_serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
_serverMetrics.removeTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
_serverMetrics.removeTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET);
_serverMetrics.removeTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
}
_serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_DELAY_MS);
_serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);
_serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS);
_serverMetrics.removeTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_DELAY_MS);
_serverMetrics.removeTableGauge(tableKey, ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);
_serverMetrics.removeTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS);

LOGGER.info("Successfully removed ingestion metrics for partition id: {}", partitionId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,23 @@ public static String getTableTopicUniqueClientId(String className, StreamConfig
className + "-" + streamConfig.getTableNameWithType() + "-" + streamConfig.getTopicName());
}

/**
* Returns the table-key string for realtime stream server metrics: {@code table-topic-partition} and optional
* {@code consumerClientIdSuffix} when non-blank.
*
* @param tableNameWithType table name with type (e.g. {@code myTable_REALTIME})
* @param topicName stream topic name
* @param streamPartitionId stream partition id
* @param consumerClientIdSuffix optional suffix; ignored if null or blank
*/
public static String getStreamIngestionMetricTableKey(String tableNameWithType, String topicName,
int streamPartitionId, String consumerClientIdSuffix) {
Comment thread
rsrkpatwari1234 marked this conversation as resolved.
Outdated
if (StringUtils.isNotBlank(consumerClientIdSuffix)) {
return tableNameWithType + "-" + topicName + "-" + streamPartitionId + "-" + consumerClientIdSuffix;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(MAJOR) Is this a behavior change? This is not the same way how AbstractMetrics connecting parts

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no behavior change. Realtime consumption works in same manner. _clientId in RealtimeSegmentDataManager is already built with the same rule: append consumer.client.id.suffix only when StringUtils.isNotBlank(...). getStreamIngestionMetricTableKey mirrors that so ingestion-delay metric keys stay consistent with the consumer’s metric table key.

AbstractMetrics does not define how topic, partition, or suffix are encoded. It only receives a fully formed first segment and composes names like gaugeName + "." + getTableName(thatString). That is why we added the function here

}
return tableNameWithType + "-" + topicName + "-" + streamPartitionId;
}

/**
* Returns a Map of stream config index to Set of stream partition Ids.
* @param pinotPartitionIds Set of pinot partition ids.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -182,4 +183,17 @@ public void testGetStreamConfigIndexToStreamPartitions() {
Assert.assertEquals(streamConfigIndexToStreamPartitions.get(1), new HashSet<>(Arrays.asList(100, 1)));
Assert.assertEquals(streamConfigIndexToStreamPartitions.get(3), new HashSet<>(Arrays.asList(400)));
}

@Test
public void testGetStreamIngestionMetricTableKey() {
Map<String, String> streamConfigMap = new HashMap<>();
streamConfigMap.put("streamType", "kafka");
streamConfigMap.put("stream.kafka.topic.name", "events");
StreamConfig streamConfig = new StreamConfig("db.myTable_REALTIME", streamConfigMap);
String tableWithType = "db.myTable_REALTIME";
Assert.assertEquals(IngestionConfigUtils.getStreamIngestionMetricTableKey(tableWithType,
streamConfig.getTopicName(), 7, null), "db.myTable_REALTIME-events-7");
Assert.assertEquals(IngestionConfigUtils.getStreamIngestionMetricTableKey(tableWithType,
streamConfig.getTopicName(), 7, "host1"), "db.myTable_REALTIME-events-7-host1");
}
}
Loading