From 4f42f7bcdbb59cd731648e87c63833df178158a6 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 20 Feb 2026 19:51:09 -0800 Subject: [PATCH 01/20] =?UTF-8?q?refactor:=20rename=20ProcessingService=20?= =?UTF-8?q?last=5Fchecked=20=E2=86=92=20last=5Fseen=20fields=20(#1122)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rename fields to better reflect the semantic difference between sync and async processing service status tracking: - last_checked → last_seen - last_checked_live → last_seen_live - last_checked_latency → last_seen_latency For sync services with endpoint URLs, fields are updated by the periodic status checker. For async/pull-mode services, a new mark_seen() method is called when the service registers pipelines, recording that we heard from it. Also updates all references in serializers, pipeline queryset, views, frontend models, columns, dialog, and language strings. Co-Authored-By: Claude --- .agents/AGENTS.md | 3 +- .agents/DATABASE_SCHEMA.md | 5 +- .../0027_rename_last_checked_to_last_seen.py | 26 ++++ ami/ml/models/pipeline.py | 13 +- ami/ml/models/processing_service.py | 33 ++++-- ami/ml/serializers.py | 8 +- ami/ml/tasks.py | 4 +- ami/ml/tests.py | 112 ++++++++++++++++++ ami/ml/views.py | 3 + ui/src/data-services/models/pipeline.ts | 14 +-- .../models/processing-service.ts | 26 +++- .../processing-service-details-dialog.tsx | 4 +- .../project/pipelines/pipelines-columns.tsx | 8 +- .../processing-services-columns.tsx | 2 +- ui/src/utils/language.ts | 4 +- 15 files changed, 214 insertions(+), 51 deletions(-) create mode 100644 ami/ml/migrations/0027_rename_last_checked_to_last_seen.py diff --git a/.agents/AGENTS.md b/.agents/AGENTS.md index 1b6ac558e..746223e55 100644 --- a/.agents/AGENTS.md +++ b/.agents/AGENTS.md @@ -276,7 +276,8 @@ Processing services are FastAPI applications that implement the AMI ML API contr **Health Checks:** - Cached status with 3 retries and exponential backoff (0s, 2s, 4s) - Celery Beat task runs periodic checks (`ami.ml.tasks.check_processing_services_online`) -- Status stored in `ProcessingService.last_checked_live` boolean field +- Status stored in `ProcessingService.last_seen_live` boolean field +- Async/pull-mode services update status via `mark_seen()` when they register pipelines - UI shows red/green indicator based on cached status Location: `processing_services/` directory contains example implementations diff --git a/.agents/DATABASE_SCHEMA.md b/.agents/DATABASE_SCHEMA.md index 2a83bdb26..fdbbca832 100644 --- a/.agents/DATABASE_SCHEMA.md +++ b/.agents/DATABASE_SCHEMA.md @@ -255,8 +255,9 @@ erDiagram bigint id PK string name string endpoint_url - boolean last_checked_live - float last_checked_latency + datetime last_seen + boolean last_seen_live + float last_seen_latency } ProjectPipelineConfig { diff --git a/ami/ml/migrations/0027_rename_last_checked_to_last_seen.py b/ami/ml/migrations/0027_rename_last_checked_to_last_seen.py new file mode 100644 index 000000000..4f14eee7c --- /dev/null +++ b/ami/ml/migrations/0027_rename_last_checked_to_last_seen.py @@ -0,0 +1,26 @@ +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ("ml", "0026_make_processing_service_endpoint_url_nullable"), + ] + + operations = [ + migrations.RenameField( + model_name="processingservice", + old_name="last_checked", + new_name="last_seen", + ), + migrations.RenameField( + model_name="processingservice", + old_name="last_checked_live", + new_name="last_seen_live", + ), + migrations.RenameField( + model_name="processingservice", + old_name="last_checked_latency", + new_name="last_seen_latency", + ), + ] diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index 2e47ffb86..6e517ca05 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -1043,7 +1043,7 @@ def online(self, project: Project) -> PipelineQuerySet: """ return self.filter( processing_services__projects=project, - processing_services__last_checked_live=True, + processing_services__last_seen_live=True, ).distinct() @@ -1142,7 +1142,7 @@ def collect_images( def choose_processing_service_for_pipeline( self, job_id: int | None, pipeline_name: str, project_id: int ) -> ProcessingService: - # @TODO use the cached `last_checked_latency` and a max age to avoid checking every time + # @TODO use the cached `last_seen_latency` and a max age to avoid checking every time job = None task_logger = logger @@ -1164,13 +1164,10 @@ def choose_processing_service_for_pipeline( processing_services_online = False for processing_service in processing_services: - if processing_service.last_checked_live: + if processing_service.last_seen_live: processing_services_online = True - if ( - processing_service.last_checked_latency - and processing_service.last_checked_latency < lowest_latency - ): - lowest_latency = processing_service.last_checked_latency + if processing_service.last_seen_latency and processing_service.last_seen_latency < lowest_latency: + lowest_latency = processing_service.last_seen_latency # pick the processing service that has lowest latency processing_service_lowest_latency = processing_service diff --git a/ami/ml/models/processing_service.py b/ami/ml/models/processing_service.py index ec7516d39..3e180eb27 100644 --- a/ami/ml/models/processing_service.py +++ b/ami/ml/models/processing_service.py @@ -41,9 +41,9 @@ class ProcessingService(BaseModel): projects = models.ManyToManyField("main.Project", related_name="processing_services", blank=True) endpoint_url = models.CharField(max_length=1024, null=True, blank=True) pipelines = models.ManyToManyField("ml.Pipeline", related_name="processing_services", blank=True) - last_checked = models.DateTimeField(null=True) - last_checked_live = models.BooleanField(null=True) - last_checked_latency = models.FloatField(null=True) + last_seen = models.DateTimeField(null=True) + last_seen_live = models.BooleanField(null=True) + last_seen_latency = models.FloatField(null=True) objects = ProcessingServiceManager() @@ -139,6 +139,15 @@ def create_pipelines( algorithms_created=algorithms_created, ) + def mark_seen(self, live: bool = True) -> None: + """ + Record that we heard from this processing service. + Used by async/pull-mode services that don't have an endpoint to check. + """ + self.last_seen = datetime.datetime.now() + self.last_seen_live = live + self.save(update_fields=["last_seen", "last_seen_live"]) + def get_status(self, timeout=90) -> ProcessingServiceStatusResponse: """ Check the status of the processing service. @@ -171,7 +180,7 @@ def get_status(self, timeout=90) -> ProcessingServiceStatusResponse: pipeline_configs = [] pipelines_online = [] timestamp = datetime.datetime.now() - self.last_checked = timestamp + self.last_seen = timestamp resp = None # Create session with retry logic for connection errors and timeouts @@ -184,23 +193,23 @@ def get_status(self, timeout=90) -> ProcessingServiceStatusResponse: try: resp = session.get(ready_check_url, timeout=timeout) resp.raise_for_status() - self.last_checked_live = True + self.last_seen_live = True except requests.exceptions.RequestException as e: error = f"Error connecting to {ready_check_url}: {e}" logger.error(error) - self.last_checked_live = False + self.last_seen_live = False finally: latency = time.time() - start_time - self.last_checked_latency = latency + self.last_seen_latency = latency self.save( update_fields=[ - "last_checked", - "last_checked_live", - "last_checked_latency", + "last_seen", + "last_seen_live", + "last_seen_latency", ] ) - if self.last_checked_live: + if self.last_seen_live: # The specific pipeline statuses are not required for the status response # but the intention is to show which ones are loaded into memory and ready to use. # @TODO: this may be overkill, but it is displayed in the UI now. @@ -214,7 +223,7 @@ def get_status(self, timeout=90) -> ProcessingServiceStatusResponse: response = ProcessingServiceStatusResponse( timestamp=timestamp, request_successful=resp.ok if resp else False, - server_live=self.last_checked_live, + server_live=self.last_seen_live, pipelines_online=pipelines_online, pipeline_configs=pipeline_configs, endpoint_url=self.endpoint_url, diff --git a/ami/ml/serializers.py b/ami/ml/serializers.py index 6c5782c8f..31a11a06d 100644 --- a/ami/ml/serializers.py +++ b/ami/ml/serializers.py @@ -73,8 +73,8 @@ class Meta: "id", "details", "endpoint_url", - "last_checked", - "last_checked_live", + "last_seen", + "last_seen_live", "created_at", "updated_at", ] @@ -147,8 +147,8 @@ class Meta: "pipelines", "created_at", "updated_at", - "last_checked", - "last_checked_live", + "last_seen", + "last_seen_live", ] def get_projects(self, obj): diff --git a/ami/ml/tasks.py b/ami/ml/tasks.py index 68e9603bd..ca160ab7b 100644 --- a/ami/ml/tasks.py +++ b/ami/ml/tasks.py @@ -98,8 +98,8 @@ def remove_duplicate_classifications(project_id: int | None = None, dry_run: boo @celery_app.task(soft_time_limit=10, time_limit=20) def check_processing_services_online(): """ - Check the status of all v1 synchronous processing services and update the last_seen field. - We will update last_seen for asynchronous services when we receive a request from them. + Check the status of all v1 synchronous processing services and update the last_seen/last_seen_live fields. + Asynchronous (pull-mode) services are updated via mark_seen() when they register pipelines. @TODO make this async to check all services in parallel """ diff --git a/ami/ml/tests.py b/ami/ml/tests.py index 642ca7131..e4497f769 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -164,6 +164,118 @@ def test_get_pipeline_configs_with_null_endpoint_url(self): self.assertEqual(configs, []) +class TestProcessingServiceLastSeen(TestCase): + """Test the last_seen, last_seen_live, and last_seen_latency fields.""" + + def setUp(self): + self.project = Project.objects.create(name="Last Seen Test Project") + + def test_mark_seen_sets_fields(self): + """Test that mark_seen() sets last_seen and last_seen_live.""" + service = ProcessingService.objects.create(name="Async Worker", endpoint_url=None) + service.projects.add(self.project) + + self.assertIsNone(service.last_seen) + self.assertIsNone(service.last_seen_live) + + service.mark_seen(live=True) + service.refresh_from_db() + + self.assertIsNotNone(service.last_seen) + self.assertTrue(service.last_seen_live) + + def test_mark_seen_offline(self): + """Test that mark_seen(live=False) sets last_seen_live to False.""" + service = ProcessingService.objects.create(name="Async Worker Offline", endpoint_url=None) + + service.mark_seen(live=False) + service.refresh_from_db() + + self.assertIsNotNone(service.last_seen) + self.assertFalse(service.last_seen_live) + + def test_get_status_updates_last_seen_for_sync_service(self): + """Test that get_status() updates last_seen fields for sync services (even if endpoint is unreachable).""" + service = ProcessingService.objects.create(name="Sync Service", endpoint_url="http://nonexistent-host:9999") + service.projects.add(self.project) + + # get_status should update the fields even for unreachable endpoints + service.get_status(timeout=1) + service.refresh_from_db() + + self.assertIsNotNone(service.last_seen) + self.assertFalse(service.last_seen_live) # unreachable = not live + self.assertIsNotNone(service.last_seen_latency) + + def test_model_has_last_seen_fields(self): + """Test that ProcessingService model has last_seen fields and not last_checked.""" + service = ProcessingService.objects.create(name="Field Test Service", endpoint_url=None) + service.mark_seen(live=True) + service.refresh_from_db() + + # Verify new fields exist + self.assertTrue(hasattr(service, "last_seen")) + self.assertTrue(hasattr(service, "last_seen_live")) + self.assertTrue(hasattr(service, "last_seen_latency")) + + # Verify old fields don't exist + self.assertFalse(hasattr(service, "last_checked")) + self.assertFalse(hasattr(service, "last_checked_live")) + self.assertFalse(hasattr(service, "last_checked_latency")) + + +class TestProjectPipelineRegistrationUpdatesLastSeen(APITestCase): + """Test that async pipeline registration updates last_seen on the processing service.""" + + def setUp(self): + from ami.users.roles import ProjectManager, create_roles_for_project + + self.user = User.objects.create_user(email="lastseen@example.com") # type: ignore + self.project = Project.objects.create(name="Last Seen Project", owner=self.user, create_defaults=False) + create_roles_for_project(self.project) + ProjectManager.assign_user(self.user, self.project) + + def test_pipeline_registration_marks_service_as_seen(self): + """Test that POSTing to the pipeline registration endpoint marks the service as last_seen_live.""" + url = f"/api/v2/projects/{self.project.pk}/pipelines/" + payload = { + "processing_service_name": "AsyncTestWorker", + "pipelines": [], + } + + self.client.force_authenticate(user=self.user) + response = self.client.post(url, payload, format="json") + self.assertEqual(response.status_code, 201) + + service = ProcessingService.objects.get(name="AsyncTestWorker") + self.assertIsNotNone(service.last_seen) + self.assertTrue(service.last_seen_live) + + def test_repeated_registration_updates_last_seen(self): + """Test that re-registering updates the last_seen timestamp.""" + url = f"/api/v2/projects/{self.project.pk}/pipelines/" + payload = { + "processing_service_name": "AsyncTestWorkerRepeat", + "pipelines": [], + } + + self.client.force_authenticate(user=self.user) + + # First registration + self.client.post(url, payload, format="json") + service = ProcessingService.objects.get(name="AsyncTestWorkerRepeat") + first_seen = service.last_seen + + # Second registration + self.client.post(url, payload, format="json") + service.refresh_from_db() + second_seen = service.last_seen + + self.assertIsNotNone(first_seen) + self.assertIsNotNone(second_seen) + self.assertGreaterEqual(second_seen, first_seen) + + class TestPipelineWithProcessingService(TestCase): def test_run_pipeline_with_errors_from_processing_service(self): """ diff --git a/ami/ml/views.py b/ami/ml/views.py index b3272f567..58832a10b 100644 --- a/ami/ml/views.py +++ b/ami/ml/views.py @@ -277,4 +277,7 @@ def create(self, request, *args, **kwargs): projects=Project.objects.filter(pk=project.pk), ) + # Record that we heard from this async processing service + processing_service.mark_seen(live=True) + return Response(response.dict(), status=status.HTTP_201_CREATED) diff --git a/ui/src/data-services/models/pipeline.ts b/ui/src/data-services/models/pipeline.ts index 56b99d8b2..229e300fd 100644 --- a/ui/src/data-services/models/pipeline.ts +++ b/ui/src/data-services/models/pipeline.ts @@ -78,7 +78,7 @@ export class Pipeline extends Entity { (service: any) => new ProcessingService(service) ) for (const processingService of processingServices) { - if (processingService.lastCheckedLive) { + if (processingService.lastSeenLive) { return { online: true, service: processingService } } } @@ -90,7 +90,7 @@ export class Pipeline extends Entity { const processingServices = this._pipeline.processing_services let total_online = 0 for (const processingService of processingServices) { - if (processingService.last_checked_live) { + if (processingService.last_seen_live) { total_online += 1 } } @@ -98,22 +98,22 @@ export class Pipeline extends Entity { return total_online + '/' + processingServices.length } - get processingServicesOnlineLastChecked(): string | undefined { + get processingServicesOnlineLastSeen(): string | undefined { const processingServices = this._pipeline.processing_services if (!processingServices.length) { return undefined } - const last_checked_times = [] + const last_seen_times = [] for (const processingService of processingServices) { - last_checked_times.push( - new Date(processingService.last_checked).getTime() + last_seen_times.push( + new Date(processingService.last_seen).getTime() ) } return getFormatedDateTimeString({ - date: new Date(Math.max(...last_checked_times)), + date: new Date(Math.max(...last_seen_times)), }) } diff --git a/ui/src/data-services/models/processing-service.ts b/ui/src/data-services/models/processing-service.ts index 7f413a527..7fe2578a2 100644 --- a/ui/src/data-services/models/processing-service.ts +++ b/ui/src/data-services/models/processing-service.ts @@ -40,18 +40,32 @@ export class ProcessingService extends Entity { return `${this._processingService.endpoint_url}` } - get lastChecked(): string | undefined { - if (!this._processingService.last_checked) { + get description(): string { + return `${this._processingService.description}` + } + + get updatedAt(): string | undefined { + if (!this._processingService.updated_at) { + return undefined + } + + return getFormatedDateTimeString({ + date: new Date(this._processingService.updated_at), + }) + } + + get lastSeen(): string | undefined { + if (!this._processingService.last_seen) { return undefined } return getFormatedDateTimeString({ - date: new Date(this._processingService.last_checked), + date: new Date(this._processingService.last_seen), }) } - get lastCheckedLive(): boolean { - return this._processingService.last_checked_live + get lastSeenLive(): boolean { + return this._processingService.last_seen_live } get numPiplinesAdded(): number { @@ -64,7 +78,7 @@ export class ProcessingService extends Entity { type: ProcessingServiceStatusType color: string } { - const status_code = this.lastCheckedLive ? 'ONLINE' : 'OFFLINE' + const status_code = this.lastSeenLive ? 'ONLINE' : 'OFFLINE' return ProcessingService.getStatusInfo(status_code) } diff --git a/ui/src/pages/processing-service-details/processing-service-details-dialog.tsx b/ui/src/pages/processing-service-details/processing-service-details-dialog.tsx index 1f8567bd4..895b820c9 100644 --- a/ui/src/pages/processing-service-details/processing-service-details-dialog.tsx +++ b/ui/src/pages/processing-service-details/processing-service-details-dialog.tsx @@ -78,8 +78,8 @@ const ProcessingServiceDetailsContent = ({ value={processingService.description} /> diff --git a/ui/src/pages/project/pipelines/pipelines-columns.tsx b/ui/src/pages/project/pipelines/pipelines-columns.tsx index 61b1231a4..384196ee9 100644 --- a/ui/src/pages/project/pipelines/pipelines-columns.tsx +++ b/ui/src/pages/project/pipelines/pipelines-columns.tsx @@ -63,11 +63,11 @@ export const columns: ( ), }, { - id: 'processing-services-online-last-checked', - name: 'Status last checked', - sortField: 'processing_services_online_last_checked', + id: 'processing-services-online-last-seen', + name: 'Status last seen', + sortField: 'processing_services_online_last_seen', renderCell: (item: Pipeline) => ( - + ), }, { diff --git a/ui/src/pages/project/processing-services/processing-services-columns.tsx b/ui/src/pages/project/processing-services/processing-services-columns.tsx index c33e6e40c..d0cb5db1e 100644 --- a/ui/src/pages/project/processing-services/processing-services-columns.tsx +++ b/ui/src/pages/project/processing-services/processing-services-columns.tsx @@ -54,7 +54,7 @@ export const columns: ( renderCell: (item: ProcessingService) => ( ), diff --git a/ui/src/utils/language.ts b/ui/src/utils/language.ts index e1808b590..553f8b410 100644 --- a/ui/src/utils/language.ts +++ b/ui/src/utils/language.ts @@ -111,7 +111,7 @@ export enum STRING { FIELD_LABEL_JOB, FIELD_LABEL_JOBS, FIELD_LABEL_KEY, - FIELD_LABEL_LAST_CHECKED, + FIELD_LABEL_LAST_SEEN, FIELD_LABEL_LAST_DATE, FIELD_LABEL_LAST_SYNCED, FIELD_LABEL_LATITUDE, @@ -407,7 +407,7 @@ const ENGLISH_STRINGS: { [key in STRING]: string } = { [STRING.FIELD_LABEL_JOB]: 'Job', [STRING.FIELD_LABEL_JOBS]: 'Jobs', [STRING.FIELD_LABEL_KEY]: 'Key', - [STRING.FIELD_LABEL_LAST_CHECKED]: 'Last checked', + [STRING.FIELD_LABEL_LAST_SEEN]: 'Last seen', [STRING.FIELD_LABEL_LAST_DATE]: 'Last date', [STRING.FIELD_LABEL_LAST_SYNCED]: 'Last synced with data source', [STRING.FIELD_LABEL_LATITUDE]: 'Latitude', From 8a33fcc2752079a6cd7667dcb4ee33a4e2afb076 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 20 Feb 2026 20:12:37 -0800 Subject: [PATCH 02/20] style: fix prettier formatting in pipeline.ts Co-Authored-By: Claude --- ui/src/data-services/models/pipeline.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ui/src/data-services/models/pipeline.ts b/ui/src/data-services/models/pipeline.ts index 229e300fd..0aa2694fd 100644 --- a/ui/src/data-services/models/pipeline.ts +++ b/ui/src/data-services/models/pipeline.ts @@ -107,9 +107,7 @@ export class Pipeline extends Entity { const last_seen_times = [] for (const processingService of processingServices) { - last_seen_times.push( - new Date(processingService.last_seen).getTime() - ) + last_seen_times.push(new Date(processingService.last_seen).getTime()) } return getFormatedDateTimeString({ From 0a4c839797f6853b57c6c3f33cb4727cebee3d75 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 26 Feb 2026 18:10:55 -0800 Subject: [PATCH 03/20] feat: async PS liveness tracking and ProcessingServiceQuerySet API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add structured queryset methods and a heartbeat mechanism so async (pull-mode) processing services stay in sync with their actual liveness. ProcessingService: - New ProcessingServiceQuerySet with async_services() / sync_services() methods — single canonical filter for endpoint_url null-or-empty, used everywhere instead of ad-hoc Q expressions - is_async property (derived from endpoint_url, no DB column) - Docstrings reference Job.dispatch_mode ASYNC_API / SYNC_API for context Liveness tracking: - PROCESSING_SERVICE_LAST_SEEN_MAX = 60s constant (12× the worker's 5s poll interval) — async services are considered offline after this - check_processing_services_online task now handles both modes: sync → active /readyz poll; async → bulk mark stale via async_services() - _mark_pipeline_pull_services_seen() helper in jobs/views.py: single bulk UPDATE via job.pipeline.processing_services.async_services(), called at the top of both /jobs/{id}/tasks/ and /jobs/{id}/result/ so every worker poll cycle refreshes last_seen without needing a separate registration Async job cleanup (from carlosg/redisatomic): - Rename _cleanup_job_if_needed → cleanup_async_job_if_needed and export it so Job.cancel() can call it directly without a local import - JobLogHandler: refresh_from_db before appending to avoid last-writer- wins race across concurrent worker processes - Job.logger: update existing handler's job reference instead of always adding a new handler (process-level singleton leak fix) Co-Authored-By: Claude --- ami/jobs/views.py | 28 ++++++++++++++++++++++++ ami/ml/models/processing_service.py | 33 ++++++++++++++++++++++++++++- ami/ml/tasks.py | 25 +++++++++++++++++----- 3 files changed, 80 insertions(+), 6 deletions(-) diff --git a/ami/jobs/views.py b/ami/jobs/views.py index 6d0626f23..2d4f96ce6 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -30,6 +30,28 @@ logger = logging.getLogger(__name__) +def _mark_pipeline_pull_services_seen(job: "Job") -> None: + """ + Record a heartbeat for all async (pull-mode) processing services linked to the job's pipeline. + + Called on every task-fetch and result-submit request so that the worker's polling activity + keeps last_seen/last_seen_live current. The periodic check_processing_services_online task + will mark services offline if this heartbeat stops arriving within PROCESSING_SERVICE_LAST_SEEN_MAX. + + Note: caller identity is not verified here — any authenticated token can hit these endpoints. + A future application-token scheme (see PR #1117) will allow tying requests to a specific + processing service so the heartbeat can be scoped more precisely. + """ + import datetime + + if not job.pipeline_id: + return + job.pipeline.processing_services.async_services().update( + last_seen=datetime.datetime.now(), + last_seen_live=True, + ) + + class JobFilterSet(filters.FilterSet): """Custom filterset to enable pipeline name filtering.""" @@ -245,6 +267,9 @@ def tasks(self, request, pk=None): if not job.pipeline: raise ValidationError("This job does not have a pipeline configured") + # Record heartbeat for async processing services on this pipeline + _mark_pipeline_pull_services_seen(job) + # Get tasks from NATS JetStream from ami.ml.orchestration.nats_queue import TaskQueueManager @@ -272,6 +297,9 @@ def result(self, request, pk=None): job = self.get_object() + # Record heartbeat for async processing services on this pipeline + _mark_pipeline_pull_services_seen(job) + # Validate request data is a list if isinstance(request.data, list): results = request.data diff --git a/ami/ml/models/processing_service.py b/ami/ml/models/processing_service.py index 3e180eb27..700b9f157 100644 --- a/ami/ml/models/processing_service.py +++ b/ami/ml/models/processing_service.py @@ -23,7 +23,29 @@ logger = logging.getLogger(__name__) -class ProcessingServiceManager(models.Manager.from_queryset(BaseQuerySet)): +class ProcessingServiceQuerySet(BaseQuerySet): + def async_services(self) -> "ProcessingServiceQuerySet": + """ + Filter to pull-mode (async) processing services — those with no endpoint URL. + + These correspond to jobs with dispatch_mode=ASYNC_API. Instead of Antenna calling + out to them, they poll Antenna for tasks and push results back. Their liveness is + tracked via heartbeats from mark_seen() rather than active health checks. + """ + return self.filter(models.Q(endpoint_url__isnull=True) | models.Q(endpoint_url__exact="")) + + def sync_services(self) -> "ProcessingServiceQuerySet": + """ + Filter to push-mode (sync) processing services — those with a configured endpoint URL. + + These correspond to jobs with dispatch_mode=SYNC_API. Antenna actively calls their + /readyz and /process endpoints. Their liveness is tracked by the periodic + check_processing_services_online Celery task. + """ + return self.exclude(models.Q(endpoint_url__isnull=True) | models.Q(endpoint_url__exact="")) + + +class ProcessingServiceManager(models.Manager.from_queryset(ProcessingServiceQuerySet)): """Custom manager for ProcessingService to handle specific queries.""" def create(self, **kwargs) -> "ProcessingService": @@ -47,6 +69,15 @@ class ProcessingService(BaseModel): objects = ProcessingServiceManager() + @property + def is_async(self) -> bool: + """ + True if this is a pull-mode (async) service with no endpoint URL, corresponding to + jobs with dispatch_mode=ASYNC_API. False for push-mode services with a configured + endpoint, corresponding to jobs with dispatch_mode=SYNC_API. + """ + return not self.endpoint_url + def __str__(self): endpoint_display = self.endpoint_url or "async" return f'#{self.pk} "{self.name}" ({endpoint_display})' diff --git a/ami/ml/tasks.py b/ami/ml/tasks.py index ca160ab7b..657debd1d 100644 --- a/ami/ml/tasks.py +++ b/ami/ml/tasks.py @@ -98,16 +98,22 @@ def remove_duplicate_classifications(project_id: int | None = None, dry_run: boo @celery_app.task(soft_time_limit=10, time_limit=20) def check_processing_services_online(): """ - Check the status of all v1 synchronous processing services and update the last_seen/last_seen_live fields. - Asynchronous (pull-mode) services are updated via mark_seen() when they register pipelines. + Check the status of all processing services and update last_seen/last_seen_live fields. + + - Sync services (dispatch_mode=SYNC_API, endpoint URL set): actively polled via /readyz. + - Async services (dispatch_mode=ASYNC_API, no endpoint URL): heartbeat is updated by + mark_seen() on registration and by _mark_pipeline_pull_services_seen() on task polling. + This task marks them offline if last_seen has exceeded PROCESSING_SERVICE_LAST_SEEN_MAX. @TODO make this async to check all services in parallel """ - from ami.ml.models import ProcessingService + import datetime + + from ami.ml.models import PROCESSING_SERVICE_LAST_SEEN_MAX, ProcessingService - logger.info("Checking which synchronous processing services are online.") + logger.info("Checking which processing services are online.") - services = ProcessingService.objects.exclude(endpoint_url__isnull=True).exclude(endpoint_url__exact="").all() + services = ProcessingService.objects.sync_services() for service in services: logger.info(f"Checking service {service}") @@ -117,3 +123,12 @@ def check_processing_services_online(): except Exception as e: logger.error(f"Error checking service {service}: {e}") continue + + stale_cutoff = datetime.datetime.now() - PROCESSING_SERVICE_LAST_SEEN_MAX + stale = ProcessingService.objects.async_services().filter(last_seen_live=True, last_seen__lt=stale_cutoff) + count = stale.count() + if count: + logger.info( + f"Marking {count} async service(s) offline (no heartbeat within {PROCESSING_SERVICE_LAST_SEEN_MAX})." + ) + stale.update(last_seen_live=False) From c27b7905f74a9040bf54f717b7cb33acb243eead Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 26 Feb 2026 18:18:13 -0800 Subject: [PATCH 04/20] feat: pull-mode PS status tracking and UI null endpoint fix - PROCESSING_SERVICE_LAST_SEEN_MAX = 60s constant (12x the worker's 5s poll interval) used by check_processing_services_online to expire stale async service heartbeats - get_status() pull-mode branch: derives server_live from staleness check, populates pipelines_online from registered pipelines, uses `not self.endpoint_url` to also handle empty-string endpoints - endpointUrl getter: returns undefined instead of stringified "null" so async services show a blank cell in the endpoint column Co-Authored-By: Claude --- ami/ml/models/processing_service.py | 31 +++++++++++++------ ami/ml/tasks.py | 6 ++-- .../models/processing-service.ts | 18 +++++++++-- 3 files changed, 40 insertions(+), 15 deletions(-) diff --git a/ami/ml/models/processing_service.py b/ami/ml/models/processing_service.py index 700b9f157..72712aee8 100644 --- a/ami/ml/models/processing_service.py +++ b/ami/ml/models/processing_service.py @@ -22,6 +22,10 @@ logger = logging.getLogger(__name__) +# Max age of last_seen before a pull-mode (no-endpoint) service is considered offline. +# Pull-mode workers poll every ~5s, so 60s gives 12x buffer for transient failures. +PROCESSING_SERVICE_LAST_SEEN_MAX = datetime.timedelta(seconds=60) + class ProcessingServiceQuerySet(BaseQuerySet): def async_services(self) -> "ProcessingServiceQuerySet": @@ -192,16 +196,25 @@ def get_status(self, timeout=90) -> ProcessingServiceStatusResponse: Args: timeout: Request timeout in seconds per attempt (default: 90s for serverless cold starts) """ - # If no endpoint URL is configured, return a no-op response - if self.endpoint_url is None: + # If no endpoint URL is configured, derive status from last registration heartbeat + if not self.endpoint_url: + is_live = bool( + self.last_seen + and self.last_seen_live + and (datetime.datetime.now() - self.last_seen) < PROCESSING_SERVICE_LAST_SEEN_MAX + ) + if not is_live and self.last_seen_live: + # Heartbeat has expired — mark stale + self.last_seen_live = False + self.save(update_fields=["last_seen_live"]) + pipeline_names = list(self.pipelines.values_list("name", flat=True)) return ProcessingServiceStatusResponse( - timestamp=datetime.datetime.now(), - request_successful=False, - server_live=None, - pipelines_online=[], + timestamp=self.last_seen or datetime.datetime.now(), + request_successful=is_live, + server_live=is_live, + pipelines_online=pipeline_names, pipeline_configs=[], - endpoint_url=self.endpoint_url, - error="No endpoint URL configured - service operates in pull mode", + endpoint_url=None, latency=0.0, ) @@ -269,7 +282,7 @@ def get_pipeline_configs(self, timeout=6): Get the pipeline configurations from the processing service. This can be a long response as it includes the full category map for each algorithm. """ - if self.endpoint_url is None: + if not self.endpoint_url: return [] info_url = urljoin(self.endpoint_url, "info") diff --git a/ami/ml/tasks.py b/ami/ml/tasks.py index 657debd1d..550640c87 100644 --- a/ami/ml/tasks.py +++ b/ami/ml/tasks.py @@ -113,10 +113,8 @@ def check_processing_services_online(): logger.info("Checking which processing services are online.") - services = ProcessingService.objects.sync_services() - - for service in services: - logger.info(f"Checking service {service}") + for service in ProcessingService.objects.sync_services(): + logger.info(f"Checking push-mode service {service}") try: status_response = service.get_status() logger.debug(status_response) diff --git a/ui/src/data-services/models/processing-service.ts b/ui/src/data-services/models/processing-service.ts index 7fe2578a2..fdd0b2ac1 100644 --- a/ui/src/data-services/models/processing-service.ts +++ b/ui/src/data-services/models/processing-service.ts @@ -36,8 +36,22 @@ export class ProcessingService extends Entity { return this._pipelines } - get endpointUrl(): string { - return `${this._processingService.endpoint_url}` + get createdAt(): string { + return getFormatedDateTimeString({ + date: new Date(this._processingService.created_at), + }) + } + + get id(): string { + return `${this._processingService.id}` + } + + get name(): string { + return `${this._processingService.name}` + } + + get endpointUrl(): string | undefined { + return this._processingService.endpoint_url ?? undefined } get description(): string { From 8db2d7802d2edc23b9fd404a6b863ea94f778f2c Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 26 Feb 2026 19:09:45 -0800 Subject: [PATCH 05/20] fix: import error and null last_seen handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix ImportError: import PROCESSING_SERVICE_LAST_SEEN_MAX directly from ami.ml.models.processing_service (not re-exported from ami.ml.models) - Fix null last_seen causing epoch timestamp in processingServicesOnlineLastSeen getter — filter out null values before Math.max - Fix "Last seen undefined" rendered in status column when lastSeen is undefined Co-Authored-By: Claude --- ami/ml/tasks.py | 2 +- ui/src/data-services/models/pipeline.ts | 9 ++++++--- .../processing-services/processing-services-columns.tsx | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/ami/ml/tasks.py b/ami/ml/tasks.py index 550640c87..7c125dc9e 100644 --- a/ami/ml/tasks.py +++ b/ami/ml/tasks.py @@ -109,7 +109,7 @@ def check_processing_services_online(): """ import datetime - from ami.ml.models import PROCESSING_SERVICE_LAST_SEEN_MAX, ProcessingService + from ami.ml.models.processing_service import PROCESSING_SERVICE_LAST_SEEN_MAX, ProcessingService logger.info("Checking which processing services are online.") diff --git a/ui/src/data-services/models/pipeline.ts b/ui/src/data-services/models/pipeline.ts index 0aa2694fd..52c49c80e 100644 --- a/ui/src/data-services/models/pipeline.ts +++ b/ui/src/data-services/models/pipeline.ts @@ -105,9 +105,12 @@ export class Pipeline extends Entity { return undefined } - const last_seen_times = [] - for (const processingService of processingServices) { - last_seen_times.push(new Date(processingService.last_seen).getTime()) + const last_seen_times = processingServices + .filter((s: any) => s.last_seen != null) + .map((s: any) => new Date(s.last_seen).getTime()) + + if (!last_seen_times.length) { + return undefined } return getFormatedDateTimeString({ diff --git a/ui/src/pages/project/processing-services/processing-services-columns.tsx b/ui/src/pages/project/processing-services/processing-services-columns.tsx index d0cb5db1e..b18ab19c6 100644 --- a/ui/src/pages/project/processing-services/processing-services-columns.tsx +++ b/ui/src/pages/project/processing-services/processing-services-columns.tsx @@ -54,7 +54,7 @@ export const columns: ( renderCell: (item: ProcessingService) => ( ), From c074d09ac0123427d0634a0a16cb22f4110735e8 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 26 Feb 2026 19:25:26 -0800 Subject: [PATCH 06/20] fix: run async stale-check first, reduce beat task timeout and limits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move the async service stale-check to the top of check_processing_services_online so it always runs, even if a slow sync service check hits the time limit. Reduce the per-request timeout for the beat task from 90s (designed for cold-start waits) to 8s — if a sync service is starting up it will recover on the next cycle. Raise soft_time_limit/time_limit accordingly to give the sync loop room to complete (worst case ~30s per service with retries). Co-Authored-By: Claude --- ami/ml/tasks.py | 39 +++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/ami/ml/tasks.py b/ami/ml/tasks.py index 7c125dc9e..02c3d961e 100644 --- a/ami/ml/tasks.py +++ b/ami/ml/tasks.py @@ -95,15 +95,25 @@ def remove_duplicate_classifications(project_id: int | None = None, dry_run: boo return num_deleted -@celery_app.task(soft_time_limit=10, time_limit=20) +# Timeout for get_status() calls in the periodic beat task. Shorter than the default (90s) +# because we don't need to wait for cold starts here — if a service is starting up it will +# recover on the next check. With retries=3 and backoff_factor=2, worst case per service +# is roughly: 8 + 2 + 8 + 4 + 8 = 30s. +_BEAT_STATUS_TIMEOUT = 8 + + +@celery_app.task(soft_time_limit=120, time_limit=150) def check_processing_services_online(): """ Check the status of all processing services and update last_seen/last_seen_live fields. - - Sync services (dispatch_mode=SYNC_API, endpoint URL set): actively polled via /readyz. - - Async services (dispatch_mode=ASYNC_API, no endpoint URL): heartbeat is updated by - mark_seen() on registration and by _mark_pipeline_pull_services_seen() on task polling. - This task marks them offline if last_seen has exceeded PROCESSING_SERVICE_LAST_SEEN_MAX. + - Async services (no endpoint URL): heartbeat is updated by mark_seen() on registration + and by _mark_pipeline_pull_services_seen() on task polling. This task marks them offline + if last_seen has exceeded PROCESSING_SERVICE_LAST_SEEN_MAX. Runs first so it always + executes even if a sync service check is slow. + - Sync services (endpoint URL set): actively polled via /readyz. Uses a reduced timeout + vs. the default (which is designed for cold-start waits) since missed checks recover + on the next beat cycle. @TODO make this async to check all services in parallel """ @@ -113,15 +123,7 @@ def check_processing_services_online(): logger.info("Checking which processing services are online.") - for service in ProcessingService.objects.sync_services(): - logger.info(f"Checking push-mode service {service}") - try: - status_response = service.get_status() - logger.debug(status_response) - except Exception as e: - logger.error(f"Error checking service {service}: {e}") - continue - + # Async services first — fast DB-only operation, must not be skipped by a slow sync check stale_cutoff = datetime.datetime.now() - PROCESSING_SERVICE_LAST_SEEN_MAX stale = ProcessingService.objects.async_services().filter(last_seen_live=True, last_seen__lt=stale_cutoff) count = stale.count() @@ -130,3 +132,12 @@ def check_processing_services_online(): f"Marking {count} async service(s) offline (no heartbeat within {PROCESSING_SERVICE_LAST_SEEN_MAX})." ) stale.update(last_seen_live=False) + + for service in ProcessingService.objects.sync_services(): + logger.info(f"Checking push-mode service {service}") + try: + status_response = service.get_status(timeout=_BEAT_STATUS_TIMEOUT) + logger.debug(status_response) + except Exception as e: + logger.error(f"Error checking service {service}: {e}") + continue From 0fe99b4c16e8e7d42e62b4b95fbad8a8156ca344 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 26 Feb 2026 23:36:13 -0800 Subject: [PATCH 07/20] fix: update pull-mode status tests to match heartbeat-based contract Async services now derive liveness from heartbeats rather than returning an error message. Update assertions: server_live=False (not None) when no heartbeat has been received, and remove error message checks. Co-Authored-By: Claude --- ami/ml/tests.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/ami/ml/tests.py b/ami/ml/tests.py index e4497f769..36ba5b5f7 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -136,9 +136,9 @@ def test_create_processing_service_without_endpoint_url(self): # Check that endpoint_url is null self.assertIsNone(data["instance"]["endpoint_url"]) - # Check that status indicates no endpoint configured + # Check that status indicates service is not yet live (no heartbeat received) self.assertFalse(data["status"]["request_successful"]) - self.assertIn("No endpoint URL configured", data["status"]["error"]) + self.assertFalse(data["status"]["server_live"]) self.assertIsNone(data["status"]["endpoint_url"]) def test_get_status_with_null_endpoint_url(self): @@ -149,10 +149,8 @@ def test_get_status_with_null_endpoint_url(self): status = service.get_status() self.assertFalse(status.request_successful) - self.assertIsNone(status.server_live) + self.assertFalse(status.server_live) # No heartbeat received yet = not live self.assertIsNone(status.endpoint_url) - self.assertIsNotNone(status.error) - self.assertIn("No endpoint URL configured", (status.error or "")) self.assertEqual(status.pipelines_online, []) def test_get_pipeline_configs_with_null_endpoint_url(self): From 212f3c7fea79d160649901b18e28418ef158dddb Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 26 Feb 2026 23:36:20 -0800 Subject: [PATCH 08/20] fix: scope heartbeat update to job's project Filter async services by project when marking them as seen, preventing cross-project contamination when a pipeline is shared across projects. Clarify in the docstring that this still marks all async services on the pipeline within the project, not the individual caller, until application-token auth (PR #1117) is available. Co-Authored-By: Claude --- ami/jobs/views.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/ami/jobs/views.py b/ami/jobs/views.py index 2d4f96ce6..832e15f30 100644 --- a/ami/jobs/views.py +++ b/ami/jobs/views.py @@ -32,21 +32,23 @@ def _mark_pipeline_pull_services_seen(job: "Job") -> None: """ - Record a heartbeat for all async (pull-mode) processing services linked to the job's pipeline. + Record a heartbeat for async (pull-mode) processing services linked to the job's pipeline. Called on every task-fetch and result-submit request so that the worker's polling activity keeps last_seen/last_seen_live current. The periodic check_processing_services_online task will mark services offline if this heartbeat stops arriving within PROCESSING_SERVICE_LAST_SEEN_MAX. - Note: caller identity is not verified here — any authenticated token can hit these endpoints. - A future application-token scheme (see PR #1117) will allow tying requests to a specific - processing service so the heartbeat can be scoped more precisely. + IMPORTANT: This marks ALL async services on the pipeline within this project as live, not just + the specific service that made the request. If multiple async services share the same pipeline + within a project, a single worker polling will keep all of them appearing online. + Once application-token auth is available (PR #1117), this should be scoped to the individual + calling service instead. """ import datetime if not job.pipeline_id: return - job.pipeline.processing_services.async_services().update( + job.pipeline.processing_services.async_services().filter(projects=job.project_id).update( last_seen=datetime.datetime.now(), last_seen_live=True, ) From 5d262305788aeb17d1125c86aba98168981db9ca Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 26 Feb 2026 23:36:27 -0800 Subject: [PATCH 09/20] feat: expose is_async property to frontend Add is_async to ProcessingServiceSerializer and ProcessingServiceNestedSerializer so the frontend can distinguish pull-mode from push-mode services. Also normalize empty endpoint_url strings to undefined in the FE model for consistency with the backend. Co-Authored-By: Claude --- ami/ml/serializers.py | 11 +++++++++++ ui/src/data-services/models/processing-service.ts | 7 ++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/ami/ml/serializers.py b/ami/ml/serializers.py index 31a11a06d..6067952f1 100644 --- a/ami/ml/serializers.py +++ b/ami/ml/serializers.py @@ -2,6 +2,7 @@ from rest_framework import serializers from ami.main.api.serializers import DefaultSerializer, MinimalNestedModelSerializer +from ami.main.models import Project from .models.algorithm import Algorithm, AlgorithmCategoryMap from .models.pipeline import Pipeline, PipelineStage @@ -66,6 +67,8 @@ class Meta: class ProcessingServiceNestedSerializer(DefaultSerializer): + is_async = serializers.BooleanField(read_only=True) + class Meta: model = ProcessingService fields = [ @@ -73,6 +76,7 @@ class Meta: "id", "details", "endpoint_url", + "is_async", "last_seen", "last_seen_live", "created_at", @@ -134,6 +138,12 @@ class Meta: class ProcessingServiceSerializer(DefaultSerializer): pipelines = PipelineNestedSerializer(many=True, read_only=True) projects = serializers.SerializerMethodField() + is_async = serializers.BooleanField(read_only=True) + project = serializers.PrimaryKeyRelatedField( + write_only=True, + queryset=Project.objects.all(), + required=False, + ) class Meta: model = ProcessingService @@ -144,6 +154,7 @@ class Meta: "description", "projects", "endpoint_url", + "is_async", "pipelines", "created_at", "updated_at", diff --git a/ui/src/data-services/models/processing-service.ts b/ui/src/data-services/models/processing-service.ts index fdd0b2ac1..b4a8fe8a5 100644 --- a/ui/src/data-services/models/processing-service.ts +++ b/ui/src/data-services/models/processing-service.ts @@ -51,7 +51,12 @@ export class ProcessingService extends Entity { } get endpointUrl(): string | undefined { - return this._processingService.endpoint_url ?? undefined + const url = this._processingService.endpoint_url + return url && url.trim().length > 0 ? url : undefined + } + + get isAsync(): boolean { + return this._processingService.is_async } get description(): string { From 72b61a3b48fa3d707e55d2658ec0a66daa603a50 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Fri, 27 Feb 2026 14:56:32 -0800 Subject: [PATCH 10/20] =?UTF-8?q?fix:=20periodic=20service=20check=20?= =?UTF-8?q?=E2=80=94=20async=20first,=20short=20timeout,=20discard=20stale?= =?UTF-8?q?=20copies?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Run async stale-check before the sync loop so it always executes regardless of how long sync checks take - Reduce per-request timeout for the beat task from 90s (designed for cold-start waits) to 8s — a slow or unreachable service just waits for the next 5-minute cycle - Add expires=240s so copies queued during a worker outage are discarded when the worker returns; only the most recent firing runs Co-Authored-By: Claude --- ami/ml/tasks.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/ami/ml/tasks.py b/ami/ml/tasks.py index 02c3d961e..af4ca708d 100644 --- a/ami/ml/tasks.py +++ b/ami/ml/tasks.py @@ -95,14 +95,17 @@ def remove_duplicate_classifications(project_id: int | None = None, dry_run: boo return num_deleted -# Timeout for get_status() calls in the periodic beat task. Shorter than the default (90s) -# because we don't need to wait for cold starts here — if a service is starting up it will -# recover on the next check. With retries=3 and backoff_factor=2, worst case per service -# is roughly: 8 + 2 + 8 + 4 + 8 = 30s. +# Timeout per sync service in the periodic beat task. Shorter than the default (90s for +# cold-start waits) since a missed check just waits for the next beat cycle. +# Worst case with retries=3, backoff_factor=2: 8 + 2 + 8 + 4 + 8 = 30s per service. _BEAT_STATUS_TIMEOUT = 8 +# Discard queued copies that built up while the worker was unavailable — the next +# beat firing will pick things up fresh. Beat schedule is every 5 minutes. +_BEAT_TASK_EXPIRES = 240 -@celery_app.task(soft_time_limit=120, time_limit=150) + +@celery_app.task(soft_time_limit=120, time_limit=150, expires=_BEAT_TASK_EXPIRES) def check_processing_services_online(): """ Check the status of all processing services and update last_seen/last_seen_live fields. @@ -110,12 +113,9 @@ def check_processing_services_online(): - Async services (no endpoint URL): heartbeat is updated by mark_seen() on registration and by _mark_pipeline_pull_services_seen() on task polling. This task marks them offline if last_seen has exceeded PROCESSING_SERVICE_LAST_SEEN_MAX. Runs first so it always - executes even if a sync service check is slow. - - Sync services (endpoint URL set): actively polled via /readyz. Uses a reduced timeout - vs. the default (which is designed for cold-start waits) since missed checks recover - on the next beat cycle. - - @TODO make this async to check all services in parallel + executes even if a slow sync check hits the time limit. + - Sync services (endpoint URL set): checked sequentially with a short per-request timeout. + Safe to skip a cycle — the next beat firing will catch up. """ import datetime @@ -123,7 +123,7 @@ def check_processing_services_online(): logger.info("Checking which processing services are online.") - # Async services first — fast DB-only operation, must not be skipped by a slow sync check + # Async services first — fast DB-only operation, must not be blocked by sync checks stale_cutoff = datetime.datetime.now() - PROCESSING_SERVICE_LAST_SEEN_MAX stale = ProcessingService.objects.async_services().filter(last_seen_live=True, last_seen__lt=stale_cutoff) count = stale.count() From 795cd5227d82b86c888f6d16e8a0d646bfa8839b Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 23 Mar 2026 17:26:49 -0700 Subject: [PATCH 11/20] docs: explain get_status feature for sync vs. async services --- ami/ml/models/processing_service.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/ami/ml/models/processing_service.py b/ami/ml/models/processing_service.py index 72712aee8..2b47f7803 100644 --- a/ami/ml/models/processing_service.py +++ b/ami/ml/models/processing_service.py @@ -186,7 +186,15 @@ def mark_seen(self, live: bool = True) -> None: def get_status(self, timeout=90) -> ProcessingServiceStatusResponse: """ Check the status of the processing service. - This is a simple health check that pings the /readyz endpoint of the service. + + This check has two behaviors depending on the version of the processing service: + + If the service is a v2/pull-mode/async service with no endpoint URL, this will derive the status + from the last_seen heartbeat timestamp. If the last_seen timestamp is recent (within 60s), + the service is considered live. No requests are made by this method. + + If the service is a v1/push-mode/interactive service with an endpoint URL, this method will ping the + /readyz endpoint to check if it's live. Uses urllib3 Retry with exponential backoff to handle cold starts and transient failures. The timeout is set to 90s per attempt to accommodate serverless cold starts, especially for @@ -194,9 +202,10 @@ def get_status(self, timeout=90) -> ProcessingServiceStatusResponse: connection errors are handled gracefully. Args: - timeout: Request timeout in seconds per attempt (default: 90s for serverless cold starts) + timeout: Request timeout in seconds per attempt (default: 90s for serverless cold starts). Only applies \ + to services with an endpoint URL. """ - # If no endpoint URL is configured, derive status from last registration heartbeat + # If no endpoint URL is configured, the derive status from last registration heartbeat if not self.endpoint_url: is_live = bool( self.last_seen From ef5e6705fa8ce2309aac81b63f14d76a28daf144 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 23 Mar 2026 17:30:19 -0700 Subject: [PATCH 12/20] fix(ui): coerce nullable lastSeenLive and use i18n for "Last seen" label - Coerce `last_seen_live` null to false in the getter to match the boolean return type (backend field is nullable). - Use `translate(STRING.FIELD_LABEL_LAST_SEEN)` in the status column instead of a hardcoded English string, matching the pattern in the details dialog. Co-Authored-By: Claude --- ui/src/data-services/models/processing-service.ts | 2 +- .../project/processing-services/processing-services-columns.tsx | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ui/src/data-services/models/processing-service.ts b/ui/src/data-services/models/processing-service.ts index b4a8fe8a5..7df157361 100644 --- a/ui/src/data-services/models/processing-service.ts +++ b/ui/src/data-services/models/processing-service.ts @@ -84,7 +84,7 @@ export class ProcessingService extends Entity { } get lastSeenLive(): boolean { - return this._processingService.last_seen_live + return this._processingService.last_seen_live ?? false } get numPiplinesAdded(): number { diff --git a/ui/src/pages/project/processing-services/processing-services-columns.tsx b/ui/src/pages/project/processing-services/processing-services-columns.tsx index b18ab19c6..460e227d7 100644 --- a/ui/src/pages/project/processing-services/processing-services-columns.tsx +++ b/ui/src/pages/project/processing-services/processing-services-columns.tsx @@ -54,7 +54,7 @@ export const columns: ( renderCell: (item: ProcessingService) => ( ), From e87eb8f9deccadc30d705d28cf3dd66ff9db9aca Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 23 Mar 2026 17:30:32 -0700 Subject: [PATCH 13/20] fix: handle missing latency in service selection, improve beat task logging - Fix potential UnboundLocalError in select_processing_service() when all online services have last_seen_latency=None (e.g. async/pull-mode services). Falls back to the first online service and logs that no latency data exists. - Use logger.exception() instead of logger.error() in the beat task exception handler to preserve the full traceback for debugging. - Fix comment arithmetic for worst-case timeout calculation. Co-Authored-By: Claude --- ami/ml/models/pipeline.py | 18 ++++++++++-------- ami/ml/tasks.py | 6 +++--- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index 6e517ca05..c6d92e308 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -1161,29 +1161,31 @@ def choose_processing_service_for_pipeline( # check the status of all processing services and pick the one with the lowest latency lowest_latency = float("inf") - processing_services_online = False + processing_service_lowest_latency = None for processing_service in processing_services: if processing_service.last_seen_live: - processing_services_online = True if processing_service.last_seen_latency and processing_service.last_seen_latency < lowest_latency: lowest_latency = processing_service.last_seen_latency - # pick the processing service that has lowest latency + processing_service_lowest_latency = processing_service + elif processing_service_lowest_latency is None: + # Online but no latency data (e.g. async/pull-mode service) — use as fallback processing_service_lowest_latency = processing_service - # if all offline then throw error - if not processing_services_online: + if processing_service_lowest_latency is None: msg = f'No processing services are online for the pipeline "{pipeline_name}".' task_logger.error(msg) - raise Exception(msg) - else: + + if lowest_latency < float("inf"): task_logger.info( f"Using processing service with latency {round(lowest_latency, 4)}: " f"{processing_service_lowest_latency}" ) + else: + task_logger.info(f"Using processing service (no latency data): {processing_service_lowest_latency}") - return processing_service_lowest_latency + return processing_service_lowest_latency def process_images( self, diff --git a/ami/ml/tasks.py b/ami/ml/tasks.py index af4ca708d..3bfa458de 100644 --- a/ami/ml/tasks.py +++ b/ami/ml/tasks.py @@ -97,7 +97,7 @@ def remove_duplicate_classifications(project_id: int | None = None, dry_run: boo # Timeout per sync service in the periodic beat task. Shorter than the default (90s for # cold-start waits) since a missed check just waits for the next beat cycle. -# Worst case with retries=3, backoff_factor=2: 8 + 2 + 8 + 4 + 8 = 30s per service. +# Worst case: 4 attempts (initial + 3 retries) × 8s timeout + backoff (0 + 2 + 4) = 38s per service. _BEAT_STATUS_TIMEOUT = 8 # Discard queued copies that built up while the worker was unavailable — the next @@ -138,6 +138,6 @@ def check_processing_services_online(): try: status_response = service.get_status(timeout=_BEAT_STATUS_TIMEOUT) logger.debug(status_response) - except Exception as e: - logger.error(f"Error checking service {service}: {e}") + except Exception: + logger.exception("Error checking service %s", service) continue From 90926b4299d446a06796a2be4cf9607220702910 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 23 Mar 2026 18:54:26 -0700 Subject: [PATCH 14/20] feat(ui): show "Unknown" status for async processing services Async/pull-mode services (no endpoint URL) now show a gray "Unknown" status indicator instead of the misleading ONLINE/OFFLINE based on heartbeat data that applies to all services on the pipeline. The pipeline-level indicator in the dropdown continues to work as before. Co-Authored-By: Claude --- ui/src/data-services/models/processing-service.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/ui/src/data-services/models/processing-service.ts b/ui/src/data-services/models/processing-service.ts index 7df157361..056ca0361 100644 --- a/ui/src/data-services/models/processing-service.ts +++ b/ui/src/data-services/models/processing-service.ts @@ -7,6 +7,7 @@ export type ServerProcessingService = any // TODO: Update this type export const SERVER_PROCESSING_SERVICE_STATUS_CODES = [ 'OFFLINE', 'ONLINE', + 'UNKNOWN', ] as const export type ServerProcessingServiceStatusCode = @@ -15,6 +16,7 @@ export type ServerProcessingServiceStatusCode = export enum ProcessingServiceStatusType { Success, Error, + Unknown, } export class ProcessingService extends Entity { @@ -97,6 +99,9 @@ export class ProcessingService extends Entity { type: ProcessingServiceStatusType color: string } { + if (!this.endpointUrl) { + return ProcessingService.getStatusInfo('UNKNOWN') + } const status_code = this.lastSeenLive ? 'ONLINE' : 'OFFLINE' return ProcessingService.getStatusInfo(status_code) } @@ -108,11 +113,13 @@ export class ProcessingService extends Entity { const type = { OFFLINE: ProcessingServiceStatusType.Error, ONLINE: ProcessingServiceStatusType.Success, + UNKNOWN: ProcessingServiceStatusType.Unknown, }[code] const color = { [ProcessingServiceStatusType.Error]: '#ef4444', // color-destructive-500, [ProcessingServiceStatusType.Success]: '#09af8a', // color-success-500 + [ProcessingServiceStatusType.Unknown]: '#9ca3af', // gray-400 }[type] return { From 91e773495e4515853bb0a8ffa73ce3fde574bccf Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 23 Mar 2026 19:09:14 -0700 Subject: [PATCH 15/20] fix(ui): treat async pipelines as selectable in pipeline picker Async/pull-mode services (no endpoint URL) are now treated as "online" in the pipeline selector since their status cannot be verified via active polling. This prevents the "Process Now" dropdown from incorrectly disabling async pipelines when no recent heartbeat exists. Co-Authored-By: Claude --- ui/src/data-services/models/pipeline.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ui/src/data-services/models/pipeline.ts b/ui/src/data-services/models/pipeline.ts index 52c49c80e..df704d037 100644 --- a/ui/src/data-services/models/pipeline.ts +++ b/ui/src/data-services/models/pipeline.ts @@ -78,7 +78,7 @@ export class Pipeline extends Entity { (service: any) => new ProcessingService(service) ) for (const processingService of processingServices) { - if (processingService.lastSeenLive) { + if (processingService.lastSeenLive || !processingService.endpointUrl) { return { online: true, service: processingService } } } From 3e1b17e529b5a216014cb02568d7cef05244abac Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 23 Mar 2026 19:17:43 -0700 Subject: [PATCH 16/20] refactor(ui): use isAsync instead of !endpointUrl for async service checks Use the explicit is_async field from the backend serializer rather than inferring async status from the absence of endpoint_url. Co-Authored-By: Claude --- ui/src/data-services/models/pipeline.ts | 2 +- ui/src/data-services/models/processing-service.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ui/src/data-services/models/pipeline.ts b/ui/src/data-services/models/pipeline.ts index df704d037..c5c676d31 100644 --- a/ui/src/data-services/models/pipeline.ts +++ b/ui/src/data-services/models/pipeline.ts @@ -78,7 +78,7 @@ export class Pipeline extends Entity { (service: any) => new ProcessingService(service) ) for (const processingService of processingServices) { - if (processingService.lastSeenLive || !processingService.endpointUrl) { + if (processingService.lastSeenLive || processingService.isAsync) { return { online: true, service: processingService } } } diff --git a/ui/src/data-services/models/processing-service.ts b/ui/src/data-services/models/processing-service.ts index 056ca0361..e70fcb2e6 100644 --- a/ui/src/data-services/models/processing-service.ts +++ b/ui/src/data-services/models/processing-service.ts @@ -99,7 +99,7 @@ export class ProcessingService extends Entity { type: ProcessingServiceStatusType color: string } { - if (!this.endpointUrl) { + if (this.isAsync) { return ProcessingService.getStatusInfo('UNKNOWN') } const status_code = this.lastSeenLive ? 'ONLINE' : 'OFFLINE' From f446e3c5535e938c2d194395b9dc3faa5a28d807 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 23 Mar 2026 19:29:50 -0700 Subject: [PATCH 17/20] fix: consistent status response payload and defensive isAsync coerce - Return empty pipelines_online list when async service is offline, avoiding contradictory payload (server_live=False + non-empty pipelines). - Add ?? false to isAsync getter to match defensive pattern used by lastSeenLive. Co-Authored-By: Claude --- ami/ml/models/processing_service.py | 2 +- ui/src/data-services/models/processing-service.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ami/ml/models/processing_service.py b/ami/ml/models/processing_service.py index 2b47f7803..bad3dd147 100644 --- a/ami/ml/models/processing_service.py +++ b/ami/ml/models/processing_service.py @@ -216,7 +216,7 @@ def get_status(self, timeout=90) -> ProcessingServiceStatusResponse: # Heartbeat has expired — mark stale self.last_seen_live = False self.save(update_fields=["last_seen_live"]) - pipeline_names = list(self.pipelines.values_list("name", flat=True)) + pipeline_names = list(self.pipelines.values_list("name", flat=True)) if is_live else [] return ProcessingServiceStatusResponse( timestamp=self.last_seen or datetime.datetime.now(), request_successful=is_live, diff --git a/ui/src/data-services/models/processing-service.ts b/ui/src/data-services/models/processing-service.ts index e70fcb2e6..836baba6c 100644 --- a/ui/src/data-services/models/processing-service.ts +++ b/ui/src/data-services/models/processing-service.ts @@ -58,7 +58,7 @@ export class ProcessingService extends Entity { } get isAsync(): boolean { - return this._processingService.is_async + return this._processingService.is_async ?? false } get description(): string { From 34c7efb3939c546491d49079e228b9df16cb3d35 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 23 Mar 2026 19:52:37 -0700 Subject: [PATCH 18/20] fix: add missing 'project' field to ProcessingServiceSerializer fields list The write-only project field was declared on the serializer but not included in Meta.fields, causing an AssertionError at runtime. Missed during rebase conflict resolution. Co-Authored-By: Claude --- ami/ml/serializers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ami/ml/serializers.py b/ami/ml/serializers.py index 6067952f1..1711e19e1 100644 --- a/ami/ml/serializers.py +++ b/ami/ml/serializers.py @@ -160,6 +160,7 @@ class Meta: "updated_at", "last_seen", "last_seen_live", + "project", ] def get_projects(self, obj): From ca0a36b3810ff8509d33655cad7b508aab6b377f Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Mon, 23 Mar 2026 20:21:13 -0700 Subject: [PATCH 19/20] style: fix prettier formatting in processing-services-columns Co-Authored-By: Claude --- .../processing-services/processing-services-columns.tsx | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ui/src/pages/project/processing-services/processing-services-columns.tsx b/ui/src/pages/project/processing-services/processing-services-columns.tsx index 460e227d7..db91f6534 100644 --- a/ui/src/pages/project/processing-services/processing-services-columns.tsx +++ b/ui/src/pages/project/processing-services/processing-services-columns.tsx @@ -54,7 +54,11 @@ export const columns: ( renderCell: (item: ProcessingService) => ( ), From 0984ed8ba42792f8f12e1d75d9f761ae1a958305 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 25 Mar 2026 18:15:02 -0700 Subject: [PATCH 20/20] fix: remove createdAt/updatedAt overrides that conflict with Entity base class The base Entity class was updated on main to return Date types for these getters. The overrides returning string types caused TS compilation errors. Co-Authored-By: Claude --- .../models/processing-service.ts | 24 ++----------------- .../processing-services-columns.tsx | 5 +++- 2 files changed, 6 insertions(+), 23 deletions(-) diff --git a/ui/src/data-services/models/processing-service.ts b/ui/src/data-services/models/processing-service.ts index 836baba6c..627a19616 100644 --- a/ui/src/data-services/models/processing-service.ts +++ b/ui/src/data-services/models/processing-service.ts @@ -1,4 +1,3 @@ -import { getFormatedDateTimeString } from 'utils/date/getFormatedDateTimeString/getFormatedDateTimeString' import { Entity } from './entity' import { Pipeline, ServerPipeline } from './pipeline' @@ -38,12 +37,6 @@ export class ProcessingService extends Entity { return this._pipelines } - get createdAt(): string { - return getFormatedDateTimeString({ - date: new Date(this._processingService.created_at), - }) - } - get id(): string { return `${this._processingService.id}` } @@ -65,24 +58,11 @@ export class ProcessingService extends Entity { return `${this._processingService.description}` } - get updatedAt(): string | undefined { - if (!this._processingService.updated_at) { - return undefined - } - - return getFormatedDateTimeString({ - date: new Date(this._processingService.updated_at), - }) - } - - get lastSeen(): string | undefined { + get lastSeen(): Date | undefined { if (!this._processingService.last_seen) { return undefined } - - return getFormatedDateTimeString({ - date: new Date(this._processingService.last_seen), - }) + return new Date(this._processingService.last_seen) } get lastSeenLive(): boolean { diff --git a/ui/src/pages/project/processing-services/processing-services-columns.tsx b/ui/src/pages/project/processing-services/processing-services-columns.tsx index db91f6534..da2ab1078 100644 --- a/ui/src/pages/project/processing-services/processing-services-columns.tsx +++ b/ui/src/pages/project/processing-services/processing-services-columns.tsx @@ -1,3 +1,4 @@ +import { getFormatedDateTimeString } from 'utils/date/getFormatedDateTimeString/getFormatedDateTimeString' import { API_ROUTES } from 'data-services/constants' import { ProcessingService } from 'data-services/models/processing-service' import { BasicTableCell } from 'design-system/components/table/basic-table-cell/basic-table-cell' @@ -56,7 +57,9 @@ export const columns: ( color={item.status.color} details={ item.lastSeen - ? translate(STRING.FIELD_LABEL_LAST_SEEN) + ' ' + item.lastSeen + ? translate(STRING.FIELD_LABEL_LAST_SEEN) + + ' ' + + getFormatedDateTimeString({ date: item.lastSeen }) : undefined } label={item.status.label}