Skip to content

fix(dataloader): switch to forkserver context + add timeout + explicit teardown (closes #140, partial #145)#148

Open
mihow wants to merge 2 commits into
mainfrom
fix/dataloader-shmem-leak
Open

fix(dataloader): switch to forkserver context + add timeout + explicit teardown (closes #140, partial #145)#148
mihow wants to merge 2 commits into
mainfrom
fix/dataloader-shmem-leak

Conversation

@mihow
Copy link
Copy Markdown
Collaborator

@mihow mihow commented May 20, 2026

Plain-language summary

Our ML workers — the "ami workers" that pull jobs off the Antenna queue and run detection + classification on bug images — have a long-running memory leak in the PyTorch DataLoader. Each batch leaves behind a few megabytes of shared memory that PyTorch never reclaims. On big jobs the leak adds up fast: a routine 400-image production job recently used up the entire host's RAM in under 20 minutes and got OOM-killed, which dropped about half its images on the floor.

This PR ships three small, contained changes to how the DataLoader is configured plus an explicit cleanup at end of job. Together they should make worker memory stay roughly flat across batches instead of climbing until the kernel kills the process. All three knobs are env-configurable so an operator can opt any single host out if it surfaces a problem with a specific pipeline.

Motivation (2026-05-19 incident with numbers): #140 (comment)

What changes

1. multiprocessing_context="forkserver" on the DataLoader

The Linux default is fork, which makes each DataLoader subprocess inherit the parent's entire memory state via copy-on-write. When the parent process already has large CUDA buffers and pinned memory loaded (a normal state for a running ML worker), every DataLoader subprocess carries that state along as "shared memory" — and PyTorch sometimes can't clean it up on subprocess shutdown. forkserver re-execs subprocesses from a clean intermediate state, sidestepping both the inheritance and the cleanup race.

  • New env var: AMI_ANTENNA_API_DATALOADER_MP_CONTEXT
  • Default: forkserver
  • Allowed values: fork, spawn, forkserver, or empty (let PyTorch pick its default)

This is the fix listed in both #140 and #145 that nobody has tried in production yet.

2. timeout=300 on the DataLoader

When the shared-memory cleanup race in #140 hits, the main thread currently hangs forever in multiprocessing.Queue.get() waiting for a subprocess that's already dead. A non-zero timeout converts the silent hang into a RuntimeError the worker can log, restart from, and move on. Zero risk to the happy path — workers normally finish a batch in seconds, not minutes.

  • New env var: AMI_ANTENNA_API_DATALOADER_TIMEOUT_S
  • Default: 300 (seconds)
  • Set to 0 to disable

3. Explicit teardown in _process_job

In the finally: block of _process_job, after the result_poster.shutdown() call:

del loader
del batch_source
gc.collect()
if torch.cuda.is_available():
    torch.cuda.empty_cache()

Right now the DataLoader gets dropped implicitly when the function returns. That can leave child processes' shared-memory segments and pipe FDs alive for an unpredictable amount of time. Explicit cleanup tells Python to release references immediately and asks PyTorch to flush its GPU buffer cache. Cheap insurance regardless of which multiprocessing_context ends up active, and directly addresses the per-batch pipe-FD accumulation pattern observed in #145.

Verification on antenna-dev-arctia (H100 24GB MIG, ~68 GiB host RAM)

Supervisor ami-worker (2 procs, --num_workers 1, AMI_NUM_WORKERS=1) restarted with PYTHONPATH pointing at the patched worktree. Ran three back-to-back test_ml_job_e2e jobs against quebec_vermont_moths_2023:

Job Images Detections Classifications Failed Runtime
170 (small) 10 68 121 0 18 s
171 (stress) 212 1343 2393 0 66 s
173 (stress 2) 140 902 1380 0 46 s

Zero image failures across ~360 processed images on the exact pipeline + parallel-2-workers configuration that lost 49% of images on production on 2026-05-19. No OOM kills in dmesg. Both workers stayed RUNNING throughout.

RSS / shmem-rss trajectory (sampled every 3s during the stress run):

Worker min RSS peak RSS final RSS peak shmem final shmem
worker_00 1.9 GB 3.8 GB 3.8 GB 1.9 GB 1.8 GB
worker_01 2.8 GB 5.0 GB 4.5 GB 3.1 GB 2.5 GB

For comparison, the 2026-05-19 production incident saw single-worker peaks of 17–35 GB shmem-rss on the same pipeline before the OS killed the process. The catastrophic per-job blow-up is gone; the residual growth that remains across multiple jobs is the territory of #144 Part B (out of scope here).

DataLoader subprocesses cleaned up cleanly after each job — pgrep -c pt_data_worker returned 0 between jobs.

Test suite on the worktree: 21/21 passed (pytest trapdata/antenna/tests/), including the new 8-test test_dataloader_hygiene.py regression suite that guards the configuration surface so a future refactor can't silently remove the fix.

Pipeline-pickling smoke test

forkserver pickles the dataset class when spawning subprocesses. Verified manually that get_rest_dataloader() with num_workers=2 + forkserver spawns cleanly using RESTDataset (the shared dataset class for all pipelines). End-to-end run above used quebec_vermont_moths_2023. Other pipelines (panama, uk_denmark, costa_rica_turing, kenya-uganda_turing, anguilla_turing, global, insect_orders, moth_binary) share the same RESTDataset so the pickling path is the same — to be exercised in the staged rollout.

Out of scope (deliberate)

Relationship to other open work

Orthogonal to PR #139 (env-knob plumbing on feature/uv-migration). This PR is based on main and does not modify the same files in conflicting ways.

Staged rollout plan (post-merge)

  1. Build wheel from the merged PR. Install on one prod GPU host first.
  2. Pull a heavy job; watch dmesg, ps -o pid,rss,cmd -C ami, pgrep -c pt_data_worker for 30+ minutes.
  3. If RSS stays bounded and no RuntimeError: could not unlink ... events appear, roll to the rest of the fleet.
  4. The pipe-FD leak in DataLoader pipe FD leak: ~10k pipes/day accumulate, worker hits EMFILE silently after ~6 days uptime #145 surfaces at ~6 days of uptime, so the real verdict on the teardown fix is at 7d. Watch one full 24h cycle before declaring fixed.

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Added configuration options to control DataLoader subprocess startup method and per-batch timeout for improved worker stability and hang recovery.
  • Refactor

    • Improved worker shutdown to explicitly release data loader and batch resources, run garbage collection, and clear GPU caches.
  • Tests

    • Added regression tests covering subprocess configuration, timeout behavior, overrides, and backwards compatibility.

Review Change Stack

…t teardown

Closes #140, partial #145.

## What this fixes (plain language)

The ML worker that pulls jobs off the Antenna queue has a memory leak in
its PyTorch DataLoader. Each batch leaves behind a few megabytes of
shared memory that PyTorch never reclaims. On big jobs the leak adds up
fast: a routine 400-image job recently used up the entire host's RAM in
under 20 minutes and got killed by the OS, taking ~half its images down
with it.

Two small DataLoader-config changes plus an explicit cleanup at end of
job are enough to make worker memory stay roughly flat across batches
instead of climbing until the kernel kills the process.

## The three changes

1. **`multiprocessing_context="forkserver"`** (new env var
   `AMI_ANTENNA_API_DATALOADER_MP_CONTEXT`, default `forkserver`).
   The Linux default is `fork`, which makes each DataLoader subprocess
   inherit the parent's entire memory state via copy-on-write. When the
   parent has large CUDA buffers and pinned memory loaded, every
   subprocess carries that state as "shared memory" and PyTorch
   sometimes can't clean it up on shutdown. `forkserver` re-execs
   subprocesses from a clean intermediate state, sidestepping the
   inheritance race.

2. **`timeout=300`** (new env var `AMI_ANTENNA_API_DATALOADER_TIMEOUT_S`,
   default 300s). When a DataLoader subprocess dies mid-shared-memory
   cleanup, the main thread currently blocks forever on
   `multiprocessing.Queue.get()`. A timeout converts that silent hang
   into a `RuntimeError` the supervisor can restart from.

3. **Explicit teardown in `_process_job`** — `del loader; gc.collect();
   torch.cuda.empty_cache()` in the `finally:` block. Drops the
   DataLoader reference immediately instead of relying on the garbage
   collector to run `__del__` at some indeterminate later time. Also
   addresses the pipe-FD accumulation pattern observed in #145.

All three are conservative — operators can revert each one individually
via env vars without a code change.

## Out of scope

- **`pin_memory` default not changed.** #138 and #122 disagree on the
  right default for HTTP-sourced workloads; that needs its own measured
  ablation, not a guess buried in this PR.
- LRU model eviction (#144 Part A) and the worker drain-and-exit
  recycler (#147) are separate follow-ups.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 20, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 0a0d4be5-ee14-4c7b-8f2f-8ac03c09dda2

📥 Commits

Reviewing files that changed from the base of the PR and between ce5a2ed and 0083ae8.

📒 Files selected for processing (2)
  • trapdata/antenna/datasets.py
  • trapdata/antenna/tests/test_dataloader_hygiene.py
💤 Files with no reviewable changes (1)
  • trapdata/antenna/tests/test_dataloader_hygiene.py

📝 Walkthrough

Walkthrough

Adds Settings and example env entries for DataLoader multiprocessing context and timeout, wires those into get_rest_dataloader(), explicitly tears down DataLoader/batch iterator on worker shutdown, and adds regression tests for defaults, overrides, validation, and backwards compatibility.

Changes

DataLoader Subprocess Hygiene

Layer / File(s) Summary
Configuration Schema
trapdata/settings.py, .env.example
New antenna_api_dataloader_mp_context (default "forkserver") and antenna_api_dataloader_timeout_s (default 300) settings and .env.example guidance.
DataLoader Context and Timeout Configuration
trapdata/antenna/datasets.py
get_rest_dataloader() imports torch.multiprocessing, documents the new settings, validates and resolves the configured multiprocessing start method when num_workers > 0, computes per-batch timeout only when enabled, and passes multiprocessing_context= and timeout= to DataLoader.
Worker Shutdown Resource Cleanup
trapdata/antenna/worker.py
In _process_job finally-block, attempts to delete loader and batch_source if present, calls gc.collect(), and clears CUDA caches via torch.cuda.empty_cache() when available.
DataLoader Hygiene Test Coverage
trapdata/antenna/tests/test_dataloader_hygiene.py
Regression tests verify defaults (forkserver + 300s when num_workers>0, no subprocesses → None/timeout 0), operator overrides (spawn, empty-string fallback, timeout=0), invalid-context validation, and backwards compatibility when settings fields are missing.

Sequence Diagram(s)

No sequence diagram generated for this change.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related issues

Possibly related PRs

Poem

🐰
I hop through queues and threads so spry,
Forkserver guides the workers nigh,
Timeouts watch the batches run,
We clear the caches when we're done,
Now loaders sleep beneath the sky.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 66.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title clearly and specifically summarizes the three main changes: switching to forkserver multiprocessing context, adding DataLoader timeout, and implementing explicit teardown.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/dataloader-shmem-leak

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@trapdata/antenna/datasets.py`:
- Around line 453-458: The code currently validates mp_context against a
hardcoded tuple and then calls mp.get_context(mp_context); replace that with a
platform-aware check using multiprocessing.get_all_start_methods(): compute
allowed = set(mp.get_all_start_methods()), treat empty/None "" as allowed
default, and if mp_context is provided ensure mp_context is in allowed; if not,
raise ValueError that includes the actual supported methods (allowed) and the
invalid value; only call mp.get_context(mp_context) when mp_context is non-empty
so dataloader_mp_context is set safely.

In `@trapdata/antenna/tests/test_dataloader_hygiene.py`:
- Line 15: Remove the unused import of multiprocessing from the module: delete
the top-level import line "import multiprocessing" in the test file (the unused
symbol multiprocessing is triggering Flake8 F401), ensuring no other code
references multiprocessing remains; run the test/flake checks afterwards to
confirm the warning is resolved.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 0be389c7-0408-45bd-a2b8-ef0a1c174c1c

📥 Commits

Reviewing files that changed from the base of the PR and between a33746a and ce5a2ed.

📒 Files selected for processing (5)
  • .env.example
  • trapdata/antenna/datasets.py
  • trapdata/antenna/tests/test_dataloader_hygiene.py
  • trapdata/antenna/worker.py
  • trapdata/settings.py

Comment thread trapdata/antenna/datasets.py Outdated
Comment thread trapdata/antenna/tests/test_dataloader_hygiene.py Outdated
…sed import

CodeRabbit feedback on PR #148:
- datasets.py: replace hardcoded ("fork","spawn","forkserver") tuple with
  multiprocessing.get_all_start_methods() so the validation matches what the
  host actually supports (e.g. macOS lacks "fork" by default in Python 3.13+).
- test_dataloader_hygiene.py: drop unused `import multiprocessing` (Flake8 F401).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@mihow
Copy link
Copy Markdown
Collaborator Author

mihow commented May 20, 2026

Claude says: Addressed both CodeRabbit findings in 0083ae8:

  • trapdata/antenna/datasets.py — replaced the hardcoded ("fork", "spawn", "forkserver") tuple with multiprocessing.get_all_start_methods() so the validation matches what the running interpreter / platform actually supports. The error message now lists the actually-supported methods rather than the hardcoded set. Backwards-compatible with the existing test_invalid_mp_context_raises test (matches on antenna_api_dataloader_mp_context).
  • trapdata/antenna/tests/test_dataloader_hygiene.py — removed the unused import multiprocessing (Flake8 F401). All other "multiprocessing" string hits in the file are docstring text and DataLoader attribute names (loader.multiprocessing_context), not symbol references.

Local sanity check on arctia (Python 3.12.13):

  • pytest trapdata/antenna/tests/test_dataloader_hygiene.py -v → 8/8 PASSED
  • flake8 trapdata/antenna/datasets.py trapdata/antenna/tests/test_dataloader_hygiene.py → clean

CI failures — empirical pre-existence check

Ran the 3 failing tests on both fix/dataloader-shmem-leak (HEAD = ce5a2ed) and the merge base a33746a on main, same venv (Python 3.12.13) on arctia:

Test fix branch main a33746a
test_models.py::TestSourceImageSchema::test_url FAIL (Wikipedia 400) FAIL (Wikipedia 400)
test_api.py::TestInferenceAPI::test_config_num_classification_predictions PASS locally PASS locally
test_api.py::TestInferenceAPI::test_logits_in_classification_response PASS locally PASS locally

test_url is a confirmed external regression: upload.wikimedia.org now returns 400 Client Error: Use thumbnail sizes listed on https://w.wiki/GHai for the hardcoded URL …/Wikipedia-logo-v2.svg/103px-Wikipedia-logo-v2.svg.png. Wikipedia tightened thumbnail-size policy; the test needs a different fixture URL. Not caused by this PR.

The two insect_orders_2025 "No detections found" failures pass locally on both branches with the same code/deps — same behavior on main as on the fix branch, which is consistent with them being CI-runner-environment / model-cache flakes rather than something this PR introduced. The PR's diff is also isolated to trapdata/antenna/ + trapdata/settings.py + .env.example; it doesn't touch the trapdata/api/ test path nor the legacy DataLoader at trapdata/ml/models/base.py:273.

Happy to file a follow-up issue for both classes of failure if useful — the Wikipedia one needs a fixture-URL change, the insect_orders_2025 ones probably need a closer look at the CI runner / cache state.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant