diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java index 79fb88f56ed7..07a3c3621ba8 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java @@ -50,13 +50,14 @@ public abstract class ServerPrometheusMetricsTest extends PinotPrometheusMetrics ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS, ServerGauge.LAST_REALTIME_SEGMENT_INITIAL_CONSUMPTION_DURATION_SECONDS, ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS, - ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS); + ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS, + ServerGauge.REALTIME_INGESTION_OFFSET_LAG, ServerGauge.REALTIME_INGESTION_DELAY_MS, + ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, + ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET, ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS); private static final List GAUGES_ACCEPTING_PARTITION = List.of(ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, ServerGauge.UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT, - ServerGauge.REALTIME_INGESTION_OFFSET_LAG, ServerGauge.REALTIME_INGESTION_DELAY_MS, - ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT, - ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET); + ServerGauge.UPSERT_PRIMARY_KEYS_COUNT, ServerGauge.DEDUP_PRIMARY_KEYS_COUNT); private static final List GAUGES_ACCEPTING_RAW_TABLE_NAME = List.of(ServerGauge.REALTIME_OFFHEAP_MEMORY_USED, ServerGauge.REALTIME_SEGMENT_NUM_PARTITIONS, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java index e26390b9e4fb..520ae5cbaff1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java @@ -40,6 +40,8 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.metrics.ServerTimer; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConsumerFactory; @@ -53,7 +55,8 @@ import org.slf4j.LoggerFactory; /** - * A Class to track realtime ingestion delay for table partitions on a given server. + * Tracks realtime ingestion delay for table partitions on the current server. + * Ingestion gauges are keyed like stream consumer metrics ({@code tableNameWithType-topic-streamPartition}). * Highlights: * 1-An object of this class is hosted by each RealtimeTableDataManager. * 2-The object tracks ingestion delays for all partitions hosted by the current server for the given Realtime table. @@ -132,6 +135,8 @@ void update(long ingestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffs private final ServerMetrics _serverMetrics; private final String _tableNameWithType; private final String _metricName; + @Nullable + private final String _consumerClientIdSuffix; private final RealtimeTableDataManager _realTimeTableDataManager; private final BooleanSupplier _isServerReadyToServeQueries; private final Cache _segmentsToIgnore = @@ -176,6 +181,9 @@ public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithTy _serverMetrics = serverMetrics; _tableNameWithType = tableNameWithType; _metricName = tableNameWithType; + InstanceDataManagerConfig instanceDataManagerConfig = realtimeTableDataManager.getInstanceDataManagerConfig(); + _consumerClientIdSuffix = + instanceDataManagerConfig != null ? instanceDataManagerConfig.getConsumerClientIdSuffix() : null; _realTimeTableDataManager = realtimeTableDataManager; _isServerReadyToServeQueries = isServerReadyToServeQueries; @@ -369,27 +377,40 @@ void setClock(Clock clock) { _clock = clock; } + /** Table key for ingestion gauges for {@code pinotPartitionId}, aligned with stream consumer metrics. */ + private String getIngestionGaugeTableKey(int pinotPartitionId) { + TableConfig tableConfig = _realTimeTableDataManager.getCachedTableConfigAndSchema().getLeft(); + List streamConfigs = IngestionConfigUtils.getStreamConfigs(tableConfig); + StreamConfig streamConfig = IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs, + pinotPartitionId); + int streamPartitionId = + IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(tableConfig, pinotPartitionId); + return IngestionConfigUtils.getStreamIngestionMetricTableKey(_tableNameWithType, streamConfig.getTopicName(), + streamPartitionId, _consumerClientIdSuffix); + } + @VisibleForTesting void createMetrics(int partitionId) { int streamConfigIndex = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId); StreamMetadataProvider streamMetadataProvider = _streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex); + String tableKey = getIngestionGaugeTableKey(partitionId); if (streamMetadataProvider != null && streamMetadataProvider.supportsOffsetLag()) { - _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG, + _serverMetrics.setOrUpdateTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_OFFSET_LAG, () -> getPartitionIngestionOffsetLag(partitionId)); - _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, - ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET, () -> getPartitionIngestionConsumingOffset(partitionId)); + _serverMetrics.setOrUpdateTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET, + () -> getPartitionIngestionConsumingOffset(partitionId)); - _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, - ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, () -> getLatestPartitionOffset(partitionId)); + _serverMetrics.setOrUpdateTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, + () -> getLatestPartitionOffset(partitionId)); } - _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_DELAY_MS, + _serverMetrics.setOrUpdateTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> getPartitionIngestionDelayMs(partitionId)); - _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, - ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, () -> getPartitionEndToEndIngestionDelayMs(partitionId)); - _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, - ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS, () -> getPartitionIngestionReportingStatus(partitionId)); + _serverMetrics.setOrUpdateTableGauge(tableKey, ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, + () -> getPartitionEndToEndIngestionDelayMs(partitionId)); + _serverMetrics.setOrUpdateTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS, + () -> getPartitionIngestionReportingStatus(partitionId)); LOGGER.info("Successfully created ingestion metrics for partition id: {}", partitionId); } @@ -398,18 +419,16 @@ private void removeMetrics(int partitionId) { int streamConfigIndex = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId); StreamMetadataProvider streamMetadataProvider = _streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex); + String tableKey = getIngestionGaugeTableKey(partitionId); // Remove all metrics associated with this partition if (streamMetadataProvider != null && streamMetadataProvider.supportsOffsetLag()) { - _serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG); - _serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET); - _serverMetrics.removePartitionGauge(_metricName, partitionId, - ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET); + _serverMetrics.removeTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_OFFSET_LAG); + _serverMetrics.removeTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET); + _serverMetrics.removeTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET); } - _serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_DELAY_MS); - _serverMetrics.removePartitionGauge(_metricName, partitionId, - ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS); - _serverMetrics.removePartitionGauge(_metricName, partitionId, - ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS); + _serverMetrics.removeTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_DELAY_MS); + _serverMetrics.removeTableGauge(tableKey, ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS); + _serverMetrics.removeTableGauge(tableKey, ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS); LOGGER.info("Successfully removed ingestion metrics for partition id: {}", partitionId); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java index 40f647c429cd..1d7e736c43d2 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.config.table.IndexingConfig; @@ -339,6 +340,23 @@ public static String getTableTopicUniqueClientId(String className, StreamConfig className + "-" + streamConfig.getTableNameWithType() + "-" + streamConfig.getTopicName()); } + /** + * Returns the table-key string for realtime stream server metrics: {@code table-topic-partition} and optional + * {@code consumerClientIdSuffix} when non-blank. + * + * @param tableNameWithType table name with type (e.g. {@code myTable_REALTIME}) + * @param topicName stream topic name + * @param streamPartitionId stream partition id + * @param consumerClientIdSuffix optional suffix; ignored if null or blank + */ + public static String getStreamIngestionMetricTableKey(String tableNameWithType, String topicName, + int streamPartitionId, @Nullable String consumerClientIdSuffix) { + if (StringUtils.isNotBlank(consumerClientIdSuffix)) { + return tableNameWithType + "-" + topicName + "-" + streamPartitionId + "-" + consumerClientIdSuffix; + } + return tableNameWithType + "-" + topicName + "-" + streamPartitionId; + } + /** * Returns a Map of stream config index to Set of stream partition Ids. * @param pinotPartitionIds Set of pinot partition ids. diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java index 3390126b2ffb..f8adb800c499 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java @@ -182,4 +182,13 @@ public void testGetStreamConfigIndexToStreamPartitions() { Assert.assertEquals(streamConfigIndexToStreamPartitions.get(1), new HashSet<>(Arrays.asList(100, 1))); Assert.assertEquals(streamConfigIndexToStreamPartitions.get(3), new HashSet<>(Arrays.asList(400))); } + + @Test + public void testGetStreamIngestionMetricTableKey() { + String tableWithType = "db.myTable_REALTIME"; + Assert.assertEquals(IngestionConfigUtils.getStreamIngestionMetricTableKey(tableWithType, "events", 7, null), + "db.myTable_REALTIME-events-7"); + Assert.assertEquals(IngestionConfigUtils.getStreamIngestionMetricTableKey(tableWithType, "events", 7, "host1"), + "db.myTable_REALTIME-events-7-host1"); + } }