From 20b402f24ed54cd139c9c1b90be557af916bb259 Mon Sep 17 00:00:00 2001 From: Rekha Seethamraju Date: Thu, 14 May 2026 14:06:49 -0700 Subject: [PATCH] Emit topic name in ingestion delay metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Include the Kafka topic name as part of the ingestion delay gauge metric name so that operators can distinguish metrics by topic when a table has multiple upstream streams or when a topic is renamed. - Add `setOrUpdatePartitionTopicGauge` / `removePartitionTopicGauge` to AbstractMetrics, composing the metric name as .... - Update IngestionDelayTracker.createMetrics / removeMetrics to use the new topic-aware APIs, looking up the topic name from the StreamConfig. - Rename RealtimeTableDataManager.getHostedPartitionsGroupIds → getHostedConsumingPartitionsGroupIds to clarify that only CONSUMING segments are returned. - Add unit tests in AbstractMetricsTest and IngestionDelayTrackerTest covering the new APIs and verifying the topic name appears in the metric name. --- .../pinot/common/metrics/AbstractMetrics.java | 34 +++++++++ .../common/metrics/AbstractMetricsTest.java | 21 ++++++ .../realtime/IngestionDelayTracker.java | 57 +++++++++------ .../realtime/RealtimeTableDataManager.java | 2 +- .../realtime/IngestionDelayTrackerTest.java | 73 +++++++++++++++++++ 5 files changed, 162 insertions(+), 25 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java index c594188855c9..9572cee84ddd 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java @@ -569,6 +569,21 @@ public void setOrUpdatePartitionGauge(final String tableName, final int partitio setOrUpdateGauge(fullGaugeName, valueSupplier); } + /** + * Install a per-partition topic table gauge. + * + * @param tableName The table name + * @param partitionId The partition id + * @param topicName The topic name + * @param gauge The gauge to use + * @param valueSupplier The supplier function used to retrieve the value of the gauge. + */ + public void setOrUpdatePartitionTopicGauge(final String tableName, final int partitionId, + final String topicName, final G gauge, final Supplier valueSupplier) { + final String fullGaugeName = composeTableTopicGaugeName(tableName, String.valueOf(partitionId), topicName, gauge); + setOrUpdateGauge(fullGaugeName, valueSupplier); + } + /** * @deprecated please use setOrUpdateGauge(final String metricName, final Supplier valueSupplier) instead. * @@ -764,6 +779,20 @@ public void removePartitionGauge(final String tableName, final int partitionId, removeGauge(fullGaugeName); } + /** + * Removes a table gauge given the table name, the partition id, topic name and the gauge. + * The add/remove is expected to work correctly in case of being invoked across multiple threads. + * @param tableName table name + * @param partitionId The partition id + * @param topicName The topic name + * @param gauge the gauge to be removed + */ + public void removePartitionTopicGauge(final String tableName, final int partitionId, + final String topicName, final G gauge) { + final String fullGaugeName = composeTableTopicGaugeName(tableName, String.valueOf(partitionId), topicName, gauge); + removeGauge(fullGaugeName); + } + /** * Removes a table gauge given the table name, the key and the gauge. * The add/remove is expected to work correctly in case of being invoked across multiple threads. @@ -788,6 +817,11 @@ private String composeTableGaugeName(final String tableName, final String key, f return gauge.getGaugeName() + "." + getTableName(tableName) + "." + key; } + private String composeTableTopicGaugeName(final String tableName, final String key, + final String topicName, final G gauge) { + return gauge.getGaugeName() + "." + getTableName(tableName) + "." + key + "." + topicName; + } + public String composePluginGaugeName(String pluginName, Gauge gauge) { return gauge.getGaugeName() + "." + pluginName; } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/AbstractMetricsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/AbstractMetricsTest.java index 6ff9c4e2331a..8647875a30b3 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/AbstractMetricsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/AbstractMetricsTest.java @@ -407,6 +407,27 @@ public void testPartitionGauges() { Assert.assertTrue(controllerMetrics.getMetricsRegistry().allMetrics().isEmpty()); } + @Test + public void testPartitionTopicGauges() { + ControllerMetrics controllerMetrics = buildTestMetrics(); + String table = "test_table"; + int partitionId = 7; + String topicName = "test_topic"; + + controllerMetrics.setOrUpdatePartitionTopicGauge(table, partitionId, topicName, ControllerGauge.VERSION, () -> 1L); + Assert.assertEquals( + getGaugeValue(controllerMetrics, + ControllerGauge.VERSION.getGaugeName() + "." + table + "." + partitionId + "." + topicName), 1); + + controllerMetrics.setOrUpdatePartitionTopicGauge(table, partitionId, topicName, ControllerGauge.VERSION, () -> 2L); + Assert.assertEquals( + getGaugeValue(controllerMetrics, + ControllerGauge.VERSION.getGaugeName() + "." + table + "." + partitionId + "." + topicName), 2); + + controllerMetrics.removePartitionTopicGauge(table, partitionId, topicName, ControllerGauge.VERSION); + Assert.assertTrue(controllerMetrics.getMetricsRegistry().allMetrics().isEmpty()); + } + @Test public void testAddCallbackGauges() { ControllerMetrics controllerMetrics = buildTestMetrics(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java index e26390b9e4fb..202b27d08410 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java @@ -372,43 +372,52 @@ void setClock(Clock clock) { @VisibleForTesting void createMetrics(int partitionId) { int streamConfigIndex = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId); + List streamConfigs = + IngestionConfigUtils.getStreamConfigs(_realTimeTableDataManager.getCachedTableConfigAndSchema().getLeft()); + String topicName = streamConfigs.get(streamConfigIndex).getTopicName(); StreamMetadataProvider streamMetadataProvider = _streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex); if (streamMetadataProvider != null && streamMetadataProvider.supportsOffsetLag()) { - _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG, - () -> getPartitionIngestionOffsetLag(partitionId)); + _serverMetrics.setOrUpdatePartitionTopicGauge(_metricName, partitionId, topicName, + ServerGauge.REALTIME_INGESTION_OFFSET_LAG, () -> getPartitionIngestionOffsetLag(partitionId)); - _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, + _serverMetrics.setOrUpdatePartitionTopicGauge(_metricName, partitionId, topicName, ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET, () -> getPartitionIngestionConsumingOffset(partitionId)); - _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, + _serverMetrics.setOrUpdatePartitionTopicGauge(_metricName, partitionId, topicName, ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, () -> getLatestPartitionOffset(partitionId)); } - _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_DELAY_MS, - () -> getPartitionIngestionDelayMs(partitionId)); - _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, + _serverMetrics.setOrUpdatePartitionTopicGauge(_metricName, partitionId, topicName, + ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> getPartitionIngestionDelayMs(partitionId)); + _serverMetrics.setOrUpdatePartitionTopicGauge(_metricName, partitionId, topicName, ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, () -> getPartitionEndToEndIngestionDelayMs(partitionId)); - _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, + _serverMetrics.setOrUpdatePartitionTopicGauge(_metricName, partitionId, topicName, ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS, () -> getPartitionIngestionReportingStatus(partitionId)); LOGGER.info("Successfully created ingestion metrics for partition id: {}", partitionId); } - private void removeMetrics(int partitionId) { + @VisibleForTesting + void removeMetrics(int partitionId) { int streamConfigIndex = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId); - StreamMetadataProvider streamMetadataProvider = - _streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex); - // Remove all metrics associated with this partition + List streamConfigs = + IngestionConfigUtils.getStreamConfigs(_realTimeTableDataManager.getCachedTableConfigAndSchema().getLeft()); + String topicName = streamConfigs.get(streamConfigIndex).getTopicName(); + StreamMetadataProvider streamMetadataProvider = _streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex); + if (streamMetadataProvider != null && streamMetadataProvider.supportsOffsetLag()) { - _serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG); - _serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET); - _serverMetrics.removePartitionGauge(_metricName, partitionId, + _serverMetrics.removePartitionTopicGauge(_metricName, partitionId, topicName, + ServerGauge.REALTIME_INGESTION_OFFSET_LAG); + _serverMetrics.removePartitionTopicGauge(_metricName, partitionId, topicName, + ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET); + _serverMetrics.removePartitionTopicGauge(_metricName, partitionId, topicName, ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET); } - _serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_DELAY_MS); - _serverMetrics.removePartitionGauge(_metricName, partitionId, + _serverMetrics.removePartitionTopicGauge(_metricName, partitionId, topicName, + ServerGauge.REALTIME_INGESTION_DELAY_MS); + _serverMetrics.removePartitionTopicGauge(_metricName, partitionId, topicName, ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS); - _serverMetrics.removePartitionGauge(_metricName, partitionId, + _serverMetrics.removePartitionTopicGauge(_metricName, partitionId, topicName, ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS); LOGGER.info("Successfully removed ingestion metrics for partition id: {}", partitionId); @@ -503,26 +512,26 @@ public void timeoutInactivePartitions() { // involves network traffic and may be inefficient. List partitionsToVerify = getPartitionsToBeVerified(); if (partitionsToVerify.isEmpty()) { - // Don't make the call to getHostedPartitionsGroupIds() as it involves checking ideal state. + // Don't make the call to getHostedConsumingPartitionsGroupIds() as it involves checking ideal state. return; } - Set partitionsHostedByThisServer; + Set consumingPartitionsHostedByThisServer; try { - partitionsHostedByThisServer = _realTimeTableDataManager.getHostedPartitionsGroupIds(); + consumingPartitionsHostedByThisServer = _realTimeTableDataManager.getHostedConsumingPartitionsGroupIds(); } catch (Exception e) { LOGGER.error("Failed to get partitions hosted by this server, table={}, exception={}:{}", _tableNameWithType, e.getClass(), e.getMessage()); return; } for (int partitionId : partitionsToVerify) { - if (!partitionsHostedByThisServer.contains(partitionId)) { - // Partition is not hosted in this server anymore, stop tracking it + if (!consumingPartitionsHostedByThisServer.contains(partitionId)) { + // Partition is not hosted in this server anymore or is not consuming anymore, stop tracking it removePartitionId(partitionId); } } ConcurrentHashMap newMap = new ConcurrentHashMap<>(); - partitionsHostedByThisServer.forEach(p -> newMap.put(p, true)); + consumingPartitionsHostedByThisServer.forEach(p -> newMap.put(p, true)); _partitionsHostedByThisServer = newMap; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 15ec71fe50f4..e9c73ffd74e2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -398,7 +398,7 @@ public StreamMetadataProvider getStreamMetadataProvider(RealtimeSegmentDataManag * Returns all partitionGroupIds for the partitions hosted by this server for current table. * @apiNote this involves Zookeeper read and should not be used frequently due to efficiency concerns. */ - public Set getHostedPartitionsGroupIds() { + public Set getHostedConsumingPartitionsGroupIds() { Set partitionsHostedByThisServer = new HashSet<>(); List segments = TableStateUtils.getSegmentsInGivenStateForThisInstance(_helixManager, _tableNameWithType, CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java index 80d5444e3ed9..2646f6012ed9 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java @@ -51,7 +51,10 @@ import org.testng.annotations.Test; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; public class IngestionDelayTrackerTest { @@ -572,4 +575,74 @@ private void assertIncreasing(Map> metrics, String key) { List values = metrics.get(key); Assert.assertTrue(values.get(values.size() - 1) > values.get(0), key + " not increasing"); } + + @Test + public void testCreateMetricsUsesTopicName() { + // The stream config in REALTIME_TABLE_DATA_MANAGER uses topic name "test" + String expectedTopicName = "test"; + int partitionId = 0; + + // Use a fresh ServerMetrics mock so we can verify calls on it + ServerMetrics serverMetricsMock = mock(ServerMetrics.class); + IngestionDelayTracker tracker = + new IngestionDelayTracker(serverMetricsMock, REALTIME_TABLE_NAME, REALTIME_TABLE_DATA_MANAGER, () -> true); + tracker.createMetrics(partitionId); + + // Verify that all ingestion gauges include the topic name + verify(serverMetricsMock).setOrUpdatePartitionTopicGauge( + eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName), + eq(ServerGauge.REALTIME_INGESTION_DELAY_MS), any()); + verify(serverMetricsMock).setOrUpdatePartitionTopicGauge( + eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName), + eq(ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS), any()); + verify(serverMetricsMock).setOrUpdatePartitionTopicGauge( + eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName), + eq(ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS), any()); + // FakeStreamMetadataProvider.supportsOffsetLag() returns true, so these should also be called + verify(serverMetricsMock).setOrUpdatePartitionTopicGauge( + eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName), + eq(ServerGauge.REALTIME_INGESTION_OFFSET_LAG), any()); + verify(serverMetricsMock).setOrUpdatePartitionTopicGauge( + eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName), + eq(ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET), any()); + verify(serverMetricsMock).setOrUpdatePartitionTopicGauge( + eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName), + eq(ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET), any()); + + tracker.shutdown(); + } + + @Test + public void testRemoveMetricsUsesTopicName() { + String expectedTopicName = "test"; + int partitionId = 0; + + ServerMetrics serverMetricsMock = mock(ServerMetrics.class); + IngestionDelayTracker tracker = + new IngestionDelayTracker(serverMetricsMock, REALTIME_TABLE_NAME, REALTIME_TABLE_DATA_MANAGER, () -> true); + // createMetrics must be called first so the stream metadata provider is set up + tracker.createMetrics(partitionId); + tracker.removeMetrics(partitionId); + + verify(serverMetricsMock).removePartitionTopicGauge( + eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName), + eq(ServerGauge.REALTIME_INGESTION_DELAY_MS)); + verify(serverMetricsMock).removePartitionTopicGauge( + eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName), + eq(ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS)); + verify(serverMetricsMock).removePartitionTopicGauge( + eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName), + eq(ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS)); + verify(serverMetricsMock).removePartitionTopicGauge( + eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName), + eq(ServerGauge.REALTIME_INGESTION_OFFSET_LAG)); + verify(serverMetricsMock).removePartitionTopicGauge( + eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName), + eq(ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET)); + verify(serverMetricsMock).removePartitionTopicGauge( + eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName), + eq(ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET)); + + tracker.shutdown(); + } }