Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions ami/jobs/tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,57 @@ def test_tasks_endpoint_without_pipeline(self):
self.assertEqual(resp.status_code, 400)
self.assertIn("pipeline", resp.json()[0].lower())

def test_queue_images_to_nats_embeds_pipeline_config(self):
"""Tasks queued to NATS carry the pipeline config (including project overrides)."""
from unittest.mock import AsyncMock, MagicMock, patch

from ami.ml.models import ProjectPipelineConfig
from ami.ml.schemas import PipelineRequestConfigParameters

pipeline = self._create_pipeline()
pipeline.default_config = PipelineRequestConfigParameters({"example_param": "default"})
pipeline.save()
# _create_pipeline already called pipeline.projects.add(self.project) which
# created a ProjectPipelineConfig row; update it rather than creating a duplicate.
ProjectPipelineConfig.objects.filter(project=self.project, pipeline=pipeline).update(
config={"example_param": "project_override"}
)
Comment on lines +578 to +584
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Test does not verify default-key preservation in merge.

Line 578 and Line 583 only exercise an overridden key, so Line 619 cannot catch regressions where defaults are dropped instead of merged. Add one default-only key and assert it survives in task.config.

Suggested test tightening
-        pipeline.default_config = PipelineRequestConfigParameters({"example_param": "default"})
+        pipeline.default_config = PipelineRequestConfigParameters(
+            {"example_param": "default", "default_only_param": 7}
+        )
@@
         self.assertIsNotNone(task.config)
         self.assertEqual(task.config.get("example_param"), "project_override")
+        self.assertEqual(task.config.get("default_only_param"), 7)

Also applies to: 619-619

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/tests/test_jobs.py` around lines 578 - 584, The test currently only
asserts an overridden key survives the merge and misses verifying that
default-only keys are preserved; modify the test around
PipelineRequestConfigParameters usage (the pipeline.default_config assignment
and ProjectPipelineConfig.update call) to add a second key present only in
defaults (e.g., add "default_only": "value" to PipelineRequestConfigParameters)
and then assert that task.config contains both the overridden key
("example_param" with "project_override") and the default-only key; ensure the
update still only overrides the intended key via
ProjectPipelineConfig.objects.filter(...).update(...) and that the assertion
checks task.config (from whatever function creates the Task) for presence of the
default-only key.


job = self._create_ml_job("Config propagation test", pipeline)
job.dispatch_mode = JobDispatchMode.ASYNC_API
job.status = JobState.STARTED
job.save(update_fields=["dispatch_mode", "status"])

image = SourceImage.objects.create(
path="config_test.jpg",
public_base_url="http://example.com",
project=self.project,
)

published_tasks = []

mock_manager = AsyncMock()
mock_manager.log_async = AsyncMock()

async def capture_publish(job_id, data):
published_tasks.append(data)
return True

mock_manager.publish_task = capture_publish

mock_ctx = MagicMock()
mock_ctx.__aenter__ = AsyncMock(return_value=mock_manager)
mock_ctx.__aexit__ = AsyncMock(return_value=False)

with patch("ami.ml.orchestration.jobs.TaskQueueManager", return_value=mock_ctx):
with patch("ami.ml.orchestration.jobs.AsyncJobStateManager"):
queue_images_to_nats(job, [image])

self.assertEqual(len(published_tasks), 1)
task = published_tasks[0]
self.assertIsNotNone(task.config)
self.assertEqual(task.config.get("example_param"), "project_override")

def test_result_endpoint_stub(self):
"""Test the result endpoint accepts results (stubbed implementation)."""
pipeline = self._create_pipeline()
Expand Down
3 changes: 3 additions & 0 deletions ami/ml/orchestration/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ def queue_images_to_nats(job: "Job", images: list[SourceImage]):
job.logger.info(f"Queuing {len(images)} images to NATS stream for job '{job.pk}'")

# Prepare all messages outside of async context to avoid Django ORM issues
pipeline_config = job.pipeline.get_config(project_id=job.project.pk) if job.pipeline else None

tasks: list[tuple[int, PipelineProcessingTask]] = []
image_ids = []
skipped_count = 0
Expand All @@ -101,6 +103,7 @@ def queue_images_to_nats(job: "Job", images: list[SourceImage]):
id=image_id,
image_id=image_id,
image_url=image_url,
config=pipeline_config,
)
Comment on lines 89 to 107
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipeline_config is a mutable dict and the same object is being attached to every PipelineProcessingTask. Because Pipeline.get_config() returns (and mutates) the underlying default_config dict, this means all queued tasks share a single config instance, which can lead to cross-task side effects if anything mutates it. Prefer passing an immutable snapshot (e.g., a shallow/deep copy) per task, or otherwise ensure the config is not shared/mutated after task creation.

Copilot uses AI. Check for mistakes.
tasks.append((image.pk, task))

Expand Down
4 changes: 2 additions & 2 deletions ami/ml/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ class PipelineResultsResponse(pydantic.BaseModel):
source_images: list[SourceImageResponse]
detections: list[DetectionResponse]
errors: list | str | None = None
config: PipelineRequestConfigParameters | dict | None = None


class PipelineResultsError(pydantic.BaseModel):
Expand All @@ -257,9 +258,8 @@ class PipelineProcessingTask(pydantic.BaseModel):
image_id: str
image_url: str
reply_subject: str | None = None # The NATS subject to send the result to
# TODO: Do we need these?
config: PipelineRequestConfigParameters | dict | None = None
# detections: list[DetectionRequest] | None = None
# config: PipelineRequestConfigParameters | dict | None = None


class ProcessingServiceClientInfo(pydantic.BaseModel):
Expand Down
22 changes: 22 additions & 0 deletions processing_services/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,28 @@ PipelineChoice = typing.Literal[
"new-pipeline-slug",
]
```
## NATS Pull-Mode (Async API) Contract

Processing services that operate in pull-mode (fetching tasks from Antenna via `POST /api/v2/jobs/{id}/tasks/`) receive `PipelineProcessingTask` objects. Each task now includes a `config` field carrying the pipeline configuration for that job:

```json
{
"id": "42",
"image_id": "42",
"image_url": "https://...",
"reply_subject": "antenna.results.job.7.img.42",
"config": {
"example_config_param": 3
}
}
```

`config` mirrors `PipelineRequest.config` from the synchronous HTTP path. It is derived from the pipeline's `default_config` merged with any per-project `ProjectPipelineConfig` override. It may be `null` if no config is set.

Workers should read `config` from each task and apply it to their processing. If `config` is absent or null, fall back to worker-level defaults (e.g. environment variables).
Comment on lines +95 to +97
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contract says config “may be null if no config is set”, but with the current Antenna code path pipeline.get_config(...) always returns a dict (possibly empty) when a pipeline exists; tasks are not normally queued without a pipeline. Consider updating this to reflect actual behavior (empty object vs null), or clarify the specific scenario where null is expected.

Suggested change
`config` mirrors `PipelineRequest.config` from the synchronous HTTP path. It is derived from the pipeline's `default_config` merged with any per-project `ProjectPipelineConfig` override. It may be `null` if no config is set.
Workers should read `config` from each task and apply it to their processing. If `config` is absent or null, fall back to worker-level defaults (e.g. environment variables).
`config` mirrors `PipelineRequest.config` from the synchronous HTTP path. It is derived from the pipeline's `default_config` merged with any per-project `ProjectPipelineConfig` override. In the normal Antenna code path, it is an object; if no defaults or overrides are set, it may be an empty object.
Workers should read `config` from each task and apply it to their processing. If `config` is unexpectedly absent or `null` (for example, due to a malformed or legacy payload), fall back to worker-level defaults (e.g. environment variables).

Copilot uses AI. Check for mistakes.

When returning results, populate `PipelineResultsResponse.config` with the config that was actually used, so Antenna can record it for auditing.
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doc implies Antenna will “record [results].config for auditing”, but the backend currently doesn’t appear to persist or otherwise use PipelineResultsResponse.config when saving results (it’s parsed but ignored). Either implement recording on the Antenna side, or soften/reword this guidance so workers don’t expect the value to be stored/visible.

Suggested change
When returning results, populate `PipelineResultsResponse.config` with the config that was actually used, so Antenna can record it for auditing.
When returning results, you may populate `PipelineResultsResponse.config` with the config that was actually used so your worker reports its effective settings consistently. Do not assume Antenna currently persists, displays, or audits this value when saving results.

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

README contract is ahead of in-repo example/minimal implementations.

This says workers should populate PipelineResultsResponse.config, but current in-repo examples still omit it (processing_services/minimal/api/schemas.py:213-244, processing_services/example/api/pipelines.py:117-140). Please add a short note here that those examples need the companion schema update.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@processing_services/README.md` around lines 99 - 100, Update the README to
note that the worker examples must be updated to include the new
PipelineResultsResponse.config field; specifically, mention that the minimal and
example worker schema definitions (module minimal.api.schemas) and the pipelines
example code that builds responses (module example.api.pipelines) need a
companion schema update so they populate PipelineResultsResponse.config with the
actual runtime config used when returning results.

## Demo

## `minimal` Pipelines and Output Images
Expand Down
Loading