Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions packages/k8s/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

128 changes: 128 additions & 0 deletions packages/k8s/src/k8s/heartbeat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import * as core from '@actions/core'

export interface HeartbeatWebSocket {
readyState: number
ping(): void
close(): void
on(event: string, listener: (...args: any[]) => void): this
once(event: string, listener: (...args: any[]) => void): this
}

export function parsePositiveMsEnv(
value: string | undefined,
fallback: number
): number {
const parsed = Number.parseInt(value ?? '', 10)
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback
}

export class WebSocketHeartbeat {
private pingInterval: ReturnType<typeof setInterval> | null = null
private pongTimeout: ReturnType<typeof setTimeout> | null = null
private lastHeartbeatLog = 0
private static readonly LOG_INTERVAL_MS = 2 * 60 * 1000

constructor(
private readonly pingPeriodMs: number,
private readonly pongDeadlineMs: number
) {}

private shouldLog(): boolean {
const now = Date.now()
if (now - this.lastHeartbeatLog >= WebSocketHeartbeat.LOG_INTERVAL_MS) {
this.lastHeartbeatLog = now
return true
}
return false
}

stop(): void {
if (this.shouldLog()) {
core.debug('[Heartbeat] stopping heartbeat')
}
if (this.pingInterval) {
clearInterval(this.pingInterval)
this.pingInterval = null
}
if (this.pongTimeout) {
clearTimeout(this.pongTimeout)
this.pongTimeout = null
}
}

start(ws: HeartbeatWebSocket, reject: (err: Error) => void): void {
core.debug(
`[Heartbeat] Starting with period=${this.pingPeriodMs}ms, deadline=${this.pongDeadlineMs}ms`
)
this.lastHeartbeatLog = Date.now()

const resetPongTimeout = (): void => {
if (this.pongTimeout) {
clearTimeout(this.pongTimeout)
this.pongTimeout = null
}
this.pongTimeout = setTimeout(() => {
core.warning(
`[Heartbeat] No pong received in ${this.pongDeadlineMs}ms, closing stale connection`
)
this.stop()
try {
ws.close()
} catch {
// ignore errors closing an already-closing socket
}
reject(
new Error(
`WebSocket heartbeat timeout: no pong within ${this.pongDeadlineMs}ms`
)
)
}, this.pongDeadlineMs)
}

ws.on('pong', () => {
if (this.shouldLog()) {
core.debug('[Heartbeat] Pong received')
}
resetPongTimeout()
})

ws.on('error', (err: Error) => {
core.error(`[Heartbeat] WebSocket error: ${err.message}`)
this.stop()
})

ws.on('close', () => {
core.debug('[Heartbeat] WebSocket closed, stopping heartbeat')
this.stop()
})

resetPongTimeout()

// WebSocket readyState: 0 = CONNECTING, 1 = OPEN, 2 = CLOSING, 3 = CLOSED
this.pingInterval = setInterval(() => {
if (ws.readyState === 0) {
// Still connecting — skip this tick but keep the interval alive
return
}
if (ws.readyState === 1) {
try {
ws.ping()
if (this.shouldLog()) {
core.debug('[Heartbeat] Ping sent')
}
} catch (err) {
core.error(`[Heartbeat] Ping failed: ${err}`)
this.stop()
}
} else {
// CLOSING (2) or CLOSED (3)
if (this.shouldLog()) {
core.debug(
`[Heartbeat] WebSocket closing/closed (readyState=${ws.readyState}), stopping heartbeat`
)
}
this.stop()
}
}, this.pingPeriodMs)
}
}
100 changes: 92 additions & 8 deletions packages/k8s/src/k8s/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import {
WORK_VOLUME
} from './utils'
import * as shlex from 'shlex'
import { parsePositiveMsEnv, WebSocketHeartbeat } from './heartbeat'
import type { HeartbeatWebSocket } from './heartbeat'

const kc = new k8s.KubeConfig()

Expand Down Expand Up @@ -250,9 +252,31 @@ export async function execPodStep(
stdin?: stream.Readable
): Promise<number> {
const exec = new k8s.Exec(kc)
core.debug(
`[execPodStep] Starting: cmd="${command[0]}" (${command.length} args), pod=${podName}, container=${containerName}`
)

command = fixArgs(command)
return await new Promise(function (resolve, reject) {

const DEFAULT_PING_PERIOD_MS = 5000
const pingPeriodMs = parsePositiveMsEnv(
process.env.ACTIONS_RUNNER_HEARTBEAT_PERIOD_MS,
DEFAULT_PING_PERIOD_MS
)
const pongDeadlineMs = parsePositiveMsEnv(
process.env.ACTIONS_RUNNER_HEARTBEAT_DEADLINE_MS,
pingPeriodMs * 12 + 1000
)
core.debug(
`[execPodStep] Heartbeat config: pingPeriodMs=${pingPeriodMs}, pongDeadlineMs=${pongDeadlineMs}`
)

const heartbeat = new WebSocketHeartbeat(pingPeriodMs, pongDeadlineMs)

return new Promise<number>((resolve, reject) => {
core.debug('[execPodStep] About to call exec.exec')
let ws: HeartbeatWebSocket | null = null

Comment thread
madAndroid marked this conversation as resolved.
exec
.exec(
namespace(),
Expand All @@ -263,22 +287,82 @@ export async function execPodStep(
process.stderr,
stdin ?? null,
false /* tty */,
resp => {
core.debug(`execPodStep response: ${JSON.stringify(resp)}`)
async resp => {
core.debug(
`[execPodStep] execPodStep response: ${JSON.stringify(resp)}`
)

heartbeat.stop()

// Close WebSocket and wait for it before resolving/rejecting
const closeWebSocket = async (): Promise<void> => {
const socket = ws
if (socket && (socket.readyState === 1 || socket.readyState === 0)) {
return new Promise<void>(closeResolve => {
const closeTimeout = setTimeout(() => {
core.warning(
'[execPodStep] WebSocket close timeout, forcing cleanup'
)
closeResolve()
}, 5000)

socket.once('close', () => {
clearTimeout(closeTimeout)
core.debug('[execPodStep] WebSocket closed cleanly')
closeResolve()
})
socket.close()
})
}
}

if (resp.status === 'Success') {
core.debug(`[execPodStep] Success, code: ${resp.code}`)
await closeWebSocket()
resolve(resp.code || 0)
} else {
core.debug(
JSON.stringify({
message: resp?.message,
details: resp?.details
})
`[execPodStep] Failure: ${JSON.stringify({ message: resp?.message, details: resp?.details })}`
)
await closeWebSocket()
reject(new Error(resp?.message || 'execPodStep failed'))
}
}
)
.catch(e => reject(e))
.then(websocket => {
core.debug('[execPodStep] exec.exec resolved, ws object received')
ws = websocket
if (ws) {
heartbeat.start(ws, reject)
} else {
core.warning('[Heartbeat] WebSocket is null, heartbeat not started')
}
})
.catch(async e => {
heartbeat.stop()
core.error(`[execPodStep] exec.exec threw error: ${e}`)

// Close WebSocket before rejecting with timeout protection
const socket = ws
if (socket && (socket.readyState === 1 || socket.readyState === 0)) {
await new Promise<void>(closeResolve => {
const closeTimeout = setTimeout(() => {
core.warning(
'[execPodStep] WebSocket close timeout in error handler'
)
closeResolve()
}, 5000)

socket.once('close', () => {
clearTimeout(closeTimeout)
closeResolve()
})
socket.close()
})
}

reject(e)
})
})
}

Expand Down
Loading