Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions packages/k8s/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

128 changes: 128 additions & 0 deletions packages/k8s/src/k8s/heartbeat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import * as core from '@actions/core'

export interface HeartbeatWebSocket {
readyState: number
ping(): void
close(): void
on(event: string, listener: (...args: any[]) => void): this
once(event: string, listener: (...args: any[]) => void): this
}

export function parsePositiveMsEnv(
value: string | undefined,
fallback: number
): number {
const parsed = Number.parseInt(value ?? '', 10)
return Number.isFinite(parsed) && parsed > 0 ? parsed : fallback
}

export class WebSocketHeartbeat {
private pingInterval: ReturnType<typeof setInterval> | null = null
private pongTimeout: ReturnType<typeof setTimeout> | null = null
private lastHeartbeatLog = 0
private static readonly LOG_INTERVAL_MS = 2 * 60 * 1000

constructor(
private readonly pingPeriodMs: number,
private readonly pongDeadlineMs: number
) {}

private shouldLog(): boolean {
const now = Date.now()
if (now - this.lastHeartbeatLog >= WebSocketHeartbeat.LOG_INTERVAL_MS) {
this.lastHeartbeatLog = now
return true
}
return false
}

stop(): void {
if (this.shouldLog()) {
core.debug('[Heartbeat] stopping heartbeat')
}
if (this.pingInterval) {
clearInterval(this.pingInterval)
this.pingInterval = null
}
if (this.pongTimeout) {
clearTimeout(this.pongTimeout)
this.pongTimeout = null
}
}

start(ws: HeartbeatWebSocket, reject: (err: Error) => void): void {
core.debug(
`[Heartbeat] Starting with period=${this.pingPeriodMs}ms, deadline=${this.pongDeadlineMs}ms`
)
this.lastHeartbeatLog = Date.now()

const resetPongTimeout = (): void => {
if (this.pongTimeout) {
clearTimeout(this.pongTimeout)
this.pongTimeout = null
}
this.pongTimeout = setTimeout(() => {
core.warning(
`[Heartbeat] No pong received in ${this.pongDeadlineMs}ms, closing stale connection`
)
this.stop()
try {
ws.close()
} catch {
// ignore errors closing an already-closing socket
}
reject(
new Error(
`WebSocket heartbeat timeout: no pong within ${this.pongDeadlineMs}ms`
)
)
}, this.pongDeadlineMs)
}

ws.on('pong', () => {
if (this.shouldLog()) {
core.debug('[Heartbeat] Pong received')
}
resetPongTimeout()
})

ws.on('error', (err: Error) => {
core.error(`[Heartbeat] WebSocket error: ${err.message}`)
this.stop()
})

ws.on('close', () => {
core.debug('[Heartbeat] WebSocket closed, stopping heartbeat')
this.stop()
})

resetPongTimeout()

// WebSocket readyState: 0 = CONNECTING, 1 = OPEN, 2 = CLOSING, 3 = CLOSED
this.pingInterval = setInterval(() => {
if (ws.readyState === 0) {
// Still connecting — skip this tick but keep the interval alive
return
Comment thread
madAndroid marked this conversation as resolved.
Outdated
}
if (ws.readyState === 1) {
try {
ws.ping()
if (this.shouldLog()) {
core.debug('[Heartbeat] Ping sent')
}
} catch (err) {
core.error(`[Heartbeat] Ping failed: ${err}`)
this.stop()
}
} else {
// CLOSING (2) or CLOSED (3)
if (this.shouldLog()) {
core.debug(
`[Heartbeat] WebSocket closing/closed (readyState=${ws.readyState}), stopping heartbeat`
)
}
this.stop()
}
}, this.pingPeriodMs)
}
}
100 changes: 92 additions & 8 deletions packages/k8s/src/k8s/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import {
WORK_VOLUME
} from './utils'
import * as shlex from 'shlex'
import { parsePositiveMsEnv, WebSocketHeartbeat } from './heartbeat'
import type { HeartbeatWebSocket } from './heartbeat'

const kc = new k8s.KubeConfig()

Expand Down Expand Up @@ -250,9 +252,31 @@ export async function execPodStep(
stdin?: stream.Readable
): Promise<number> {
const exec = new k8s.Exec(kc)
core.debug(
`[execPodStep] Starting: cmd="${command[0]}" (${command.length} args), pod=${podName}, container=${containerName}`
)

command = fixArgs(command)
return await new Promise(function (resolve, reject) {

const DEFAULT_PING_PERIOD_MS = 5000
const pingPeriodMs = parsePositiveMsEnv(
process.env.ACTIONS_RUNNER_HEARTBEAT_PERIOD_MS,
DEFAULT_PING_PERIOD_MS
)
const pongDeadlineMs = parsePositiveMsEnv(
process.env.ACTIONS_RUNNER_HEARTBEAT_DEADLINE_MS,
pingPeriodMs * 12 + 1000
)
core.debug(
`[execPodStep] Heartbeat config: pingPeriodMs=${pingPeriodMs}, pongDeadlineMs=${pongDeadlineMs}`
)

const heartbeat = new WebSocketHeartbeat(pingPeriodMs, pongDeadlineMs)

return new Promise<number>((resolve, reject) => {
core.debug('[execPodStep] About to call exec.exec')
let ws: HeartbeatWebSocket | null = null

Comment thread
madAndroid marked this conversation as resolved.
exec
.exec(
namespace(),
Expand All @@ -263,22 +287,82 @@ export async function execPodStep(
process.stderr,
stdin ?? null,
false /* tty */,
resp => {
core.debug(`execPodStep response: ${JSON.stringify(resp)}`)
async resp => {
core.debug(
`[execPodStep] execPodStep response: ${JSON.stringify(resp)}`
)

heartbeat.stop()

// Close WebSocket and wait for it before resolving/rejecting
const closeWebSocket = async (): Promise<void> => {
const socket = ws
if (socket && (socket.readyState === 1 || socket.readyState === 0)) {
return new Promise<void>(closeResolve => {
const closeTimeout = setTimeout(() => {
core.warning(
'[execPodStep] WebSocket close timeout, forcing cleanup'
)
closeResolve()
}, 5000)

socket.once('close', () => {
clearTimeout(closeTimeout)
core.debug('[execPodStep] WebSocket closed cleanly')
closeResolve()
})
socket.close()
})
}
}

if (resp.status === 'Success') {
core.debug(`[execPodStep] Success, code: ${resp.code}`)
await closeWebSocket()
resolve(resp.code || 0)
} else {
core.debug(
JSON.stringify({
message: resp?.message,
details: resp?.details
})
`[execPodStep] Failure: ${JSON.stringify({ message: resp?.message, details: resp?.details })}`
)
await closeWebSocket()
reject(new Error(resp?.message || 'execPodStep failed'))
}
}
)
.catch(e => reject(e))
.then(websocket => {
core.debug('[execPodStep] exec.exec resolved, ws object received')
ws = websocket
if (ws) {
heartbeat.start(ws, reject)
} else {
core.warning('[Heartbeat] WebSocket is null, heartbeat not started')
}
})
.catch(async e => {
heartbeat.stop()
core.error(`[execPodStep] exec.exec threw error: ${e}`)

// Close WebSocket before rejecting with timeout protection
const socket = ws
if (socket && (socket.readyState === 1 || socket.readyState === 0)) {
await new Promise<void>(closeResolve => {
const closeTimeout = setTimeout(() => {
core.warning(
'[execPodStep] WebSocket close timeout in error handler'
)
closeResolve()
}, 5000)

socket.once('close', () => {
clearTimeout(closeTimeout)
closeResolve()
})
socket.close()
})
}

reject(e)
})
})
}

Expand Down
Loading
Loading