Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ For notes on migrating to 2.x / 0.200.x see [the upgrade guide](doc/upgrade-to-2

### :rocket: Features

* feat(sdk-metrics): add maxExportBatchSize option to PeriodicExportingMetricReader [#6655](https://github.com/open-telemetry/opentelemetry-js/pull/6655) @psx95

### :bug: Bug Fixes

### :books: Documentation
Expand Down
92 changes: 92 additions & 0 deletions packages/sdk-metrics/src/export/MetricDataSplitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import type { DataPoint, ResourceMetrics, ScopeMetrics } from './MetricData';

/**
* Splits a ResourceMetrics object into smaller ResourceMetrics objects
* such that no batch exceeds maxExportBatchSize data points.
* @param resourceMetrics The metrics to split.
* @param maxExportBatchSize The maximum number of data points per batch.
* @internal
*/
export function splitMetricData(
resourceMetrics: ResourceMetrics,
maxExportBatchSize: number
): ResourceMetrics[] {
if (!Number.isInteger(maxExportBatchSize) || maxExportBatchSize <= 0) {
throw new Error('maxExportBatchSize must be a positive integer');
}
const batches: ResourceMetrics[] = [];
let currentBatchPoints = 0;
let currentScopeMetrics: ScopeMetrics[] = [];

function flush() {
if (currentScopeMetrics.length > 0) {
batches.push({
resource: resourceMetrics.resource,
scopeMetrics: currentScopeMetrics,
});
currentScopeMetrics = [];
currentBatchPoints = 0;
}
}

// Iterate through all scopes in the input metrics
for (const scopeMetric of resourceMetrics.scopeMetrics) {
let scopeMetricCopy: ScopeMetrics | null = null;

// Iterate through all metrics within the current scope
for (const metric of scopeMetric.metrics) {
let dataPointsRemaining = metric.dataPoints;
let metricCopy: typeof metric | undefined = undefined;

// If a metric has no data points, add it directly to the current batch
if (dataPointsRemaining.length === 0) {
if (!scopeMetricCopy) {
scopeMetricCopy = { scope: scopeMetric.scope, metrics: [] };
currentScopeMetrics.push(scopeMetricCopy);
}
scopeMetricCopy.metrics.push(metric);
continue;
}

// Chunk the data points of the current metric across batches
while (dataPointsRemaining.length > 0) {
const spaceLeft = maxExportBatchSize - currentBatchPoints;
const take = Math.min(spaceLeft, dataPointsRemaining.length);
const chunk = dataPointsRemaining.slice(0, take);
dataPointsRemaining = dataPointsRemaining.slice(take);

// Ensure we have a ScopeMetrics object in the current batch
if (!scopeMetricCopy) {
scopeMetricCopy = { scope: scopeMetric.scope, metrics: [] };
currentScopeMetrics.push(scopeMetricCopy);
metricCopy = undefined; // Reset because we are starting a new batch
}

// Ensure we have a MetricData object for this specific metric in the current batch.
if (!metricCopy) {
metricCopy = { ...metric, dataPoints: [] };
scopeMetricCopy.metrics.push(metricCopy);
}

(metricCopy.dataPoints as DataPoint<unknown>[]).push(
...(chunk as DataPoint<unknown>[])
);
currentBatchPoints += take;

// If the current batch is full, flush it and start a new one
if (currentBatchPoints === maxExportBatchSize) {
flush();
scopeMetricCopy = null; // Force recreation of scope copy in the next batch
}
}
}
}

flush();
return batches;
}
84 changes: 70 additions & 14 deletions packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import type { PushMetricExporter } from './MetricExporter';
import { callWithTimeout, TimeoutError } from '../utils';
import type { MetricProducer } from './MetricProducer';
import { InstrumentType } from './MetricData';
import { splitMetricData } from './MetricDataSplitter';

export type PeriodicExportingMetricReaderOptions = {
/**
Expand Down Expand Up @@ -50,6 +51,11 @@ export type PeriodicExportingMetricReaderOptions = {
observableUpDownCounter?: number;
default?: number;
};
/**
* The maximum batch size for exports. If configured, the reader will split
* batches larger than this size into smaller batches.
*/
maxExportBatchSize?: number;
};

/**
Expand All @@ -61,13 +67,16 @@ export class PeriodicExportingMetricReader extends MetricReader {
private _exporter: PushMetricExporter;
private readonly _exportInterval: number;
private readonly _exportTimeout: number;
private readonly _maxExportBatchSize?: number;
private _ongoingExportPromise: Promise<void> | null = null;

constructor(options: PeriodicExportingMetricReaderOptions) {
const {
exporter,
exportIntervalMillis = 60000,
metricProducers,
cardinalityLimits,
maxExportBatchSize,
} = options;
let { exportTimeoutMillis = 30000 } = options;

Expand Down Expand Up @@ -111,6 +120,13 @@ export class PeriodicExportingMetricReader extends MetricReader {
throw Error('exportTimeoutMillis must be greater than 0');
}

if (
maxExportBatchSize !== undefined &&
(!Number.isInteger(maxExportBatchSize) || maxExportBatchSize <= 0)
) {
throw Error('maxExportBatchSize must be a positive integer');
}

if (exportIntervalMillis < exportTimeoutMillis) {
if (
'exportIntervalMillis' in options &&
Expand All @@ -132,25 +148,25 @@ export class PeriodicExportingMetricReader extends MetricReader {
this._exportInterval = exportIntervalMillis;
this._exportTimeout = exportTimeoutMillis;
this._exporter = exporter;
this._maxExportBatchSize = maxExportBatchSize;
}

private async _runOnce(): Promise<void> {
try {
await callWithTimeout(this._doRun(), this._exportTimeout);
await this._doRun();
} catch (err) {
if (err instanceof TimeoutError) {
api.diag.error(
'Export took longer than %s milliseconds and timed out.',
this._exportTimeout
);
return;
}

globalErrorHandler(err);
}
}

private async _doRun(): Promise<void> {
if (this._ongoingExportPromise) {
api.diag.debug(
'PeriodicExportingMetricReader: export already in progress, skipping'
);
return;
}

const { resourceMetrics, errors } = await this.collect({
timeoutMillis: this._exportTimeout,
});
Expand All @@ -175,11 +191,51 @@ export class PeriodicExportingMetricReader extends MetricReader {
return;
}

const result = await internal._export(this._exporter, resourceMetrics);
if (result.code !== ExportResultCode.SUCCESS) {
throw new Error(
`PeriodicExportingMetricReader: metrics export failed (error ${result.error})`
);
const batches = this._maxExportBatchSize
? splitMetricData(resourceMetrics, this._maxExportBatchSize)
: [resourceMetrics];
Comment thread
psx95 marked this conversation as resolved.

const currentExport = async () => {
let anyErr: Error | null = null;
for (const batch of batches) {
try {
const result = await callWithTimeout(
Comment on lines +200 to +202
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the spec say if the batches must run serially in order or are they allowed to execute the batches in parallel?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The related part of the spec states:

The reader MUST ensure all metric data points from a single Collect() are provided to Export before metric data points from a subsequent Collect() so that metric points are sent in-order.

So the points need to be sent in-order, would sending batches out-of-order (in case of parallel export) cause the points to be sent out-of-order?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that's right. It makes sense but it's unfortunate because a single collect call would only ever return one point per timeseries, so you know there is no risk of out-of-order points.

internal._export(this._exporter, batch),
this._exportTimeout
);
if (result.code !== ExportResultCode.SUCCESS) {
const err = new Error(
`PeriodicExportingMetricReader: metrics export failed (error ${result.error})`
);
api.diag.error(err.message);
anyErr = err;
}
} catch (e) {
if (e instanceof TimeoutError) {
// We do not report TimeoutError to the globalErrorHandler in _runOnce().
api.diag.error(
`PeriodicExportingMetricReader: metrics export timed out after ${this._exportTimeout}ms`
);
break;
} else {
Comment on lines +200 to +220
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, I missed the documentation in callWithTimeout that states:

NOTE: this operation will continue even after it throws a {@link TimeoutError}.

If this is the case, I can modify the loop in _doRun to stop processing further batches if a TimeoutError occurs. This prevents starting new exports while the previous one might still be active in the background.

Important

Breaking the loop on timeout means that if one batch fails due to timeout, subsequent batches in the same export cycle will be skipped. This prioritizes safety (no concurrent exports) over data delivery (trying to send remaining batches)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did the existing code already have this issue if there was a previous export still running?

Copy link
Copy Markdown
Author

@psx95 psx95 May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing code also had the same issue (concurrent exports).

I don't see any check before invoking _runOnce() that would verify if the operation in the runOnce function (i.e., export) is still working.

This means that _runOnce() could technically be invoked again while the underlying export operation from previous await callWithTimeout(this._doRun(), this._exportTimeout) is still being run.

api.diag.error(
'PeriodicExportingMetricReader: metrics export threw error',
e
);
anyErr = e instanceof Error ? e : new Error(String(e));
}
}
}
if (anyErr) {
throw anyErr;
}
};

this._ongoingExportPromise = currentExport();
try {
await this._ongoingExportPromise;
} finally {
this._ongoingExportPromise = null;
}
}

Expand Down
Loading
Loading