Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,21 @@ public void setOrUpdatePartitionGauge(final String tableName, final int partitio
setOrUpdateGauge(fullGaugeName, valueSupplier);
}

/**
* Install a per-partition topic table gauge.
*
* @param tableName The table name
* @param partitionId The partition id
* @param topicName The topic name
* @param gauge The gauge to use
* @param valueSupplier The supplier function used to retrieve the value of the gauge.
*/
public void setOrUpdatePartitionTopicGauge(final String tableName, final int partitionId,
final String topicName, final G gauge, final Supplier<Long> valueSupplier) {
final String fullGaugeName = composeTableTopicGaugeName(tableName, String.valueOf(partitionId), topicName, gauge);
setOrUpdateGauge(fullGaugeName, valueSupplier);
}

/**
* @deprecated please use setOrUpdateGauge(final String metricName, final Supplier<Long> valueSupplier) instead.
*
Expand Down Expand Up @@ -764,6 +779,20 @@ public void removePartitionGauge(final String tableName, final int partitionId,
removeGauge(fullGaugeName);
}

/**
* Removes a table gauge given the table name, the partition id, topic name and the gauge.
* The add/remove is expected to work correctly in case of being invoked across multiple threads.
* @param tableName table name
* @param partitionId The partition id
* @param topicName The topic name
* @param gauge the gauge to be removed
*/
public void removePartitionTopicGauge(final String tableName, final int partitionId,
final String topicName, final G gauge) {
final String fullGaugeName = composeTableTopicGaugeName(tableName, String.valueOf(partitionId), topicName, gauge);
removeGauge(fullGaugeName);
}

/**
* Removes a table gauge given the table name, the key and the gauge.
* The add/remove is expected to work correctly in case of being invoked across multiple threads.
Expand All @@ -788,6 +817,11 @@ private String composeTableGaugeName(final String tableName, final String key, f
return gauge.getGaugeName() + "." + getTableName(tableName) + "." + key;
}

private String composeTableTopicGaugeName(final String tableName, final String key,
final String topicName, final G gauge) {
return gauge.getGaugeName() + "." + getTableName(tableName) + "." + key + "." + topicName;
}

public String composePluginGaugeName(String pluginName, Gauge gauge) {
return gauge.getGaugeName() + "." + pluginName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,27 @@ public void testPartitionGauges() {
Assert.assertTrue(controllerMetrics.getMetricsRegistry().allMetrics().isEmpty());
}

@Test
public void testPartitionTopicGauges() {
ControllerMetrics controllerMetrics = buildTestMetrics();
String table = "test_table";
int partitionId = 7;
String topicName = "test_topic";

controllerMetrics.setOrUpdatePartitionTopicGauge(table, partitionId, topicName, ControllerGauge.VERSION, () -> 1L);
Assert.assertEquals(
getGaugeValue(controllerMetrics,
ControllerGauge.VERSION.getGaugeName() + "." + table + "." + partitionId + "." + topicName), 1);

controllerMetrics.setOrUpdatePartitionTopicGauge(table, partitionId, topicName, ControllerGauge.VERSION, () -> 2L);
Assert.assertEquals(
getGaugeValue(controllerMetrics,
ControllerGauge.VERSION.getGaugeName() + "." + table + "." + partitionId + "." + topicName), 2);

controllerMetrics.removePartitionTopicGauge(table, partitionId, topicName, ControllerGauge.VERSION);
Assert.assertTrue(controllerMetrics.getMetricsRegistry().allMetrics().isEmpty());
}

@Test
public void testAddCallbackGauges() {
ControllerMetrics controllerMetrics = buildTestMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,43 +372,52 @@ void setClock(Clock clock) {
@VisibleForTesting
void createMetrics(int partitionId) {
int streamConfigIndex = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigs(_realTimeTableDataManager.getCachedTableConfigAndSchema().getLeft());
String topicName = streamConfigs.get(streamConfigIndex).getTopicName();
StreamMetadataProvider streamMetadataProvider = _streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex);

if (streamMetadataProvider != null && streamMetadataProvider.supportsOffsetLag()) {
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG,
() -> getPartitionIngestionOffsetLag(partitionId));
_serverMetrics.setOrUpdatePartitionTopicGauge(_metricName, partitionId, topicName,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG, () -> getPartitionIngestionOffsetLag(partitionId));

_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
_serverMetrics.setOrUpdatePartitionTopicGauge(_metricName, partitionId, topicName,
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET, () -> getPartitionIngestionConsumingOffset(partitionId));

_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
_serverMetrics.setOrUpdatePartitionTopicGauge(_metricName, partitionId, topicName,
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, () -> getLatestPartitionOffset(partitionId));
}
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_DELAY_MS,
() -> getPartitionIngestionDelayMs(partitionId));
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
_serverMetrics.setOrUpdatePartitionTopicGauge(_metricName, partitionId, topicName,
ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> getPartitionIngestionDelayMs(partitionId));
_serverMetrics.setOrUpdatePartitionTopicGauge(_metricName, partitionId, topicName,
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, () -> getPartitionEndToEndIngestionDelayMs(partitionId));
_serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
_serverMetrics.setOrUpdatePartitionTopicGauge(_metricName, partitionId, topicName,
ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS, () -> getPartitionIngestionReportingStatus(partitionId));

LOGGER.info("Successfully created ingestion metrics for partition id: {}", partitionId);
}

private void removeMetrics(int partitionId) {
@VisibleForTesting
void removeMetrics(int partitionId) {
int streamConfigIndex = IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
StreamMetadataProvider streamMetadataProvider =
_streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex);
// Remove all metrics associated with this partition
List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigs(_realTimeTableDataManager.getCachedTableConfigAndSchema().getLeft());
String topicName = streamConfigs.get(streamConfigIndex).getTopicName();
StreamMetadataProvider streamMetadataProvider = _streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex);

if (streamMetadataProvider != null && streamMetadataProvider.supportsOffsetLag()) {
_serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
_serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET);
_serverMetrics.removePartitionGauge(_metricName, partitionId,
_serverMetrics.removePartitionTopicGauge(_metricName, partitionId, topicName,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
_serverMetrics.removePartitionTopicGauge(_metricName, partitionId, topicName,
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET);
_serverMetrics.removePartitionTopicGauge(_metricName, partitionId, topicName,
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
}
_serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_DELAY_MS);
_serverMetrics.removePartitionGauge(_metricName, partitionId,
_serverMetrics.removePartitionTopicGauge(_metricName, partitionId, topicName,
ServerGauge.REALTIME_INGESTION_DELAY_MS);
_serverMetrics.removePartitionTopicGauge(_metricName, partitionId, topicName,
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);
_serverMetrics.removePartitionGauge(_metricName, partitionId,
_serverMetrics.removePartitionTopicGauge(_metricName, partitionId, topicName,
ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS);

LOGGER.info("Successfully removed ingestion metrics for partition id: {}", partitionId);
Expand Down Expand Up @@ -503,26 +512,26 @@ public void timeoutInactivePartitions() {
// involves network traffic and may be inefficient.
List<Integer> partitionsToVerify = getPartitionsToBeVerified();
if (partitionsToVerify.isEmpty()) {
// Don't make the call to getHostedPartitionsGroupIds() as it involves checking ideal state.
// Don't make the call to getHostedConsumingPartitionsGroupIds() as it involves checking ideal state.
return;
}
Set<Integer> partitionsHostedByThisServer;
Set<Integer> consumingPartitionsHostedByThisServer;
try {
partitionsHostedByThisServer = _realTimeTableDataManager.getHostedPartitionsGroupIds();
consumingPartitionsHostedByThisServer = _realTimeTableDataManager.getHostedConsumingPartitionsGroupIds();
} catch (Exception e) {
LOGGER.error("Failed to get partitions hosted by this server, table={}, exception={}:{}", _tableNameWithType,
e.getClass(), e.getMessage());
return;
}
for (int partitionId : partitionsToVerify) {
if (!partitionsHostedByThisServer.contains(partitionId)) {
// Partition is not hosted in this server anymore, stop tracking it
if (!consumingPartitionsHostedByThisServer.contains(partitionId)) {
// Partition is not hosted in this server anymore or is not consuming anymore, stop tracking it
removePartitionId(partitionId);
}
}

ConcurrentHashMap<Integer, Boolean> newMap = new ConcurrentHashMap<>();
partitionsHostedByThisServer.forEach(p -> newMap.put(p, true));
consumingPartitionsHostedByThisServer.forEach(p -> newMap.put(p, true));
_partitionsHostedByThisServer = newMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ public StreamMetadataProvider getStreamMetadataProvider(RealtimeSegmentDataManag
* Returns all partitionGroupIds for the partitions hosted by this server for current table.
* @apiNote this involves Zookeeper read and should not be used frequently due to efficiency concerns.
*/
public Set<Integer> getHostedPartitionsGroupIds() {
public Set<Integer> getHostedConsumingPartitionsGroupIds() {
Set<Integer> partitionsHostedByThisServer = new HashSet<>();
List<String> segments = TableStateUtils.getSegmentsInGivenStateForThisInstance(_helixManager, _tableNameWithType,
CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@
import org.testng.annotations.Test;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;


public class IngestionDelayTrackerTest {
Expand Down Expand Up @@ -572,4 +575,74 @@ private void assertIncreasing(Map<String, List<Long>> metrics, String key) {
List<Long> values = metrics.get(key);
Assert.assertTrue(values.get(values.size() - 1) > values.get(0), key + " not increasing");
}

@Test
public void testCreateMetricsUsesTopicName() {
// The stream config in REALTIME_TABLE_DATA_MANAGER uses topic name "test"
String expectedTopicName = "test";
int partitionId = 0;

// Use a fresh ServerMetrics mock so we can verify calls on it
ServerMetrics serverMetricsMock = mock(ServerMetrics.class);
IngestionDelayTracker tracker =
new IngestionDelayTracker(serverMetricsMock, REALTIME_TABLE_NAME, REALTIME_TABLE_DATA_MANAGER, () -> true);
tracker.createMetrics(partitionId);

// Verify that all ingestion gauges include the topic name
verify(serverMetricsMock).setOrUpdatePartitionTopicGauge(
eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName),
eq(ServerGauge.REALTIME_INGESTION_DELAY_MS), any());
verify(serverMetricsMock).setOrUpdatePartitionTopicGauge(
eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName),
eq(ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS), any());
verify(serverMetricsMock).setOrUpdatePartitionTopicGauge(
eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName),
eq(ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS), any());
// FakeStreamMetadataProvider.supportsOffsetLag() returns true, so these should also be called
verify(serverMetricsMock).setOrUpdatePartitionTopicGauge(
eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName),
eq(ServerGauge.REALTIME_INGESTION_OFFSET_LAG), any());
verify(serverMetricsMock).setOrUpdatePartitionTopicGauge(
eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName),
eq(ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET), any());
verify(serverMetricsMock).setOrUpdatePartitionTopicGauge(
eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName),
eq(ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET), any());

tracker.shutdown();
}

@Test
public void testRemoveMetricsUsesTopicName() {
String expectedTopicName = "test";
int partitionId = 0;

ServerMetrics serverMetricsMock = mock(ServerMetrics.class);
IngestionDelayTracker tracker =
new IngestionDelayTracker(serverMetricsMock, REALTIME_TABLE_NAME, REALTIME_TABLE_DATA_MANAGER, () -> true);
// createMetrics must be called first so the stream metadata provider is set up
tracker.createMetrics(partitionId);
tracker.removeMetrics(partitionId);

verify(serverMetricsMock).removePartitionTopicGauge(
eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName),
eq(ServerGauge.REALTIME_INGESTION_DELAY_MS));
verify(serverMetricsMock).removePartitionTopicGauge(
eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName),
eq(ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS));
verify(serverMetricsMock).removePartitionTopicGauge(
eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName),
eq(ServerGauge.REALTIME_INGESTION_DELAY_REPORTING_STATUS));
verify(serverMetricsMock).removePartitionTopicGauge(
eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName),
eq(ServerGauge.REALTIME_INGESTION_OFFSET_LAG));
verify(serverMetricsMock).removePartitionTopicGauge(
eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName),
eq(ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET));
verify(serverMetricsMock).removePartitionTopicGauge(
eq(REALTIME_TABLE_NAME), eq(partitionId), eq(expectedTopicName),
eq(ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET));

tracker.shutdown();
}
}
Loading