Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .autover/changes/e1a240df-673e-4a7d-af74-197103533038.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"Projects": [
{
"Name": "Amazon.Lambda.RuntimeSupport",
"Type": "Minor",
"ChangelogMessages": [
"Add LambdaBootstrapBuilder.Create(Func<Stream, ILambdaContext, Task<Stream>>, ILambdaSerializer) overload (and matching HandlerWrapper.GetHandlerWrapper) so stream-stream handlers can expose a serializer via ILambdaContext.Serializer. Enables frameworks that own envelope (de)serialization but delegate inner-payload (de)serialization to a user-supplied serializer."
]
}
]
}
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ global.json

**/cdk.out/**
**/.DS_Store

# JetBrains Rider per-project cache
**/*.lscache
314 changes: 196 additions & 118 deletions Docs/durable-execution-design.md

Large diffs are not rendered by default.

62 changes: 61 additions & 1 deletion Libraries/Libraries.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 18
VisualStudioVersion = 18.5.11709.299 stable
VisualStudioVersion = 18.5.11709.299
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{AAB54E74-20B1-42ED-BC3D-CE9F7BC7FD12}"
EndProject
Expand Down Expand Up @@ -155,6 +155,14 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ResponseStreamingFunctionHa
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AspNetCoreStreamingApiGatewayTest", "test\Amazon.Lambda.RuntimeSupport.Tests\AspNetCoreStreamingApiGatewayTest\AspNetCoreStreamingApiGatewayTest.csproj", "{0768FA72-CF49-2B59-BC4C-E4CE579E5D93}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Amazon.Lambda.DurableExecution", "src\Amazon.Lambda.DurableExecution\Amazon.Lambda.DurableExecution.csproj", "{9097B5A4-E100-47FD-A676-0B666A36FAFF}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Amazon.Lambda.DurableExecution.Tests", "test\Amazon.Lambda.DurableExecution.Tests\Amazon.Lambda.DurableExecution.Tests.csproj", "{57150BA6-3826-431F-8F58-B1D11FAFC5D4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Amazon.Lambda.DurableExecution.IntegrationTests", "test\Amazon.Lambda.DurableExecution.IntegrationTests\Amazon.Lambda.DurableExecution.IntegrationTests.csproj", "{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Amazon.Lambda.DurableExecution.AotPublishTest", "test\Amazon.Lambda.DurableExecution.AotPublishTest\Amazon.Lambda.DurableExecution.AotPublishTest.csproj", "{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -969,6 +977,54 @@ Global
{0768FA72-CF49-2B59-BC4C-E4CE579E5D93}.Release|x64.Build.0 = Release|Any CPU
{0768FA72-CF49-2B59-BC4C-E4CE579E5D93}.Release|x86.ActiveCfg = Release|Any CPU
{0768FA72-CF49-2B59-BC4C-E4CE579E5D93}.Release|x86.Build.0 = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|x64.ActiveCfg = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|x64.Build.0 = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|x86.ActiveCfg = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Debug|x86.Build.0 = Debug|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|Any CPU.Build.0 = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|x64.ActiveCfg = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|x64.Build.0 = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|x86.ActiveCfg = Release|Any CPU
{9097B5A4-E100-47FD-A676-0B666A36FAFF}.Release|x86.Build.0 = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|x64.ActiveCfg = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|x64.Build.0 = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|x86.ActiveCfg = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Debug|x86.Build.0 = Debug|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|Any CPU.Build.0 = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|x64.ActiveCfg = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|x64.Build.0 = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|x86.ActiveCfg = Release|Any CPU
{57150BA6-3826-431F-8F58-B1D11FAFC5D4}.Release|x86.Build.0 = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|x64.ActiveCfg = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|x64.Build.0 = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|x86.ActiveCfg = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Debug|x86.Build.0 = Debug|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|Any CPU.Build.0 = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|x64.ActiveCfg = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|x64.Build.0 = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|x86.ActiveCfg = Release|Any CPU
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27}.Release|x86.Build.0 = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|x64.ActiveCfg = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|x64.Build.0 = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|x86.ActiveCfg = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Debug|x86.Build.0 = Debug|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|Any CPU.Build.0 = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|x64.ActiveCfg = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|x64.Build.0 = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|x86.ActiveCfg = Release|Any CPU
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -1045,6 +1101,10 @@ Global
{80594C21-C6EB-469E-83CC-68F9F661CA5E} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
{E404A7AC-812B-BC03-CA76-02C0BC2BA7F9} = {B5BD0336-7D08-492C-8489-42C987E29B39}
{0768FA72-CF49-2B59-BC4C-E4CE579E5D93} = {B5BD0336-7D08-492C-8489-42C987E29B39}
{9097B5A4-E100-47FD-A676-0B666A36FAFF} = {AAB54E74-20B1-42ED-BC3D-CE9F7BC7FD12}
{57150BA6-3826-431F-8F58-B1D11FAFC5D4} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
{CA132CAB-FF4F-4312-B3A3-66DE9D360F27} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
{16B1B1CC-3AFC-4DC7-8DB6-D14AE12924A2} = {1DE4EE60-45BA-4EF7-BE00-B9EB861E4C69}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {503678A4-B8D1-4486-8915-405A3E9CF0EB}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
<EnableTrimAnalyzer>true</EnableTrimAnalyzer>
<Nullable>enable</Nullable>
<ImplicitUsings>enable</ImplicitUsings>
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
<WarningsAsErrors>IL2026,IL2067,IL2075,IL3050</WarningsAsErrors>
<!-- DurableExecution intentionally consumes the preview ILambdaContext.Serializer
API. The whole package is in development (0.x), so suppressing project-wide
is appropriate; downstream users still see AWSLAMBDA001 in their own code. -->
<NoWarn>$(NoWarn);AWSLAMBDA001</NoWarn>
</PropertyGroup>

<ItemGroup>
Expand Down

This file was deleted.

130 changes: 130 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
using Amazon.Lambda.Core;
using Amazon.Lambda.DurableExecution.Internal;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// Implementation of <see cref="IDurableContext"/>. Constructs and dispatches
/// per-operation classes (<see cref="StepOperation{T}"/>, <see cref="WaitOperation"/>);
/// the replay logic lives in those classes.
/// </summary>
internal sealed class DurableContext : IDurableContext
{
private readonly ExecutionState _state;
private readonly TerminationManager _terminationManager;
private readonly OperationIdGenerator _idGenerator;
private readonly string _durableExecutionArn;
private readonly CheckpointBatcher? _batcher;

public DurableContext(
ExecutionState state,
TerminationManager terminationManager,
OperationIdGenerator idGenerator,
string durableExecutionArn,
ILambdaContext lambdaContext,
CheckpointBatcher? batcher = null)
{
_state = state;
_terminationManager = terminationManager;
_idGenerator = idGenerator;
_durableExecutionArn = durableExecutionArn;
_batcher = batcher;
LambdaContext = lambdaContext;
}

// Replay-safe logger ships in a follow-up PR; see IDurableContext.Logger doc.
public ILogger Logger => NullLogger.Instance;
Comment thread
GarrettBeatty marked this conversation as resolved.
public IExecutionContext ExecutionContext => new DurableExecutionContext(_durableExecutionArn);
public ILambdaContext LambdaContext { get; }
Comment thread
GarrettBeatty marked this conversation as resolved.

public Task<T> StepAsync<T>(
Func<IStepContext, Task<T>> func,
string? name = null,
StepConfig? config = null,
CancellationToken cancellationToken = default)
=> RunStep(func, name, config, cancellationToken);

public async Task StepAsync(
Func<IStepContext, Task> func,
string? name = null,
StepConfig? config = null,
CancellationToken cancellationToken = default)
{
// Void steps don't carry a meaningful payload — wrap with an object?-typed
// step that always returns null. The serializer isn't actually invoked
// with a non-null value, so any registered ILambdaSerializer suffices.
await RunStep<object?>(
async (ctx) => { await func(ctx); return null; },
name, config, cancellationToken);
}

private Task<T> RunStep<T>(
Func<IStepContext, Task<T>> func,
string? name,
StepConfig? config,
CancellationToken cancellationToken)
{
var serializer = LambdaContext.Serializer
?? throw new InvalidOperationException(
"No ILambdaSerializer is registered on ILambdaContext.Serializer. " +
"In the class library programming model, register one with " +
"[assembly: LambdaSerializer(typeof(...))]. In an executable / custom " +
"runtime, pass it to LambdaBootstrapBuilder.Create(handler, serializer). " +
"In tests, set TestLambdaContext.Serializer.");

var operationId = _idGenerator.NextId();
var op = new StepOperation<T>(
operationId, name, func, config, serializer, Logger,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}

public Task WaitAsync(
TimeSpan duration,
string? name = null,
CancellationToken cancellationToken = default)
{
// Service timer granularity is 1 second; sub-second waits would round to 0.
// WaitOptions.WaitSeconds is integer in [1, 31_622_400] (1 second to ~1 year).
if (duration < TimeSpan.FromSeconds(1))
throw new ArgumentOutOfRangeException(nameof(duration), duration, "Wait duration must be at least 1 second.");

if (duration > TimeSpan.FromSeconds(31_622_400))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be validating this on our end?

throw new ArgumentOutOfRangeException(nameof(duration), duration, "Wait duration must be at most 31,622,400 seconds (~1 year).");

cancellationToken.ThrowIfCancellationRequested();

var operationId = _idGenerator.NextId();
var waitSeconds = (int)Math.Max(1, Math.Ceiling(duration.TotalSeconds));
Comment thread
GarrettBeatty marked this conversation as resolved.
var op = new WaitOperation(
operationId, name, waitSeconds,
_state, _terminationManager, _durableExecutionArn, _batcher);
return op.ExecuteAsync(cancellationToken);
}
}

internal sealed class DurableExecutionContext : IExecutionContext
{
public DurableExecutionContext(string durableExecutionArn)
{
DurableExecutionArn = durableExecutionArn;
}

public string DurableExecutionArn { get; }
}

internal sealed class StepContext : IStepContext
{
public StepContext(string operationId, int attemptNumber, ILogger logger)
{
OperationId = operationId;
AttemptNumber = attemptNumber;
Logger = logger;
}

public ILogger Logger { get; }
public int AttemptNumber { get; }
public string OperationId { get; }
}
112 changes: 112 additions & 0 deletions Libraries/src/Amazon.Lambda.DurableExecution/DurableEntryPoint.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
using System.IO;
using System.Text;
using System.Text.Json;
using System.Threading;
using Amazon.Lambda;
using Amazon.Lambda.Core;
using Amazon.Lambda.DurableExecution.Internal;
using Amazon.Lambda.DurableExecution.Services;
using Amazon.Lambda.Model;
using Amazon.Runtime;

namespace Amazon.Lambda.DurableExecution;

/// <summary>
/// AOT-friendly entry point for a durable workflow. Owns (de)serialization of
/// the wire envelope so users only register their own POCO types in their
/// <c>JsonSerializerContext</c> — the library's <see cref="DurableEnvelopeJsonContext"/>
/// handles envelope JSON, the user's <see cref="ILambdaSerializer"/> (read from
/// <see cref="ILambdaContext.Serializer"/>) handles only <typeparamref name="TInput"/>
/// and <typeparamref name="TOutput"/>.
/// </summary>
/// <typeparam name="TInput">The workflow's input payload type.</typeparam>
/// <typeparam name="TOutput">The workflow's return type.</typeparam>
/// <example>
/// <code>
/// private static readonly DurableEntryPoint&lt;OrderEvent, OrderResult&gt; _entry = new(MyWorkflow);
///
/// static async Task Main()
/// {
/// await LambdaBootstrapBuilder
/// .Create(_entry.InvokeAsync, new SourceGeneratorLambdaJsonSerializer&lt;MyJsonContext&gt;())
/// .Build()
/// .RunAsync();
/// }
/// </code>
/// </example>
public sealed class DurableEntryPoint<TInput, TOutput>
{
private static readonly Lazy<IAmazonLambda> _cachedLambdaClient =
new(() => new AmazonLambdaClient(), LazyThreadSafetyMode.ExecutionAndPublication);

private readonly Func<TInput, IDurableContext, Task<TOutput>> _workflow;
private readonly IAmazonLambda _lambdaClient;

/// <summary>
/// Creates an entry point that uses a default <see cref="AmazonLambdaClient"/>,
/// constructed lazily and cached process-wide.
/// </summary>
public DurableEntryPoint(Func<TInput, IDurableContext, Task<TOutput>> workflow)
: this(workflow, _cachedLambdaClient.Value)
{
}

/// <summary>
/// Creates an entry point that uses the supplied <see cref="IAmazonLambda"/> client
/// for checkpoint and state-fetch calls.
/// </summary>
public DurableEntryPoint(Func<TInput, IDurableContext, Task<TOutput>> workflow, IAmazonLambda lambdaClient)
{
_workflow = workflow ?? throw new ArgumentNullException(nameof(workflow));
_lambdaClient = lambdaClient ?? throw new ArgumentNullException(nameof(lambdaClient));
}

/// <summary>
/// Lambda handler entry point. Register this method with <c>LambdaBootstrapBuilder</c>
/// alongside an <see cref="ILambdaSerializer"/> that knows how to (de)serialize
/// <typeparamref name="TInput"/> / <typeparamref name="TOutput"/>.
/// </summary>
public async Task<Stream> InvokeAsync(Stream input, ILambdaContext context)
{
var output = await DurableEntryPointCore.InvokeAsync(_workflow, input, context, _lambdaClient);
var ms = new MemoryStream();
JsonSerializer.Serialize(ms, output, DurableEnvelopeJsonContext.Default.DurableExecutionInvocationOutput);
ms.Position = 0;
return ms;
}
}

/// <summary>
/// AOT-friendly entry point for a void durable workflow.
/// See <see cref="DurableEntryPoint{TInput,TOutput}"/> for details.
/// </summary>
public sealed class DurableEntryPoint<TInput>
{
private readonly DurableEntryPoint<TInput, object?> _inner;

/// <summary>
/// Creates an entry point that uses a default <see cref="AmazonLambdaClient"/>,
/// constructed lazily and cached process-wide.
/// </summary>
public DurableEntryPoint(Func<TInput, IDurableContext, Task> workflow)
{
if (workflow == null) throw new ArgumentNullException(nameof(workflow));
_inner = new DurableEntryPoint<TInput, object?>(async (i, c) => { await workflow(i, c); return null; });
}

/// <summary>
/// Creates an entry point that uses the supplied <see cref="IAmazonLambda"/> client
/// for checkpoint and state-fetch calls.
/// </summary>
public DurableEntryPoint(Func<TInput, IDurableContext, Task> workflow, IAmazonLambda lambdaClient)
{
if (workflow == null) throw new ArgumentNullException(nameof(workflow));
_inner = new DurableEntryPoint<TInput, object?>(
async (i, c) => { await workflow(i, c); return null; },
lambdaClient);
}

/// <inheritdoc cref="DurableEntryPoint{TInput,TOutput}.InvokeAsync"/>
public Task<Stream> InvokeAsync(Stream input, ILambdaContext context)
=> _inner.InvokeAsync(input, context);
}
Loading
Loading