Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,27 @@ func TestFailureHang(t *testing.T) {
}
}

func TestFailure_SplitError(t *testing.T) {
initRunner(t)

p, s := beam.NewPipelineWithRoot()
configs := beam.Create(s, SourceConfig{NumElements: 100, InitialSplits: 1})
col := beam.ParDo(s, &slowFailSDF{}, configs)
beam.ParDo(s, &int64Check{Name: "sdf_fail", Want: []int{}}, col)

// Set a short timeout so the test fails quickly if the pipeline hangs due to split error handling bug
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

_, err := executeWithT(ctx, t, p)
if err == nil {
t.Fatalf("expected pipeline failure, but got a success")
}
if want := "intentional split error from tracker"; !strings.Contains(err.Error(), want) {
t.Fatalf("expected pipeline failure with %q, but was %v", want, err)
}
}

func TestRunner_Passert(t *testing.T) {
initRunner(t)
tests := []struct {
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ progress:
sr, err := b.Split(ctx, wk, 0.5 /* fraction of remainder */, nil /* allowed splits */)
if err != nil {
slog.Warn("SDK Error from split, aborting splits and failing bundle", "bundle", rb, "error", err.Error())
if b.BundleErr != nil {
if b.BundleErr == nil {
b.BundleErr = err
}
Comment thread
shunping marked this conversation as resolved.
Outdated
return b.BundleErr
Expand Down
35 changes: 35 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func init() {
register.Function2x1(combineIntSum)

register.DoFn3x1[*sdf.LockRTracker, SourceConfig, func(int64), error]((*intRangeFn)(nil))
register.DoFn4x1[context.Context, *sdf.LockRTracker, SourceConfig, func(int64), error]((*slowFailSDF)(nil))
register.Emitter1[int64]()
register.Emitter2[int64, int64]()
}
Expand Down Expand Up @@ -404,3 +405,37 @@ func (fn *selfCheckpointingDoFn) ProcessElement(rt *sdf.LockRTracker, _ []byte,
}
}
}

type errorSplitTracker struct {
*offsetrange.Tracker
}

func (t *errorSplitTracker) TrySplit(fraction float64) (any, any, error) {
return nil, nil, fmt.Errorf("intentional split error from tracker")
}

type slowFailSDF struct{}

func (fn *slowFailSDF) CreateInitialRestriction(config SourceConfig) offsetrange.Restriction {
return offsetrange.Restriction{Start: 0, End: config.NumElements}
}

func (fn *slowFailSDF) SplitRestriction(config SourceConfig, rest offsetrange.Restriction) []offsetrange.Restriction {
return rest.EvenSplits(config.InitialSplits)
}

func (fn *slowFailSDF) RestrictionSize(_ SourceConfig, rest offsetrange.Restriction) float64 {
return rest.Size()
}

func (fn *slowFailSDF) CreateTracker(rest offsetrange.Restriction) *sdf.LockRTracker {
return sdf.NewLockRTracker(&errorSplitTracker{Tracker: offsetrange.NewTracker(rest)})
}

func (fn *slowFailSDF) ProcessElement(ctx context.Context, rt *sdf.LockRTracker, config SourceConfig, emit func(int64)) error {
fmt.Println("DEBUG: slowFailSDF.ProcessElement invoked")
Comment thread
shunping marked this conversation as resolved.
Outdated
for i := rt.GetRestriction().(offsetrange.Restriction).Start; rt.TryClaim(i); i++ {
<-ctx.Done()
}
return nil
}
19 changes: 19 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type B struct {
OutputData engine.TentativeData

Resp chan *fnpb.ProcessBundleResponse
Done chan struct{}
BundleErr error
Comment thread
shunping marked this conversation as resolved.
Outdated
responded bool

Expand All @@ -80,6 +81,7 @@ func (b *B) Init() {
close(b.DataWait) // Can happen if there are no outputs for the bundle.
}
b.Resp = make(chan *fnpb.ProcessBundleResponse, 1)
b.Done = make(chan struct{})
}

// DataOrTimerDone indicates a final element has been received from a Data or Timer output.
Expand All @@ -102,7 +104,11 @@ func (b *B) Respond(resp *fnpb.InstructionResponse) {
return
}
b.responded = true
if b.Done != nil {
close(b.Done)
}
if resp.GetError() != "" {
slog.Error("DEBUG: Prism received bundle error from worker response", "bundle", resp.GetInstructionId())
b.BundleErr = fmt.Errorf("bundle %v %v failed:%v", resp.GetInstructionId(), b.PBDID, resp.GetError())
Comment thread
shunping marked this conversation as resolved.
Outdated
close(b.Resp)
return
Expand Down Expand Up @@ -143,6 +149,13 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{} {
b.DataOrTimerDone()
}
return b.DataWait
case <-b.Done:
// The bundle completed/failed before req was sent.
outCap := b.OutputCount + len(b.HasTimers)
for i := 0; i < outCap; i++ {
b.DataOrTimerDone()
}
return b.DataWait
case wk.InstReqs <- req:
// desired outcome
}
Expand Down Expand Up @@ -181,6 +194,9 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{} {
case <-ctx.Done():
b.DataOrTimerDone()
return b.DataWait
case <-b.Done:
b.DataOrTimerDone()
return b.DataWait
case wk.DataReqs <- elms:
}
}
Expand All @@ -202,6 +218,9 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan struct{} {
case <-ctx.Done():
b.DataOrTimerDone()
return b.DataWait
case <-b.Done:
b.DataOrTimerDone()
return b.DataWait
case wk.DataReqs <- &fnpb.Elements{
Timers: timers,
Data: []*fnpb.Elements_Data{
Expand Down
67 changes: 67 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,73 @@ func TestWorker_State_MultimapSideInput(t *testing.T) {
}
}

// TestBundle_ProcessOn_WorkerFailure verifies that the runner does not deadlock when
// a worker fails mid-bundle and stops reading elements from the Data plane stream.
func TestBundle_ProcessOn_WorkerFailure(t *testing.T) {
ctx, wk, clientConn := serveTestWorker(t)

dataCli := fnpb.NewBeamFnDataClient(clientConn)
dataStream, err := dataCli.Data(ctx)
if err != nil {
t.Fatal("couldn't create data client:", err)
}

instID := wk.NextInst()

// Create 15 large input blocks (512 KB each) to saturate the 10-slot channel buffer
// and the gRPC flow control window, forcing the Data sender inside worker.go to block.
largeBytes := make([]byte, 512*1024)
var inputBlocks []*engine.Block
for i := 0; i < 15; i++ {
inputBlocks = append(inputBlocks, &engine.Block{
Kind: engine.BlockData,
Bytes: [][]byte{largeBytes},
})
}

b := &B{
InstID: instID,
PBDID: "teststageID",
Input: inputBlocks,
OutputCount: 1,
}
b.Init()
wk.activeInstructions[instID] = b

processOnDone := make(chan struct{})
go func() {
b.ProcessOn(ctx, wk)
close(processOnDone)
}()

// Send the initial process bundle request trigger.
wk.InstReqs <- &fnpb.InstructionRequest{
InstructionId: instID,
}

// Read only the first block to simulate worker processing start.
_, err = dataStream.Recv()
if err != nil {
t.Fatal("couldn't receive first data element:", err)
}

// Simulate worker failure by responding with an error on the Control channel.
// Without the fix, ProcessOn's background goroutine deadlocks at `wk.DataReqs <- elms`
// because the client stopped reading and the buffer/flow-control is saturated.
wk.activeInstructions[instID].Respond(&fnpb.InstructionResponse{
InstructionId: instID,
Error: "Intentional worker failure",
})

// Verify that ProcessOn exits cleanly and does not deadlock/hang.
select {
case <-processOnDone:
// Test passed: ProcessOn exited successfully!
case <-time.After(10 * time.Second):
t.Fatal("ProcessOn deadlocked / hung after worker failure!")
}
}

func newWorker() *W {
mw := &MultiplexW{
pool: map[string]*W{},
Expand Down
Loading