Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 189 additions & 30 deletions packages/k8s/src/k8s/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,121 @@ const kc = new k8s.KubeConfig()

kc.loadFromDefault()

const k8sApi = kc.makeApiClient(k8s.CoreV1Api)
const k8sBatchV1Api = kc.makeApiClient(k8s.BatchV1Api)
const k8sAuthorizationV1Api = kc.makeApiClient(k8s.AuthorizationV1Api)

const DEFAULT_WAIT_FOR_POD_TIME_SECONDS = 10 * 60 // 10 min

const RETRYABLE_STATUS_CODES = new Set([408, 429, 500, 502, 503, 504])
const RETRYABLE_NETWORK_CODES = new Set([
'ECONNREFUSED',
'ECONNRESET',
'ETIMEDOUT',
'ENETUNREACH',
'EAI_AGAIN',
'ENOTFOUND'
])
const MAX_RETRIES = 3
const RETRY_BASE_DELAY_MS = 1000

export function isRetryableError(err: unknown): boolean {
if (err instanceof k8s.ApiException) {
return RETRYABLE_STATUS_CODES.has(err.code)
}

let current: unknown = err
while (current instanceof Error) {
if (
'code' in current &&
typeof current.code === 'string' &&
RETRYABLE_NETWORK_CODES.has(current.code)
) {
return true
}
current = (current as { cause?: unknown }).cause
}

return false
}

function retryDelay(attempt: number): number {
return RETRY_BASE_DELAY_MS * 2 ** attempt * (0.5 + Math.random())
}

export function retryAfterDelay(err: k8s.ApiException<unknown>, attempt: number): number {
const headerRetrySeconds = err.headers?.['retry-after'] ?? err.headers?.['Retry-After']
if (!headerRetrySeconds) {
return retryDelay(attempt)
}

const seconds = Number(headerRetrySeconds)
if (!Number.isFinite(seconds) || seconds <= 0) {
return retryDelay(attempt)
}

// Cap the delay to 30 seconds
const maxDelaySeconds = 30;
return Math.min(seconds * 1000, maxDelaySeconds * 1000)
}

function describeError(err: unknown): string {
if (err instanceof k8s.ApiException) {
return `status ${err.code}`
}
let current: unknown = err
while (current instanceof Error) {
if (
'code' in current &&
typeof current.code === 'string' &&
RETRYABLE_NETWORK_CODES.has(current.code)
) {
return current.code
}
current = (current as { cause?: unknown }).cause
}
return String(err)
}

function withRetryClient<T extends object>(client: T): T {
const callWithRetry = async (
fn: (...args: unknown[]) => unknown,
name: string,
args: unknown[]
): Promise<unknown> => {
for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) {
try {
return await fn(...args)
} catch (err) {
if (!isRetryableError(err) || attempt === MAX_RETRIES) {
throw err
}
const delay =
err instanceof k8s.ApiException && err.code === 429
? retryAfterDelay(err, attempt)
: retryDelay(attempt)
core.warning(
`K8s API call ${name} failed (${describeError(err)}), retrying in ${Math.round(delay)}ms (attempt ${attempt + 1}/${MAX_RETRIES})`
)
await sleep(delay)
}
}
}

return new Proxy(client, {
get(target, prop, receiver) {
const value = Reflect.get(target, prop, receiver)
if (typeof value !== 'function') {
return value
}
return async (...args: unknown[]) =>
callWithRetry(value.bind(target), String(prop), args)
}
})
}
Comment on lines +104 to +139

const k8sApi = withRetryClient(kc.makeApiClient(k8s.CoreV1Api))
const k8sBatchV1Api = withRetryClient(kc.makeApiClient(k8s.BatchV1Api))
const k8sAuthorizationV1Api = withRetryClient(
kc.makeApiClient(k8s.AuthorizationV1Api)
)

export const requiredPermissions = [
{
group: '',
Expand Down Expand Up @@ -176,10 +285,20 @@ export async function createJobPod(
mergePodSpecWithOptions(appPod.spec, extension.spec)
}

return await k8sApi.createNamespacedPod({
namespace: namespace(),
body: appPod
})
try {
return await k8sApi.createNamespacedPod({
namespace: namespace(),
body: appPod
})
} catch (err) {
if (err instanceof k8s.ApiException && err.code === 409) {
return await k8sApi.readNamespacedPod({
name,
namespace: namespace()
})
}
throw err
}
}

export async function createContainerStepPod(
Expand Down Expand Up @@ -229,18 +348,35 @@ export async function createContainerStepPod(
mergePodSpecWithOptions(appPod.spec, extension.spec)
}

return await k8sApi.createNamespacedPod({
namespace: namespace(),
body: appPod
})
try {
return await k8sApi.createNamespacedPod({
namespace: namespace(),
body: appPod
})
} catch (err) {
if (err instanceof k8s.ApiException && err.code === 409) {
return await k8sApi.readNamespacedPod({
name,
namespace: namespace()
})
}
throw err
}
}

export async function deletePod(name: string): Promise<void> {
await k8sApi.deleteNamespacedPod({
name,
namespace: namespace(),
gracePeriodSeconds: 0
})
try {
await k8sApi.deleteNamespacedPod({
name,
namespace: namespace(),
gracePeriodSeconds: 0
})
} catch (err) {
if (err instanceof k8s.ApiException && err.code === 404) {
return
}
throw err
}
}

export async function execPodStep(
Expand Down Expand Up @@ -614,11 +750,20 @@ export async function createDockerSecret(
'base64'
)
}

return await k8sApi.createNamespacedSecret({
namespace: namespace(),
body: secret
})
try {
return await k8sApi.createNamespacedSecret({
namespace: namespace(),
body: secret
})
} catch (err) {
if (err instanceof k8s.ApiException && err.code === 409) {
return await k8sApi.readNamespacedSecret({
name: secretName,
namespace: namespace()
})
}
throw err
}
}

export async function createSecretForEnvs(envs: {
Expand All @@ -642,18 +787,32 @@ export async function createSecretForEnvs(envs: {
secret.data[key] = Buffer.from(value).toString('base64')
}

await k8sApi.createNamespacedSecret({
namespace: namespace(),
body: secret
})
try {
await k8sApi.createNamespacedSecret({
namespace: namespace(),
body: secret
})
} catch (err) {
if (!(err instanceof k8s.ApiException && err.code === 409)) {
throw err
}
}

return secretName
Comment on lines +790 to 801
}

export async function deleteSecret(name: string): Promise<void> {
await k8sApi.deleteNamespacedSecret({
name,
namespace: namespace()
})
try {
await k8sApi.deleteNamespacedSecret({
name,
namespace: namespace()
})
} catch (err) {
if (err instanceof k8s.ApiException && err.code === 404) {
return
}
throw err
}
}

export async function pruneSecrets(): Promise<void> {
Expand Down
Loading