diff --git a/packages/k8s/package-lock.json b/packages/k8s/package-lock.json index ea382461..8fc73668 100644 --- a/packages/k8s/package-lock.json +++ b/packages/k8s/package-lock.json @@ -112,7 +112,6 @@ "integrity": "sha512-H3mcG6ZDLTlYfaSNi0iOKkigqMFvkTKlGUYlD8GW7nNOYRrevuA46iTypPyv+06V3fEmvvazfntkBU34L0azAw==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@babel/code-frame": "^7.28.6", "@babel/generator": "^7.28.6", @@ -3266,7 +3265,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "baseline-browser-mapping": "^2.9.0", "caniuse-lite": "^1.0.30001759", @@ -4452,7 +4450,6 @@ "integrity": "sha512-F26gjC0yWN8uAA5m5Ss8ZQf5nDHWGlN/xWZIh8S5SRbsEKBovwZhxGd6LJlbZYxBgCYOtreSUyb8hpXyGC5O4A==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@jest/core": "30.2.0", "@jest/types": "30.2.0", @@ -5086,7 +5083,6 @@ "resolved": "https://registry.npmjs.org/jsep/-/jsep-1.4.0.tgz", "integrity": "sha512-B7qPcEVE3NVkmSJbaYxvv4cHkVW7DQsZz13pUMrfS8z8Q/BuShN+gcTXrUlPiGqM2/t/EEaI030bpxMqY8gMlw==", "license": "MIT", - "peer": true, "engines": { "node": ">= 10.16.0" } @@ -6467,7 +6463,6 @@ "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "dev": true, "license": "Apache-2.0", - "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -6815,7 +6810,6 @@ "resolved": "https://registry.npmjs.org/ws/-/ws-8.19.0.tgz", "integrity": "sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==", "license": "MIT", - "peer": true, "engines": { "node": ">=10.0.0" }, diff --git a/packages/k8s/src/k8s/heartbeat.ts b/packages/k8s/src/k8s/heartbeat.ts new file mode 100644 index 00000000..b83f1654 --- /dev/null +++ b/packages/k8s/src/k8s/heartbeat.ts @@ -0,0 +1,136 @@ +import * as core from '@actions/core' + +export interface HeartbeatWebSocket { + readyState: number + ping(): void + close(): void + on(event: string, listener: (...args: any[]) => void): this + once(event: string, listener: (...args: any[]) => void): this +} + +export function parsePositiveMsEnv( + value: string | undefined, + fallback: number +): number { + const parsed = Number.parseInt(value ?? '', 10) + return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback +} + +export class WebSocketHeartbeat { + private pingInterval: ReturnType | null = null + private pongTimeout: ReturnType | null = null + private lastHeartbeatLog = 0 + private static readonly LOG_INTERVAL_MS = 2 * 60 * 1000 + + constructor( + private readonly pingPeriodMs: number, + private readonly pongDeadlineMs: number + ) {} + + private shouldLog(): boolean { + const now = Date.now() + if (now - this.lastHeartbeatLog >= WebSocketHeartbeat.LOG_INTERVAL_MS) { + this.lastHeartbeatLog = now + return true + } + return false + } + + stop(): void { + if (this.shouldLog()) { + core.debug('[Heartbeat] stopping heartbeat') + } + if (this.pingInterval) { + clearInterval(this.pingInterval) + this.pingInterval = null + } + if (this.pongTimeout) { + clearTimeout(this.pongTimeout) + this.pongTimeout = null + } + } + + start(ws: HeartbeatWebSocket, reject: (err: Error) => void): void { + core.debug( + `[Heartbeat] Starting with period=${this.pingPeriodMs}ms, deadline=${this.pongDeadlineMs}ms` + ) + this.lastHeartbeatLog = Date.now() + + const resetPongTimeout = (): void => { + if (this.pongTimeout) { + clearTimeout(this.pongTimeout) + this.pongTimeout = null + } + this.pongTimeout = setTimeout(() => { + core.warning( + `[Heartbeat] No pong received in ${this.pongDeadlineMs}ms, closing stale connection` + ) + this.stop() + try { + ws.close() + } catch { + // ignore errors closing an already-closing socket + } + reject( + new Error( + `WebSocket heartbeat timeout: no pong within ${this.pongDeadlineMs}ms` + ) + ) + }, this.pongDeadlineMs) + } + + // Arm the deadline only after the first ping is sent, not immediately on + // start, so a slow CONNECTING socket cannot time out before any exchange. + let deadlineArmed = false + + ws.on('pong', () => { + if (this.shouldLog()) { + core.debug('[Heartbeat] Pong received') + } + if (deadlineArmed) { + resetPongTimeout() + } + }) + + ws.on('error', (err: Error) => { + core.error(`[Heartbeat] WebSocket error: ${err.message}`) + this.stop() + }) + + ws.on('close', () => { + core.debug('[Heartbeat] WebSocket closed, stopping heartbeat') + this.stop() + }) + + // WebSocket readyState: 0 = CONNECTING, 1 = OPEN, 2 = CLOSING, 3 = CLOSED + this.pingInterval = setInterval(() => { + if (ws.readyState === 0) { + // Still connecting — skip this tick but keep the interval alive + return + } + if (ws.readyState === 1) { + try { + ws.ping() + if (!deadlineArmed) { + deadlineArmed = true + resetPongTimeout() + } + if (this.shouldLog()) { + core.debug('[Heartbeat] Ping sent') + } + } catch (err) { + core.error(`[Heartbeat] Ping failed: ${err}`) + this.stop() + } + } else { + // CLOSING (2) or CLOSED (3) + if (this.shouldLog()) { + core.debug( + `[Heartbeat] WebSocket closing/closed (readyState=${ws.readyState}), stopping heartbeat` + ) + } + this.stop() + } + }, this.pingPeriodMs) + } +} diff --git a/packages/k8s/src/k8s/index.ts b/packages/k8s/src/k8s/index.ts index ae773da3..8e19a812 100644 --- a/packages/k8s/src/k8s/index.ts +++ b/packages/k8s/src/k8s/index.ts @@ -24,6 +24,8 @@ import { WORK_VOLUME } from './utils' import * as shlex from 'shlex' +import { parsePositiveMsEnv, WebSocketHeartbeat } from './heartbeat' +import type { HeartbeatWebSocket } from './heartbeat' const kc = new k8s.KubeConfig() @@ -250,9 +252,31 @@ export async function execPodStep( stdin?: stream.Readable ): Promise { const exec = new k8s.Exec(kc) + core.debug( + `[execPodStep] Starting: cmd="${command[0]}" (${command.length} args), pod=${podName}, container=${containerName}` + ) command = fixArgs(command) - return await new Promise(function (resolve, reject) { + + const DEFAULT_PING_PERIOD_MS = 5000 + const pingPeriodMs = parsePositiveMsEnv( + process.env.ACTIONS_RUNNER_HEARTBEAT_PERIOD_MS, + DEFAULT_PING_PERIOD_MS + ) + const pongDeadlineMs = parsePositiveMsEnv( + process.env.ACTIONS_RUNNER_HEARTBEAT_DEADLINE_MS, + pingPeriodMs * 12 + 1000 + ) + core.debug( + `[execPodStep] Heartbeat config: pingPeriodMs=${pingPeriodMs}, pongDeadlineMs=${pongDeadlineMs}` + ) + + const heartbeat = new WebSocketHeartbeat(pingPeriodMs, pongDeadlineMs) + + return new Promise((resolve, reject) => { + core.debug('[execPodStep] About to call exec.exec') + let ws: HeartbeatWebSocket | null = null + exec .exec( namespace(), @@ -263,22 +287,85 @@ export async function execPodStep( process.stderr, stdin ?? null, false /* tty */, - resp => { - core.debug(`execPodStep response: ${JSON.stringify(resp)}`) + async resp => { + core.debug( + `[execPodStep] execPodStep response: ${JSON.stringify(resp)}` + ) + + heartbeat.stop() + + // Close WebSocket and wait for it before resolving/rejecting + const closeWebSocket = async (): Promise => { + const socket = ws + if ( + socket && + (socket.readyState === 1 || socket.readyState === 0) + ) { + return new Promise(closeResolve => { + const closeTimeout = setTimeout(() => { + core.warning( + '[execPodStep] WebSocket close timeout, forcing cleanup' + ) + closeResolve() + }, 5000) + + socket.once('close', () => { + clearTimeout(closeTimeout) + core.debug('[execPodStep] WebSocket closed cleanly') + closeResolve() + }) + socket.close() + }) + } + } + if (resp.status === 'Success') { + core.debug(`[execPodStep] Success, code: ${resp.code}`) + await closeWebSocket() resolve(resp.code || 0) } else { core.debug( - JSON.stringify({ - message: resp?.message, - details: resp?.details - }) + `[execPodStep] Failure: ${JSON.stringify({ message: resp?.message, details: resp?.details })}` ) + await closeWebSocket() reject(new Error(resp?.message || 'execPodStep failed')) } } ) - .catch(e => reject(e)) + .then(websocket => { + core.debug('[execPodStep] exec.exec resolved, ws object received') + ws = websocket + if (ws) { + heartbeat.start(ws, reject) + } else { + core.warning('[Heartbeat] WebSocket is null, heartbeat not started') + } + }) + .catch(async e => { + heartbeat.stop() + core.error(`[execPodStep] exec.exec threw error: ${e}`) + + // Close WebSocket before rejecting with timeout protection + const socket = ws + if (socket && (socket.readyState === 1 || socket.readyState === 0)) { + await new Promise(closeResolve => { + const closeTimeout = setTimeout(() => { + core.warning( + '[execPodStep] WebSocket close timeout in error handler' + ) + closeResolve() + }, 5000) + + socket.once('close', () => { + clearTimeout(closeTimeout) + closeResolve() + }) + socket.close() + }) + } + + reject(e) + }) }) } diff --git a/packages/k8s/tests/heartbeat-test.ts b/packages/k8s/tests/heartbeat-test.ts new file mode 100644 index 00000000..3b882a11 --- /dev/null +++ b/packages/k8s/tests/heartbeat-test.ts @@ -0,0 +1,207 @@ +import { EventEmitter } from 'events' +import { parsePositiveMsEnv, WebSocketHeartbeat } from '../src/k8s/heartbeat' +import type { HeartbeatWebSocket } from '../src/k8s/heartbeat' + +jest.mock('@actions/core', () => ({ + debug: jest.fn(), + warning: jest.fn(), + error: jest.fn(), + info: jest.fn() +})) + +// Minimal WebSocket test double backed by EventEmitter +class MockWebSocket extends EventEmitter implements HeartbeatWebSocket { + readyState: number + ping = jest.fn() + close = jest.fn() + + constructor(readyState = 1) { + super() + this.readyState = readyState + } +} + +describe('parsePositiveMsEnv', () => { + it('returns the fallback for undefined input', () => { + expect(parsePositiveMsEnv(undefined, 5000)).toBe(5000) + }) + + it('returns the fallback for an empty string', () => { + expect(parsePositiveMsEnv('', 5000)).toBe(5000) + }) + + it('returns the fallback for non-numeric input', () => { + expect(parsePositiveMsEnv('abc', 5000)).toBe(5000) + }) + + it('returns the fallback for zero', () => { + expect(parsePositiveMsEnv('0', 5000)).toBe(5000) + }) + + it('returns the fallback for negative values', () => { + expect(parsePositiveMsEnv('-100', 5000)).toBe(5000) + }) + + it('returns the fallback for NaN', () => { + expect(parsePositiveMsEnv('NaN', 5000)).toBe(5000) + }) + + it('parses a valid positive integer', () => { + expect(parsePositiveMsEnv('3000', 5000)).toBe(3000) + }) + + it('ignores fractional parts (parseInt semantics)', () => { + expect(parsePositiveMsEnv('1500.9', 5000)).toBe(1500) + }) +}) + +describe('WebSocketHeartbeat', () => { + beforeEach(() => { + jest.useFakeTimers() + }) + + afterEach(() => { + jest.useRealTimers() + }) + + describe('CONNECTING state (readyState === 0)', () => { + it('skips ping while connecting but keeps the interval running', () => { + const ws = new MockWebSocket(0) // CONNECTING + const reject = jest.fn() + const hb = new WebSocketHeartbeat(100, 10000) + + hb.start(ws, reject) + + jest.advanceTimersByTime(500) // fire interval several times + + expect(ws.ping).not.toHaveBeenCalled() + expect(reject).not.toHaveBeenCalled() + + hb.stop() + }) + + it('starts sending pings once the socket transitions to OPEN', () => { + const ws = new MockWebSocket(0) + const reject = jest.fn() + const hb = new WebSocketHeartbeat(100, 10000) + + hb.start(ws, reject) + + jest.advanceTimersByTime(100) // one interval tick while CONNECTING + expect(ws.ping).not.toHaveBeenCalled() + + ws.readyState = 1 // now OPEN + jest.advanceTimersByTime(100) // exactly one more tick + + expect(ws.ping).toHaveBeenCalledTimes(1) + + hb.stop() + }) + }) + + describe('pong timeout', () => { + it('closes the socket and rejects the promise when no pong is received after first ping', () => { + const ws = new MockWebSocket(1) + const reject = jest.fn() + // pingPeriodMs=100, pongDeadlineMs=200 + // t=100: first ping sent → deadline armed, fires at t=300 + const hb = new WebSocketHeartbeat(100, 200) + + hb.start(ws, reject) + + jest.advanceTimersByTime(350) // past first ping + deadline + + expect(ws.close).toHaveBeenCalledTimes(1) + expect(reject).toHaveBeenCalledTimes(1) + expect(reject.mock.calls[0][0]).toBeInstanceOf(Error) + expect(reject.mock.calls[0][0].message).toMatch(/heartbeat timeout/) + }) + + it('does not reject while the socket is still CONNECTING, even past pongDeadlineMs', () => { + const ws = new MockWebSocket(0) // CONNECTING — never transitions + const reject = jest.fn() + const hb = new WebSocketHeartbeat(100, 200) + + hb.start(ws, reject) + + // Advance well past what the deadline would have been if armed at start + jest.advanceTimersByTime(1000) + + expect(reject).not.toHaveBeenCalled() + + hb.stop() + }) + + it('resets the pong deadline when a pong is received', () => { + const ws = new MockWebSocket(1) + const reject = jest.fn() + // pingPeriodMs=100, pongDeadlineMs=500 + // t=100: first ping → deadline at t=600 + const hb = new WebSocketHeartbeat(100, 500) + + hb.start(ws, reject) + + jest.advanceTimersByTime(400) // t=400, deadline at t=600 + ws.emit('pong') // reset deadline → now fires at t=900 + + jest.advanceTimersByTime(400) // t=800, deadline not yet reached + + expect(reject).not.toHaveBeenCalled() + + hb.stop() + }) + }) + + describe('stop()', () => { + it('clears all timers so no pings or timeout callbacks fire after stop', () => { + const ws = new MockWebSocket(1) + const reject = jest.fn() + const hb = new WebSocketHeartbeat(100, 500) + + hb.start(ws, reject) + + jest.advanceTimersByTime(50) + hb.stop() + + const pingCountAtStop = (ws.ping as jest.Mock).mock.calls.length + + jest.advanceTimersByTime(1000) // well past both intervals + + expect(ws.ping).toHaveBeenCalledTimes(pingCountAtStop) + expect(reject).not.toHaveBeenCalled() + }) + + it('stops automatically when the WebSocket emits close', () => { + const ws = new MockWebSocket(1) + const reject = jest.fn() + const hb = new WebSocketHeartbeat(100, 500) + + hb.start(ws, reject) + + ws.emit('close') + + jest.advanceTimersByTime(1000) + + expect(reject).not.toHaveBeenCalled() + }) + }) + + describe('CLOSING/CLOSED state', () => { + it('stops the heartbeat when readyState is CLOSING (2)', () => { + const ws = new MockWebSocket(1) + const reject = jest.fn() + const hb = new WebSocketHeartbeat(100, 10000) + + hb.start(ws, reject) + + jest.advanceTimersByTime(150) // one successful ping + expect(ws.ping).toHaveBeenCalledTimes(1) + + ws.readyState = 2 // CLOSING + jest.advanceTimersByTime(200) + + // ping should not have been called again after transitioning to CLOSING + expect(ws.ping).toHaveBeenCalledTimes(1) + }) + }) +})