From 2ff807d972c86b0ad48761a01b59026b187c07e8 Mon Sep 17 00:00:00 2001 From: jishnundth Date: Mon, 11 May 2026 11:21:17 +0000 Subject: [PATCH 1/4] fix: scheduled timeout is not being propogated to task.result() --- .../services/controllers/task/controller.go | 7 ++++++- internal/services/dispatcher/server_v1.go | 1 - internal/services/scheduler/v1/scheduler.go | 5 +++-- pkg/repository/output.go | 18 ++++++++++++++++++ pkg/repository/task.go | 16 ++++++++++------ 5 files changed, 37 insertions(+), 10 deletions(-) diff --git a/internal/services/controllers/task/controller.go b/internal/services/controllers/task/controller.go index 4fad5527ff..24444d5f88 100644 --- a/internal/services/controllers/task/controller.go +++ b/internal/services/controllers/task/controller.go @@ -638,6 +638,7 @@ func (tc *TasksControllerImpl) handleTaskCancelled(ctx context.Context, tenantId msgs := msgqueue.JSONConvert[tasktypes.CancelledTaskPayload](payloads) shouldTasksNotify := make(map[int64]bool) + reasons := make(map[int64]string) for _, msg := range msgs { opts = append(opts, v1.TaskIdInsertedAtRetryCount{ @@ -647,9 +648,13 @@ func (tc *TasksControllerImpl) handleTaskCancelled(ctx context.Context, tenantId }) shouldTasksNotify[msg.TaskId] = msg.ShouldNotify + + if msg.EventMessage != "" { + reasons[msg.TaskId] = msg.EventMessage + } } - res, err := tc.repov1.Tasks().CancelTasks(ctx, tenantId, opts) + res, err := tc.repov1.Tasks().CancelTasks(ctx, tenantId, opts, reasons) if err != nil { err = fmt.Errorf("could not cancel tasks: %w", err) diff --git a/internal/services/dispatcher/server_v1.go b/internal/services/dispatcher/server_v1.go index 0cd9f3acf9..1c2d176a1c 100644 --- a/internal/services/dispatcher/server_v1.go +++ b/internal/services/dispatcher/server_v1.go @@ -532,7 +532,6 @@ func (s *DispatcherImpl) taskEventsToWorkflowRunEvent(tenantId uuid.UUID, finali case sqlcv1.V1TaskEventTypeFAILED: res.Error = &event.ErrorMessage case sqlcv1.V1TaskEventTypeCANCELLED: - //FIXME: this should be more specific for schedule timeouts res.Error = &event.ErrorMessage } diff --git a/internal/services/scheduler/v1/scheduler.go b/internal/services/scheduler/v1/scheduler.go index eb794fb040..54e5446f65 100644 --- a/internal/services/scheduler/v1/scheduler.go +++ b/internal/services/scheduler/v1/scheduler.go @@ -587,7 +587,7 @@ func (s *Scheduler) scheduleStepRuns(ctx context.Context, tenantId uuid.UUID, re schedulingTimedOut.WorkflowRunID, schedulingTimedOut.RetryCount, sqlcv1.V1EventTypeOlapSCHEDULINGTIMEDOUT, - "", + repov1.CancelledReasonSchedulingTimeout.UserMessage(), false, ) @@ -722,9 +722,10 @@ func (s *Scheduler) notifyAfterConcurrency(ctx context.Context, tenantId uuid.UU if cancelled.CancelledReason == "SCHEDULING_TIMED_OUT" { eventType = sqlcv1.V1EventTypeOlapSCHEDULINGTIMEDOUT + eventMessage = repov1.CancelledReasonSchedulingTimeout.UserMessage() shouldNotify = false } else { - eventMessage = "Cancelled due to concurrency strategy" + eventMessage = repov1.CancelledReasonConcurrencyLimit.UserMessage() } msg, err := tasktypes.CancelledTaskMessage( diff --git a/pkg/repository/output.go b/pkg/repository/output.go index fea33fd0b3..a403804a3d 100644 --- a/pkg/repository/output.go +++ b/pkg/repository/output.go @@ -9,6 +9,24 @@ import ( "github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1" ) +type CancelledReason string + +const ( + CancelledReasonSchedulingTimeout CancelledReason = "SCHEDULING_TIMED_OUT" + + CancelledReasonConcurrencyLimit CancelledReason = "CONCURRENCY_LIMIT" +) + +func (r CancelledReason) UserMessage() string { + switch r { + case CancelledReasonSchedulingTimeout: + return "task did not start within its schedule timeout" + case CancelledReasonConcurrencyLimit: + return "task was cancelled by a concurrency strategy" + } + return "" +} + type TaskOutputEvent struct { IsFailure bool `json:"is_failure"` diff --git a/pkg/repository/task.go b/pkg/repository/task.go index 33b7c577ac..7c8d2d941e 100644 --- a/pkg/repository/task.go +++ b/pkg/repository/task.go @@ -245,7 +245,7 @@ type TaskRepository interface { FailTasks(ctx context.Context, tenantId uuid.UUID, tasks []FailTaskOpts) (*FailTasksResponse, error) - CancelTasks(ctx context.Context, tenantId uuid.UUID, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error) + CancelTasks(ctx context.Context, tenantId uuid.UUID, tasks []TaskIdInsertedAtRetryCount, reasons map[int64]string) (*FinalizedTaskResponse, error) ListTasks(ctx context.Context, tenantId uuid.UUID, tasks []int64) ([]*sqlcv1.V1Task, error) @@ -1036,7 +1036,7 @@ func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tena return res, nil } -func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId uuid.UUID, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error) { +func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId uuid.UUID, tasks []TaskIdInsertedAtRetryCount, reasons map[int64]string) (*FinalizedTaskResponse, error) { ctx, span := telemetry.NewSpan(ctx, "TaskRepositoryImpl.CancelTasks") defer span.End() @@ -1059,7 +1059,7 @@ func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId uuid.UUID defer rollback() // release queue items - res, err := r.cancelTasks(ctx, tx, tenantId, tasks) + res, err := r.cancelTasks(ctx, tx, tenantId, tasks, reasons) if err != nil { err = fmt.Errorf("failed to cancel tasks: %w", err) @@ -1079,7 +1079,7 @@ func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId uuid.UUID return res, nil } -func (r *sharedRepository) cancelTasks(ctx context.Context, dbtx sqlcv1.DBTX, tenantId uuid.UUID, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error) { +func (r *sharedRepository) cancelTasks(ctx context.Context, dbtx sqlcv1.DBTX, tenantId uuid.UUID, tasks []TaskIdInsertedAtRetryCount, reasons map[int64]string) (*FinalizedTaskResponse, error) { // get a unique set of task ids and retry counts tasks = uniqueSet(tasks) @@ -1093,9 +1093,13 @@ func (r *sharedRepository) cancelTasks(ctx context.Context, dbtx sqlcv1.DBTX, te outputs := make([][]byte, len(releasedTasks)) for i, releasedTask := range releasedTasks { - out := NewCancelledTaskOutputEvent(releasedTask).Bytes() + out := NewCancelledTaskOutputEvent(releasedTask) - outputs[i] = out + if reason, ok := reasons[releasedTask.ID]; ok { + out.ErrorMessage = reason + } + + outputs[i] = out.Bytes() } internalEvents, err := r.createTaskEventsAfterRelease( From 1dac8a749788ddafd5fbc8563ddf4ed3ef9f2372 Mon Sep 17 00:00:00 2001 From: jishnundth Date: Mon, 11 May 2026 18:27:23 +0000 Subject: [PATCH 2/4] fix: make grpc message size configurable for TS sdk --- sdks/typescript/CHANGELOG.md | 6 +++ sdks/typescript/package.json | 2 +- .../hatchet-client/client-config.test.ts | 45 +++++++++++++++++++ .../clients/hatchet-client/client-config.ts | 19 +++++++- .../util/config-loader/config-loader.test.ts | 8 ++++ .../src/util/config-loader/config-loader.ts | 24 +++++++++- sdks/typescript/src/util/grpc-helpers.ts | 2 + sdks/typescript/src/v1/client/admin.ts | 8 ++-- sdks/typescript/src/version.ts | 2 +- 9 files changed, 109 insertions(+), 7 deletions(-) diff --git a/sdks/typescript/CHANGELOG.md b/sdks/typescript/CHANGELOG.md index f5d6632433..f0af434ca2 100644 --- a/sdks/typescript/CHANGELOG.md +++ b/sdks/typescript/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to Hatchet's TypeScript SDK will be documented in this chang The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.22.4] - 2026-05-19 + +### Added + +- Adds `grpc_max_recv_message_length` and `grpc_max_send_message_length` to client config, also configurable via env vars. Defaults to 4MB. + ## [1.22.3] - 2026-05-18 ### Fixed diff --git a/sdks/typescript/package.json b/sdks/typescript/package.json index 218f8501a2..a2652736a3 100644 --- a/sdks/typescript/package.json +++ b/sdks/typescript/package.json @@ -1,6 +1,6 @@ { "name": "@hatchet-dev/typescript-sdk", - "version": "1.22.3", + "version": "1.22.4", "description": "Background task orchestration & visibility for developers", "types": "dist/index.d.ts", "files": [ diff --git a/sdks/typescript/src/clients/hatchet-client/client-config.test.ts b/sdks/typescript/src/clients/hatchet-client/client-config.test.ts index 3c0a3851ac..672b3ccd4a 100644 --- a/sdks/typescript/src/clients/hatchet-client/client-config.test.ts +++ b/sdks/typescript/src/clients/hatchet-client/client-config.test.ts @@ -54,3 +54,48 @@ describe('ClientConfigSchema cancellation timing', () => { ).toThrow(); }); }); + +describe('ClientConfigSchema gRPC max message length', () => { + it('applies 4MB defaults for both recv and send', () => { + const cfg = ClientConfigSchema.parse(baseConfig()); + expect(cfg.grpc_max_recv_message_length).toBe(4 * 1024 * 1024); + expect(cfg.grpc_max_send_message_length).toBe(4 * 1024 * 1024); + }); + + it('accepts custom positive integer values', () => { + const cfg = ClientConfigSchema.parse({ + ...baseConfig(), + grpc_max_recv_message_length: 8 * 1024 * 1024, + grpc_max_send_message_length: 16 * 1024 * 1024, + }); + expect(cfg.grpc_max_recv_message_length).toBe(8 * 1024 * 1024); + expect(cfg.grpc_max_send_message_length).toBe(16 * 1024 * 1024); + }); + + it('rejects invalid values', () => { + expect(() => + ClientConfigSchema.parse({ + ...baseConfig(), + grpc_max_recv_message_length: 0, + }) + ).toThrow(); + expect(() => + ClientConfigSchema.parse({ + ...baseConfig(), + grpc_max_send_message_length: -1, + }) + ).toThrow(); + expect(() => + ClientConfigSchema.parse({ + ...baseConfig(), + grpc_max_recv_message_length: 1.5, + }) + ).toThrow(); + expect(() => + ClientConfigSchema.parse({ + ...baseConfig(), + grpc_max_send_message_length: '4mb' as any, + }) + ).toThrow(); + }); +}); diff --git a/sdks/typescript/src/clients/hatchet-client/client-config.ts b/sdks/typescript/src/clients/hatchet-client/client-config.ts index f584810655..a87a03f476 100644 --- a/sdks/typescript/src/clients/hatchet-client/client-config.ts +++ b/sdks/typescript/src/clients/hatchet-client/client-config.ts @@ -54,6 +54,18 @@ export const ClientConfigSchema = z.object({ middleware: TaskMiddlewareSchema, cancellation_grace_period: DurationMsSchema.optional().default(1000), cancellation_warning_threshold: DurationMsSchema.optional().default(300), + grpc_max_recv_message_length: z + .number() + .int() + .positive() + .optional() + .default(4 * 1024 * 1024), + grpc_max_send_message_length: z + .number() + .int() + .positive() + .optional() + .default(4 * 1024 * 1024), }); export type LogConstructor = (context: string, logLevel?: LogLevel) => Logger; @@ -127,10 +139,15 @@ type ClientConfigInferred = z.infer; export type ClientConfig = Omit< ClientConfigInferred, - 'cancellation_grace_period' | 'cancellation_warning_threshold' + | 'cancellation_grace_period' + | 'cancellation_warning_threshold' + | 'grpc_max_recv_message_length' + | 'grpc_max_send_message_length' > & { cancellation_grace_period?: number; cancellation_warning_threshold?: number; + grpc_max_recv_message_length?: number; + grpc_max_send_message_length?: number; } & { credentials?: ChannelCredentials; } & { diff --git a/sdks/typescript/src/util/config-loader/config-loader.test.ts b/sdks/typescript/src/util/config-loader/config-loader.test.ts index 42b663ddb9..84ce533ffa 100644 --- a/sdks/typescript/src/util/config-loader/config-loader.test.ts +++ b/sdks/typescript/src/util/config-loader/config-loader.test.ts @@ -4,6 +4,8 @@ describe('ConfigLoader', () => { beforeEach(() => { // Clear env vars that might leak from other tests delete process.env.HATCHET_CLIENT_TLS_STRATEGY; + delete process.env.HATCHET_CLIENT_GRPC_MAX_RECV_MESSAGE_LENGTH; + delete process.env.HATCHET_CLIENT_GRPC_MAX_SEND_MESSAGE_LENGTH; process.env.HATCHET_CLIENT_TOKEN = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJncnBjX2Jyb2FkY2FzdF9hZGRyZXNzIjoiMTI3LjAuMC4xOjgwODAiLCJzZXJ2ZXJfdXJsIjoiaHR0cDovL2xvY2FsaG9zdDo4MDgwIiwic3ViIjoiNzA3ZDA4NTUtODBhYi00ZTFmLWExNTYtZjFjNDU0NmNiZjUyIn0K.abcdef'; @@ -14,6 +16,8 @@ describe('ConfigLoader', () => { process.env.HATCHET_CLIENT_TLS_SERVER_NAME = 'TLS_SERVER_NAME'; process.env.HATCHET_CLIENT_WORKER_HEALTHCHECK_ENABLED = 'true'; process.env.HATCHET_CLIENT_WORKER_HEALTHCHECK_PORT = '8001'; + process.env.HATCHET_CLIENT_GRPC_MAX_RECV_MESSAGE_LENGTH = String(8 * 1024 * 1024); + process.env.HATCHET_CLIENT_GRPC_MAX_SEND_MESSAGE_LENGTH = String(16 * 1024 * 1024); }); it('should load from environment variables', () => { @@ -41,6 +45,8 @@ describe('ConfigLoader', () => { excludedAttributes: [], includeTaskNameInSpanName: false, }, + grpc_max_recv_message_length: 8 * 1024 * 1024, + grpc_max_send_message_length: 16 * 1024 * 1024, }); }); @@ -97,6 +103,8 @@ describe('ConfigLoader', () => { excludedAttributes: ['additional_metadata'], includeTaskNameInSpanName: true, }, + grpc_max_recv_message_length: 8 * 1024 * 1024, + grpc_max_send_message_length: 16 * 1024 * 1024, }); }); diff --git a/sdks/typescript/src/util/config-loader/config-loader.ts b/sdks/typescript/src/util/config-loader/config-loader.ts index a831953638..ae6048b4bd 100644 --- a/sdks/typescript/src/util/config-loader/config-loader.ts +++ b/sdks/typescript/src/util/config-loader/config-loader.ts @@ -21,7 +21,9 @@ type EnvVars = | 'HATCHET_CLIENT_WORKER_HEALTHCHECK_ENABLED' | 'HATCHET_CLIENT_WORKER_HEALTHCHECK_PORT' | 'HATCHET_CLIENT_OPENTELEMETRY_EXCLUDED_ATTRIBUTES' - | 'HATCHET_CLIENT_OPENTELEMETRY_INCLUDE_TASK_NAME_IN_SPAN_NAME'; + | 'HATCHET_CLIENT_OPENTELEMETRY_INCLUDE_TASK_NAME_IN_SPAN_NAME' + | 'HATCHET_CLIENT_GRPC_MAX_RECV_MESSAGE_LENGTH' + | 'HATCHET_CLIENT_GRPC_MAX_SEND_MESSAGE_LENGTH'; type TLSStrategy = 'tls' | 'mtls'; @@ -105,6 +107,18 @@ export class ConfigLoader { this.env('HATCHET_CLIENT_OPENTELEMETRY_INCLUDE_TASK_NAME_IN_SPAN_NAME') === 'true', }; + const grpcMaxRecvMessageLength = + override?.grpc_max_recv_message_length ?? + yaml?.grpc_max_recv_message_length ?? + this.parseIntEnv(this.env('HATCHET_CLIENT_GRPC_MAX_RECV_MESSAGE_LENGTH')) ?? + 4 * 1024 * 1024; + + const grpcMaxSendMessageLength = + override?.grpc_max_send_message_length ?? + yaml?.grpc_max_send_message_length ?? + this.parseIntEnv(this.env('HATCHET_CLIENT_GRPC_MAX_SEND_MESSAGE_LENGTH')) ?? + 4 * 1024 * 1024; + return { token: override?.token ?? yaml?.token ?? this.env('HATCHET_CLIENT_TOKEN'), host_port: grpcBroadcastAddress, @@ -119,9 +133,17 @@ export class ConfigLoader { tenant_id: tenantId, namespace: namespace ? `${namespace}`.toLowerCase() : '', otel: otelConfig, + grpc_max_recv_message_length: grpcMaxRecvMessageLength, + grpc_max_send_message_length: grpcMaxSendMessageLength, }; } + private static parseIntEnv(value: string | undefined): number | undefined { + if (!value) return undefined; + const parsed = parseInt(value, 10); + return Number.isNaN(parsed) ? undefined : parsed; + } + private static parseJsonArray(value: string): string[] { try { const parsed = JSON.parse(value); diff --git a/sdks/typescript/src/util/grpc-helpers.ts b/sdks/typescript/src/util/grpc-helpers.ts index 9a2a8749e4..d4c066a074 100644 --- a/sdks/typescript/src/util/grpc-helpers.ts +++ b/sdks/typescript/src/util/grpc-helpers.ts @@ -19,6 +19,8 @@ export const channelFactory = (config: ClientConfig, credentials: ChannelCredent 'grpc.keepalive_permit_without_calls': 1, // Enable gzip compression for all calls on this channel 'grpc.default_compression_algorithm': 2, // 2 = Gzip compression + 'grpc.max_send_message_length': config.grpc_max_send_message_length ?? 4 * 1024 * 1024, + 'grpc.max_receive_message_length': config.grpc_max_recv_message_length ?? 4 * 1024 * 1024, }); export const addTokenMiddleware = (token: string) => diff --git a/sdks/typescript/src/v1/client/admin.ts b/sdks/typescript/src/v1/client/admin.ts index f4f6398fff..87540053cc 100644 --- a/sdks/typescript/src/v1/client/admin.ts +++ b/sdks/typescript/src/v1/client/admin.ts @@ -248,9 +248,11 @@ export class AdminClient { }; }); - const limit = 4 * 1024 * 1024; // FIXME configurable GRPC limit - - const batches = batch(workflowRequests, batchSize, limit); + const batches = batch( + workflowRequests, + batchSize, + this.config.grpc_max_send_message_length ?? 4 * 1024 * 1024 + ); this.logger.debug(`batching ${batches.length} batches`); diff --git a/sdks/typescript/src/version.ts b/sdks/typescript/src/version.ts index e9957e3875..d0eefc1020 100644 --- a/sdks/typescript/src/version.ts +++ b/sdks/typescript/src/version.ts @@ -1 +1 @@ -export const HATCHET_VERSION = '1.22.1'; +export const HATCHET_VERSION = '1.22.4'; From 2fd15cbd4b5f01ef9b40904de02e5e18e5b54afa Mon Sep 17 00:00:00 2001 From: jishnundth Date: Mon, 11 May 2026 18:29:52 +0000 Subject: [PATCH 3/4] Revert "fix: scheduled timeout is not being propogated to task.result()" This reverts commit 8697370184916fa4ed2ddd5e41e08ffa041df635. --- .../services/controllers/task/controller.go | 7 +------ internal/services/dispatcher/server_v1.go | 1 + internal/services/scheduler/v1/scheduler.go | 5 ++--- pkg/repository/output.go | 18 ------------------ pkg/repository/task.go | 16 ++++++---------- 5 files changed, 10 insertions(+), 37 deletions(-) diff --git a/internal/services/controllers/task/controller.go b/internal/services/controllers/task/controller.go index 24444d5f88..4fad5527ff 100644 --- a/internal/services/controllers/task/controller.go +++ b/internal/services/controllers/task/controller.go @@ -638,7 +638,6 @@ func (tc *TasksControllerImpl) handleTaskCancelled(ctx context.Context, tenantId msgs := msgqueue.JSONConvert[tasktypes.CancelledTaskPayload](payloads) shouldTasksNotify := make(map[int64]bool) - reasons := make(map[int64]string) for _, msg := range msgs { opts = append(opts, v1.TaskIdInsertedAtRetryCount{ @@ -648,13 +647,9 @@ func (tc *TasksControllerImpl) handleTaskCancelled(ctx context.Context, tenantId }) shouldTasksNotify[msg.TaskId] = msg.ShouldNotify - - if msg.EventMessage != "" { - reasons[msg.TaskId] = msg.EventMessage - } } - res, err := tc.repov1.Tasks().CancelTasks(ctx, tenantId, opts, reasons) + res, err := tc.repov1.Tasks().CancelTasks(ctx, tenantId, opts) if err != nil { err = fmt.Errorf("could not cancel tasks: %w", err) diff --git a/internal/services/dispatcher/server_v1.go b/internal/services/dispatcher/server_v1.go index 1c2d176a1c..0cd9f3acf9 100644 --- a/internal/services/dispatcher/server_v1.go +++ b/internal/services/dispatcher/server_v1.go @@ -532,6 +532,7 @@ func (s *DispatcherImpl) taskEventsToWorkflowRunEvent(tenantId uuid.UUID, finali case sqlcv1.V1TaskEventTypeFAILED: res.Error = &event.ErrorMessage case sqlcv1.V1TaskEventTypeCANCELLED: + //FIXME: this should be more specific for schedule timeouts res.Error = &event.ErrorMessage } diff --git a/internal/services/scheduler/v1/scheduler.go b/internal/services/scheduler/v1/scheduler.go index 54e5446f65..eb794fb040 100644 --- a/internal/services/scheduler/v1/scheduler.go +++ b/internal/services/scheduler/v1/scheduler.go @@ -587,7 +587,7 @@ func (s *Scheduler) scheduleStepRuns(ctx context.Context, tenantId uuid.UUID, re schedulingTimedOut.WorkflowRunID, schedulingTimedOut.RetryCount, sqlcv1.V1EventTypeOlapSCHEDULINGTIMEDOUT, - repov1.CancelledReasonSchedulingTimeout.UserMessage(), + "", false, ) @@ -722,10 +722,9 @@ func (s *Scheduler) notifyAfterConcurrency(ctx context.Context, tenantId uuid.UU if cancelled.CancelledReason == "SCHEDULING_TIMED_OUT" { eventType = sqlcv1.V1EventTypeOlapSCHEDULINGTIMEDOUT - eventMessage = repov1.CancelledReasonSchedulingTimeout.UserMessage() shouldNotify = false } else { - eventMessage = repov1.CancelledReasonConcurrencyLimit.UserMessage() + eventMessage = "Cancelled due to concurrency strategy" } msg, err := tasktypes.CancelledTaskMessage( diff --git a/pkg/repository/output.go b/pkg/repository/output.go index a403804a3d..fea33fd0b3 100644 --- a/pkg/repository/output.go +++ b/pkg/repository/output.go @@ -9,24 +9,6 @@ import ( "github.com/hatchet-dev/hatchet/pkg/repository/sqlcv1" ) -type CancelledReason string - -const ( - CancelledReasonSchedulingTimeout CancelledReason = "SCHEDULING_TIMED_OUT" - - CancelledReasonConcurrencyLimit CancelledReason = "CONCURRENCY_LIMIT" -) - -func (r CancelledReason) UserMessage() string { - switch r { - case CancelledReasonSchedulingTimeout: - return "task did not start within its schedule timeout" - case CancelledReasonConcurrencyLimit: - return "task was cancelled by a concurrency strategy" - } - return "" -} - type TaskOutputEvent struct { IsFailure bool `json:"is_failure"` diff --git a/pkg/repository/task.go b/pkg/repository/task.go index 7c8d2d941e..33b7c577ac 100644 --- a/pkg/repository/task.go +++ b/pkg/repository/task.go @@ -245,7 +245,7 @@ type TaskRepository interface { FailTasks(ctx context.Context, tenantId uuid.UUID, tasks []FailTaskOpts) (*FailTasksResponse, error) - CancelTasks(ctx context.Context, tenantId uuid.UUID, tasks []TaskIdInsertedAtRetryCount, reasons map[int64]string) (*FinalizedTaskResponse, error) + CancelTasks(ctx context.Context, tenantId uuid.UUID, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error) ListTasks(ctx context.Context, tenantId uuid.UUID, tasks []int64) ([]*sqlcv1.V1Task, error) @@ -1036,7 +1036,7 @@ func (r *TaskRepositoryImpl) ListFinalizedWorkflowRuns(ctx context.Context, tena return res, nil } -func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId uuid.UUID, tasks []TaskIdInsertedAtRetryCount, reasons map[int64]string) (*FinalizedTaskResponse, error) { +func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId uuid.UUID, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error) { ctx, span := telemetry.NewSpan(ctx, "TaskRepositoryImpl.CancelTasks") defer span.End() @@ -1059,7 +1059,7 @@ func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId uuid.UUID defer rollback() // release queue items - res, err := r.cancelTasks(ctx, tx, tenantId, tasks, reasons) + res, err := r.cancelTasks(ctx, tx, tenantId, tasks) if err != nil { err = fmt.Errorf("failed to cancel tasks: %w", err) @@ -1079,7 +1079,7 @@ func (r *TaskRepositoryImpl) CancelTasks(ctx context.Context, tenantId uuid.UUID return res, nil } -func (r *sharedRepository) cancelTasks(ctx context.Context, dbtx sqlcv1.DBTX, tenantId uuid.UUID, tasks []TaskIdInsertedAtRetryCount, reasons map[int64]string) (*FinalizedTaskResponse, error) { +func (r *sharedRepository) cancelTasks(ctx context.Context, dbtx sqlcv1.DBTX, tenantId uuid.UUID, tasks []TaskIdInsertedAtRetryCount) (*FinalizedTaskResponse, error) { // get a unique set of task ids and retry counts tasks = uniqueSet(tasks) @@ -1093,13 +1093,9 @@ func (r *sharedRepository) cancelTasks(ctx context.Context, dbtx sqlcv1.DBTX, te outputs := make([][]byte, len(releasedTasks)) for i, releasedTask := range releasedTasks { - out := NewCancelledTaskOutputEvent(releasedTask) + out := NewCancelledTaskOutputEvent(releasedTask).Bytes() - if reason, ok := reasons[releasedTask.ID]; ok { - out.ErrorMessage = reason - } - - outputs[i] = out.Bytes() + outputs[i] = out } internalEvents, err := r.createTaskEventsAfterRelease( From d6fd8e8fd52247d65a6f81bdea72e238c4eaef48 Mon Sep 17 00:00:00 2001 From: jishnundth Date: Tue, 19 May 2026 16:09:59 +0000 Subject: [PATCH 4/4] fix: int env parsing, reject malformed values --- .../util/config-loader/config-loader.test.ts | 14 ++++++++++++++ .../src/util/config-loader/config-loader.ts | 17 +++++++++++------ 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/sdks/typescript/src/util/config-loader/config-loader.test.ts b/sdks/typescript/src/util/config-loader/config-loader.test.ts index 84ce533ffa..d167a3ff9b 100644 --- a/sdks/typescript/src/util/config-loader/config-loader.test.ts +++ b/sdks/typescript/src/util/config-loader/config-loader.test.ts @@ -50,6 +50,20 @@ describe('ConfigLoader', () => { }); }); + it('should throw on a malformed grpc max recv message length env var', () => { + process.env.HATCHET_CLIENT_GRPC_MAX_RECV_MESSAGE_LENGTH = '4mb'; + expect(() => ConfigLoader.loadClientConfig()).toThrow( + /HATCHET_CLIENT_GRPC_MAX_RECV_MESSAGE_LENGTH.*"4mb".*positive integer/ + ); + }); + + it('should throw on a malformed grpc max send message length env var', () => { + process.env.HATCHET_CLIENT_GRPC_MAX_SEND_MESSAGE_LENGTH = '4mb'; + expect(() => ConfigLoader.loadClientConfig()).toThrow( + /HATCHET_CLIENT_GRPC_MAX_SEND_MESSAGE_LENGTH.*"4mb".*positive integer/ + ); + }); + it('should throw an error if the file is not found', () => { expect(() => ConfigLoader.loadClientConfig( diff --git a/sdks/typescript/src/util/config-loader/config-loader.ts b/sdks/typescript/src/util/config-loader/config-loader.ts index ae6048b4bd..cb23390c0f 100644 --- a/sdks/typescript/src/util/config-loader/config-loader.ts +++ b/sdks/typescript/src/util/config-loader/config-loader.ts @@ -110,13 +110,13 @@ export class ConfigLoader { const grpcMaxRecvMessageLength = override?.grpc_max_recv_message_length ?? yaml?.grpc_max_recv_message_length ?? - this.parseIntEnv(this.env('HATCHET_CLIENT_GRPC_MAX_RECV_MESSAGE_LENGTH')) ?? + this.parseIntEnv('HATCHET_CLIENT_GRPC_MAX_RECV_MESSAGE_LENGTH') ?? 4 * 1024 * 1024; const grpcMaxSendMessageLength = override?.grpc_max_send_message_length ?? yaml?.grpc_max_send_message_length ?? - this.parseIntEnv(this.env('HATCHET_CLIENT_GRPC_MAX_SEND_MESSAGE_LENGTH')) ?? + this.parseIntEnv('HATCHET_CLIENT_GRPC_MAX_SEND_MESSAGE_LENGTH') ?? 4 * 1024 * 1024; return { @@ -138,10 +138,15 @@ export class ConfigLoader { }; } - private static parseIntEnv(value: string | undefined): number | undefined { - if (!value) return undefined; - const parsed = parseInt(value, 10); - return Number.isNaN(parsed) ? undefined : parsed; + private static parseIntEnv(envName: EnvVars): number | undefined { + const value = this.env(envName); + if (value === undefined || value === '') return undefined; + if (!/^\d+$/.test(value.trim())) { + throw new Error( + `Invalid value for ${envName}: "${value}". Expected a positive integer.` + ); + } + return parseInt(value, 10); } private static parseJsonArray(value: string): string[] {