From 8979c035224e3951720de82e9a238a5bfccf50dc Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 29 Apr 2026 18:45:22 +0000 Subject: [PATCH 01/17] Add initial implementation for metrics batching --- .../export/PeriodicExportingMetricReader.ts | 116 +++++++++- .../PeriodicExportingMetricReader.test.ts | 204 ++++++++++++++++++ 2 files changed, 313 insertions(+), 7 deletions(-) diff --git a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts index df489066883..e0a04cd447c 100644 --- a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts +++ b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts @@ -13,7 +13,7 @@ import { MetricReader } from './MetricReader'; import type { PushMetricExporter } from './MetricExporter'; import { callWithTimeout, TimeoutError } from '../utils'; import type { MetricProducer } from './MetricProducer'; -import { InstrumentType } from './MetricData'; +import { InstrumentType, ResourceMetrics, ScopeMetrics } from './MetricData'; export type PeriodicExportingMetricReaderOptions = { /** @@ -50,6 +50,11 @@ export type PeriodicExportingMetricReaderOptions = { observableUpDownCounter?: number; default?: number; }; + /** + * The maximum batch size for exports. If configured, the reader will split + * batches larger than this size into smaller batches. + */ + maxExportBatchSize?: number; }; /** @@ -61,6 +66,8 @@ export class PeriodicExportingMetricReader extends MetricReader { private _exporter: PushMetricExporter; private readonly _exportInterval: number; private readonly _exportTimeout: number; + private readonly _maxExportBatchSize?: number; + private _previousExportPromise: Promise = Promise.resolve(); constructor(options: PeriodicExportingMetricReaderOptions) { const { @@ -68,6 +75,7 @@ export class PeriodicExportingMetricReader extends MetricReader { exportIntervalMillis = 60000, metricProducers, cardinalityLimits, + maxExportBatchSize, } = options; let { exportTimeoutMillis = 30000 } = options; @@ -132,6 +140,81 @@ export class PeriodicExportingMetricReader extends MetricReader { this._exportInterval = exportIntervalMillis; this._exportTimeout = exportTimeoutMillis; this._exporter = exporter; + this._maxExportBatchSize = maxExportBatchSize; + } + + private _splitResourceMetrics( + resourceMetrics: ResourceMetrics, + maxExportBatchSize: number + ): ResourceMetrics[] { + const batches: ResourceMetrics[] = []; + let currentBatchPoints = 0; + let currentScopeMetrics: ScopeMetrics[] = []; + + function flush() { + if (currentScopeMetrics.length > 0) { + batches.push({ + resource: resourceMetrics.resource, + scopeMetrics: currentScopeMetrics, + }); + currentScopeMetrics = []; + currentBatchPoints = 0; + } + } + + for (const scopeMetric of resourceMetrics.scopeMetrics) { + let scopeMetricCopy: ScopeMetrics | null = null; + + for (const metric of scopeMetric.metrics) { + let dataPointsRemaining = metric.dataPoints; + + if (dataPointsRemaining.length === 0) { + if (!scopeMetricCopy) { + scopeMetricCopy = { scope: scopeMetric.scope, metrics: [] }; + currentScopeMetrics.push(scopeMetricCopy); + } + scopeMetricCopy.metrics.push(metric); + continue; + } + + while (dataPointsRemaining.length > 0) { + const spaceLeft = maxExportBatchSize - currentBatchPoints; + if (spaceLeft === 0) { + flush(); + scopeMetricCopy = null; + continue; + } + + const take = Math.min(spaceLeft, dataPointsRemaining.length); + const chunk = dataPointsRemaining.slice(0, take); + dataPointsRemaining = dataPointsRemaining.slice(take); + + if (!scopeMetricCopy) { + scopeMetricCopy = { scope: scopeMetric.scope, metrics: [] }; + currentScopeMetrics.push(scopeMetricCopy); + } + + let metricCopy = scopeMetricCopy.metrics.find( + m => m.descriptor.name === metric.descriptor.name + ); + if (!metricCopy) { + metricCopy = { ...metric, dataPoints: [] }; + scopeMetricCopy.metrics.push(metricCopy); + } + + (metricCopy.dataPoints as any[]).push(...chunk); + currentBatchPoints += take; + + if (currentBatchPoints === maxExportBatchSize) { + flush(); + scopeMetricCopy = null; + } + } + } + } + + flush(); + return batches; } private async _runOnce(): Promise { @@ -175,12 +258,31 @@ export class PeriodicExportingMetricReader extends MetricReader { return; } - const result = await internal._export(this._exporter, resourceMetrics); - if (result.code !== ExportResultCode.SUCCESS) { - throw new Error( - `PeriodicExportingMetricReader: metrics export failed (error ${result.error})` - ); - } + const batches = this._maxExportBatchSize + ? this._splitResourceMetrics(resourceMetrics, this._maxExportBatchSize) + : [resourceMetrics]; + + const currentExport = async () => { + let anyErr: Error | null = null; + for (const batch of batches) { + const result = await internal._export(this._exporter, batch); + if (result.code !== ExportResultCode.SUCCESS) { + const err = new Error( + `PeriodicExportingMetricReader: metrics export failed (error ${result.error})` + ); + api.diag.error(err.message); + anyErr = err; + } + } + if (anyErr) { + throw anyErr; + } + }; + + // Schedules the current export to run after all previously scheduled exports have finished. + const promise = this._previousExportPromise.then(currentExport); + this._previousExportPromise = promise.catch(() => { }); + await promise; } protected override onInitialized(): void { diff --git a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts index 35e19b0ece1..2c4d95d5b77 100644 --- a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts +++ b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts @@ -42,6 +42,8 @@ class TestMetricExporter implements PushMetricExporter { public throwExport = false; public throwFlush = false; public rejectExport = false; + public concurrentCalls = 0; + public maxConcurrentCalls = 0; private _batches: ResourceMetrics[] = []; private _shutdown: boolean = false; @@ -50,11 +52,15 @@ class TestMetricExporter implements PushMetricExporter { resultCallback: (result: ExportResult) => void ): void { this._batches.push(metrics); + this.concurrentCalls++; + this.maxConcurrentCalls = Math.max(this.maxConcurrentCalls, this.concurrentCalls); if (this.throwExport) { + this.concurrentCalls--; throw new Error('Error during export'); } setTimeout(() => { + this.concurrentCalls--; if (this.rejectExport) { resultCallback({ code: ExportResultCode.FAILED, @@ -782,5 +788,203 @@ describe('PeriodicExportingMetricReader', () => { await assert.rejects(() => reader.shutdown(), /Error during forceFlush/); }); + + describe('maxExportBatchSize', () => { + it('should split batches when exceeding maxExportBatchSize', async () => { + const exporter = new TestMetricExporter(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + maxExportBatchSize: 2, + }); + + const resourceMetrics: ResourceMetrics = { + resource: { attributes: {}, merge: sinon.stub(), getRawAttributes: () => [] } as any, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + ], + descriptor: { name: 'm1', description: '', unit: '', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + reader.setMetricProducer( + new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }) + ); + + await reader.forceFlush(); + + const exports = exporter.getExports(); + assert.strictEqual(exports.length, 2); + + // First batch should have 2 data points + assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints.length, 2); + assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints[0].value, 1); + assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints[1].value, 2); + + // Second batch should have 1 data point + assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); + assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].dataPoints[0].value, 3); + + await reader.shutdown(); + }); + + it('should synchronize concurrent exports', async () => { + const exporter = new TestMetricExporter(); + exporter.exportTime = 50; // Make export take some time + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + }); + + const resourceMetrics: ResourceMetrics = { + resource: { attributes: {}, merge: sinon.stub(), getRawAttributes: () => [] } as any, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [{ startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }], + descriptor: { name: 'm1', description: '', unit: '', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + const producer = new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }); + reader.setMetricProducer(producer); + + // Trigger two exports quickly + const p1 = reader.forceFlush(); + const p2 = reader.forceFlush(); + + await Promise.all([p1, p2]); + + const exports = exporter.getExports(); + assert.strictEqual(exports.length, 2); + + // Assert that they didn't overlap + assert.strictEqual(exporter.maxConcurrentCalls, 1); + + await reader.shutdown(); + }); + + it('should split data points across metrics if needed', async () => { + const exporter = new TestMetricExporter(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + maxExportBatchSize: 2, + }); + + const resourceMetrics: ResourceMetrics = { + resource: { attributes: {}, merge: sinon.stub(), getRawAttributes: () => [] } as any, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, + ], + descriptor: { name: 'm1', description: '', unit: '', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + ], + descriptor: { name: 'm2', description: '', unit: '', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + reader.setMetricProducer( + new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }) + ); + + await reader.forceFlush(); + + const exports = exporter.getExports(); + assert.strictEqual(exports.length, 2); + + // First batch should have 2 data points (m1:1, m2:2) + assert.strictEqual(exports[0].scopeMetrics[0].metrics.length, 2); + assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints.length, 1); + assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints[0].value, 1); + assert.strictEqual(exports[0].scopeMetrics[0].metrics[1].dataPoints.length, 1); + assert.strictEqual(exports[0].scopeMetrics[0].metrics[1].dataPoints[0].value, 2); + + // Second batch should have 1 data point (m2:3) + assert.strictEqual(exports[1].scopeMetrics[0].metrics.length, 1); + assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].descriptor.name, 'm2'); + assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); + assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].dataPoints[0].value, 3); + + await reader.shutdown(); + }); + + it('should continue exporting remaining batches if one fails', async () => { + const exporter = new TestMetricExporter(); + exporter.rejectExport = true; // Fail all exports + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + maxExportBatchSize: 1, + }); + + const resourceMetrics: ResourceMetrics = { + resource: { attributes: {}, merge: sinon.stub(), getRawAttributes: () => [] } as any, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, + ], + descriptor: { name: 'm1', description: '', unit: '', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + reader.setMetricProducer( + new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }) + ); + + // Call forceFlush to trigger the export + await reader.forceFlush(); + + const exports = exporter.getExports(); + assert.strictEqual(exports.length, 2); // Both batches should have been attempted + + await reader.shutdown(); + }); + }); }); }); From bbbd01d80d9e6b1f30d78ee5646ad6286f6f9d03 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 29 Apr 2026 19:29:52 +0000 Subject: [PATCH 02/17] Move metric splitting functionality to different class --- .../src/export/MetricDataSplitter.ts | 91 ++++++ .../export/PeriodicExportingMetricReader.ts | 79 +---- .../test/export/MetricDataSplitter.test.ts | 292 ++++++++++++++++++ .../PeriodicExportingMetricReader.test.ts | 109 ------- 4 files changed, 386 insertions(+), 185 deletions(-) create mode 100644 packages/sdk-metrics/src/export/MetricDataSplitter.ts create mode 100644 packages/sdk-metrics/test/export/MetricDataSplitter.test.ts diff --git a/packages/sdk-metrics/src/export/MetricDataSplitter.ts b/packages/sdk-metrics/src/export/MetricDataSplitter.ts new file mode 100644 index 00000000000..d094064c67d --- /dev/null +++ b/packages/sdk-metrics/src/export/MetricDataSplitter.ts @@ -0,0 +1,91 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { ResourceMetrics, ScopeMetrics } from './MetricData'; + +export class MetricDataSplitter { + /** + * Splits a ResourceMetrics object into smaller ResourceMetrics objects + * such that no batch exceeds maxExportBatchSize data points. + * @param resourceMetrics The metrics to split. + * @param maxExportBatchSize The maximum number of data points per batch. + */ + static split( + resourceMetrics: ResourceMetrics, + maxExportBatchSize: number + ): ResourceMetrics[] { + if (maxExportBatchSize <= 0) { + throw new Error('maxExportBatchSize must be greater than 0'); + } + const batches: ResourceMetrics[] = []; + let currentBatchPoints = 0; + let currentScopeMetrics: ScopeMetrics[] = []; + + function flush() { + if (currentScopeMetrics.length > 0) { + batches.push({ + resource: resourceMetrics.resource, + scopeMetrics: currentScopeMetrics, + }); + currentScopeMetrics = []; + currentBatchPoints = 0; + } + } + + for (const scopeMetric of resourceMetrics.scopeMetrics) { + let scopeMetricCopy: ScopeMetrics | null = null; + + for (const metric of scopeMetric.metrics) { + let dataPointsRemaining = metric.dataPoints; + + if (dataPointsRemaining.length === 0) { + if (!scopeMetricCopy) { + scopeMetricCopy = { scope: scopeMetric.scope, metrics: [] }; + currentScopeMetrics.push(scopeMetricCopy); + } + scopeMetricCopy.metrics.push(metric); + continue; + } + + while (dataPointsRemaining.length > 0) { + const spaceLeft = maxExportBatchSize - currentBatchPoints; + if (spaceLeft === 0) { + flush(); + scopeMetricCopy = null; + continue; + } + + const take = Math.min(spaceLeft, dataPointsRemaining.length); + const chunk = dataPointsRemaining.slice(0, take); + dataPointsRemaining = dataPointsRemaining.slice(take); + + if (!scopeMetricCopy) { + scopeMetricCopy = { scope: scopeMetric.scope, metrics: [] }; + currentScopeMetrics.push(scopeMetricCopy); + } + + let metricCopy = scopeMetricCopy.metrics.find( + m => m.descriptor.name === metric.descriptor.name + ); + if (!metricCopy) { + metricCopy = { ...metric, dataPoints: [] }; + scopeMetricCopy.metrics.push(metricCopy); + } + + (metricCopy.dataPoints as any[]).push(...chunk); + currentBatchPoints += take; + + if (currentBatchPoints === maxExportBatchSize) { + flush(); + scopeMetricCopy = null; + } + } + } + } + + flush(); + return batches; + } +} diff --git a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts index e0a04cd447c..3d096d84f92 100644 --- a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts +++ b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts @@ -13,7 +13,8 @@ import { MetricReader } from './MetricReader'; import type { PushMetricExporter } from './MetricExporter'; import { callWithTimeout, TimeoutError } from '../utils'; import type { MetricProducer } from './MetricProducer'; -import { InstrumentType, ResourceMetrics, ScopeMetrics } from './MetricData'; +import { InstrumentType } from './MetricData'; +import { MetricDataSplitter } from './MetricDataSplitter'; export type PeriodicExportingMetricReaderOptions = { /** @@ -143,80 +144,6 @@ export class PeriodicExportingMetricReader extends MetricReader { this._maxExportBatchSize = maxExportBatchSize; } - private _splitResourceMetrics( - resourceMetrics: ResourceMetrics, - maxExportBatchSize: number - ): ResourceMetrics[] { - const batches: ResourceMetrics[] = []; - let currentBatchPoints = 0; - let currentScopeMetrics: ScopeMetrics[] = []; - - function flush() { - if (currentScopeMetrics.length > 0) { - batches.push({ - resource: resourceMetrics.resource, - scopeMetrics: currentScopeMetrics, - }); - currentScopeMetrics = []; - currentBatchPoints = 0; - } - } - - for (const scopeMetric of resourceMetrics.scopeMetrics) { - let scopeMetricCopy: ScopeMetrics | null = null; - - for (const metric of scopeMetric.metrics) { - let dataPointsRemaining = metric.dataPoints; - - if (dataPointsRemaining.length === 0) { - if (!scopeMetricCopy) { - scopeMetricCopy = { scope: scopeMetric.scope, metrics: [] }; - currentScopeMetrics.push(scopeMetricCopy); - } - scopeMetricCopy.metrics.push(metric); - continue; - } - - while (dataPointsRemaining.length > 0) { - const spaceLeft = maxExportBatchSize - currentBatchPoints; - if (spaceLeft === 0) { - flush(); - scopeMetricCopy = null; - continue; - } - - const take = Math.min(spaceLeft, dataPointsRemaining.length); - const chunk = dataPointsRemaining.slice(0, take); - dataPointsRemaining = dataPointsRemaining.slice(take); - - if (!scopeMetricCopy) { - scopeMetricCopy = { scope: scopeMetric.scope, metrics: [] }; - currentScopeMetrics.push(scopeMetricCopy); - } - - let metricCopy = scopeMetricCopy.metrics.find( - m => m.descriptor.name === metric.descriptor.name - ); - if (!metricCopy) { - metricCopy = { ...metric, dataPoints: [] }; - scopeMetricCopy.metrics.push(metricCopy); - } - - (metricCopy.dataPoints as any[]).push(...chunk); - currentBatchPoints += take; - - if (currentBatchPoints === maxExportBatchSize) { - flush(); - scopeMetricCopy = null; - } - } - } - } - - flush(); - return batches; - } - private async _runOnce(): Promise { try { await callWithTimeout(this._doRun(), this._exportTimeout); @@ -259,7 +186,7 @@ export class PeriodicExportingMetricReader extends MetricReader { } const batches = this._maxExportBatchSize - ? this._splitResourceMetrics(resourceMetrics, this._maxExportBatchSize) + ? MetricDataSplitter.split(resourceMetrics, this._maxExportBatchSize) : [resourceMetrics]; const currentExport = async () => { diff --git a/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts b/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts new file mode 100644 index 00000000000..4ed4761bb24 --- /dev/null +++ b/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts @@ -0,0 +1,292 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import { MetricDataSplitter } from '../../src/export/MetricDataSplitter'; +import { DataPointType, ResourceMetrics } from '../../src/export/MetricData'; +import { AggregationTemporality } from '../../src/export/AggregationTemporality'; +import { ValueType } from '@opentelemetry/api'; +import * as assert from 'assert'; + +describe('MetricDataSplitter', () => { + const dummyResource = { attributes: {} } as any; + + it('should split batches when exceeding maxExportBatchSize', () => { + const resourceMetrics: ResourceMetrics = { + resource: dummyResource, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + ], + descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + const batches = MetricDataSplitter.split(resourceMetrics, 2); + + assert.strictEqual(batches.length, 2); + + // First batch should have 2 data points + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 2); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints[0].value, 1); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints[1].value, 2); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.description, 'desc1'); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.unit, 'unit1'); + + // Second batch should have 1 data point + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints[0].value, 3); + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.description, 'desc1'); + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.unit, 'unit1'); + }); + + it('should split data points across metrics if needed', () => { + const resourceMetrics: ResourceMetrics = { + resource: dummyResource, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, + ], + descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + ], + descriptor: { name: 'm2', description: 'desc2', unit: 'unit2', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + const batches = MetricDataSplitter.split(resourceMetrics, 2); + + assert.strictEqual(batches.length, 2); + + // First batch should have 2 data points (m1:1, m2:2) + assert.strictEqual(batches[0].scopeMetrics[0].metrics.length, 2); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 1); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints[0].value, 1); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); + + assert.strictEqual(batches[0].scopeMetrics[0].metrics[1].dataPoints.length, 1); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[1].dataPoints[0].value, 2); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[1].descriptor.name, 'm2'); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[1].descriptor.description, 'desc2'); + + // Second batch should have 1 data point (m2:3) + assert.strictEqual(batches[1].scopeMetrics[0].metrics.length, 1); + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.name, 'm2'); + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.description, 'desc2'); + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints[0].value, 3); + }); + + it('should handle empty data points', () => { + const resourceMetrics: ResourceMetrics = { + resource: dummyResource, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [], + descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + const batches = MetricDataSplitter.split(resourceMetrics, 2); + + assert.strictEqual(batches.length, 1); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 0); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); + }); + + it('should split across multiple scopes and fill batches', () => { + const resourceMetrics: ResourceMetrics = { + resource: dummyResource, + scopeMetrics: [ + { + scope: { name: 'scope1' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, + ], + descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + ], + descriptor: { name: 'm2', description: 'desc2', unit: 'unit2', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + { + scope: { name: 'scope2' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 4 }, + ], + descriptor: { name: 'm3', description: 'desc3', unit: 'unit3', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + const batches = MetricDataSplitter.split(resourceMetrics, 2); + + assert.strictEqual(batches.length, 2); + + // Batch 1: scope1, m1 (2 points) + assert.strictEqual(batches[0].scopeMetrics.length, 1); + assert.strictEqual(batches[0].scopeMetrics[0].scope.name, 'scope1'); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 2); + + // Batch 2: scope1, m2 (1 point) AND scope2, m3 (1 point) + assert.strictEqual(batches[1].scopeMetrics.length, 2); + assert.strictEqual(batches[1].scopeMetrics[0].scope.name, 'scope1'); + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.name, 'm2'); + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.description, 'desc2'); + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); + + assert.strictEqual(batches[1].scopeMetrics[1].scope.name, 'scope2'); + assert.strictEqual(batches[1].scopeMetrics[1].metrics[0].descriptor.name, 'm3'); + assert.strictEqual(batches[1].scopeMetrics[1].metrics[0].descriptor.unit, 'unit3'); + assert.strictEqual(batches[1].scopeMetrics[1].metrics[0].dataPoints.length, 1); + }); + + it('should split a single metric across multiple batches', () => { + const resourceMetrics: ResourceMetrics = { + resource: dummyResource, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 4 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 5 }, + ], + descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + const batches = MetricDataSplitter.split(resourceMetrics, 2); + + assert.strictEqual(batches.length, 3); + + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 2); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); + + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints.length, 2); + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); + + assert.strictEqual(batches[2].scopeMetrics[0].metrics[0].dataPoints.length, 1); + assert.strictEqual(batches[2].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); + }); + + it('should handle partly filled batches with multiple scopes', () => { + const resourceMetrics: ResourceMetrics = { + resource: dummyResource, + scopeMetrics: [ + { + scope: { name: 'scope1' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [{ startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }], + descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + { + scope: { name: 'scope2' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [{ startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }], + descriptor: { name: 'm2', description: 'desc2', unit: 'unit2', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + const batches = MetricDataSplitter.split(resourceMetrics, 3); + + assert.strictEqual(batches.length, 1); + assert.strictEqual(batches[0].scopeMetrics.length, 2); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 1); + assert.strictEqual(batches[0].scopeMetrics[1].metrics[0].descriptor.name, 'm2'); + assert.strictEqual(batches[0].scopeMetrics[1].metrics[0].dataPoints.length, 1); + }); + + it('should throw when maxExportBatchSize is less than or equal to 0', () => { + const resourceMetrics: ResourceMetrics = { + resource: dummyResource, + scopeMetrics: [], + }; + + assert.throws( + () => MetricDataSplitter.split(resourceMetrics, 0), + /maxExportBatchSize must be greater than 0/ + ); + + assert.throws( + () => MetricDataSplitter.split(resourceMetrics, -1), + /maxExportBatchSize must be greater than 0/ + ); + }); +}); diff --git a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts index 2c4d95d5b77..114aa46e24b 100644 --- a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts +++ b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts @@ -790,55 +790,7 @@ describe('PeriodicExportingMetricReader', () => { }); describe('maxExportBatchSize', () => { - it('should split batches when exceeding maxExportBatchSize', async () => { - const exporter = new TestMetricExporter(); - const reader = new PeriodicExportingMetricReader({ - exporter: exporter, - exportIntervalMillis: MAX_32_BIT_INT, - maxExportBatchSize: 2, - }); - - const resourceMetrics: ResourceMetrics = { - resource: { attributes: {}, merge: sinon.stub(), getRawAttributes: () => [] } as any, - scopeMetrics: [ - { - scope: { name: 'test' }, - metrics: [ - { - dataPointType: DataPointType.GAUGE, - dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, - ], - descriptor: { name: 'm1', description: '', unit: '', valueType: ValueType.INT }, - aggregationTemporality: AggregationTemporality.CUMULATIVE, - }, - ], - }, - ], - }; - reader.setMetricProducer( - new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }) - ); - - await reader.forceFlush(); - - const exports = exporter.getExports(); - assert.strictEqual(exports.length, 2); - - // First batch should have 2 data points - assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints.length, 2); - assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints[0].value, 1); - assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints[1].value, 2); - - // Second batch should have 1 data point - assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); - assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].dataPoints[0].value, 3); - - await reader.shutdown(); - }); it('should synchronize concurrent exports', async () => { const exporter = new TestMetricExporter(); @@ -883,67 +835,6 @@ describe('PeriodicExportingMetricReader', () => { await reader.shutdown(); }); - it('should split data points across metrics if needed', async () => { - const exporter = new TestMetricExporter(); - const reader = new PeriodicExportingMetricReader({ - exporter: exporter, - exportIntervalMillis: MAX_32_BIT_INT, - maxExportBatchSize: 2, - }); - - const resourceMetrics: ResourceMetrics = { - resource: { attributes: {}, merge: sinon.stub(), getRawAttributes: () => [] } as any, - scopeMetrics: [ - { - scope: { name: 'test' }, - metrics: [ - { - dataPointType: DataPointType.GAUGE, - dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, - ], - descriptor: { name: 'm1', description: '', unit: '', valueType: ValueType.INT }, - aggregationTemporality: AggregationTemporality.CUMULATIVE, - }, - { - dataPointType: DataPointType.GAUGE, - dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, - ], - descriptor: { name: 'm2', description: '', unit: '', valueType: ValueType.INT }, - aggregationTemporality: AggregationTemporality.CUMULATIVE, - }, - ], - }, - ], - }; - - reader.setMetricProducer( - new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }) - ); - - await reader.forceFlush(); - - const exports = exporter.getExports(); - assert.strictEqual(exports.length, 2); - - // First batch should have 2 data points (m1:1, m2:2) - assert.strictEqual(exports[0].scopeMetrics[0].metrics.length, 2); - assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints.length, 1); - assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints[0].value, 1); - assert.strictEqual(exports[0].scopeMetrics[0].metrics[1].dataPoints.length, 1); - assert.strictEqual(exports[0].scopeMetrics[0].metrics[1].dataPoints[0].value, 2); - - // Second batch should have 1 data point (m2:3) - assert.strictEqual(exports[1].scopeMetrics[0].metrics.length, 1); - assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].descriptor.name, 'm2'); - assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); - assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].dataPoints[0].value, 3); - - await reader.shutdown(); - }); - it('should continue exporting remaining batches if one fails', async () => { const exporter = new TestMetricExporter(); exporter.rejectExport = true; // Fail all exports From ab9f04cddc3ec8b7512ab2f511266e354242c7af Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 29 Apr 2026 19:35:50 +0000 Subject: [PATCH 03/17] Add test cases for other data types for completion --- .../test/export/MetricDataSplitter.test.ts | 225 +++++++++++++++++- 1 file changed, 219 insertions(+), 6 deletions(-) diff --git a/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts b/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts index 4ed4761bb24..39d219b27ed 100644 --- a/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts +++ b/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts @@ -12,7 +12,11 @@ import * as assert from 'assert'; describe('MetricDataSplitter', () => { const dummyResource = { attributes: {} } as any; - it('should split batches when exceeding maxExportBatchSize', () => { + // ========================================================================== + // GAUGE Tests + // ========================================================================== + + it('should split batches when exceeding maxExportBatchSize (GAUGE)', () => { const resourceMetrics: ResourceMetrics = { resource: dummyResource, scopeMetrics: [ @@ -54,7 +58,7 @@ describe('MetricDataSplitter', () => { assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.unit, 'unit1'); }); - it('should split data points across metrics if needed', () => { + it('should split data points across metrics if needed (GAUGE)', () => { const resourceMetrics: ResourceMetrics = { resource: dummyResource, scopeMetrics: [ @@ -106,7 +110,7 @@ describe('MetricDataSplitter', () => { assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints[0].value, 3); }); - it('should handle empty data points', () => { + it('should handle empty data points (GAUGE)', () => { const resourceMetrics: ResourceMetrics = { resource: dummyResource, scopeMetrics: [ @@ -131,7 +135,7 @@ describe('MetricDataSplitter', () => { assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); }); - it('should split across multiple scopes and fill batches', () => { + it('should split across multiple scopes and fill batches (GAUGE)', () => { const resourceMetrics: ResourceMetrics = { resource: dummyResource, scopeMetrics: [ @@ -196,7 +200,7 @@ describe('MetricDataSplitter', () => { assert.strictEqual(batches[1].scopeMetrics[1].metrics[0].dataPoints.length, 1); }); - it('should split a single metric across multiple batches', () => { + it('should split a single metric across multiple batches (GAUGE)', () => { const resourceMetrics: ResourceMetrics = { resource: dummyResource, scopeMetrics: [ @@ -234,7 +238,7 @@ describe('MetricDataSplitter', () => { assert.strictEqual(batches[2].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); }); - it('should handle partly filled batches with multiple scopes', () => { + it('should handle partly filled batches with multiple scopes (GAUGE)', () => { const resourceMetrics: ResourceMetrics = { resource: dummyResource, scopeMetrics: [ @@ -273,6 +277,215 @@ describe('MetricDataSplitter', () => { assert.strictEqual(batches[0].scopeMetrics[1].metrics[0].dataPoints.length, 1); }); + // ========================================================================== + // SUM Tests + // ========================================================================== + + it('should split batches when exceeding maxExportBatchSize (SUM)', () => { + const resourceMetrics: ResourceMetrics = { + resource: dummyResource, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.SUM, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + ], + descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + isMonotonic: true, + }, + ], + }, + ], + }; + + const batches = MetricDataSplitter.split(resourceMetrics, 2); + + assert.strictEqual(batches.length, 2); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 2); + assert.strictEqual((batches[0].scopeMetrics[0].metrics[0] as any).isMonotonic, true); + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); + }); + + it('should split across multiple scopes and fill batches (SUM)', () => { + const resourceMetrics: ResourceMetrics = { + resource: dummyResource, + scopeMetrics: [ + { + scope: { name: 'scope1' }, + metrics: [ + { + dataPointType: DataPointType.SUM, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, + ], + descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + isMonotonic: true, + }, + { + dataPointType: DataPointType.SUM, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + ], + descriptor: { name: 'm2', description: 'desc2', unit: 'unit2', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + isMonotonic: true, + }, + ], + }, + { + scope: { name: 'scope2' }, + metrics: [ + { + dataPointType: DataPointType.SUM, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 4 }, + ], + descriptor: { name: 'm3', description: 'desc3', unit: 'unit3', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + isMonotonic: true, + }, + ], + }, + ], + }; + + const batches = MetricDataSplitter.split(resourceMetrics, 2); + + assert.strictEqual(batches.length, 2); + assert.strictEqual(batches[0].scopeMetrics.length, 1); + assert.strictEqual(batches[1].scopeMetrics.length, 2); + }); + + // ========================================================================== + // HISTOGRAM Tests + // ========================================================================== + + it('should split batches when exceeding maxExportBatchSize (HISTOGRAM)', () => { + const dummyHistogram = { buckets: { boundaries: [1, 2], counts: [1, 1, 1] }, sum: 3, count: 3 }; + const resourceMetrics: ResourceMetrics = { + resource: dummyResource, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.HISTOGRAM, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyHistogram }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyHistogram }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyHistogram }, + ], + descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + const batches = MetricDataSplitter.split(resourceMetrics, 2); + + assert.strictEqual(batches.length, 2); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 2); + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); + }); + + it('should split across multiple scopes and fill batches (HISTOGRAM)', () => { + const dummyHistogram = { buckets: { boundaries: [1, 2], counts: [1, 1, 1] }, sum: 3, count: 3 }; + const resourceMetrics: ResourceMetrics = { + resource: dummyResource, + scopeMetrics: [ + { + scope: { name: 'scope1' }, + metrics: [ + { + dataPointType: DataPointType.HISTOGRAM, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyHistogram }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyHistogram }, + ], + descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + { + dataPointType: DataPointType.HISTOGRAM, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyHistogram }, + ], + descriptor: { name: 'm2', description: 'desc2', unit: 'unit2', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + { + scope: { name: 'scope2' }, + metrics: [ + { + dataPointType: DataPointType.HISTOGRAM, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyHistogram }, + ], + descriptor: { name: 'm3', description: 'desc3', unit: 'unit3', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + const batches = MetricDataSplitter.split(resourceMetrics, 2); + + assert.strictEqual(batches.length, 2); + assert.strictEqual(batches[0].scopeMetrics.length, 1); + assert.strictEqual(batches[1].scopeMetrics.length, 2); + }); + + // ========================================================================== + // EXPONENTIAL_HISTOGRAM Tests + // ========================================================================== + + it('should split batches when exceeding maxExportBatchSize (EXPONENTIAL_HISTOGRAM)', () => { + const dummyExponentialHistogram = { sum: 3, count: 3, scale: 1, zeroCount: 0, positive: { offset: 1, bucketCounts: [1, 1, 1] }, negative: { offset: 1, bucketCounts: [] } }; + const resourceMetrics: ResourceMetrics = { + resource: dummyResource, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.EXPONENTIAL_HISTOGRAM, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyExponentialHistogram }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyExponentialHistogram }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyExponentialHistogram }, + ], + descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + const batches = MetricDataSplitter.split(resourceMetrics, 2); + + assert.strictEqual(batches.length, 2); + assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 2); + assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); + }); + + // ========================================================================== + // Validation Tests + // ========================================================================== + it('should throw when maxExportBatchSize is less than or equal to 0', () => { const resourceMetrics: ResourceMetrics = { resource: dummyResource, From 8069cbaebc17651b7ddaeaa94d13afeb5a45ba33 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 29 Apr 2026 21:31:35 +0000 Subject: [PATCH 04/17] Apply the timeout to individual batches instead of collective export --- .../export/PeriodicExportingMetricReader.ts | 34 +-- .../PeriodicExportingMetricReader.test.ts | 200 ++++++++++++++++++ 2 files changed, 219 insertions(+), 15 deletions(-) diff --git a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts index 3d096d84f92..f5c278cda55 100644 --- a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts +++ b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts @@ -146,16 +146,8 @@ export class PeriodicExportingMetricReader extends MetricReader { private async _runOnce(): Promise { try { - await callWithTimeout(this._doRun(), this._exportTimeout); + await this._doRun(); } catch (err) { - if (err instanceof TimeoutError) { - api.diag.error( - 'Export took longer than %s milliseconds and timed out.', - this._exportTimeout - ); - return; - } - globalErrorHandler(err); } } @@ -192,13 +184,25 @@ export class PeriodicExportingMetricReader extends MetricReader { const currentExport = async () => { let anyErr: Error | null = null; for (const batch of batches) { - const result = await internal._export(this._exporter, batch); - if (result.code !== ExportResultCode.SUCCESS) { - const err = new Error( - `PeriodicExportingMetricReader: metrics export failed (error ${result.error})` + try { + const result = await callWithTimeout( + internal._export(this._exporter, batch), + this._exportTimeout ); - api.diag.error(err.message); - anyErr = err; + if (result.code !== ExportResultCode.SUCCESS) { + const err = new Error( + `PeriodicExportingMetricReader: metrics export failed (error ${result.error})` + ); + api.diag.error(err.message); + anyErr = err; + } + } catch (e) { + if (e instanceof TimeoutError) { + api.diag.error(`PeriodicExportingMetricReader: metrics export timed out after ${this._exportTimeout}ms`); + } else { + api.diag.error('PeriodicExportingMetricReader: metrics export threw error', e); + } + anyErr = e instanceof Error ? e : new Error(String(e)); } } if (anyErr) { diff --git a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts index 114aa46e24b..2470772bf5c 100644 --- a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts +++ b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts @@ -790,7 +790,55 @@ describe('PeriodicExportingMetricReader', () => { }); describe('maxExportBatchSize', () => { + it('should split batches when exceeding maxExportBatchSize', async () => { + const exporter = new TestMetricExporter(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + maxExportBatchSize: 2, + }); + + const resourceMetrics: ResourceMetrics = { + resource: { attributes: {}, merge: sinon.stub(), getRawAttributes: () => [] } as any, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + ], + descriptor: { name: 'm1', description: '', unit: '', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + reader.setMetricProducer( + new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }) + ); + + await reader.forceFlush(); + + const exports = exporter.getExports(); + assert.strictEqual(exports.length, 2); + + // First batch should have 2 data points + assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints.length, 2); + assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints[0].value, 1); + assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints[1].value, 2); + // Second batch should have 1 data point + assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); + assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].dataPoints[0].value, 3); + + await reader.shutdown(); + }); it('should synchronize concurrent exports', async () => { const exporter = new TestMetricExporter(); @@ -835,6 +883,67 @@ describe('PeriodicExportingMetricReader', () => { await reader.shutdown(); }); + it('should split data points across metrics if needed', async () => { + const exporter = new TestMetricExporter(); + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + maxExportBatchSize: 2, + }); + + const resourceMetrics: ResourceMetrics = { + resource: { attributes: {}, merge: sinon.stub(), getRawAttributes: () => [] } as any, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, + ], + descriptor: { name: 'm1', description: '', unit: '', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + ], + descriptor: { name: 'm2', description: '', unit: '', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + reader.setMetricProducer( + new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }) + ); + + await reader.forceFlush(); + + const exports = exporter.getExports(); + assert.strictEqual(exports.length, 2); + + // First batch should have 2 data points (m1:1, m2:2) + assert.strictEqual(exports[0].scopeMetrics[0].metrics.length, 2); + assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints.length, 1); + assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints[0].value, 1); + assert.strictEqual(exports[0].scopeMetrics[0].metrics[1].dataPoints.length, 1); + assert.strictEqual(exports[0].scopeMetrics[0].metrics[1].dataPoints[0].value, 2); + + // Second batch should have 1 data point (m2:3) + assert.strictEqual(exports[1].scopeMetrics[0].metrics.length, 1); + assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].descriptor.name, 'm2'); + assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); + assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].dataPoints[0].value, 3); + + await reader.shutdown(); + }); + it('should continue exporting remaining batches if one fails', async () => { const exporter = new TestMetricExporter(); exporter.rejectExport = true; // Fail all exports @@ -876,6 +985,97 @@ describe('PeriodicExportingMetricReader', () => { await reader.shutdown(); }); + + it('should apply timeout to individual batches and not combination', async () => { + const exporter = new TestMetricExporter(); + exporter.exportTime = 15; // Each batch takes 15ms + + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + maxExportBatchSize: 1, // Results in 2 batches + exportTimeoutMillis: 20, // Timeout is 20ms + }); + + const resourceMetrics: ResourceMetrics = { + resource: { attributes: {}, merge: sinon.stub(), getRawAttributes: () => [] } as any, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, + { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, + ], + descriptor: { name: 'm1', description: '', unit: '', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + reader.setMetricProducer( + new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }) + ); + + // If timeout applied to all, this would fail because total time is 30ms > 20ms. + // Since it applies to individual, it should succeed! + await reader.forceFlush(); + + const exports = exporter.getExports(); + assert.strictEqual(exports.length, 2); + + await reader.shutdown(); + }); + + it('should report timeout to global error handler when a batch times out', async () => { + const exporter = new TestMetricExporter(); + exporter.exportTime = 50; // Make it take 50ms + + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 20, // Timeout is 20ms + }); + + const resourceMetrics: ResourceMetrics = { + resource: { attributes: {}, merge: sinon.stub(), getRawAttributes: () => [] } as any, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [{ startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }], + descriptor: { name: 'm1', description: '', unit: '', valueType: ValueType.INT }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + reader.setMetricProducer( + new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }) + ); + + const errorHandlerStub = sinon.stub(); + setGlobalErrorHandler(errorHandlerStub); + + await reader.forceFlush(); + + sinon.assert.calledOnce(errorHandlerStub); + const error = errorHandlerStub.firstCall.args[0]; + assert.ok(error instanceof TimeoutError || error.message === 'Operation timed out.'); + + // Restore global error handler to avoid affecting other tests + setGlobalErrorHandler(() => {}); + + await reader.shutdown(); + }); }); }); }); From b0f9122c0d39f1c335f766b5024d75183e0af768 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 29 Apr 2026 21:40:03 +0000 Subject: [PATCH 05/17] Add check to validate the value for maxExportBatchSize --- .../export/PeriodicExportingMetricReader.ts | 4 +++ .../PeriodicExportingMetricReader.test.ts | 25 +++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts index f5c278cda55..5a21836051a 100644 --- a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts +++ b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts @@ -120,6 +120,10 @@ export class PeriodicExportingMetricReader extends MetricReader { throw Error('exportTimeoutMillis must be greater than 0'); } + if (maxExportBatchSize !== undefined && maxExportBatchSize <= 0) { + throw Error('maxExportBatchSize must be greater than 0'); + } + if (exportIntervalMillis < exportTimeoutMillis) { if ( 'exportIntervalMillis' in options && diff --git a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts index 2470772bf5c..489e3c5dfba 100644 --- a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts +++ b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts @@ -207,6 +207,31 @@ describe('PeriodicExportingMetricReader', () => { ); }); + it('should throw when maxExportBatchSize less or equal to 0', () => { + const exporter = new TestDeltaMetricExporter(); + assert.throws( + () => + new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 1, + exportTimeoutMillis: 1, + maxExportBatchSize: 0, + }), + /maxExportBatchSize must be greater than 0/ + ); + + assert.throws( + () => + new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 1, + exportTimeoutMillis: 1, + maxExportBatchSize: -1, + }), + /maxExportBatchSize must be greater than 0/ + ); + }); + it('should throw when timeout less or equal to interval', () => { const exporter = new TestDeltaMetricExporter(); assert.throws( From 79a0cada7ba282a3dd59e6330f26e1ce41e74653 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 29 Apr 2026 21:40:28 +0000 Subject: [PATCH 06/17] Mark MetricDataSplitter class as internal --- packages/sdk-metrics/src/export/MetricDataSplitter.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/sdk-metrics/src/export/MetricDataSplitter.ts b/packages/sdk-metrics/src/export/MetricDataSplitter.ts index d094064c67d..f96b074ee61 100644 --- a/packages/sdk-metrics/src/export/MetricDataSplitter.ts +++ b/packages/sdk-metrics/src/export/MetricDataSplitter.ts @@ -5,6 +5,9 @@ import { ResourceMetrics, ScopeMetrics } from './MetricData'; +/** + * @internal + */ export class MetricDataSplitter { /** * Splits a ResourceMetrics object into smaller ResourceMetrics objects From 37304bd16cd57ad35a56a91dbb568f68c2c7d60e Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 29 Apr 2026 21:54:49 +0000 Subject: [PATCH 07/17] Do not report timeout errors to globalErrorHandler --- .../export/PeriodicExportingMetricReader.ts | 3 +- .../PeriodicExportingMetricReader.test.ts | 46 ------------------- 2 files changed, 2 insertions(+), 47 deletions(-) diff --git a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts index 5a21836051a..b7e5b951331 100644 --- a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts +++ b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts @@ -202,11 +202,12 @@ export class PeriodicExportingMetricReader extends MetricReader { } } catch (e) { if (e instanceof TimeoutError) { + // We do not report TimeoutError to the globalErrorHandler in _runOnce(). api.diag.error(`PeriodicExportingMetricReader: metrics export timed out after ${this._exportTimeout}ms`); } else { api.diag.error('PeriodicExportingMetricReader: metrics export threw error', e); + anyErr = e instanceof Error ? e : new Error(String(e)); } - anyErr = e instanceof Error ? e : new Error(String(e)); } } if (anyErr) { diff --git a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts index 489e3c5dfba..180cef71891 100644 --- a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts +++ b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts @@ -1055,52 +1055,6 @@ describe('PeriodicExportingMetricReader', () => { await reader.shutdown(); }); - - it('should report timeout to global error handler when a batch times out', async () => { - const exporter = new TestMetricExporter(); - exporter.exportTime = 50; // Make it take 50ms - - const reader = new PeriodicExportingMetricReader({ - exporter: exporter, - exportIntervalMillis: MAX_32_BIT_INT, - exportTimeoutMillis: 20, // Timeout is 20ms - }); - - const resourceMetrics: ResourceMetrics = { - resource: { attributes: {}, merge: sinon.stub(), getRawAttributes: () => [] } as any, - scopeMetrics: [ - { - scope: { name: 'test' }, - metrics: [ - { - dataPointType: DataPointType.GAUGE, - dataPoints: [{ startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }], - descriptor: { name: 'm1', description: '', unit: '', valueType: ValueType.INT }, - aggregationTemporality: AggregationTemporality.CUMULATIVE, - }, - ], - }, - ], - }; - - reader.setMetricProducer( - new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }) - ); - - const errorHandlerStub = sinon.stub(); - setGlobalErrorHandler(errorHandlerStub); - - await reader.forceFlush(); - - sinon.assert.calledOnce(errorHandlerStub); - const error = errorHandlerStub.firstCall.args[0]; - assert.ok(error instanceof TimeoutError || error.message === 'Operation timed out.'); - - // Restore global error handler to avoid affecting other tests - setGlobalErrorHandler(() => {}); - - await reader.shutdown(); - }); }); }); }); From 76d13eaaf5f43afd35f32ba5695dd32f5ecf84b6 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Thu, 30 Apr 2026 15:15:48 +0000 Subject: [PATCH 08/17] Add Changelog entry --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 393591a32a2..38d1f837b38 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ For notes on migrating to 2.x / 0.200.x see [the upgrade guide](doc/upgrade-to-2 ### :rocket: Features +* feat(sdk-metrics): add maxExportBatchSize option to PeriodicExportingMetricReader [#6641](https://github.com/open-telemetry/opentelemetry-js/issues/6641) @psx95 + ### :bug: Bug Fixes ### :books: Documentation From 6aaa163d9b4e299450a2a417f7e86f5a624c298c Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Thu, 30 Apr 2026 15:24:43 +0000 Subject: [PATCH 09/17] Fix lint issues --- .../src/export/MetricDataSplitter.ts | 6 +- .../export/PeriodicExportingMetricReader.ts | 11 +- .../test/export/MetricDataSplitter.test.ts | 660 +++++++++++++++--- .../PeriodicExportingMetricReader.test.ts | 241 +++++-- 4 files changed, 759 insertions(+), 159 deletions(-) diff --git a/packages/sdk-metrics/src/export/MetricDataSplitter.ts b/packages/sdk-metrics/src/export/MetricDataSplitter.ts index f96b074ee61..53adae07f21 100644 --- a/packages/sdk-metrics/src/export/MetricDataSplitter.ts +++ b/packages/sdk-metrics/src/export/MetricDataSplitter.ts @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { ResourceMetrics, ScopeMetrics } from './MetricData'; +import type { DataPoint, ResourceMetrics, ScopeMetrics } from './MetricData'; /** * @internal @@ -77,7 +77,9 @@ export class MetricDataSplitter { scopeMetricCopy.metrics.push(metricCopy); } - (metricCopy.dataPoints as any[]).push(...chunk); + (metricCopy.dataPoints as DataPoint[]).push( + ...(chunk as DataPoint[]) + ); currentBatchPoints += take; if (currentBatchPoints === maxExportBatchSize) { diff --git a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts index b7e5b951331..9ab6272372b 100644 --- a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts +++ b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts @@ -203,9 +203,14 @@ export class PeriodicExportingMetricReader extends MetricReader { } catch (e) { if (e instanceof TimeoutError) { // We do not report TimeoutError to the globalErrorHandler in _runOnce(). - api.diag.error(`PeriodicExportingMetricReader: metrics export timed out after ${this._exportTimeout}ms`); + api.diag.error( + `PeriodicExportingMetricReader: metrics export timed out after ${this._exportTimeout}ms` + ); } else { - api.diag.error('PeriodicExportingMetricReader: metrics export threw error', e); + api.diag.error( + 'PeriodicExportingMetricReader: metrics export threw error', + e + ); anyErr = e instanceof Error ? e : new Error(String(e)); } } @@ -217,7 +222,7 @@ export class PeriodicExportingMetricReader extends MetricReader { // Schedules the current export to run after all previously scheduled exports have finished. const promise = this._previousExportPromise.then(currentExport); - this._previousExportPromise = promise.catch(() => { }); + this._previousExportPromise = promise.catch(() => {}); await promise; } diff --git a/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts b/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts index 39d219b27ed..82f38b52d43 100644 --- a/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts +++ b/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts @@ -4,7 +4,8 @@ */ import { MetricDataSplitter } from '../../src/export/MetricDataSplitter'; -import { DataPointType, ResourceMetrics } from '../../src/export/MetricData'; +import type { ResourceMetrics } from '../../src/export/MetricData'; +import { DataPointType } from '../../src/export/MetricData'; import { AggregationTemporality } from '../../src/export/AggregationTemporality'; import { ValueType } from '@opentelemetry/api'; import * as assert from 'assert'; @@ -26,11 +27,31 @@ describe('MetricDataSplitter', () => { { dataPointType: DataPointType.GAUGE, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 1, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 2, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 3, + }, ], - descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + descriptor: { + name: 'm1', + description: 'desc1', + unit: 'unit1', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -41,21 +62,54 @@ describe('MetricDataSplitter', () => { const batches = MetricDataSplitter.split(resourceMetrics, 2); assert.strictEqual(batches.length, 2); - + // First batch should have 2 data points - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 2); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints[0].value, 1); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints[1].value, 2); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.description, 'desc1'); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.unit, 'unit1'); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].dataPoints.length, + 2 + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].dataPoints[0].value, + 1 + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].dataPoints[1].value, + 2 + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].descriptor.name, + 'm1' + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].descriptor.description, + 'desc1' + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].descriptor.unit, + 'unit1' + ); // Second batch should have 1 data point - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints[0].value, 3); - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.description, 'desc1'); - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.unit, 'unit1'); + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].dataPoints.length, + 1 + ); + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].dataPoints[0].value, + 3 + ); + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].descriptor.name, + 'm1' + ); + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].descriptor.description, + 'desc1' + ); + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].descriptor.unit, + 'unit1' + ); }); it('should split data points across metrics if needed (GAUGE)', () => { @@ -68,18 +122,43 @@ describe('MetricDataSplitter', () => { { dataPointType: DataPointType.GAUGE, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 1, + }, ], - descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + descriptor: { + name: 'm1', + description: 'desc1', + unit: 'unit1', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, { dataPointType: DataPointType.GAUGE, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 2, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 3, + }, ], - descriptor: { name: 'm2', description: 'desc2', unit: 'unit2', valueType: ValueType.INT }, + descriptor: { + name: 'm2', + description: 'desc2', + unit: 'unit2', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -90,24 +169,57 @@ describe('MetricDataSplitter', () => { const batches = MetricDataSplitter.split(resourceMetrics, 2); assert.strictEqual(batches.length, 2); - + // First batch should have 2 data points (m1:1, m2:2) assert.strictEqual(batches[0].scopeMetrics[0].metrics.length, 2); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 1); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints[0].value, 1); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); - - assert.strictEqual(batches[0].scopeMetrics[0].metrics[1].dataPoints.length, 1); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[1].dataPoints[0].value, 2); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[1].descriptor.name, 'm2'); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[1].descriptor.description, 'desc2'); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].dataPoints.length, + 1 + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].dataPoints[0].value, + 1 + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].descriptor.name, + 'm1' + ); + + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[1].dataPoints.length, + 1 + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[1].dataPoints[0].value, + 2 + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[1].descriptor.name, + 'm2' + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[1].descriptor.description, + 'desc2' + ); // Second batch should have 1 data point (m2:3) assert.strictEqual(batches[1].scopeMetrics[0].metrics.length, 1); - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.name, 'm2'); - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.description, 'desc2'); - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints[0].value, 3); + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].descriptor.name, + 'm2' + ); + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].descriptor.description, + 'desc2' + ); + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].dataPoints.length, + 1 + ); + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].dataPoints[0].value, + 3 + ); }); it('should handle empty data points (GAUGE)', () => { @@ -120,7 +232,12 @@ describe('MetricDataSplitter', () => { { dataPointType: DataPointType.GAUGE, dataPoints: [], - descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + descriptor: { + name: 'm1', + description: 'desc1', + unit: 'unit1', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -131,8 +248,14 @@ describe('MetricDataSplitter', () => { const batches = MetricDataSplitter.split(resourceMetrics, 2); assert.strictEqual(batches.length, 1); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 0); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].dataPoints.length, + 0 + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].descriptor.name, + 'm1' + ); }); it('should split across multiple scopes and fill batches (GAUGE)', () => { @@ -145,18 +268,43 @@ describe('MetricDataSplitter', () => { { dataPointType: DataPointType.GAUGE, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 1, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 2, + }, ], - descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + descriptor: { + name: 'm1', + description: 'desc1', + unit: 'unit1', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, { dataPointType: DataPointType.GAUGE, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 3, + }, ], - descriptor: { name: 'm2', description: 'desc2', unit: 'unit2', valueType: ValueType.INT }, + descriptor: { + name: 'm2', + description: 'desc2', + unit: 'unit2', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -167,9 +315,19 @@ describe('MetricDataSplitter', () => { { dataPointType: DataPointType.GAUGE, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 4 }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 4, + }, ], - descriptor: { name: 'm3', description: 'desc3', unit: 'unit3', valueType: ValueType.INT }, + descriptor: { + name: 'm3', + description: 'desc3', + unit: 'unit3', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -180,24 +338,48 @@ describe('MetricDataSplitter', () => { const batches = MetricDataSplitter.split(resourceMetrics, 2); assert.strictEqual(batches.length, 2); - + // Batch 1: scope1, m1 (2 points) assert.strictEqual(batches[0].scopeMetrics.length, 1); assert.strictEqual(batches[0].scopeMetrics[0].scope.name, 'scope1'); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 2); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].descriptor.name, + 'm1' + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].dataPoints.length, + 2 + ); // Batch 2: scope1, m2 (1 point) AND scope2, m3 (1 point) assert.strictEqual(batches[1].scopeMetrics.length, 2); assert.strictEqual(batches[1].scopeMetrics[0].scope.name, 'scope1'); - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.name, 'm2'); - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.description, 'desc2'); - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); - + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].descriptor.name, + 'm2' + ); + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].descriptor.description, + 'desc2' + ); + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].dataPoints.length, + 1 + ); + assert.strictEqual(batches[1].scopeMetrics[1].scope.name, 'scope2'); - assert.strictEqual(batches[1].scopeMetrics[1].metrics[0].descriptor.name, 'm3'); - assert.strictEqual(batches[1].scopeMetrics[1].metrics[0].descriptor.unit, 'unit3'); - assert.strictEqual(batches[1].scopeMetrics[1].metrics[0].dataPoints.length, 1); + assert.strictEqual( + batches[1].scopeMetrics[1].metrics[0].descriptor.name, + 'm3' + ); + assert.strictEqual( + batches[1].scopeMetrics[1].metrics[0].descriptor.unit, + 'unit3' + ); + assert.strictEqual( + batches[1].scopeMetrics[1].metrics[0].dataPoints.length, + 1 + ); }); it('should split a single metric across multiple batches (GAUGE)', () => { @@ -210,13 +392,43 @@ describe('MetricDataSplitter', () => { { dataPointType: DataPointType.GAUGE, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 4 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 5 }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 1, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 2, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 3, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 4, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 5, + }, ], - descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + descriptor: { + name: 'm1', + description: 'desc1', + unit: 'unit1', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -227,15 +439,33 @@ describe('MetricDataSplitter', () => { const batches = MetricDataSplitter.split(resourceMetrics, 2); assert.strictEqual(batches.length, 3); - - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 2); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); - - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints.length, 2); - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); - - assert.strictEqual(batches[2].scopeMetrics[0].metrics[0].dataPoints.length, 1); - assert.strictEqual(batches[2].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); + + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].dataPoints.length, + 2 + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].descriptor.name, + 'm1' + ); + + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].dataPoints.length, + 2 + ); + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].descriptor.name, + 'm1' + ); + + assert.strictEqual( + batches[2].scopeMetrics[0].metrics[0].dataPoints.length, + 1 + ); + assert.strictEqual( + batches[2].scopeMetrics[0].metrics[0].descriptor.name, + 'm1' + ); }); it('should handle partly filled batches with multiple scopes (GAUGE)', () => { @@ -247,8 +477,20 @@ describe('MetricDataSplitter', () => { metrics: [ { dataPointType: DataPointType.GAUGE, - dataPoints: [{ startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }], - descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + dataPoints: [ + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 1, + }, + ], + descriptor: { + name: 'm1', + description: 'desc1', + unit: 'unit1', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -258,8 +500,20 @@ describe('MetricDataSplitter', () => { metrics: [ { dataPointType: DataPointType.GAUGE, - dataPoints: [{ startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }], - descriptor: { name: 'm2', description: 'desc2', unit: 'unit2', valueType: ValueType.INT }, + dataPoints: [ + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 2, + }, + ], + descriptor: { + name: 'm2', + description: 'desc2', + unit: 'unit2', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -271,10 +525,22 @@ describe('MetricDataSplitter', () => { assert.strictEqual(batches.length, 1); assert.strictEqual(batches[0].scopeMetrics.length, 2); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].descriptor.name, 'm1'); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 1); - assert.strictEqual(batches[0].scopeMetrics[1].metrics[0].descriptor.name, 'm2'); - assert.strictEqual(batches[0].scopeMetrics[1].metrics[0].dataPoints.length, 1); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].descriptor.name, + 'm1' + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].dataPoints.length, + 1 + ); + assert.strictEqual( + batches[0].scopeMetrics[1].metrics[0].descriptor.name, + 'm2' + ); + assert.strictEqual( + batches[0].scopeMetrics[1].metrics[0].dataPoints.length, + 1 + ); }); // ========================================================================== @@ -291,11 +557,31 @@ describe('MetricDataSplitter', () => { { dataPointType: DataPointType.SUM, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 1, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 2, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 3, + }, ], - descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + descriptor: { + name: 'm1', + description: 'desc1', + unit: 'unit1', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, isMonotonic: true, }, @@ -307,9 +593,18 @@ describe('MetricDataSplitter', () => { const batches = MetricDataSplitter.split(resourceMetrics, 2); assert.strictEqual(batches.length, 2); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 2); - assert.strictEqual((batches[0].scopeMetrics[0].metrics[0] as any).isMonotonic, true); - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].dataPoints.length, + 2 + ); + assert.strictEqual( + (batches[0].scopeMetrics[0].metrics[0] as any).isMonotonic, + true + ); + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].dataPoints.length, + 1 + ); }); it('should split across multiple scopes and fill batches (SUM)', () => { @@ -322,19 +617,44 @@ describe('MetricDataSplitter', () => { { dataPointType: DataPointType.SUM, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 1, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 2, + }, ], - descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + descriptor: { + name: 'm1', + description: 'desc1', + unit: 'unit1', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, isMonotonic: true, }, { dataPointType: DataPointType.SUM, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 3, + }, ], - descriptor: { name: 'm2', description: 'desc2', unit: 'unit2', valueType: ValueType.INT }, + descriptor: { + name: 'm2', + description: 'desc2', + unit: 'unit2', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, isMonotonic: true, }, @@ -346,9 +666,19 @@ describe('MetricDataSplitter', () => { { dataPointType: DataPointType.SUM, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 4 }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 4, + }, ], - descriptor: { name: 'm3', description: 'desc3', unit: 'unit3', valueType: ValueType.INT }, + descriptor: { + name: 'm3', + description: 'desc3', + unit: 'unit3', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, isMonotonic: true, }, @@ -369,7 +699,11 @@ describe('MetricDataSplitter', () => { // ========================================================================== it('should split batches when exceeding maxExportBatchSize (HISTOGRAM)', () => { - const dummyHistogram = { buckets: { boundaries: [1, 2], counts: [1, 1, 1] }, sum: 3, count: 3 }; + const dummyHistogram = { + buckets: { boundaries: [1, 2], counts: [1, 1, 1] }, + sum: 3, + count: 3, + }; const resourceMetrics: ResourceMetrics = { resource: dummyResource, scopeMetrics: [ @@ -379,11 +713,31 @@ describe('MetricDataSplitter', () => { { dataPointType: DataPointType.HISTOGRAM, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyHistogram }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyHistogram }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyHistogram }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: dummyHistogram, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: dummyHistogram, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: dummyHistogram, + }, ], - descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + descriptor: { + name: 'm1', + description: 'desc1', + unit: 'unit1', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -394,12 +748,22 @@ describe('MetricDataSplitter', () => { const batches = MetricDataSplitter.split(resourceMetrics, 2); assert.strictEqual(batches.length, 2); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 2); - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].dataPoints.length, + 2 + ); + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].dataPoints.length, + 1 + ); }); it('should split across multiple scopes and fill batches (HISTOGRAM)', () => { - const dummyHistogram = { buckets: { boundaries: [1, 2], counts: [1, 1, 1] }, sum: 3, count: 3 }; + const dummyHistogram = { + buckets: { boundaries: [1, 2], counts: [1, 1, 1] }, + sum: 3, + count: 3, + }; const resourceMetrics: ResourceMetrics = { resource: dummyResource, scopeMetrics: [ @@ -409,18 +773,43 @@ describe('MetricDataSplitter', () => { { dataPointType: DataPointType.HISTOGRAM, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyHistogram }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyHistogram }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: dummyHistogram, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: dummyHistogram, + }, ], - descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + descriptor: { + name: 'm1', + description: 'desc1', + unit: 'unit1', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, { dataPointType: DataPointType.HISTOGRAM, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyHistogram }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: dummyHistogram, + }, ], - descriptor: { name: 'm2', description: 'desc2', unit: 'unit2', valueType: ValueType.INT }, + descriptor: { + name: 'm2', + description: 'desc2', + unit: 'unit2', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -431,9 +820,19 @@ describe('MetricDataSplitter', () => { { dataPointType: DataPointType.HISTOGRAM, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyHistogram }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: dummyHistogram, + }, ], - descriptor: { name: 'm3', description: 'desc3', unit: 'unit3', valueType: ValueType.INT }, + descriptor: { + name: 'm3', + description: 'desc3', + unit: 'unit3', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -453,7 +852,14 @@ describe('MetricDataSplitter', () => { // ========================================================================== it('should split batches when exceeding maxExportBatchSize (EXPONENTIAL_HISTOGRAM)', () => { - const dummyExponentialHistogram = { sum: 3, count: 3, scale: 1, zeroCount: 0, positive: { offset: 1, bucketCounts: [1, 1, 1] }, negative: { offset: 1, bucketCounts: [] } }; + const dummyExponentialHistogram = { + sum: 3, + count: 3, + scale: 1, + zeroCount: 0, + positive: { offset: 1, bucketCounts: [1, 1, 1] }, + negative: { offset: 1, bucketCounts: [] }, + }; const resourceMetrics: ResourceMetrics = { resource: dummyResource, scopeMetrics: [ @@ -463,11 +869,31 @@ describe('MetricDataSplitter', () => { { dataPointType: DataPointType.EXPONENTIAL_HISTOGRAM, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyExponentialHistogram }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyExponentialHistogram }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: dummyExponentialHistogram }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: dummyExponentialHistogram, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: dummyExponentialHistogram, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: dummyExponentialHistogram, + }, ], - descriptor: { name: 'm1', description: 'desc1', unit: 'unit1', valueType: ValueType.INT }, + descriptor: { + name: 'm1', + description: 'desc1', + unit: 'unit1', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -478,8 +904,14 @@ describe('MetricDataSplitter', () => { const batches = MetricDataSplitter.split(resourceMetrics, 2); assert.strictEqual(batches.length, 2); - assert.strictEqual(batches[0].scopeMetrics[0].metrics[0].dataPoints.length, 2); - assert.strictEqual(batches[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].dataPoints.length, + 2 + ); + assert.strictEqual( + batches[1].scopeMetrics[0].metrics[0].dataPoints.length, + 1 + ); }); // ========================================================================== diff --git a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts index 180cef71891..c891b5c40b8 100644 --- a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts +++ b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts @@ -53,7 +53,10 @@ class TestMetricExporter implements PushMetricExporter { ): void { this._batches.push(metrics); this.concurrentCalls++; - this.maxConcurrentCalls = Math.max(this.maxConcurrentCalls, this.concurrentCalls); + this.maxConcurrentCalls = Math.max( + this.maxConcurrentCalls, + this.concurrentCalls + ); if (this.throwExport) { this.concurrentCalls--; @@ -824,7 +827,11 @@ describe('PeriodicExportingMetricReader', () => { }); const resourceMetrics: ResourceMetrics = { - resource: { attributes: {}, merge: sinon.stub(), getRawAttributes: () => [] } as any, + resource: { + attributes: {}, + merge: sinon.stub(), + getRawAttributes: () => [], + } as any, scopeMetrics: [ { scope: { name: 'test' }, @@ -832,11 +839,31 @@ describe('PeriodicExportingMetricReader', () => { { dataPointType: DataPointType.GAUGE, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 1, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 2, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 3, + }, ], - descriptor: { name: 'm1', description: '', unit: '', valueType: ValueType.INT }, + descriptor: { + name: 'm1', + description: '', + unit: '', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -845,7 +872,10 @@ describe('PeriodicExportingMetricReader', () => { }; reader.setMetricProducer( - new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }) + new TestMetricProducer({ + resourceMetrics: resourceMetrics, + errors: [], + }) ); await reader.forceFlush(); @@ -854,13 +884,28 @@ describe('PeriodicExportingMetricReader', () => { assert.strictEqual(exports.length, 2); // First batch should have 2 data points - assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints.length, 2); - assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints[0].value, 1); - assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints[1].value, 2); + assert.strictEqual( + exports[0].scopeMetrics[0].metrics[0].dataPoints.length, + 2 + ); + assert.strictEqual( + exports[0].scopeMetrics[0].metrics[0].dataPoints[0].value, + 1 + ); + assert.strictEqual( + exports[0].scopeMetrics[0].metrics[0].dataPoints[1].value, + 2 + ); // Second batch should have 1 data point - assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); - assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].dataPoints[0].value, 3); + assert.strictEqual( + exports[1].scopeMetrics[0].metrics[0].dataPoints.length, + 1 + ); + assert.strictEqual( + exports[1].scopeMetrics[0].metrics[0].dataPoints[0].value, + 3 + ); await reader.shutdown(); }); @@ -874,15 +919,31 @@ describe('PeriodicExportingMetricReader', () => { }); const resourceMetrics: ResourceMetrics = { - resource: { attributes: {}, merge: sinon.stub(), getRawAttributes: () => [] } as any, + resource: { + attributes: {}, + merge: sinon.stub(), + getRawAttributes: () => [], + } as any, scopeMetrics: [ { scope: { name: 'test' }, metrics: [ { dataPointType: DataPointType.GAUGE, - dataPoints: [{ startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }], - descriptor: { name: 'm1', description: '', unit: '', valueType: ValueType.INT }, + dataPoints: [ + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 1, + }, + ], + descriptor: { + name: 'm1', + description: '', + unit: '', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -890,7 +951,10 @@ describe('PeriodicExportingMetricReader', () => { ], }; - const producer = new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }); + const producer = new TestMetricProducer({ + resourceMetrics: resourceMetrics, + errors: [], + }); reader.setMetricProducer(producer); // Trigger two exports quickly @@ -917,7 +981,11 @@ describe('PeriodicExportingMetricReader', () => { }); const resourceMetrics: ResourceMetrics = { - resource: { attributes: {}, merge: sinon.stub(), getRawAttributes: () => [] } as any, + resource: { + attributes: {}, + merge: sinon.stub(), + getRawAttributes: () => [], + } as any, scopeMetrics: [ { scope: { name: 'test' }, @@ -925,18 +993,43 @@ describe('PeriodicExportingMetricReader', () => { { dataPointType: DataPointType.GAUGE, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 1, + }, ], - descriptor: { name: 'm1', description: '', unit: '', valueType: ValueType.INT }, + descriptor: { + name: 'm1', + description: '', + unit: '', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, { dataPointType: DataPointType.GAUGE, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 3 }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 2, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 3, + }, ], - descriptor: { name: 'm2', description: '', unit: '', valueType: ValueType.INT }, + descriptor: { + name: 'm2', + description: '', + unit: '', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -945,7 +1038,10 @@ describe('PeriodicExportingMetricReader', () => { }; reader.setMetricProducer( - new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }) + new TestMetricProducer({ + resourceMetrics: resourceMetrics, + errors: [], + }) ); await reader.forceFlush(); @@ -955,16 +1051,37 @@ describe('PeriodicExportingMetricReader', () => { // First batch should have 2 data points (m1:1, m2:2) assert.strictEqual(exports[0].scopeMetrics[0].metrics.length, 2); - assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints.length, 1); - assert.strictEqual(exports[0].scopeMetrics[0].metrics[0].dataPoints[0].value, 1); - assert.strictEqual(exports[0].scopeMetrics[0].metrics[1].dataPoints.length, 1); - assert.strictEqual(exports[0].scopeMetrics[0].metrics[1].dataPoints[0].value, 2); + assert.strictEqual( + exports[0].scopeMetrics[0].metrics[0].dataPoints.length, + 1 + ); + assert.strictEqual( + exports[0].scopeMetrics[0].metrics[0].dataPoints[0].value, + 1 + ); + assert.strictEqual( + exports[0].scopeMetrics[0].metrics[1].dataPoints.length, + 1 + ); + assert.strictEqual( + exports[0].scopeMetrics[0].metrics[1].dataPoints[0].value, + 2 + ); // Second batch should have 1 data point (m2:3) assert.strictEqual(exports[1].scopeMetrics[0].metrics.length, 1); - assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].descriptor.name, 'm2'); - assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].dataPoints.length, 1); - assert.strictEqual(exports[1].scopeMetrics[0].metrics[0].dataPoints[0].value, 3); + assert.strictEqual( + exports[1].scopeMetrics[0].metrics[0].descriptor.name, + 'm2' + ); + assert.strictEqual( + exports[1].scopeMetrics[0].metrics[0].dataPoints.length, + 1 + ); + assert.strictEqual( + exports[1].scopeMetrics[0].metrics[0].dataPoints[0].value, + 3 + ); await reader.shutdown(); }); @@ -979,7 +1096,11 @@ describe('PeriodicExportingMetricReader', () => { }); const resourceMetrics: ResourceMetrics = { - resource: { attributes: {}, merge: sinon.stub(), getRawAttributes: () => [] } as any, + resource: { + attributes: {}, + merge: sinon.stub(), + getRawAttributes: () => [], + } as any, scopeMetrics: [ { scope: { name: 'test' }, @@ -987,10 +1108,25 @@ describe('PeriodicExportingMetricReader', () => { { dataPointType: DataPointType.GAUGE, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 1, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 2, + }, ], - descriptor: { name: 'm1', description: '', unit: '', valueType: ValueType.INT }, + descriptor: { + name: 'm1', + description: '', + unit: '', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -999,7 +1135,10 @@ describe('PeriodicExportingMetricReader', () => { }; reader.setMetricProducer( - new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }) + new TestMetricProducer({ + resourceMetrics: resourceMetrics, + errors: [], + }) ); // Call forceFlush to trigger the export @@ -1023,7 +1162,11 @@ describe('PeriodicExportingMetricReader', () => { }); const resourceMetrics: ResourceMetrics = { - resource: { attributes: {}, merge: sinon.stub(), getRawAttributes: () => [] } as any, + resource: { + attributes: {}, + merge: sinon.stub(), + getRawAttributes: () => [], + } as any, scopeMetrics: [ { scope: { name: 'test' }, @@ -1031,10 +1174,25 @@ describe('PeriodicExportingMetricReader', () => { { dataPointType: DataPointType.GAUGE, dataPoints: [ - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 1 }, - { startTime: [0, 0], endTime: [0, 0], attributes: {}, value: 2 }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 1, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 2, + }, ], - descriptor: { name: 'm1', description: '', unit: '', valueType: ValueType.INT }, + descriptor: { + name: 'm1', + description: '', + unit: '', + valueType: ValueType.INT, + }, aggregationTemporality: AggregationTemporality.CUMULATIVE, }, ], @@ -1043,7 +1201,10 @@ describe('PeriodicExportingMetricReader', () => { }; reader.setMetricProducer( - new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }) + new TestMetricProducer({ + resourceMetrics: resourceMetrics, + errors: [], + }) ); // If timeout applied to all, this would fail because total time is 30ms > 20ms. From b9835f8cad6d19efada712c6cdee659b25b9de08 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Thu, 30 Apr 2026 16:19:02 +0000 Subject: [PATCH 10/17] Improve timeout tests --- .../PeriodicExportingMetricReader.test.ts | 95 ++++++++++++++++++- 1 file changed, 90 insertions(+), 5 deletions(-) diff --git a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts index c891b5c40b8..972f9a3c6d2 100644 --- a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts +++ b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts @@ -32,7 +32,7 @@ import { DEFAULT_AGGREGATION_SELECTOR, DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR, } from '../../src/export/AggregationSelector'; -import { ValueType } from '@opentelemetry/api'; +import { ValueType, diag } from '@opentelemetry/api'; const MAX_32_BIT_INT = 2 ** 31 - 1; @@ -1151,6 +1151,7 @@ describe('PeriodicExportingMetricReader', () => { }); it('should apply timeout to individual batches and not combination', async () => { + const clock = sinon.useFakeTimers(); const exporter = new TestMetricExporter(); exporter.exportTime = 15; // Each batch takes 15ms @@ -1207,14 +1208,98 @@ describe('PeriodicExportingMetricReader', () => { }) ); - // If timeout applied to all, this would fail because total time is 30ms > 20ms. - // Since it applies to individual, it should succeed! - await reader.forceFlush(); + // Call _runOnce directly to avoid forceFlush timer issues in this specific test + const runOncePromise = (reader as any)._runOnce(); + + // Batch 1 should be scheduled. Tick 15ms to complete it. + await clock.tickAsync(15); + + // Batch 2 should be scheduled. Tick 15ms to complete it. + await clock.tickAsync(15); + + await runOncePromise; const exports = exporter.getExports(); assert.strictEqual(exports.length, 2); - await reader.shutdown(); + clock.restore(); + }); + + it('should log timeout error to diag and not propagate to globalErrorHandler', async () => { + const clock = sinon.useFakeTimers(); + const exporter = new TestMetricExporter(); + exporter.exportTime = 50; // Make it take 50ms + + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + maxExportBatchSize: 1, + exportTimeoutMillis: 20, // Timeout is 20ms + }); + + const resourceMetrics: ResourceMetrics = { + resource: { + attributes: {}, + merge: sinon.stub(), + getRawAttributes: () => [], + } as any, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 1, + }, + ], + descriptor: { + name: 'm1', + description: '', + unit: '', + valueType: ValueType.INT, + }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + reader.setMetricProducer( + new TestMetricProducer({ + resourceMetrics: resourceMetrics, + errors: [], + }) + ); + + const diagErrorStub = sinon.stub(diag, 'error'); + const errorHandlerStub = sinon.stub(); + setGlobalErrorHandler(errorHandlerStub); + + const runOncePromise = (reader as any)._runOnce(); + + // Tick 20ms to trigger timeout + await clock.tickAsync(20); + + await runOncePromise; + + sinon.assert.calledOnce(diagErrorStub); + assert.match( + diagErrorStub.firstCall.args[0], + /metrics export timed out/ + ); + + // Global error handler should not be called for timeout errors + sinon.assert.notCalled(errorHandlerStub); + + // Restore global error handler + setGlobalErrorHandler(() => {}); + clock.restore(); }); }); }); From ea31ad66c0c3a9d79164fae36741ae2ad8eabd68 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Thu, 30 Apr 2026 17:08:46 +0000 Subject: [PATCH 11/17] Remove dead code from MetricDataSplitter --- packages/sdk-metrics/src/export/MetricDataSplitter.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/packages/sdk-metrics/src/export/MetricDataSplitter.ts b/packages/sdk-metrics/src/export/MetricDataSplitter.ts index 53adae07f21..02f163775c1 100644 --- a/packages/sdk-metrics/src/export/MetricDataSplitter.ts +++ b/packages/sdk-metrics/src/export/MetricDataSplitter.ts @@ -54,12 +54,6 @@ export class MetricDataSplitter { while (dataPointsRemaining.length > 0) { const spaceLeft = maxExportBatchSize - currentBatchPoints; - if (spaceLeft === 0) { - flush(); - scopeMetricCopy = null; - continue; - } - const take = Math.min(spaceLeft, dataPointsRemaining.length); const chunk = dataPointsRemaining.slice(0, take); dataPointsRemaining = dataPointsRemaining.slice(take); From 8648301f0818925bd6ef26521e2d88d4689d8e50 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Thu, 30 Apr 2026 21:25:54 +0000 Subject: [PATCH 12/17] Replace the MetricDataSplitter class with a free function --- .../src/export/MetricDataSplitter.ts | 124 +++++++++--------- .../export/PeriodicExportingMetricReader.ts | 4 +- .../test/export/MetricDataSplitter.test.ts | 30 ++--- 3 files changed, 77 insertions(+), 81 deletions(-) diff --git a/packages/sdk-metrics/src/export/MetricDataSplitter.ts b/packages/sdk-metrics/src/export/MetricDataSplitter.ts index 02f163775c1..12fba23c62c 100644 --- a/packages/sdk-metrics/src/export/MetricDataSplitter.ts +++ b/packages/sdk-metrics/src/export/MetricDataSplitter.ts @@ -6,85 +6,81 @@ import type { DataPoint, ResourceMetrics, ScopeMetrics } from './MetricData'; /** + * Splits a ResourceMetrics object into smaller ResourceMetrics objects + * such that no batch exceeds maxExportBatchSize data points. + * @param resourceMetrics The metrics to split. + * @param maxExportBatchSize The maximum number of data points per batch. * @internal */ -export class MetricDataSplitter { - /** - * Splits a ResourceMetrics object into smaller ResourceMetrics objects - * such that no batch exceeds maxExportBatchSize data points. - * @param resourceMetrics The metrics to split. - * @param maxExportBatchSize The maximum number of data points per batch. - */ - static split( - resourceMetrics: ResourceMetrics, - maxExportBatchSize: number - ): ResourceMetrics[] { - if (maxExportBatchSize <= 0) { - throw new Error('maxExportBatchSize must be greater than 0'); - } - const batches: ResourceMetrics[] = []; - let currentBatchPoints = 0; - let currentScopeMetrics: ScopeMetrics[] = []; +export function splitMetricData( + resourceMetrics: ResourceMetrics, + maxExportBatchSize: number +): ResourceMetrics[] { + if (maxExportBatchSize <= 0) { + throw new Error('maxExportBatchSize must be greater than 0'); + } + const batches: ResourceMetrics[] = []; + let currentBatchPoints = 0; + let currentScopeMetrics: ScopeMetrics[] = []; - function flush() { - if (currentScopeMetrics.length > 0) { - batches.push({ - resource: resourceMetrics.resource, - scopeMetrics: currentScopeMetrics, - }); - currentScopeMetrics = []; - currentBatchPoints = 0; - } + function flush() { + if (currentScopeMetrics.length > 0) { + batches.push({ + resource: resourceMetrics.resource, + scopeMetrics: currentScopeMetrics, + }); + currentScopeMetrics = []; + currentBatchPoints = 0; } + } - for (const scopeMetric of resourceMetrics.scopeMetrics) { - let scopeMetricCopy: ScopeMetrics | null = null; + for (const scopeMetric of resourceMetrics.scopeMetrics) { + let scopeMetricCopy: ScopeMetrics | null = null; - for (const metric of scopeMetric.metrics) { - let dataPointsRemaining = metric.dataPoints; + for (const metric of scopeMetric.metrics) { + let dataPointsRemaining = metric.dataPoints; - if (dataPointsRemaining.length === 0) { - if (!scopeMetricCopy) { - scopeMetricCopy = { scope: scopeMetric.scope, metrics: [] }; - currentScopeMetrics.push(scopeMetricCopy); - } - scopeMetricCopy.metrics.push(metric); - continue; + if (dataPointsRemaining.length === 0) { + if (!scopeMetricCopy) { + scopeMetricCopy = { scope: scopeMetric.scope, metrics: [] }; + currentScopeMetrics.push(scopeMetricCopy); } + scopeMetricCopy.metrics.push(metric); + continue; + } - while (dataPointsRemaining.length > 0) { - const spaceLeft = maxExportBatchSize - currentBatchPoints; - const take = Math.min(spaceLeft, dataPointsRemaining.length); - const chunk = dataPointsRemaining.slice(0, take); - dataPointsRemaining = dataPointsRemaining.slice(take); + while (dataPointsRemaining.length > 0) { + const spaceLeft = maxExportBatchSize - currentBatchPoints; + const take = Math.min(spaceLeft, dataPointsRemaining.length); + const chunk = dataPointsRemaining.slice(0, take); + dataPointsRemaining = dataPointsRemaining.slice(take); - if (!scopeMetricCopy) { - scopeMetricCopy = { scope: scopeMetric.scope, metrics: [] }; - currentScopeMetrics.push(scopeMetricCopy); - } + if (!scopeMetricCopy) { + scopeMetricCopy = { scope: scopeMetric.scope, metrics: [] }; + currentScopeMetrics.push(scopeMetricCopy); + } - let metricCopy = scopeMetricCopy.metrics.find( - m => m.descriptor.name === metric.descriptor.name - ); - if (!metricCopy) { - metricCopy = { ...metric, dataPoints: [] }; - scopeMetricCopy.metrics.push(metricCopy); - } + let metricCopy = scopeMetricCopy.metrics.find( + m => m.descriptor.name === metric.descriptor.name + ); + if (!metricCopy) { + metricCopy = { ...metric, dataPoints: [] }; + scopeMetricCopy.metrics.push(metricCopy); + } - (metricCopy.dataPoints as DataPoint[]).push( - ...(chunk as DataPoint[]) - ); - currentBatchPoints += take; + (metricCopy.dataPoints as DataPoint[]).push( + ...(chunk as DataPoint[]) + ); + currentBatchPoints += take; - if (currentBatchPoints === maxExportBatchSize) { - flush(); - scopeMetricCopy = null; - } + if (currentBatchPoints === maxExportBatchSize) { + flush(); + scopeMetricCopy = null; } } } - - flush(); - return batches; } + + flush(); + return batches; } diff --git a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts index 9ab6272372b..df5d9a19237 100644 --- a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts +++ b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts @@ -14,7 +14,7 @@ import type { PushMetricExporter } from './MetricExporter'; import { callWithTimeout, TimeoutError } from '../utils'; import type { MetricProducer } from './MetricProducer'; import { InstrumentType } from './MetricData'; -import { MetricDataSplitter } from './MetricDataSplitter'; +import { splitMetricData } from './MetricDataSplitter'; export type PeriodicExportingMetricReaderOptions = { /** @@ -182,7 +182,7 @@ export class PeriodicExportingMetricReader extends MetricReader { } const batches = this._maxExportBatchSize - ? MetricDataSplitter.split(resourceMetrics, this._maxExportBatchSize) + ? splitMetricData(resourceMetrics, this._maxExportBatchSize) : [resourceMetrics]; const currentExport = async () => { diff --git a/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts b/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts index 82f38b52d43..d9df383ce4f 100644 --- a/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts +++ b/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts @@ -3,14 +3,14 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { MetricDataSplitter } from '../../src/export/MetricDataSplitter'; +import { splitMetricData } from '../../src/export/MetricDataSplitter'; import type { ResourceMetrics } from '../../src/export/MetricData'; import { DataPointType } from '../../src/export/MetricData'; import { AggregationTemporality } from '../../src/export/AggregationTemporality'; import { ValueType } from '@opentelemetry/api'; import * as assert from 'assert'; -describe('MetricDataSplitter', () => { +describe('splitMetricData', () => { const dummyResource = { attributes: {} } as any; // ========================================================================== @@ -59,7 +59,7 @@ describe('MetricDataSplitter', () => { ], }; - const batches = MetricDataSplitter.split(resourceMetrics, 2); + const batches = splitMetricData(resourceMetrics, 2); assert.strictEqual(batches.length, 2); @@ -166,7 +166,7 @@ describe('MetricDataSplitter', () => { ], }; - const batches = MetricDataSplitter.split(resourceMetrics, 2); + const batches = splitMetricData(resourceMetrics, 2); assert.strictEqual(batches.length, 2); @@ -245,7 +245,7 @@ describe('MetricDataSplitter', () => { ], }; - const batches = MetricDataSplitter.split(resourceMetrics, 2); + const batches = splitMetricData(resourceMetrics, 2); assert.strictEqual(batches.length, 1); assert.strictEqual( @@ -335,7 +335,7 @@ describe('MetricDataSplitter', () => { ], }; - const batches = MetricDataSplitter.split(resourceMetrics, 2); + const batches = splitMetricData(resourceMetrics, 2); assert.strictEqual(batches.length, 2); @@ -436,7 +436,7 @@ describe('MetricDataSplitter', () => { ], }; - const batches = MetricDataSplitter.split(resourceMetrics, 2); + const batches = splitMetricData(resourceMetrics, 2); assert.strictEqual(batches.length, 3); @@ -521,7 +521,7 @@ describe('MetricDataSplitter', () => { ], }; - const batches = MetricDataSplitter.split(resourceMetrics, 3); + const batches = splitMetricData(resourceMetrics, 3); assert.strictEqual(batches.length, 1); assert.strictEqual(batches[0].scopeMetrics.length, 2); @@ -590,7 +590,7 @@ describe('MetricDataSplitter', () => { ], }; - const batches = MetricDataSplitter.split(resourceMetrics, 2); + const batches = splitMetricData(resourceMetrics, 2); assert.strictEqual(batches.length, 2); assert.strictEqual( @@ -687,7 +687,7 @@ describe('MetricDataSplitter', () => { ], }; - const batches = MetricDataSplitter.split(resourceMetrics, 2); + const batches = splitMetricData(resourceMetrics, 2); assert.strictEqual(batches.length, 2); assert.strictEqual(batches[0].scopeMetrics.length, 1); @@ -745,7 +745,7 @@ describe('MetricDataSplitter', () => { ], }; - const batches = MetricDataSplitter.split(resourceMetrics, 2); + const batches = splitMetricData(resourceMetrics, 2); assert.strictEqual(batches.length, 2); assert.strictEqual( @@ -840,7 +840,7 @@ describe('MetricDataSplitter', () => { ], }; - const batches = MetricDataSplitter.split(resourceMetrics, 2); + const batches = splitMetricData(resourceMetrics, 2); assert.strictEqual(batches.length, 2); assert.strictEqual(batches[0].scopeMetrics.length, 1); @@ -901,7 +901,7 @@ describe('MetricDataSplitter', () => { ], }; - const batches = MetricDataSplitter.split(resourceMetrics, 2); + const batches = splitMetricData(resourceMetrics, 2); assert.strictEqual(batches.length, 2); assert.strictEqual( @@ -925,12 +925,12 @@ describe('MetricDataSplitter', () => { }; assert.throws( - () => MetricDataSplitter.split(resourceMetrics, 0), + () => splitMetricData(resourceMetrics, 0), /maxExportBatchSize must be greater than 0/ ); assert.throws( - () => MetricDataSplitter.split(resourceMetrics, -1), + () => splitMetricData(resourceMetrics, -1), /maxExportBatchSize must be greater than 0/ ); }); From 6424d497a3e71e73bdff2ffed34252e7c1f7cbd1 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Mon, 4 May 2026 11:21:21 -0400 Subject: [PATCH 13/17] Update validation check for maxExportBatchSize value Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../sdk-metrics/src/export/MetricDataSplitter.ts | 4 ++-- .../src/export/PeriodicExportingMetricReader.ts | 7 +++++-- .../test/export/MetricDataSplitter.test.ts | 9 +++++++-- .../export/PeriodicExportingMetricReader.test.ts | 15 +++++++++++++-- 4 files changed, 27 insertions(+), 8 deletions(-) diff --git a/packages/sdk-metrics/src/export/MetricDataSplitter.ts b/packages/sdk-metrics/src/export/MetricDataSplitter.ts index 12fba23c62c..c937299fab2 100644 --- a/packages/sdk-metrics/src/export/MetricDataSplitter.ts +++ b/packages/sdk-metrics/src/export/MetricDataSplitter.ts @@ -16,8 +16,8 @@ export function splitMetricData( resourceMetrics: ResourceMetrics, maxExportBatchSize: number ): ResourceMetrics[] { - if (maxExportBatchSize <= 0) { - throw new Error('maxExportBatchSize must be greater than 0'); + if (!Number.isInteger(maxExportBatchSize) || maxExportBatchSize <= 0) { + throw new Error('maxExportBatchSize must be a positive integer'); } const batches: ResourceMetrics[] = []; let currentBatchPoints = 0; diff --git a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts index df5d9a19237..b93f40f477f 100644 --- a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts +++ b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts @@ -120,8 +120,11 @@ export class PeriodicExportingMetricReader extends MetricReader { throw Error('exportTimeoutMillis must be greater than 0'); } - if (maxExportBatchSize !== undefined && maxExportBatchSize <= 0) { - throw Error('maxExportBatchSize must be greater than 0'); + if ( + maxExportBatchSize !== undefined && + (!Number.isInteger(maxExportBatchSize) || maxExportBatchSize <= 0) + ) { + throw Error('maxExportBatchSize must be a positive integer'); } if (exportIntervalMillis < exportTimeoutMillis) { diff --git a/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts b/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts index d9df383ce4f..8bf013a81a7 100644 --- a/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts +++ b/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts @@ -926,12 +926,17 @@ describe('splitMetricData', () => { assert.throws( () => splitMetricData(resourceMetrics, 0), - /maxExportBatchSize must be greater than 0/ + /maxExportBatchSize must be a positive integer/ ); assert.throws( () => splitMetricData(resourceMetrics, -1), - /maxExportBatchSize must be greater than 0/ + /maxExportBatchSize must be a positive integer/ + ); + + assert.throws( + () => splitMetricData(resourceMetrics, -1.5), + /maxExportBatchSize must be a positive integer/ ); }); }); diff --git a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts index 972f9a3c6d2..087a4f7d7a7 100644 --- a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts +++ b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts @@ -220,7 +220,7 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 1, maxExportBatchSize: 0, }), - /maxExportBatchSize must be greater than 0/ + /maxExportBatchSize must be a positive integer/ ); assert.throws( @@ -231,7 +231,18 @@ describe('PeriodicExportingMetricReader', () => { exportTimeoutMillis: 1, maxExportBatchSize: -1, }), - /maxExportBatchSize must be greater than 0/ + /maxExportBatchSize must be a positive integer/ + ); + + assert.throws( + () => + new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: 1, + exportTimeoutMillis: 1, + maxExportBatchSize: 1.5, + }), + /maxExportBatchSize must be a positive integer/ ); }); From c935ad0fccf6d5b3907c971ce93ac4979405a760 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Mon, 4 May 2026 16:33:21 +0000 Subject: [PATCH 14/17] Prevent concurrent export by breaking export loop on Timeout error --- .../export/PeriodicExportingMetricReader.ts | 1 + .../PeriodicExportingMetricReader.test.ts | 63 +++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts index b93f40f477f..dc7d37c42c4 100644 --- a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts +++ b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts @@ -209,6 +209,7 @@ export class PeriodicExportingMetricReader extends MetricReader { api.diag.error( `PeriodicExportingMetricReader: metrics export timed out after ${this._exportTimeout}ms` ); + break; } else { api.diag.error( 'PeriodicExportingMetricReader: metrics export threw error', diff --git a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts index 087a4f7d7a7..eb979757a77 100644 --- a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts +++ b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts @@ -921,6 +921,69 @@ describe('PeriodicExportingMetricReader', () => { await reader.shutdown(); }); + it('should not export batches simultaneously when one times out', async () => { + const exporter = new TestMetricExporter(); + exporter.exportTime = 50; // Make export take some time + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 20, // Timeout smaller than export time + maxExportBatchSize: 1, + }); + + const resourceMetrics: ResourceMetrics = { + resource: { + attributes: {}, + merge: sinon.stub(), + getRawAttributes: () => [], + } as any, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 1, + }, + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 2, + }, + ], + descriptor: { + name: 'm1', + description: '', + unit: '', + valueType: ValueType.INT, + }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + ], + }, + ], + }; + + const producer = new TestMetricProducer({ + resourceMetrics: resourceMetrics, + errors: [], + }); + reader.setMetricProducer(producer); + + await reader.forceFlush(); + + // Assert that they didn't overlap + assert.strictEqual(exporter.maxConcurrentCalls, 1, 'Batches should not be exported simultaneously'); + + await reader.shutdown(); + }); + it('should synchronize concurrent exports', async () => { const exporter = new TestMetricExporter(); exporter.exportTime = 50; // Make export take some time From d4180bec4ccc7f30d8214e7b2a0c1058704992fa Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Mon, 4 May 2026 17:15:15 +0000 Subject: [PATCH 15/17] Fix incorrect merge of distinct metric streams The incorrect merge happened because metrics were only being grouped using descriptor.name and the SDK can produce multiple incompatible storages per name. --- .../src/export/MetricDataSplitter.ts | 14 +++- .../test/export/MetricDataSplitter.test.ts | 75 +++++++++++++++++++ .../PeriodicExportingMetricReader.test.ts | 6 +- 3 files changed, 90 insertions(+), 5 deletions(-) diff --git a/packages/sdk-metrics/src/export/MetricDataSplitter.ts b/packages/sdk-metrics/src/export/MetricDataSplitter.ts index c937299fab2..6a2a6f2f787 100644 --- a/packages/sdk-metrics/src/export/MetricDataSplitter.ts +++ b/packages/sdk-metrics/src/export/MetricDataSplitter.ts @@ -34,12 +34,16 @@ export function splitMetricData( } } + // Iterate through all scopes in the input metrics for (const scopeMetric of resourceMetrics.scopeMetrics) { let scopeMetricCopy: ScopeMetrics | null = null; + // Iterate through all metrics within the current scope for (const metric of scopeMetric.metrics) { let dataPointsRemaining = metric.dataPoints; + let metricCopy: typeof metric | undefined = undefined; + // If a metric has no data points, add it directly to the current batch if (dataPointsRemaining.length === 0) { if (!scopeMetricCopy) { scopeMetricCopy = { scope: scopeMetric.scope, metrics: [] }; @@ -49,20 +53,21 @@ export function splitMetricData( continue; } + // Chunk the data points of the current metric across batches while (dataPointsRemaining.length > 0) { const spaceLeft = maxExportBatchSize - currentBatchPoints; const take = Math.min(spaceLeft, dataPointsRemaining.length); const chunk = dataPointsRemaining.slice(0, take); dataPointsRemaining = dataPointsRemaining.slice(take); + // Ensure we have a ScopeMetrics object in the current batch if (!scopeMetricCopy) { scopeMetricCopy = { scope: scopeMetric.scope, metrics: [] }; currentScopeMetrics.push(scopeMetricCopy); + metricCopy = undefined; // Reset because we are starting a new batch } - let metricCopy = scopeMetricCopy.metrics.find( - m => m.descriptor.name === metric.descriptor.name - ); + // Ensure we have a MetricData object for this specific metric in the current batch. if (!metricCopy) { metricCopy = { ...metric, dataPoints: [] }; scopeMetricCopy.metrics.push(metricCopy); @@ -73,9 +78,10 @@ export function splitMetricData( ); currentBatchPoints += take; + // If the current batch is full, flush it and start a new one if (currentBatchPoints === maxExportBatchSize) { flush(); - scopeMetricCopy = null; + scopeMetricCopy = null; // Force recreation of scope copy in the next batch } } } diff --git a/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts b/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts index 8bf013a81a7..2c5feaf54b8 100644 --- a/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts +++ b/packages/sdk-metrics/test/export/MetricDataSplitter.test.ts @@ -914,6 +914,81 @@ describe('splitMetricData', () => { ); }); + it('should not merge metrics with the same name but different types', () => { + const resourceMetrics: ResourceMetrics = { + resource: dummyResource, + scopeMetrics: [ + { + scope: { name: 'test' }, + metrics: [ + { + dataPointType: DataPointType.GAUGE, + dataPoints: [ + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 1, + }, + ], + descriptor: { + name: 'm1', + description: 'desc1', + unit: 'unit1', + valueType: ValueType.INT, + }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + }, + { + dataPointType: DataPointType.SUM, + dataPoints: [ + { + startTime: [0, 0], + endTime: [0, 0], + attributes: {}, + value: 2, + }, + ], + descriptor: { + name: 'm1', // Same name! + description: 'desc2', + unit: 'unit2', + valueType: ValueType.INT, + }, + aggregationTemporality: AggregationTemporality.CUMULATIVE, + isMonotonic: true, + }, + ], + }, + ], + }; + + const batches = splitMetricData(resourceMetrics, 10); + + assert.strictEqual(batches.length, 1); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics.length, + 2, + 'Should keep distinct metrics separate' + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].descriptor.name, + 'm1' + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[1].descriptor.name, + 'm1' + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[0].dataPointType, + DataPointType.GAUGE + ); + assert.strictEqual( + batches[0].scopeMetrics[0].metrics[1].dataPointType, + DataPointType.SUM + ); + }); + // ========================================================================== // Validation Tests // ========================================================================== diff --git a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts index eb979757a77..234d783e825 100644 --- a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts +++ b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts @@ -979,7 +979,11 @@ describe('PeriodicExportingMetricReader', () => { await reader.forceFlush(); // Assert that they didn't overlap - assert.strictEqual(exporter.maxConcurrentCalls, 1, 'Batches should not be exported simultaneously'); + assert.strictEqual( + exporter.maxConcurrentCalls, + 1, + 'Batches should not be exported simultaneously' + ); await reader.shutdown(); }); From 1cf992a2326c5d83c273bff893c106352313e573 Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Mon, 4 May 2026 19:23:39 +0000 Subject: [PATCH 16/17] Fix CHANGELOG entry --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 38d1f837b38..5656bd602dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ For notes on migrating to 2.x / 0.200.x see [the upgrade guide](doc/upgrade-to-2 ### :rocket: Features -* feat(sdk-metrics): add maxExportBatchSize option to PeriodicExportingMetricReader [#6641](https://github.com/open-telemetry/opentelemetry-js/issues/6641) @psx95 +* feat(sdk-metrics): add maxExportBatchSize option to PeriodicExportingMetricReader [#6655](https://github.com/open-telemetry/opentelemetry-js/pull/6655) @psx95 ### :bug: Bug Fixes From 5158764b7891dfce8e3fb4abe84b66188f9e395c Mon Sep 17 00:00:00 2001 From: Pranav Sharma Date: Wed, 6 May 2026 19:56:05 +0000 Subject: [PATCH 17/17] Skip collect + export cycle if export ongoing --- .../export/PeriodicExportingMetricReader.ts | 19 +++++--- .../PeriodicExportingMetricReader.test.ts | 45 +++++++++++++++++-- 2 files changed, 55 insertions(+), 9 deletions(-) diff --git a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts index dc7d37c42c4..28b4832a96a 100644 --- a/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts +++ b/packages/sdk-metrics/src/export/PeriodicExportingMetricReader.ts @@ -68,7 +68,7 @@ export class PeriodicExportingMetricReader extends MetricReader { private readonly _exportInterval: number; private readonly _exportTimeout: number; private readonly _maxExportBatchSize?: number; - private _previousExportPromise: Promise = Promise.resolve(); + private _ongoingExportPromise: Promise | null = null; constructor(options: PeriodicExportingMetricReaderOptions) { const { @@ -160,6 +160,13 @@ export class PeriodicExportingMetricReader extends MetricReader { } private async _doRun(): Promise { + if (this._ongoingExportPromise) { + api.diag.debug( + 'PeriodicExportingMetricReader: export already in progress, skipping' + ); + return; + } + const { resourceMetrics, errors } = await this.collect({ timeoutMillis: this._exportTimeout, }); @@ -224,10 +231,12 @@ export class PeriodicExportingMetricReader extends MetricReader { } }; - // Schedules the current export to run after all previously scheduled exports have finished. - const promise = this._previousExportPromise.then(currentExport); - this._previousExportPromise = promise.catch(() => {}); - await promise; + this._ongoingExportPromise = currentExport(); + try { + await this._ongoingExportPromise; + } finally { + this._ongoingExportPromise = null; + } } protected override onInitialized(): void { diff --git a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts index 234d783e825..9433639ad2a 100644 --- a/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts +++ b/packages/sdk-metrics/test/export/PeriodicExportingMetricReader.test.ts @@ -524,6 +524,39 @@ describe('PeriodicExportingMetricReader', () => { exporter.throwExport = false; await reader.shutdown(); }); + + it('should not initiate collect when an export is ongoing', async () => { + const exporter = new TestMetricExporter(); + exporter.exportTime = 100; // Make it slow + const reader = new PeriodicExportingMetricReader({ + exporter: exporter, + exportIntervalMillis: MAX_32_BIT_INT, + exportTimeoutMillis: 80, + }); + + reader.setMetricProducer( + new TestMetricProducer({ resourceMetrics: resourceMetrics, errors: [] }) + ); + + const collectSpy = sinon.spy(reader, 'collect'); + + // Trigger first export + const firstFlush = reader.forceFlush(); + + // Wait a bit to ensure the first call has passed the `this.collect` call + // and is now in the export phase. + await new Promise(resolve => setTimeout(resolve, 10)); + + // Trigger second export + await reader.forceFlush(); + + // The second forceFlush should have skipped collection. + assert.strictEqual(collectSpy.callCount, 1); + + // Wait for the first export to complete to clean up + await firstFlush; + await reader.shutdown(); + }); }); describe('forceFlush', () => { @@ -988,7 +1021,7 @@ describe('PeriodicExportingMetricReader', () => { await reader.shutdown(); }); - it('should synchronize concurrent exports', async () => { + it('should skip subsequent export when one is ongoing', async () => { const exporter = new TestMetricExporter(); exporter.exportTime = 50; // Make export take some time const reader = new PeriodicExportingMetricReader({ @@ -1035,16 +1068,20 @@ describe('PeriodicExportingMetricReader', () => { }); reader.setMetricProducer(producer); - // Trigger two exports quickly + // Trigger first export const p1 = reader.forceFlush(); + // Wait a bit to ensure the first call has passed the `this.collect` call + // and is now in the export phase. + await new Promise(resolve => setTimeout(resolve, 10)); + // Trigger second export const p2 = reader.forceFlush(); await Promise.all([p1, p2]); const exports = exporter.getExports(); - assert.strictEqual(exports.length, 2); + assert.strictEqual(exports.length, 1); - // Assert that they didn't overlap + // Assert that they didn't overlap (only 1 ran) assert.strictEqual(exporter.maxConcurrentCalls, 1); await reader.shutdown();