Skip to content
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
52421b9
Update AbstractMetrics.java
rsrkpatwari1234 Apr 12, 2026
dcf4bd9
Update IngestionDelayTracker.java
rsrkpatwari1234 Apr 12, 2026
6ef8ada
Update RealtimeTableDataManager.java
rsrkpatwari1234 Apr 12, 2026
b7e6a3e
Update IngestionDelayTrackerTest.java
rsrkpatwari1234 Apr 12, 2026
4526485
Update PinotPrometheusMetricsTest.java
rsrkpatwari1234 Apr 12, 2026
36237ef
Update ServerPrometheusMetricsTest.java
rsrkpatwari1234 Apr 12, 2026
8aad969
Update AbstractMetricsTest.java
rsrkpatwari1234 Apr 12, 2026
2ca726c
Update AbstractMetrics.java
rsrkpatwari1234 Apr 12, 2026
9922d54
Update AbstractMetricsTest.java
rsrkpatwari1234 Apr 12, 2026
b6fa133
Update PinotPrometheusMetricsTest.java
rsrkpatwari1234 Apr 12, 2026
4597452
Update IngestionDelayTracker.java
rsrkpatwari1234 Apr 12, 2026
ab8ced7
Update RealtimeTableDataManager.java
rsrkpatwari1234 Apr 12, 2026
ff5bd09
Update IngestionDelayTracker.java
rsrkpatwari1234 Apr 12, 2026
fc7a92d
Update IngestionDelayTrackerTest.java
rsrkpatwari1234 Apr 12, 2026
2112cf6
Update RealtimeSegmentDataManager.java
rsrkpatwari1234 Apr 12, 2026
fb55606
Update RealtimeTableDataManager.java
rsrkpatwari1234 Apr 12, 2026
dfeadfa
Update IngestionDelayTrackerTest.java
rsrkpatwari1234 Apr 12, 2026
8e239e9
Update IngestionDelayTrackerTest.java
rsrkpatwari1234 Apr 12, 2026
264cbbb
Update IngestionDelayTrackerTest.java
rsrkpatwari1234 Apr 12, 2026
aaf74b1
Update IngestionDelayTrackerTest.java
rsrkpatwari1234 Apr 13, 2026
5e0819c
Update IngestionDelayTracker.java
rsrkpatwari1234 Apr 14, 2026
7a87040
Update IngestionDelayTrackerTest.java
rsrkpatwari1234 Apr 14, 2026
5f89f3e
Update AbstractMetrics.java
rsrkpatwari1234 Apr 20, 2026
e32e45d
Update IngestionDelayTracker.java
rsrkpatwari1234 Apr 20, 2026
23630e0
Update IngestionDelayTracker.java
rsrkpatwari1234 Apr 20, 2026
128854d
Update AbstractMetricsTest.java
rsrkpatwari1234 Apr 20, 2026
7bc3550
Merge branch 'apache:master' into rsrkpatwari1234-issue-18099
rsrkpatwari1234 Apr 24, 2026
71abbd0
Update IngestionConfigUtils.java
rsrkpatwari1234 May 1, 2026
41eca2b
Merge branch 'apache:master' into rsrkpatwari1234-issue-18099
rsrkpatwari1234 May 1, 2026
0c11831
Update IngestionConfigUtilsTest.java
rsrkpatwari1234 May 1, 2026
1a31661
Update RealtimeSegmentDataManager.java
rsrkpatwari1234 May 1, 2026
a29fa6a
Update AbstractMetrics.java
rsrkpatwari1234 May 1, 2026
647554d
Update AbstractMetrics.java
rsrkpatwari1234 May 1, 2026
fe0d33c
Update RealtimeSegmentDataManager.java
rsrkpatwari1234 May 1, 2026
bb45a22
Update RealtimeSegmentDataManager.java
rsrkpatwari1234 May 1, 2026
7fc4a19
Update IngestionDelayTracker.java
rsrkpatwari1234 May 1, 2026
1d6b8a5
Update RealtimeSegmentDataManager.java
rsrkpatwari1234 May 1, 2026
71ea605
Merge branch 'apache:master' into rsrkpatwari1234-issue-18099
rsrkpatwari1234 May 19, 2026
5a5e392
Update AbstractMetrics.java
rsrkpatwari1234 May 19, 2026
09fffac
Update PinotPrometheusMetricsTest.java
rsrkpatwari1234 May 19, 2026
5d24d7e
Update ServerPrometheusMetricsTest.java
rsrkpatwari1234 May 19, 2026
ef97f39
Update AbstractMetricsTest.java
rsrkpatwari1234 May 19, 2026
4498c4e
Update RealtimeTableDataManager.java
rsrkpatwari1234 May 19, 2026
d0b3415
Update RealtimeSegmentDataManager.java
rsrkpatwari1234 May 19, 2026
819f60b
Update IngestionDelayTrackerTest.java
rsrkpatwari1234 May 19, 2026
239b2e5
Update RealtimeTableDataManager.java
rsrkpatwari1234 May 19, 2026
fcb21a6
Update AbstractMetrics.java
rsrkpatwari1234 May 19, 2026
bd3cb8e
Update IngestionDelayTracker.java
rsrkpatwari1234 May 19, 2026
278d959
Update IngestionConfigUtils.java
rsrkpatwari1234 May 19, 2026
0fa2e7f
Update IngestionConfigUtilsTest.java
rsrkpatwari1234 May 19, 2026
622e6fd
Update ServerPrometheusMetricsTest.java
rsrkpatwari1234 May 19, 2026
12c911b
Update IngestionConfigUtilsTest.java
rsrkpatwari1234 May 19, 2026
90d7941
Update pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionCo…
rsrkpatwari1234 May 19, 2026
9066315
Update IngestionConfigUtils.java
rsrkpatwari1234 May 19, 2026
8f12b14
Update IngestionDelayTracker.java
rsrkpatwari1234 May 19, 2026
0743774
Update IngestionConfigUtilsTest.java
rsrkpatwari1234 May 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.Utils;
import org.apache.pinot.spi.metrics.PinotGauge;
import org.apache.pinot.spi.metrics.PinotMeter;
Expand Down Expand Up @@ -565,6 +566,32 @@ public void setOrUpdatePartitionGauge(final String tableName, final int partitio
setOrUpdateGauge(fullGaugeName, valueSupplier);
}

/**
* Registers a per-partition table gauge keyed by table name with type, Kafka (or other stream) topic, and partition
* group id. The resulting MBean name matches {@code server.yml} rules so Prometheus exports {@code topic} and
* {@code partition} labels (see apache/pinot#18099).
*/
public void setOrUpdatePartitionGaugeForStreamTopic(final String tableNameWithType, final String topicName,
final int partitionGroupId, final G gauge, final Supplier<Long> valueSupplier) {
Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicName must not be blank");
setOrUpdateTableGauge(composeStreamTopicPartitionKey(tableNameWithType, topicName, partitionGroupId), gauge,
valueSupplier);
}

/**
* Removes a gauge registered via {@link #setOrUpdatePartitionGaugeForStreamTopic}.
*/
public void removePartitionGaugeForStreamTopic(final String tableNameWithType, final String topicName,
final int partitionGroupId, final G gauge) {
Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicName must not be blank");
removeTableGauge(composeStreamTopicPartitionKey(tableNameWithType, topicName, partitionGroupId), gauge);
}

private static String composeStreamTopicPartitionKey(String tableNameWithType, String topicName,
int partitionGroupId) {
return tableNameWithType + "-" + topicName + "-" + partitionGroupId;
}

/**
* @deprecated please use setOrUpdateGauge(final String metricName, final Supplier<Long> valueSupplier) instead.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,24 @@ public void testPartitionGauges() {
Assert.assertTrue(controllerMetrics.getMetricsRegistry().allMetrics().isEmpty());
}

@Test
public void testPartitionGaugesWithStreamTopic() {
ControllerMetrics controllerMetrics = buildTestMetrics();
String tableWithType = "myTable_REALTIME";
String topic = "events";
int partitionGroupId = 3;
String compositeKey = tableWithType + "-" + topic + "-" + partitionGroupId;

controllerMetrics.setOrUpdatePartitionGaugeForStreamTopic(tableWithType, topic, partitionGroupId,
ControllerGauge.VERSION, () -> 7L);
Assert.assertEquals(MetricValueUtils.getGaugeValue(controllerMetrics,
ControllerGauge.VERSION.getGaugeName() + "." + compositeKey), 7);

controllerMetrics.removePartitionGaugeForStreamTopic(tableWithType, topic, partitionGroupId,
ControllerGauge.VERSION);
Assert.assertTrue(controllerMetrics.getMetricsRegistry().allMetrics().isEmpty());
}

@Test
public void testAddCallbackGauges() {
ControllerMetrics controllerMetrics = buildTestMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ public static class ExportedLabels {
public static final List<String> PARTITION_TABLENAME_TABLETYPE =
List.of(PARTITION, "3", TABLE, ExportedLabelValues.TABLENAME, TABLETYPE, TableType.REALTIME.toString());

public static final List<String> PARTITIONNUM_TABLENAME_TABLETYPE_KAFKATOPIC =
List.of(PARTITION, "3", TABLE, ExportedLabelValues.TABLENAME, TABLETYPE, TableType.REALTIME.toString(), TOPIC,
KAFKA_TOPIC);

public static final List<String> TABLENAME_TABLETYPE_CONTROLLER_TASKTYPE =
List.of(TABLE, ExportedLabelValues.TABLENAME, TABLETYPE, TABLETYPE_REALTIME, TASKTYPE,
CONTROLLER_PERIODIC_TASK_CHC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,12 @@ public abstract class ServerPrometheusMetricsTest extends PinotPrometheusMetrics

private static final List<ServerGauge> GAUGES_ACCEPTING_PARTITION =
List.of(ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, ServerGauge.UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG, ServerGauge.REALTIME_INGESTION_DELAY_MS,
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
ServerGauge.REALTIME_INGESTION_OFFSET_LAG, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
ServerGauge.DEDUP_PRIMARY_KEYS_COUNT, ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET,
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);

private static final List<ServerGauge> GAUGES_ACCEPTING_PARTITION_WITH_STREAM_TOPIC =
List.of(ServerGauge.REALTIME_INGESTION_DELAY_MS, ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);

private static final List<ServerGauge> GAUGES_ACCEPTING_RAW_TABLE_NAME =
List.of(ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, ServerGauge.REALTIME_SEGMENT_NUM_PARTITIONS,
Expand Down Expand Up @@ -122,6 +125,10 @@ public void gaugeTest(ServerGauge serverGauge) {
addGaugeWithLabels(serverGauge, CLIENT_ID);
assertGaugeExportedCorrectly(serverGauge.getGaugeName(),
ExportedLabels.PARTITION_TABLENAME_TABLETYPE_KAFKATOPIC, EXPORTED_METRIC_PREFIX);
} else if (GAUGES_ACCEPTING_PARTITION_WITH_STREAM_TOPIC.contains(serverGauge)) {
addPartitionGaugeWithStreamTopicLabels(serverGauge, TABLE_NAME_WITH_TYPE);
assertGaugeExportedCorrectly(serverGauge.getGaugeName(),
ExportedLabels.PARTITIONNUM_TABLENAME_TABLETYPE_KAFKATOPIC, EXPORTED_METRIC_PREFIX);
} else if (GAUGES_ACCEPTING_PARTITION.contains(serverGauge)) {
addPartitionGaugeWithLabels(serverGauge, TABLE_NAME_WITH_TYPE);
assertGaugeExportedCorrectly(serverGauge.getGaugeName(), ExportedLabels.PARTITION_TABLENAME_TABLETYPE,
Expand All @@ -145,6 +152,10 @@ private void addPartitionGaugeWithLabels(ServerGauge serverGauge, String labels)
_serverMetrics.setValueOfPartitionGauge(labels, 3, serverGauge, 100L);
}

private void addPartitionGaugeWithStreamTopicLabels(ServerGauge serverGauge, String tableNameWithType) {
_serverMetrics.setValueOfTableGauge(tableNameWithType + "-" + KAFKA_TOPIC + "-3", serverGauge, 100L);
}

public void addMeterWithLabels(ServerMeter serverMeter, String labels) {
_serverMetrics.addMeteredTableValue(labels, serverMeter, 4L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
Expand Down Expand Up @@ -98,12 +99,15 @@ private static class IngestionInfo {
@Nullable
volatile StreamPartitionMsgOffset _currentOffset;
volatile long _firstStreamIngestionTimeMs;
@Nullable
final String _streamTopicName;

IngestionInfo(long ingestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset,
long firstStreamIngestionTimeMs) {
long firstStreamIngestionTimeMs, @Nullable String streamTopicName) {
_ingestionTimeMs = ingestionTimeMs;
_currentOffset = currentOffset;
_firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
_streamTopicName = streamTopicName;
}

void update(long ingestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset,
Expand Down Expand Up @@ -333,9 +337,9 @@ public Thread newThread(Runnable r) {
*/
private void removePartitionId(int partitionId) {
_partitionsHostedByThisServer.remove(partitionId);
_ingestionInfoMap.remove(partitionId);
IngestionInfo removed = _ingestionInfoMap.remove(partitionId);
_partitionsTracked.computeIfPresent(partitionId, (k, v) -> {
removeMetrics(partitionId);
removeMetrics(partitionId, removed != null ? removed._streamTopicName : null);
return null;
});

Expand Down Expand Up @@ -384,17 +388,26 @@ void createMetrics(int partitionId) {
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, () -> getLatestPartitionOffset(partitionId));
}
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_DELAY_MS,
() -> getPartitionIngestionDelayMs(partitionId));
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, () -> getPartitionEndToEndIngestionDelayMs(partitionId));
IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
@Nullable String streamTopicName = ingestionInfo != null ? ingestionInfo._streamTopicName : null;
if (StringUtils.isNotBlank(streamTopicName)) {
_serverMetrics.setOrUpdatePartitionGaugeForStreamTopic(_metricName, streamTopicName, partitionId,
Comment thread
rsrkpatwari1234 marked this conversation as resolved.
Outdated
ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> getPartitionIngestionDelayMs(partitionId));
_serverMetrics.setOrUpdatePartitionGaugeForStreamTopic(_metricName, streamTopicName, partitionId,
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, () -> getPartitionEndToEndIngestionDelayMs(partitionId));
} else {
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_DELAY_MS,
() -> getPartitionIngestionDelayMs(partitionId));
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, () -> getPartitionEndToEndIngestionDelayMs(partitionId));
}
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS, () -> getPartitionIngestionReportingStatus(partitionId));

LOGGER.info("Successfully created ingestion metrics for partition id: {}", partitionId);
}

private void removeMetrics(int partitionId) {
private void removeMetrics(int partitionId, @Nullable String streamTopicName) {
int streamConfigIndex = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
StreamMetadataProvider streamMetadataProvider =
_streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex);
Expand All @@ -410,6 +423,12 @@ private void removeMetrics(int partitionId) {
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);
_serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS);
if (StringUtils.isNotBlank(streamTopicName)) {
_serverMetrics.removePartitionGaugeForStreamTopic(_metricName, streamTopicName, partitionId,
Comment thread
rsrkpatwari1234 marked this conversation as resolved.
Outdated
ServerGauge.REALTIME_INGESTION_DELAY_MS);
_serverMetrics.removePartitionGaugeForStreamTopic(_metricName, streamTopicName, partitionId,
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);
}

LOGGER.info("Successfully removed ingestion metrics for partition id: {}", partitionId);
}
Expand All @@ -420,12 +439,14 @@ private void removeMetrics(int partitionId) {
* @param segmentName name of the consuming segment
* @param partitionId partition id of the consuming segment (directly passed in to avoid parsing the
* segment name)
* @param streamTopicName stream topic name for this consumer (e.g. Kafka topic); when blank, metrics use
* the legacy name without a topic label
* @param ingestionTimeMs ingestion time of the last consumed message (from {@link StreamMessageMetadata})
* @param firstStreamIngestionTimeMs ingestion time of the last consumed message in the first stream (from
* {@link StreamMessageMetadata})
* @param currentOffset offset of the last consumed message (from {@link StreamMessageMetadata})
*/
public void updateMetrics(String segmentName, int partitionId, long ingestionTimeMs,
public void updateMetrics(String segmentName, int partitionId, @Nullable String streamTopicName, long ingestionTimeMs,
long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset) {
if (!_isServerReadyToServeQueries.getAsBoolean() || _realTimeTableDataManager.isShutDown()) {
// Do not update the ingestion delay metrics during server startup period
Expand All @@ -444,7 +465,7 @@ public void updateMetrics(String segmentName, int partitionId, long ingestionTim
return v;
}
if (v == null) {
return new IngestionInfo(ingestionTimeMs, currentOffset, firstStreamIngestionTimeMs);
return new IngestionInfo(ingestionTimeMs, currentOffset, firstStreamIngestionTimeMs, streamTopicName);
}
v.update(ingestionTimeMs, currentOffset, firstStreamIngestionTimeMs);
return v;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2110,7 +2110,8 @@ private void updateIngestionMetrics(StreamMessageMetadata metadata) {
if (metadata != null) {
try {
_realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, _partitionGroupId,
metadata.getRecordIngestionTimeMs(), metadata.getFirstStreamRecordIngestionTimeMs(), metadata.getOffset());
_streamConfig.getTopicName(), metadata.getRecordIngestionTimeMs(),
metadata.getFirstStreamRecordIngestionTimeMs(), metadata.getOffset());
} catch (Exception e) {
_segmentLogger.warn("Failed to update the ingestion metrics", e);
}
Expand All @@ -2123,8 +2124,8 @@ private void updateIngestionMetrics(StreamMessageMetadata metadata) {
*/
private void setIngestionDelayToZero() {
long currentTimeMs = System.currentTimeMillis();
_realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, _partitionGroupId, currentTimeMs, currentTimeMs,
null);
_realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, _partitionGroupId, _streamConfig.getTopicName(),
currentTimeMs, currentTimeMs, null);
}

// This should be done during commit? We may not always commit when we build a segment....
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,16 +310,18 @@ protected void doShutdown() {
* @param segmentName name of the consuming segment
* @param partitionId partition id of the consuming segment (directly passed in to avoid parsing
* the segment name)
* @param streamTopicName stream topic for this consumer (e.g. Kafka topic); used for ingestion
* delay metric labels
* @param ingestionTimeMs ingestion time of the last consumed message (from
* {@link StreamMessageMetadata})
* @param firstStreamIngestionTimeMs ingestion time of the last consumed message in the first stream (from
* {@link StreamMessageMetadata})
* @param currentOffset offset of the last consumed message (from {@link StreamMessageMetadata})
*/
public void updateIngestionMetrics(String segmentName, int partitionId, long ingestionTimeMs,
long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset) {
_ingestionDelayTracker.updateMetrics(segmentName, partitionId, ingestionTimeMs, firstStreamIngestionTimeMs,
currentOffset);
public void updateIngestionMetrics(String segmentName, int partitionId, @Nullable String streamTopicName,
long ingestionTimeMs, long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset) {
_ingestionDelayTracker.updateMetrics(segmentName, partitionId, streamTopicName, ingestionTimeMs,
firstStreamIngestionTimeMs, currentOffset);
}

/**
Expand Down
Loading
Loading