Skip to content

[SPARK-56903][SQL] Spread NULL outer join keys across shuffle partitions#55927

Closed
sunchao wants to merge 14 commits into
apache:masterfrom
sunchao:dev/chao/codex/null-aware-outer-join-apache
Closed

[SPARK-56903][SQL] Spread NULL outer join keys across shuffle partitions#55927
sunchao wants to merge 14 commits into
apache:masterfrom
sunchao:dev/chao/codex/null-aware-outer-join-apache

Conversation

@sunchao
Copy link
Copy Markdown
Member

@sunchao sunchao commented May 17, 2026

What changes were proposed in this pull request?

This PR reduces shuffle skew for null-heavy shuffled outer equi-joins.

For LEFT OUTER, RIGHT OUTER, and FULL OUTER joins, preserved rows with a NULL
shuffle key may not need to stay concentrated on one reducer. Today those rows can all
collapse into the same shuffle partition, which creates avoidable skew on NULL-heavy inputs.

This change adds a feature-flagged null-aware shuffle partitioning mode for shuffled outer
joins:

  • Non-NULL shuffle keys keep the existing hash partitioning behavior.
  • Rows with any NULL shuffle key are spread across reducers instead of collapsing into one
    partition.
  • The behavior is disabled by default behind
    spark.sql.shuffle.spreadNullJoinKeys.enabled.
  • The optimization is considered only for LEFT OUTER, RIGHT OUTER, and FULL OUTER
    equi-joins whose preserved side has nullable join keys.

Spreading remains result-safe for null-safe equality (<=>) outer joins:

  • For ordinary extracted <=> join keys, Spark rewrites them into non-null shuffle-key
    expressions using coalesce(...) and isnull(...), so there are no NULL shuffle keys for
    this feature to redistribute.
  • The only remaining corner is NullType, where the shuffle key can still be NULL. In that
    case, shuffled join execution already treats the row as unmatched, so redistributing those
    rows does not change query results.

The implementation wires this through the planner and runtime pieces that need to understand
the new partitioning contract:

  • ClusteredDistribution can opt into null-aware spreading.
  • New null-aware partitioning and shuffle-spec variants preserve compatibility checks without
    pretending to satisfy ordinary clustered distributions.
  • Shuffle execution spreads unmatched NULL keys while preserving retry safety.
  • AQE/coalesced shuffle reads preserve the new partitioning shape.

When the feature flag is enabled, the null-aware join output partitioning intentionally does not
satisfy a strict ClusteredDistribution. That can require an extra downstream shuffle for
grouping, windowing, or another equi-join on the same key. Also, if one side is already hash
partitioned, only the other side may be reshuffled into the null-aware layout, so the
pre-shuffled side can keep its NULL skew.

This PR intentionally stays scoped to outer joins. Left anti joins may also have skewed
preserved-side NULL rows for ordinary = predicates and are worth evaluating separately, but
they need their own correctness and planning review rather than being folded into this patch.

Why are the changes needed?

Outer joins can preserve large numbers of unmatched rows from the outer side. When many of those
rows have NULL shuffle keys, sending them all to one reducer creates skew even though they do
not require one shared reducer for correctness.

Example:

SELECT *
FROM fact f
LEFT OUTER JOIN dim d
  ON f.k = d.k

If fact.k contains many NULL values, those rows must remain in the result as unmatched
left-side rows, but they do not need to be grouped together for correctness. Spreading them
reduces needless reducer concentration while leaving normal key matching unchanged.

Does this PR introduce any user-facing change?

Yes, in execution behavior only. Query results are unchanged, but when the feature flag is
enabled, shuffle partitioning for eligible NULL-heavy outer equi-joins becomes less skewed.

How was this patch tested?

  • Added and updated unit tests covering outer-join planning, FULL OUTER JOIN result correctness
    with NULL keys, null-safe outer-join behavior, shuffle-level NULL spreading, retry
    determinism, shuffle-spec compatibility, and AQE preservation of null-aware coalesced reads.
  • Ran focused plan-stability verification for the affected TPC-DS cases locally.
  • Ran git diff --check.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Codex GPT-5

Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make the CI happy, @sunchao .

[info] *** 18 TESTS FAILED ***
[error] Failed: Total 6146, Failed 18, Errors 0, Passed 6128, Ignored 4
[error] Failed tests:
[error] 	org.apache.spark.sql.TPCDSV1_4_PlanStabilitySuite
[error] 	org.apache.spark.sql.TPCDSV2_7_PlanStabilityWithStatsSuite
[error] 	org.apache.spark.sql.TPCDSV1_4_PlanStabilityWithStatsSuite
[error] 	org.apache.spark.sql.TPCDSV2_7_PlanStabilitySuite
[error] (sql / Test / test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 3764 s (01:02:44.0), completed May 17, 2026, 4:30:13 AM

@sunchao
Copy link
Copy Markdown
Member Author

sunchao commented May 17, 2026

Thanks @dongjoon-hyun , fixed.

@sunchao
Copy link
Copy Markdown
Member Author

sunchao commented May 17, 2026

Also cc @cloud-fan @viirya @peter-toth to review

keys.exists(_.exists(_.isInstanceOf[IsNull]))
}

private lazy val canSpreadNullJoinKeys: Boolean = {
Copy link
Copy Markdown
Contributor

@peter-toth peter-toth May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this robust enough? What if someone crafts a null handling join condition by hand?

Actually, this looks good.

Actually, why this is needed at all and when can't we spread nulls?
<=> is translated to 2 key pairs Coalesce(a.k, default), Coalesce(b.k, default)) and (IsNull(a.k), IsNull(b.k)), so null never shows up in shuffle keys. The join type check seems fair enough.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For most types the coalesce key is non-null, but Literal.default(NullType) is itself null, so it seems the extracted shuffle key can still contain nulls even though those rows remain matchable under <=>.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without spreading, NullType <=> keys all hash to the same value (Murmur3Hash(null) is deterministic) → all NULL rows collocate on one reducer. The executor then runs:

  • SortMergeJoinExec.scala:1116: while (advancedStreamed() && streamedRowKey.anyNull) — skip every NULL-keyed streamed row.
  • SortMergeJoinExec.scala:1529: in full-outer, leftRowKey.anyNull triggers padding emission, never a match.

So even with NULL rows colocated, the executor's anyNull guard prevents NULL=NULL from matching. The <=> semantics the user wanted (NULL matches NULL) is never delivered for NullType — the rewrite was supposed to convert NULLs
to non-null sentinels so the executor's guard wouldn't fire, but for NullType the sentinel itself is NULL, so the guard fires anyway and the join produces only padding (full outer) or nothing (inner).

With spreading, NULL rows scatter across reducers. Each reducer's executor sees some NULL rows from both sides. The anyNull guard fires the same way. Same padding emission, same lack of matching.

Output is identical with or without spreading — both produce the broken-but-self-consistent "NULL=NULL doesn't match" behavior for NullType.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm that is a good point. It seems the check is indeed unnecessary then, let me remove it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't forget to update the PR description and let's leave some comments here why spreading nulls is safe in <=> outer joins.
I wonder if left anti join could also benefit from the feature.

Copy link
Copy Markdown
Member Author

@sunchao sunchao May 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR description.

As for left anti join, yes, ordinary shuffled left anti equi-joins with = could likely benefit for the same reason as outer joins: preserved left-side rows with NULL join keys are guaranteed not to match, so concentrating them on one reducer is unnecessary. I kept this PR scoped to outer joins for now to avoid broadening the change.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Sure, handling outer joins in this PR is a nice improvement.

Copy link
Copy Markdown
Contributor

@peter-toth peter-toth left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @sunchao, let me do another round:

Summary

Adds an opt-in null-aware shuffle partitioning for outer equi-joins to break up the skew that occurs when many preserved-side rows share a NULL join key. With the flag on, only NULL-keyed rows are redistributed; non-NULL keys keep the existing hash contract.

Prior state. Outer-join inputs ship through HashPartitioning. For null-heavy keys, Murmur3Hash(null) is fixed, so all NULL-keyed rows from one side land on a single reducer; the unmatched-NULLs become a hot partition that pre-shuffle skew handling cannot help — the imbalance is in the hash distribution itself, not in any data sketch the skew detector measures.

Design approach. Limit the new behavior to LEFT/RIGHT/FULL OUTER (where preserved-NULL rows can never match under = semantics) and gate it behind spark.sql.shuffle.spreadNullJoinKeys.enabled. When opted in, the join asks for ClusteredDistribution(_, allowNullKeySpreading = true), the planner satisfies that via a new NullAwareHashPartitioning, and the partitioner spreads any anyNull() row round-robin within a map task while keeping non-NULL rows on their normal hash partition. <=> correctness is preserved because the existing coalesce(...)/isnull(...) rewrite eliminates NULLs from the shuffle keys before this code path sees them; the only residual case (NullType) is unaffected because the executor's anyNull skip in SortMergeJoinExec already blocks NULL=NULL matches independent of where the rows physically land.

Key design decisions.

  • NullAwareHashPartitioning does not satisfy an ordinary ClusteredDistribution — only one with allowNullKeySpreading = true. This is the load-bearing invariant: downstream operators that need NULL co-location (aggregates, non-spreading joins) correctly trigger a re-shuffle. The AdaptiveQueryExecSuite diff confirms the cost — optimizeOutRepartition no longer fires for repartitions sitting above a null-aware-shuffled join.
  • HashShuffleSpec and NullAwareHashShuffleSpec are made mutually compatible only when both distributions opt in. Without this, the asymmetric case where one input already arrives as HashPartitioning(k) would force a redundant left-side shuffle.
  • Stateful per-row partition assignment reuses the SPARK-23207 mechanism: sortBeforeRepartition + isOrderSensitive. The path is structurally parallel to RoundRobinPartitioning, so retry semantics are inherited rather than re-derived.
  • The NullType shuffle-key guard added in earlier revisions was deliberately removed in commit 7c760ec after analysis showed SortMergeJoinExec's anyNull skip already makes spreading result-equivalent in that corner.

Implementation sketch. Partitioning hierarchy (partitioning.scala: new NullAwareHashPartitioning, CoalescedNullAwareHashPartitioning, NullAwareHashShuffleSpec, plus a HashShuffleSpecCompatibility helper to deduplicate the now-shared compatibility check). Shuffle path (ShuffleExchangeExec: new partitioner case spreads NULLs round-robin per map task; deterministic-local-sort guard widened to cover the new partitioning). Join trait (ShuffledJoin: gate-controlled opt-in via the feature flag). AQE path (AQEShuffleReadExec wraps coalesced specs into CoalescedNullAwareHashPartitioning). Config: SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED with ConfigBindingPolicy.SESSION.


override def expressions: Seq[Expression] = from.expressions

override def satisfies0(required: Distribution): Boolean = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This body is identical to NullAwareHashPartitioning.satisfies0 at line 340 — same outer UnspecifiedDistribution/AllTuples/_ => false match, same ClusteredDistribution inner match guarded on allowNullKeySpreading, same requireAllClusterKeys branching. This is the same kind of duplication addressed elsewhere in this PR by extracting HashShuffleSpecCompatibility.isCompatible (lines 944-955).

Two cleaner shapes:

  • Lift the inner block to a private helper, e.g. private def nullAwareSatisfies0(exprs, n, required) shared by both classes.
  • Or just delegate: since boundaries don't change satisfaction semantics for the allowNullKeySpreading contract, CoalescedNullAwareHashPartitioning.satisfies0(required) is essentially from.satisfies0(required) except for the AllTuples case where numPartitions differs — that single divergence is easy to handle inline.

Side note: both overrides skip the StatefulOpClusteredDistribution case that HashPartitioningLike.satisfies0 handles. Currently unreachable (streaming joins use StatefulOpClusteredDistribution, not ClusteredDistribution, so they never opt into allowNullKeySpreading), but a one-line comment that the omission is deliberate would help the next reader.

trait ShuffledJoin extends JoinCodegenSupport {
def isSkewJoin: Boolean

private lazy val canSpreadNullJoinKeys: Boolean = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The gate opts in based on join type alone, ignoring whether the shuffle keys are actually nullable. For an outer join on non-nullable keys (e.g. f.k = d.k where both k are NOT NULL — common after a NOT NULL filter or on schema-non-null columns), the new path:

  1. Adds a per-row joinKeys.anyNull() check in ShuffleExchangeExec.getPartitionKeyExtractor that always returns false.
  2. Produces NullAwareHashPartitioning as the join's output partitioning, which doesn't satisfy ordinary ClusteredDistribution. The AdaptiveQueryExecSuite diff in this PR (optimizeOutRepartition = false cases around lines 2079-2127) shows the cost — a downstream df.repartition($"b") is no longer collapsed even though the underlying NULL-skew problem can never have existed.

Two options worth considering:

  • Gate also on leftKeys.exists(_.nullable) || rightKeys.exists(_.nullable) so a non-nullable-key outer join falls back to plain HashPartitioning.
  • If the simpler shape is preferred, add a sentence to the lazy val's comment explicitly calling out the trade-off (skew reduction vs. potentially unnecessary downstream re-shuffle / lost optimizeOutRepartition) so future readers don't read it as an oversight.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Updated

}

test("null-aware hash shuffle preserves retry determinism with local sorting") {
withSQLConf(SQLConf.SORT_BEFORE_REPARTITION.key -> "true") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry-determinism for the new partitioning has two paths and only one is covered: with SORT_BEFORE_REPARTITION = true the local sort makes row order deterministic; with false, the code in ShuffleExchangeExec (line ~497-498) instead relies on isOrderSensitive = true to propagate the parent's determinism level. Only the sorted half is exercised here.

A second test running the same ShuffleExchangeExec(NullAwareHashPartitioning, ...) with SORT_BEFORE_REPARTITION = false and asserting that outputDeterministicLevel inherits the parent's level would catch a future regression that drops isNullAwareHashPartitioning from the isOrderSensitive clause (which would silently make retries unsafe for the unsorted path). Mirroring the structure of the existing test is enough — no new infrastructure required.

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

Prior state and problem. For shuffled outer equi-joins on ordinary =, NULL-keyed rows on the preserved side must appear in the output (that's the outer-join contract) but can never satisfy a.k = b.k (NULL = anything is unknown, not true). So those rows need to be emitted, but the join machinery doesn't need to match them with anything on the other side — they're carry-through rows.

This contrasts with what HashPartitioning actually does today: it sends every row with the same hash to the same reducer, including NULL keys. Murmur3Hash(null) is deterministic, so on NULL-heavy inputs the entire NULL cohort collapses onto one reducer, producing a textbook skewed-reducer pattern in the join's output stage even though no correctness contract requires those rows to be co-located.

Worth noting why this is uniquely an outer-join problem rather than a general equi-join one: for inner joins, InferFiltersFromConstraints pushes IsNotNull(k) to both sides, so NULL-keyed rows are filtered out before the shuffle and the skew never materializes. For outer joins, that pushdown is blocked on the preserved side(s) — left outer can push IsNotNull to the right but not the left, right outer is symmetric, full outer can push to neither. The skew lives exactly where the pushdown can't reach. (Same structural reason applies to left anti, which @peter-toth raised — worth framing the follow-up that way.)

Design approach. Recognize that what equi-joins actually need from their input distribution is strictly weaker than what aggregate / window / stateful joins need:

  • Aggregate / window / StatefulOpClusteredDistribution / bucketed writes — need "same-key co-location," NULLs included, because they consume the NULL group as a coherent unit.
  • Equi-join — only needs "same-non-NULL-key co-location," because the NULL cohort never participates in matching.

The PR introduces a layout that delivers the weaker contract: non-NULL keys hash as before; NULL keys are scattered round-robin within each map task. Implementation is gated behind spark.sql.shuffle.spreadNullJoinKeys.enabled (default off) and engaged by ShuffledJoin only for outer joins.

Result-safety for <=> (the concern @peter-toth dug into at length, worth restating). ExtractEquiJoinKeys rewrites a <=> b to (coalesce(a, default(T)), isNull(a)) shuffle-key pairs, so for any concrete type the shuffle keys are non-null and the new spreading path never triggers. The only residual is NullType, where Literal.default(NullType) is itself null — but for that case the executor's anyNull guard in SortMergeJoinExec / ShuffledHashJoinExec already classifies the rows as unmatched, so spreading them does not change the result. The earlier-revision <=> gate on canSpreadNullJoinKeys was correctly removed once that argument was nailed down.

Key design decisions made by this PR.

  • How to mark "this layout has weaker NULL co-location" so downstream operators see it. The PR's choice: a parallel type hierarchy — NullAwareHashPartitioning, CoalescedNullAwareHashPartitioning, NullAwareHashShuffleSpec — paralleling the existing HashPartitioning family. Downstream operators that require strict ClusteredDistribution correctly fail satisfies against the new type and re-shuffle.
  • Layout-marking is required either way (a downstream GROUP BY k looking at the join's outputPartitioning has to be able to tell the difference) — but a flag on HashPartitioning would deliver the same property with substantially less code. The PR currently duplicates hashKeyPositions, canCreatePartitioning, createPartitioning, and numPartitions between HashShuffleSpec and NullAwareHashShuffleSpec, and CoalescedNullAwareHashPartitioning is structurally identical to CoalescedHashPartitioning modulo the type of from. The compatibility helper HashShuffleSpecCompatibility shows the duplication was felt; a spreadNullKeys: Boolean = false field on HashPartitioning (and on HashShuffleSpec) would let the existing classes carry the property and eliminate the duplication entirely. EXPLAIN visibility — the one argument for distinct types — costs one line in toString. (Detailed inline below.)
  • Scope to outer joins. Correct, and for the structural reason above (where preserved-side IsNotNull pushdown is blocked) — though that reason isn't in the code or PR description today; the comment in canSpreadNullJoinKeys only addresses the <=> corner. Worth saying it: NULL keys on the preserved side must be emitted but can never satisfy =, so their reducer placement is purely a layout choice.
  • Round-robin within each map task, seeded by XORShiftRandom(partitionId). Mirrors RoundRobinPartitioning exactly, including the sortBeforeRepartition / isOrderSensitive retry-determinism plumbing. The XOR-shift seed avoids the correlated-start hazard a plain partitionId % numPartitions would have.

Implementation sketch.

  • Catalyst (partitioning.scala): ClusteredDistribution gains opt-in allowNullKeySpreading; NullAwareHashPartitioning / CoalescedNullAwareHashPartitioning parallel their non-null-aware peers; NullAwareHashShuffleSpec participates in compatibility checks (a regular HashShuffleSpec and a NullAwareHashShuffleSpec are mutually compatible iff both distributions have the opt-in flag set).
  • Planner (ShuffledJoin.scala): when canSpreadNullJoinKeys holds, both sides request the opted-in ClusteredDistribution.
  • Shuffle write (ShuffleExchangeExec.scala): the partition extractor for NullAwareHashPartitioning checks anyNull per row and round-robins NULL-keyed rows via a per-task counter; the existing round-robin retry-determinism plumbing is extended to cover the new stateful extractor.
  • AQE (AQEShuffleReadExec.scala): coalesced reads preserve the partitioning shape by producing CoalescedNullAwareHashPartitioning.

Behavioral changes worth calling out (and worth adding to the PR description so users enabling the flag aren't surprised):

  • df.repartition($"k") JOIN ... ON k = ... on an outer join no longer optimizes out the user-requested top shuffle. The inner join produces NullAwareHashPartitioning, which is intentionally not equivalent to the user-requested HashPartitioning. This is what the AQE test diffs reflect (optimizeOutRepartition: true → false).
  • When one side of an outer join is already shuffled as HashPartitioning, only the other side gets re-shuffled as NullAwareHashPartitioning. The already-shuffled side keeps its NULL skew; the optimization is one-sided in that case.
  • An outer join's downstream GROUP BY k / OVER (PARTITION BY k) / JOIN ... ON k = ... will now re-shuffle (NullAwareHashPartitioning does not satisfy strict ClusteredDistribution). Correct, but an extra exchange relative to today's plan.

Suggested improvements (most as inline comments below):

  1. Static nullability gate. canSpreadNullJoinKeys should also check leftKeys.exists(_.nullable) || rightKeys.exists(_.nullable). Outer joins on PK/FK / NOT-NULL / post-IsNotNull keys gain nothing from the null-aware path and pay both runtime per-row cost and the downstream re-shuffle cost above. The analyzer already tracks nullability — using it here makes the whole mechanism a no-op when there's no NULL to spread.
  2. Collapse the parallel type hierarchy into a flag on HashPartitioning (see inline) — removes ~85 lines of duplication, keeps every existing case h: HashPartitioning => pattern match working.
  3. Reframe canSpreadNullJoinKeys comment and PR-description "Why" around the structural reason — preserved-side rows must be emitted but can never satisfy =, and on outer joins IsNotNull pushdown can't reach the preserved side. The <=> and NullType notes become a corollary rather than the lead.
  4. Single key evaluation in getPartitionKeyExtractor — today the join key expressions are evaluated twice for non-NULL rows.
  5. Test coverage. (a) ShuffledHashJoinExec also extends ShuffledJoin and supports outer joins, but the new tests only cover SortMergeJoinExec. (b) The NullType <=> corner — the basis of the safety argument for <=> — isn't directly exercised. (c) ExtractEquiJoinKeys.unapply(join).foreach { case ... => withSQLConf { ... } } silently passes if unapply returns None; six new tests use this pattern.
  6. Documentation. NullAwareHashPartitioning Scaladoc should call out that it does NOT satisfy a strict ClusteredDistribution; CoalescedNullAwareHashPartitioning, NullAwareHashShuffleSpec, and ClusteredDistribution.allowNullKeySpreading have no Scaladoc.

SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION),
requiredNumPartitions: Option[Int] = None) extends Distribution {
requiredNumPartitions: Option[Int] = None,
allowNullKeySpreading: Boolean = false) extends Distribution {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth a Scaladoc on this field describing the contract: it's a permission, not a requirement (an ordinary HashPartitioning still satisfies this distribution when the flag is true; the flag only weakens what the default partitioning produced by createPartitioning looks like). And it's the consumer-side knob — the partitioning-side marker (NullAwareHashPartitioning today, or a flag on HashPartitioning per the comment below) is what tells downstream operators they need to re-shuffle for strict ClusteredDistribution.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

* to be co-located. Non-NULL join keys preserve the same partitioning contract as
* [[HashPartitioning]], while rows with any NULL join key may be spread across partitions.
*/
case class NullAwareHashPartitioning(expressions: Seq[Expression], numPartitions: Int)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design alternative worth considering: a spreadNullKeys: Boolean = false field on HashPartitioning instead of a parallel type hierarchy.

The marker this carries is one bit ("NULL keys may be spread, so I don't deliver strict same-key co-location"). Encoding it as a parallel type means duplicating hashKeyPositions, canCreatePartitioning, createPartitioning, numPartitions, and (modulo the helper just extracted) isCompatibleWith in NullAwareHashShuffleSpec, plus reproducing CoalescedHashPartitioning as CoalescedNullAwareHashPartitioning, plus a new arm in every dispatcher (ShuffleExchangeExec.prepareShuffleDependency's part and getPartitionKeyExtractor, AQEShuffleReadExec.outputPartitioning).

With a flag:

  • HashPartitioning.satisfies0 only matches strict ClusteredDistribution when !spreadNullKeys, only matches allowNullKeySpreading=true distributions when spreadNullKeys.
  • HashShuffleSpec carries the flag; one extra clause in isCompatibleWith.
  • CoalescedHashPartitioning already wraps a HashPartitioning — it inherits the flag transparently. No new coalesced class.
  • Dispatchers branch on h.spreadNullKeys instead of branching on type, so every existing case h: HashPartitioning => site (BucketingUtils, V1Writes, EnsureRequirements, AQEUtils, basicPhysicalOperators, etc.) keeps working unchanged.

The one argument for distinct types is EXPLAIN-string visibility — a one-line toString fix on the flagged variant.

Separately on this class's Scaladoc: worth calling out that NullAwareHashPartitioning intentionally does NOT satisfy a strict ClusteredDistribution (NULL clustering keys aren't co-located). That's the non-obvious downstream contract — it's what forces downstream GROUP BY / window / strict equi-join to re-shuffle.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea this is an alternative design. The pros and cons are:

Pros:

  • Much less duplicated code.
  • Existing HashPartitioning plumbing can often be reused directly.
  • CoalescedHashPartitioning and HashShuffleSpec can carry the flag instead of requiring parallel classes.
  • Fewer pattern-match branches across the codebase.

Cons:

  • The semantic distinction becomes easier to overlook.
  • A HashPartitioning with spreadNullKeys = true is no longer “ordinary hash partitioning” in the old sense.
  • Every place that reasons about HashPartitioning now has to remember to inspect the flag before assuming strict same-key co-location.
  • That is subtle and potentially error-prone because HashPartitioning is already widely used.
  • The class name no longer advertises the weaker contract; you would need careful toString, docs, and audits to preserve the same clarity.

I'm a bit concerned about the cons since HashPartitioning is widely used in the codebase and the change could have a bigger blast radius than just adding another NullAwareHashPartitioning.

copy(from = from.copy(expressions = newChildren))
}

case class CoalescedNullAwareHashPartitioning(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Scaladoc here (and on NullAwareHashShuffleSpec below). CoalescedHashPartitioning documents what it represents — worth matching that here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Comment on lines +32 to +39
private lazy val canSpreadNullJoinKeys: Boolean = {
// Null-safe equality usually rewrites to non-null shuffle keys. The NullType corner can still
// produce NULL shuffle keys, but shuffled join execution already treats those rows as
// unmatched, so spreading them does not change the result.
val isOuterJoin = joinType == LeftOuter || joinType == RightOuter || joinType == FullOuter
conf.getConf(SQLConf.SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED) &&
isOuterJoin
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two improvements on this gate:

(1) Static nullability check. Outer joins on non-nullable keys (PK/FK / NOT-NULL columns / post-IsNotNull filtered keys) gain nothing from the null-aware path but still pay both the runtime per-row anyNull check and the downstream re-shuffle cost from outputPartitioning no longer satisfying strict ClusteredDistribution. The analyzer already tracks Expression.nullable — use it here to make the mechanism a no-op when there's no NULL to spread.

(2) Reframe the comment around the structural reason. The current comment only addresses the <=> corner. The real "why this PR exists" story is the preserved-side / pushdown-asymmetry argument — worth leading with that, with the <=> and NullType notes as a corollary.

Suggested change
private lazy val canSpreadNullJoinKeys: Boolean = {
// Null-safe equality usually rewrites to non-null shuffle keys. The NullType corner can still
// produce NULL shuffle keys, but shuffled join execution already treats those rows as
// unmatched, so spreading them does not change the result.
val isOuterJoin = joinType == LeftOuter || joinType == RightOuter || joinType == FullOuter
conf.getConf(SQLConf.SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED) &&
isOuterJoin
}
private lazy val canSpreadNullJoinKeys: Boolean = {
// NULL keys on the preserved side of an outer join must be emitted but can never
// satisfy `a.k = b.k` under three-valued logic, so their reducer placement is a
// pure layout choice. Inner joins don't have this problem because
// InferFiltersFromConstraints pushes IsNotNull(key) to both sides; for outer joins
// that pushdown is blocked on the preserved side(s) -- which is exactly where
// NULL-key skew can land.
//
// For null-safe equality (`<=>`), ExtractEquiJoinKeys rewrites to (coalesce, isNull)
// shuffle keys, which are non-null for any concrete type. The NullType corner can
// still produce NULL shuffle keys, but shuffled join execution already treats those
// rows as unmatched, so spreading them does not change the result.
val isOuterJoin = joinType == LeftOuter || joinType == RightOuter || joinType == FullOuter
conf.getConf(SQLConf.SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED) &&
isOuterJoin &&
(leftKeys.exists(_.nullable) || rightKeys.exists(_.nullable))
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Comment on lines +408 to +425
case h: NullAwareHashPartitioning =>
val partitionIdProjection =
UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes)
val joinKeyProjection = UnsafeProjection.create(h.expressions, outputAttributes)
var nullKeyPartition =
new XORShiftRandom(TaskContext.get().partitionId()).nextInt(h.numPartitions)
row => {
val joinKeys = joinKeyProjection(row)
if (joinKeys.anyNull()) {
// NULL join keys cannot match under ordinary equi-join semantics. Spread them
// round-robin within each map task so identical rows do not collapse to one reducer.
val partition = nullKeyPartition
nullKeyPartition = (nullKeyPartition + 1) % h.numPartitions
partition
} else {
partitionIdProjection(row).getInt(0)
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Join keys are evaluated twice for non-NULL rows on this path: once via joinKeyProjection(row) to call anyNull(), again via partitionIdProjection(row).getInt(0) which re-evaluates the same expressions to compute the hash. For most expression shapes that's a tight loop, but redundant.

Could evaluate the keys once, check anyNull on the materialized row, then hash directly from that row.

Combined with the static-nullability gate at ShuffledJoin.canSpreadNullJoinKeys (which skips this path entirely when keys are statically non-nullable), the residual overhead becomes "check the null bitset once per row when at least one key is nullable" — about as low as this gets without adaptive observation of actual NULL frequency.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it to only evaluate the join keys once but the logic becomes more complicated. Please take another look!

val join = Join(nullableLeft.logicalPlan, nullableRight.logicalPlan,
LeftOuter, Some(joinCondition), JoinHint.NONE)

ExtractEquiJoinKeys.unapply(join).foreach {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExtractEquiJoinKeys.unapply(join).foreach { case ... => withSQLConf { ... } } silently passes if unapply returns None — all assertions live inside the foreach body, so a regression in ExtractEquiJoinKeys would make these tests report success without exercising anything. Prefer:

val (_, leftKeys, rightKeys, boundCondition, _, _, _, _) =
  ExtractEquiJoinKeys.unapply(join).getOrElse(fail("Failed to extract equi-join keys"))
withSQLConf(...) {
  ...
}

Applies to all six new tests in this file using this pattern.

Separately: all six new tests use SortMergeJoinExec. ShuffledHashJoinExec also extends ShuffledJoin and supports LeftOuter / RightOuter / FullOuter (with the matching build side), so it picks up the same canSpreadNullJoinKeys behavior — worth at least one end-to-end test on that path too. And the NullType <=> corner case (which the safety argument for <=> rests on) isn't directly exercised by any of the new tests.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Status

6 of 7 prior findings addressed, 1 declined with reasoning (collapse parallel hierarchy into a flag on HashPartitioning — author's blast-radius concern is reasonable, happy to defer to the parallel-type design), 4 new (1 substantive, 3 minor).

Addressed:

  • Scaladoc on allowNullKeySpreading, NullAwareHashShuffleSpec, CoalescedNullAwareHashPartitioning, and the carve-out note on NullAwareHashPartitioning that it does NOT satisfy a strict ClusteredDistribution.
  • Single-eval refactor in ShuffleExchangeExec.getPartitionKeyExtractor (join keys evaluated once via joinKeyProjection, then reused for anyNull() and the hash).
  • Static nullability gate (leftKeys.exists(_.nullable) || rightKeys.exists(_.nullable)) in ShuffledJoin.canSpreadNullJoinKeys.
  • extractJoinParts test helper now fails loudly via getOrElse(fail(...)).
  • ShuffledHashJoinExec and NullType <=> test coverage added.

Verified the NullType <=> safety claim end-to-end: ExtractEquiJoinKeys rewrites a <=> b into (coalesce(a, default), isnull(a)); for NullType the coalesce key is still NULL, so streamedRowKey.anyNull in SortMergeJoinExec correctly classifies the row as unmatched and spreading does not change results. The new NullType test asserts exactly this.

New findings: inline below, plus one PR-description nit:

  • The "How was this patch tested?" section still says "Regenerated the affected TPC-DS plan-stability outputs after the expected physical-plan change." But the patch contains no TPC-DS golden file changes — which is correct, since the flag defaults off and existing plans are unaffected. Worth updating the description so reviewers don't go looking for those regenerated files.

Approving — the remaining items are cleanup, not blockers.


def partitionIdExpression: Expression = Pmod(
new CollationAwareMurmur3Hash(expressions), Literal(numPartitions)
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the single-eval refactor in ShuffleExchangeExec (the case h: NullAwareHashPartitioning => branch in getPartitionKeyExtractor), this method is no longer called anywhere. ShuffleExchangeExec builds an equivalent Pmod(CollationAwareMurmur3Hash(boundJoinKeys), Literal(n)) inline against the projected key row instead of calling h.partitionIdExpression. Grep confirms no external callers. Safe to delete the three-line def.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Removed


withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.SHUFFLE_PARTITIONS.key -> "5",
SQLConf.SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED.key -> "true",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enabling SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED globally inside the "SPARK-33551: Do not use AQE shuffle read for repartition" test reshapes what that test verifies. The four checkBHJ / checkSMJ assertions below flip from optimizeOutRepartition = true to false precisely because the outer join's NullAwareHashPartitioning is intentionally not equivalent to the user's repartition HashPartitioning — i.e., the test no longer exercises the original SPARK-33551 regression scenario (optimize-out behavior on outer-join + repartition under the default flag).

Two options:

  1. Split: keep the existing test with the flag off (preserving SPARK-33551 coverage), and add a new test asserting the flag-on inversion. This documents that the new feature deliberately changes the optimize-out outcome.
  2. Branch in place: run the same checkBHJ / checkSMJ calls twice within the test — once with the flag off (original assertions) and once with the flag on (new assertions).

Either is fine; the current form silently retires SPARK-33551's regression coverage.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test suite to address this

// `HashPartitioning.partitionIdExpression` to produce partitioning key.
new PartitionIdPassthrough(n)
case NullAwareHashPartitioning(_, n) =>
new PartitionIdPassthrough(n)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the parallel HashPartitioning case immediately above carries the comment "the partitioning key is already a valid partition ID, as we use HashPartitioning.partitionIdExpression to produce partitioning key." Worth matching here so a reader who jumps to this case sees why PartitionIdPassthrough is the right partitioner. Something like:

// The NullAware extractor below produces partition IDs directly:
// `Pmod(hash, n)` for non-NULL keys, a round-robin counter for NULL keys.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Added the comment above in the code

@viirya
Copy link
Copy Markdown
Member

viirya commented May 21, 2026

Thanks @sunchao for the iterations — design is in good shape and the safety reasoning around <=> / NullType is solid. Two follow-ups I'd suggest before merge, plus one optional readability nit.

1. Narrow canSpreadNullJoinKeys to the preserved side

ShuffledJoin.canSpreadNullJoinKeys currently opts in whenever either side has nullable join keys. But the skew it targets only ever appears on the preserved side:

  • LeftOuter: only left-side NULL-keyed rows are carried through (right-side NULLs are filtered by = and never emitted).
  • RightOuter: symmetric, only right.
  • FullOuter: both sides.

When only the non-preserved side has nullable keys, opting in has zero skew benefit but still pays:

  • per-row anyNull() branch in getPartitionKeyExtractor on the hot path,
  • a local sort under sortBeforeRepartition=true (defensive, since the round-robin path won't actually fire if there are no NULLs, but still paid).

Note the downstream re-shuffle cost (the one reflected by the optimizeOutRepartition test diffs) is not affected here, because ShuffledJoin.outputPartitioning only exposes the preserved side's partitioning (left.outputPartitioning for LeftOuter, etc.). So this is purely a "no-benefit but small extra cost" case.

Suggested tightening — keeps both sides opting in together, no change to shuffle-spec compatibility rules:

private lazy val canSpreadNullJoinKeys: Boolean = {
  // The skew this targets only appears on the preserved side of an outer join:
  // non-preserved NULL-keyed rows are filtered out by `=` and never emitted, so their
  // nullability cannot create reducer skew. This also mirrors which side's
  // outputPartitioning is exposed downstream by ShuffledJoin.
  val preservedSideHasNullableKeys = joinType match {
    case LeftOuter => leftKeys.exists(_.nullable)
    case RightOuter => rightKeys.exists(_.nullable)
    case FullOuter => leftKeys.exists(_.nullable) || rightKeys.exists(_.nullable)
    case _ => false
  }
  conf.getConf(SQLConf.SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED) && preservedSideHasNullableKeys
}

I considered the more aggressive variant — letting the two sides opt in independently (allowNullKeySpreading=true only on the preserved side) — but that would require extending HashShuffleSpecNullAwareHashShuffleSpec compatibility to accept the asymmetric one-side-opted-in case, which is a much larger blast radius. The "both sides together, gated by preserved-side nullability" form above gets the practical benefit without touching compatibility rules.

2. Expand the config doc with the trade-offs

The current doc only states the benefit:

"When true, Spark may spread rows with NULL equi-join keys across shuffle partitions for ordinary shuffled outer joins to reduce shuffle skew."

A user enabling the flag may be surprised by behavior the doc doesn't mention:

  • Only LEFT/RIGHT/FULL OUTER equi-joins on nullable keys are affected.
  • The join's output partitioning intentionally does not satisfy a strict ClusteredDistribution, so a downstream GROUP BY k / OVER (PARTITION BY k) / another JOIN ... ON k = ... will introduce an additional shuffle that today's plan would have avoided. The AdaptiveQueryExecSuite diffs (optimizeOutRepartition: true → false) are exactly this cost.
  • When one side is already pre-shuffled as HashPartitioning, only the other side is reshuffled as NullAwareHashPartitioning; the pre-shuffled side keeps its NULL skew.

Worth folding these into the .doc(...) so the cost is visible at SET / EXPLAIN time, not only by reading the partitioning source.

Also: the PR description's "How was this patch tested?" still says "Regenerated the affected TPC-DS plan-stability outputs after the expected physical-plan change." but the patch contains no golden-file changes (correctly, since the flag defaults off). Worth removing so future archeologists don't go looking.

3. (Optional) Document the cross-file invariants of the null-aware family

The correctness of this feature rests on four invariants that are currently maintained in four separate places:

  1. Consumer opt-in only: only ClusteredDistribution(allowNullKeySpreading = true) may be satisfied by the null-aware family — enforced in NullAwareHashPartitioning.satisfies0 / CoalescedNullAwareHashPartitioning.satisfies0.
  2. No strict-cluster satisfaction: the family must not satisfy StatefulOpClusteredDistribution, ordinary ClusteredDistribution, or AllTuples with n > 1 — enforced by deliberately not delegating to super.satisfies0 in NullAwareHashPartitioning.
  3. Non-NULL partition-id parity with HashPartitioning: for any row with no NULL key, ShuffleExchangeExec's null-aware extractor must produce the same partition id as HashPartitioning.partitionIdExpression would. This is what makes a HashShuffleSpec input compatible with a NullAwareHashShuffleSpec input when both distributions opt in. Both sites currently use Pmod(CollationAwareMurmur3Hash(keys), numPartitions); any future change to the hash on either side must be mirrored on the other.
  4. Symmetric spec compatibility: HashShuffleSpecNullAwareHashShuffleSpec are mutually compatible iff both distributions have allowNullKeySpreading = true.

Invariant (3) is the one that's currently fully implicit — there is no comment or test that ties the two hash sites together, but breaking the parity would silently misalign non-NULL rows from a HashPartitioning input against a NullAwareHashPartitioning input under the "compatible" rule (4). Worth at least a one-line comment at the CollationAwareMurmur3Hash site in ShuffleExchangeExec pointing back to HashPartitioning.partitionIdExpression, and optionally a contract block on NullAwareHashPartitioning listing the four invariants. I'd be ok with just the one-line cross-reference if a full contract block feels too heavy for this codebase's conventions.

Items 1 and 2 are merge-blockers in my view (small but user-observable); item 3 is a maintainability nit you can take or leave.

@sunchao
Copy link
Copy Markdown
Member Author

sunchao commented May 21, 2026

Thanks @viirya ! Addressed your comments

@viirya
Copy link
Copy Markdown
Member

viirya commented May 21, 2026

Thanks for the quick turnaround @sunchao — items 1 and 2 are addressed nicely. Two small notes that are not merge blockers, just opportunistic cleanup:

On item 1: the new joinType match arms are self-explanatory, but the why — "the skew only appears on the preserved side because non-preserved NULL-keyed rows are filtered out by = and never emitted" — is not in the code. The existing // Null-safe equality usually rewrites to non-null shuffle keys... comment above canSpreadNullJoinKeys explains the <=> / NullType corner but no longer covers why we look at the preserved side specifically. A one-line lead-in before the match would close the loop. Nit.

On item 3: the new comment at case NullAwareHashPartitioning is good, but the invariant I was trying to nail down is one level up — the non-NULL partition-id has to stay byte-for-byte identical to what HashPartitioning.partitionIdExpression would produce, otherwise the symmetric HashShuffleSpecNullAwareHashShuffleSpec compatibility in partitioning.scala silently misaligns non-NULL rows between the two layouts. The risk surface is "future change to the hash on either side." A back-reference would catch it:

case h: NullAwareHashPartitioning =>
  // Non-NULL keys must produce the same partition id as HashPartitioning.partitionIdExpression
  // — this is what makes HashShuffleSpec compatible with NullAwareHashShuffleSpec when both
  // distributions opt in. Keep the hash function in sync with HashPartitioning.
  val joinKeyProjection = ...

(Also noticed you removed NullAwareHashPartitioning.partitionIdExpression — good catch, that was indeed dead code.)

Both are nits; LGTM either way.

Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM, too.

@sunchao sunchao closed this in 421b800 May 22, 2026
@sunchao
Copy link
Copy Markdown
Member Author

sunchao commented May 22, 2026

Merged to master, thanks all for the review!!!

@sunchao
Copy link
Copy Markdown
Member Author

sunchao commented May 22, 2026

@dongjoon-hyun should I also backport this to branch-4.x? and branch-4.2?

@dongjoon-hyun
Copy link
Copy Markdown
Member

dongjoon-hyun commented May 22, 2026

@dongjoon-hyun should I also backport this to branch-4.x? and branch-4.2?

To @sunchao , only for branch-4.x (Apache Spark 4.3) for Improvement. branch-4.2 is under QA stage already.

Screenshot 2026-05-22 at 09 44 59

sunchao added a commit that referenced this pull request May 22, 2026
### What changes were proposed in this pull request?

This PR reduces shuffle skew for null-heavy shuffled outer equi-joins.

For `LEFT OUTER`, `RIGHT OUTER`, and `FULL OUTER` joins, preserved rows with a `NULL`
shuffle key may not need to stay concentrated on one reducer. Today those rows can all
collapse into the same shuffle partition, which creates avoidable skew on NULL-heavy inputs.

This change adds a feature-flagged null-aware shuffle partitioning mode for shuffled outer
joins:

- Non-NULL shuffle keys keep the existing hash partitioning behavior.
- Rows with any `NULL` shuffle key are spread across reducers instead of collapsing into one
  partition.
- The behavior is disabled by default behind
  `spark.sql.shuffle.spreadNullJoinKeys.enabled`.
- The optimization is considered only for `LEFT OUTER`, `RIGHT OUTER`, and `FULL OUTER`
  equi-joins whose preserved side has nullable join keys.

Spreading remains result-safe for null-safe equality (`<=>`) outer joins:

- For ordinary extracted `<=>` join keys, Spark rewrites them into non-null shuffle-key
  expressions using `coalesce(...)` and `isnull(...)`, so there are no `NULL` shuffle keys for
  this feature to redistribute.
- The only remaining corner is `NullType`, where the shuffle key can still be `NULL`. In that
  case, shuffled join execution already treats the row as unmatched, so redistributing those
  rows does not change query results.

The implementation wires this through the planner and runtime pieces that need to understand
the new partitioning contract:

- `ClusteredDistribution` can opt into null-aware spreading.
- New null-aware partitioning and shuffle-spec variants preserve compatibility checks without
  pretending to satisfy ordinary clustered distributions.
- Shuffle execution spreads unmatched `NULL` keys while preserving retry safety.
- AQE/coalesced shuffle reads preserve the new partitioning shape.

When the feature flag is enabled, the null-aware join output partitioning intentionally does not
satisfy a strict `ClusteredDistribution`. That can require an extra downstream shuffle for
grouping, windowing, or another equi-join on the same key. Also, if one side is already hash
partitioned, only the other side may be reshuffled into the null-aware layout, so the
pre-shuffled side can keep its NULL skew.

This PR intentionally stays scoped to outer joins. Left anti joins may also have skewed
preserved-side `NULL` rows for ordinary `=` predicates and are worth evaluating separately, but
they need their own correctness and planning review rather than being folded into this patch.

### Why are the changes needed?

Outer joins can preserve large numbers of unmatched rows from the outer side. When many of those
rows have `NULL` shuffle keys, sending them all to one reducer creates skew even though they do
not require one shared reducer for correctness.

Example:

```sql
SELECT *
FROM fact f
LEFT OUTER JOIN dim d
  ON f.k = d.k
```

If `fact.k` contains many `NULL` values, those rows must remain in the result as unmatched
left-side rows, but they do not need to be grouped together for correctness. Spreading them
reduces needless reducer concentration while leaving normal key matching unchanged.

### Does this PR introduce _any_ user-facing change?

Yes, in execution behavior only. Query results are unchanged, but when the feature flag is
enabled, shuffle partitioning for eligible NULL-heavy outer equi-joins becomes less skewed.

### How was this patch tested?

- Added and updated unit tests covering outer-join planning, FULL OUTER JOIN result correctness
  with `NULL` keys, null-safe outer-join behavior, shuffle-level `NULL` spreading, retry
  determinism, shuffle-spec compatibility, and AQE preservation of null-aware coalesced reads.
- Ran focused plan-stability verification for the affected TPC-DS cases locally.
- Ran `git diff --check`.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Codex GPT-5

Closes #55927 from sunchao/dev/chao/codex/null-aware-outer-join-apache.

Authored-by: Chao Sun <chao@openai.com>
Signed-off-by: Chao Sun <chao@openai.com>
@sunchao
Copy link
Copy Markdown
Member Author

sunchao commented May 22, 2026

Thanks, cherry-picked to branch-4.x.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants