Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,17 @@ case object AllTuples extends Distribution {
*
* @param requireAllClusterKeys When true, `Partitioning` which satisfies this distribution,
* must match all `clustering` expressions in the same ordering.
* @param allowNullKeySpreading When true, the default partitioning may spread rows whose
* clustering keys contain NULL values. This is a permission for
* consumers that do not require NULL-key co-location; ordinary
* [[HashPartitioning]] can still satisfy this distribution.
*/
case class ClusteredDistribution(
clustering: Seq[Expression],
requireAllClusterKeys: Boolean = SQLConf.get.getConf(
SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_DISTRIBUTION),
requiredNumPartitions: Option[Int] = None) extends Distribution {
requiredNumPartitions: Option[Int] = None,
allowNullKeySpreading: Boolean = false) extends Distribution {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth a Scaladoc on this field describing the contract: it's a permission, not a requirement (an ordinary HashPartitioning still satisfies this distribution when the flag is true; the flag only weakens what the default partitioning produced by createPartitioning looks like). And it's the consumer-side knob — the partitioning-side marker (NullAwareHashPartitioning today, or a flag on HashPartitioning per the comment below) is what tells downstream operators they need to re-shuffle for strict ClusteredDistribution.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

require(
clustering != Nil,
"The clustering expressions of a ClusteredDistribution should not be Nil. " +
Expand All @@ -97,7 +102,11 @@ case class ClusteredDistribution(
assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions,
s"This ClusteredDistribution requires ${requiredNumPartitions.get} partitions, but " +
s"the actual number of partitions is $numPartitions.")
HashPartitioning(clustering, numPartitions)
if (allowNullKeySpreading) {
NullAwareHashPartitioning(clustering, numPartitions)
} else {
HashPartitioning(clustering, numPartitions)
}
}

/**
Expand Down Expand Up @@ -282,7 +291,7 @@ trait HashPartitioningLike extends Expression with Partitioning with Unevaluable
expressions.length == h.expressions.length && expressions.zip(h.expressions).forall {
case (l, r) => l.semanticEquals(r)
}
case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _) =>
case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _, _) =>
if (requireAllClusterKeys) {
// Checks `HashPartitioning` is partitioned on exactly same clustering keys of
// `ClusteredDistribution`.
Expand Down Expand Up @@ -324,6 +333,45 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
newChildren: IndexedSeq[Expression]): HashPartitioning = copy(expressions = newChildren)
}

/**
* Represents a hash partitioning for equi-join inputs where rows with a NULL join key do not need
* to be co-located. Non-NULL join keys preserve the same partitioning contract as
* [[HashPartitioning]], while rows with any NULL join key may be spread across partitions. As a
* result, this partitioning intentionally does not satisfy a strict [[ClusteredDistribution]].
*/
case class NullAwareHashPartitioning(expressions: Seq[Expression], numPartitions: Int)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design alternative worth considering: a spreadNullKeys: Boolean = false field on HashPartitioning instead of a parallel type hierarchy.

The marker this carries is one bit ("NULL keys may be spread, so I don't deliver strict same-key co-location"). Encoding it as a parallel type means duplicating hashKeyPositions, canCreatePartitioning, createPartitioning, numPartitions, and (modulo the helper just extracted) isCompatibleWith in NullAwareHashShuffleSpec, plus reproducing CoalescedHashPartitioning as CoalescedNullAwareHashPartitioning, plus a new arm in every dispatcher (ShuffleExchangeExec.prepareShuffleDependency's part and getPartitionKeyExtractor, AQEShuffleReadExec.outputPartitioning).

With a flag:

  • HashPartitioning.satisfies0 only matches strict ClusteredDistribution when !spreadNullKeys, only matches allowNullKeySpreading=true distributions when spreadNullKeys.
  • HashShuffleSpec carries the flag; one extra clause in isCompatibleWith.
  • CoalescedHashPartitioning already wraps a HashPartitioning — it inherits the flag transparently. No new coalesced class.
  • Dispatchers branch on h.spreadNullKeys instead of branching on type, so every existing case h: HashPartitioning => site (BucketingUtils, V1Writes, EnsureRequirements, AQEUtils, basicPhysicalOperators, etc.) keeps working unchanged.

The one argument for distinct types is EXPLAIN-string visibility — a one-line toString fix on the flagged variant.

Separately on this class's Scaladoc: worth calling out that NullAwareHashPartitioning intentionally does NOT satisfy a strict ClusteredDistribution (NULL clustering keys aren't co-located). That's the non-obvious downstream contract — it's what forces downstream GROUP BY / window / strict equi-join to re-shuffle.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea this is an alternative design. The pros and cons are:

Pros:

  • Much less duplicated code.
  • Existing HashPartitioning plumbing can often be reused directly.
  • CoalescedHashPartitioning and HashShuffleSpec can carry the flag instead of requiring parallel classes.
  • Fewer pattern-match branches across the codebase.

Cons:

  • The semantic distinction becomes easier to overlook.
  • A HashPartitioning with spreadNullKeys = true is no longer “ordinary hash partitioning” in the old sense.
  • Every place that reasons about HashPartitioning now has to remember to inspect the flag before assuming strict same-key co-location.
  • That is subtle and potentially error-prone because HashPartitioning is already widely used.
  • The class name no longer advertises the weaker contract; you would need careful toString, docs, and audits to preserve the same clarity.

I'm a bit concerned about the cons since HashPartitioning is widely used in the codebase and the change could have a bigger blast radius than just adding another NullAwareHashPartitioning.

extends HashPartitioningLike {

override def satisfies0(required: Distribution): Boolean = {
(required match {
case UnspecifiedDistribution => true
case AllTuples => numPartitions == 1
case _ => false
}) || {
// Stateful operators require strict NULL-key co-location and therefore cannot consume
// null-aware hash partitioning as a compatible clustered layout.
required match {
case c @ ClusteredDistribution(
requiredClustering, requireAllClusterKeys, _, allowNullKeySpreading)
if allowNullKeySpreading =>
if (requireAllClusterKeys) {
c.areAllClusterKeysMatched(expressions)
} else {
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
}
case _ => false
}
}
}

override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
NullAwareHashShuffleSpec(this, distribution)

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): NullAwareHashPartitioning =
copy(expressions = newChildren)
}

case class CoalescedBoundary(startReducerIndex: Int, endReducerIndex: Int)

/**
Expand All @@ -345,6 +393,47 @@ case class CoalescedHashPartitioning(from: HashPartitioning, partitions: Seq[Coa
copy(from = from.copy(expressions = newChildren))
}

/**
* Represents a null-aware hash partitioning whose reducer ranges have been coalesced into fewer
* partitions. It preserves the same relaxed NULL-key co-location contract as
* [[NullAwareHashPartitioning]].
*/
case class CoalescedNullAwareHashPartitioning(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing Scaladoc here (and on NullAwareHashShuffleSpec below). CoalescedHashPartitioning documents what it represents — worth matching that here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

from: NullAwareHashPartitioning,
partitions: Seq[CoalescedBoundary]) extends HashPartitioningLike {

override def expressions: Seq[Expression] = from.expressions

override def satisfies0(required: Distribution): Boolean = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This body is identical to NullAwareHashPartitioning.satisfies0 at line 340 — same outer UnspecifiedDistribution/AllTuples/_ => false match, same ClusteredDistribution inner match guarded on allowNullKeySpreading, same requireAllClusterKeys branching. This is the same kind of duplication addressed elsewhere in this PR by extracting HashShuffleSpecCompatibility.isCompatible (lines 944-955).

Two cleaner shapes:

  • Lift the inner block to a private helper, e.g. private def nullAwareSatisfies0(exprs, n, required) shared by both classes.
  • Or just delegate: since boundaries don't change satisfaction semantics for the allowNullKeySpreading contract, CoalescedNullAwareHashPartitioning.satisfies0(required) is essentially from.satisfies0(required) except for the AllTuples case where numPartitions differs — that single divergence is easy to handle inline.

Side note: both overrides skip the StatefulOpClusteredDistribution case that HashPartitioningLike.satisfies0 handles. Currently unreachable (streaming joins use StatefulOpClusteredDistribution, not ClusteredDistribution, so they never opt into allowNullKeySpreading), but a one-line comment that the omission is deliberate would help the next reader.

(required match {
case UnspecifiedDistribution => true
case AllTuples => numPartitions == 1
case _ => false
}) || {
required match {
case c @ ClusteredDistribution(
requiredClustering, requireAllClusterKeys, _, allowNullKeySpreading)
if allowNullKeySpreading =>
if (requireAllClusterKeys) {
c.areAllClusterKeysMatched(expressions)
} else {
expressions.forall(x => requiredClustering.exists(_.semanticEquals(x)))
}
case _ => false
}
}
}

override def createShuffleSpec(distribution: ClusteredDistribution): ShuffleSpec =
CoalescedHashShuffleSpec(from.createShuffleSpec(distribution), partitions)

override val numPartitions: Int = partitions.length

override protected def withNewChildrenInternal(
newChildren: IndexedSeq[Expression]): CoalescedNullAwareHashPartitioning =
copy(from = from.copy(expressions = newChildren))
}

/**
* Represents a partitioning where rows are split across partitions based on transforms defined by
* `expressions`.
Expand Down Expand Up @@ -482,7 +571,7 @@ case class KeyedPartitioning(

def groupedSatisfies(required: Distribution): Boolean = {
required match {
case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _) =>
case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _, _) =>
if (requireAllClusterKeys) {
// Checks whether this partitioning is partitioned on exactly same clustering keys of
// `ClusteredDistribution`.
Expand Down Expand Up @@ -657,7 +746,7 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
// `RangePartitioning(a, b, c)` satisfies `OrderedDistribution(a, b)`.
val minSize = Seq(requiredOrdering.size, ordering.size).min
requiredOrdering.take(minSize) == ordering.take(minSize)
case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _) =>
case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _, _) =>
val expressions = ordering.map(_.child)
if (requireAllClusterKeys) {
// Checks `RangePartitioning` is partitioned on exactly same clustering keys of
Expand Down Expand Up @@ -782,7 +871,7 @@ case class ShufflePartitionIdPassThrough(
super.satisfies0(required) || {
required match {
// TODO(SPARK-53428): Support Direct Passthrough Partitioning in the Streaming Joins
case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _) =>
case c @ ClusteredDistribution(requiredClustering, requireAllClusterKeys, _, _) =>
val partitioningExpressions = expr.child :: Nil
if (requireAllClusterKeys) {
c.areAllClusterKeysMatched(partitioningExpressions)
Expand Down Expand Up @@ -863,6 +952,25 @@ case class RangeShuffleSpec(
}
}

private object HashShuffleSpecCompatibility {
def isCompatible(
leftDistribution: ClusteredDistribution,
leftNumPartitions: Int,
leftExpressions: Seq[Expression],
leftHashKeyPositions: Seq[mutable.BitSet],
rightDistribution: ClusteredDistribution,
rightNumPartitions: Int,
rightExpressions: Seq[Expression],
rightHashKeyPositions: Seq[mutable.BitSet]): Boolean = {
leftDistribution.clustering.length == rightDistribution.clustering.length &&
leftNumPartitions == rightNumPartitions &&
leftExpressions.length == rightExpressions.length &&
leftHashKeyPositions.zip(rightHashKeyPositions).forall { case (left, right) =>
left.intersect(right).nonEmpty
}
}
}

case class HashShuffleSpec(
partitioning: HashPartitioning,
distribution: ClusteredDistribution) extends ShuffleSpec {
Expand Down Expand Up @@ -895,14 +1003,26 @@ case class HashShuffleSpec(
// 3. both partitioning have the same number of expressions
// 4. each pair of partitioning expression from both sides has overlapping positions in their
// corresponding distributions.
distribution.clustering.length == otherDistribution.clustering.length &&
partitioning.numPartitions == otherPartitioning.numPartitions &&
partitioning.expressions.length == otherPartitioning.expressions.length && {
val otherHashKeyPositions = otherHashSpec.hashKeyPositions
hashKeyPositions.zip(otherHashKeyPositions).forall { case (left, right) =>
left.intersect(right).nonEmpty
}
}
HashShuffleSpecCompatibility.isCompatible(
distribution,
partitioning.numPartitions,
partitioning.expressions,
hashKeyPositions,
otherDistribution,
otherPartitioning.numPartitions,
otherPartitioning.expressions,
otherHashSpec.hashKeyPositions)
case otherNullAwareSpec @ NullAwareHashShuffleSpec(otherPartitioning, otherDistribution)
if distribution.allowNullKeySpreading && otherDistribution.allowNullKeySpreading =>
HashShuffleSpecCompatibility.isCompatible(
distribution,
partitioning.numPartitions,
partitioning.expressions,
hashKeyPositions,
otherDistribution,
otherPartitioning.numPartitions,
otherPartitioning.expressions,
otherNullAwareSpec.hashKeyPositions)
case ShuffleSpecCollection(specs) =>
specs.exists(isCompatibleWith)
case _ =>
Expand All @@ -923,7 +1043,73 @@ case class HashShuffleSpec(

override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
val exprs = hashKeyPositions.map(v => clustering(v.head))
HashPartitioning(exprs, partitioning.numPartitions)
if (distribution.allowNullKeySpreading) {
NullAwareHashPartitioning(exprs, partitioning.numPartitions)
} else {
HashPartitioning(exprs, partitioning.numPartitions)
}
}

override def numPartitions: Int = partitioning.numPartitions
}

/**
* Shuffle specification for [[NullAwareHashPartitioning]]. It is compatible only with shuffle
* layouts whose distributions explicitly allow NULL-key spreading.
*/
case class NullAwareHashShuffleSpec(
partitioning: NullAwareHashPartitioning,
distribution: ClusteredDistribution) extends ShuffleSpec {

lazy val hashKeyPositions: Seq[mutable.BitSet] = {
val distKeyToPos = mutable.Map.empty[Expression, mutable.BitSet]
distribution.clustering.zipWithIndex.foreach { case (distKey, distKeyPos) =>
distKeyToPos.getOrElseUpdate(distKey.canonicalized, mutable.BitSet.empty).add(distKeyPos)
}
partitioning.expressions.map(k => distKeyToPos.getOrElse(k.canonicalized, mutable.BitSet.empty))
}

override def isCompatibleWith(other: ShuffleSpec): Boolean = other match {
Comment thread
peter-toth marked this conversation as resolved.
case SinglePartitionShuffleSpec =>
partitioning.numPartitions == 1
case otherSpec @ NullAwareHashShuffleSpec(otherPartitioning, otherDistribution) =>
HashShuffleSpecCompatibility.isCompatible(
distribution,
partitioning.numPartitions,
partitioning.expressions,
hashKeyPositions,
otherDistribution,
otherPartitioning.numPartitions,
otherPartitioning.expressions,
otherSpec.hashKeyPositions)
case otherHashSpec @ HashShuffleSpec(otherPartitioning, otherDistribution)
if distribution.allowNullKeySpreading && otherDistribution.allowNullKeySpreading =>
HashShuffleSpecCompatibility.isCompatible(
distribution,
partitioning.numPartitions,
partitioning.expressions,
hashKeyPositions,
otherDistribution,
otherPartitioning.numPartitions,
otherPartitioning.expressions,
otherHashSpec.hashKeyPositions)
case ShuffleSpecCollection(specs) =>
specs.exists(isCompatibleWith)
case _ =>
false
}

override def canCreatePartitioning: Boolean = {
if (SQLConf.get.getConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION)) {
distribution.areAllClusterKeysMatched(partitioning.expressions)
} else {
true
}
}

override def createPartitioning(clustering: Seq[Expression]): Partitioning = {
val exprs = hashKeyPositions.map(v => clustering(v.head))
NullAwareHashPartitioning(exprs, partitioning.numPartitions)
}

override def numPartitions: Int = partitioning.numPartitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,20 @@ object SQLConf {
.checkValue(_ > 0, "The value of spark.sql.shuffle.partitions must be positive")
.createWithDefault(200)

val SHUFFLE_SPREAD_NULL_JOIN_KEYS_ENABLED =
buildConf("spark.sql.shuffle.spreadNullJoinKeys.enabled")
.doc("When true, Spark may spread rows with NULL equi-join keys across shuffle partitions " +
"for shuffled LEFT, RIGHT, and FULL OUTER equi-joins on nullable keys to reduce " +
"shuffle skew. Null-aware join output partitioning does not satisfy a strict " +
"ClusteredDistribution, so downstream grouping, windowing, or equi-joins may require " +
"an extra shuffle. If one input is already hash partitioned, only the other input may " +
"be reshuffled into the null-aware layout, so the pre-shuffled input can keep its NULL " +
"skew.")
.version("4.1.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(false)

val SHUFFLE_ORDER_INDEPENDENT_CHECKSUM_ENABLED =
buildConf("spark.sql.shuffle.orderIndependentChecksum.enabled")
.doc("Whether to calculate order independent checksum for the shuffle data or not. If " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,66 @@ class ShuffleSpecSuite extends SparkFunSuite with SQLHelper {
)
}

test("compatibility: NullAwareHashShuffleSpec") {
val spreadAB = ClusteredDistribution(Seq($"a", $"b"), allowNullKeySpreading = true)
val spreadCD = ClusteredDistribution(Seq($"c", $"d"), allowNullKeySpreading = true)
val regularAB = ClusteredDistribution(Seq($"a", $"b"))

val nullAwareAB = NullAwareHashShuffleSpec(
NullAwareHashPartitioning(Seq($"a", $"b"), 10), spreadAB)
val nullAwareCD = NullAwareHashShuffleSpec(
NullAwareHashPartitioning(Seq($"c", $"d"), 10), spreadCD)
val regularABSpec = HashShuffleSpec(
HashPartitioning(Seq($"a", $"b"), 10), regularAB)
val spreadABHashSpec = HashShuffleSpec(
HashPartitioning(Seq($"a", $"b"), 10), spreadAB)

checkCompatible(nullAwareAB, nullAwareCD, expected = true)
checkCompatible(nullAwareAB, SinglePartitionShuffleSpec, expected = false)
checkCompatible(
NullAwareHashShuffleSpec(NullAwareHashPartitioning(Seq($"a", $"b"), 1), spreadAB),
SinglePartitionShuffleSpec,
expected = true)
checkCompatible(nullAwareAB, regularABSpec, expected = false)
checkCompatible(nullAwareAB, spreadABHashSpec, expected = true)
checkCompatible(spreadABHashSpec, nullAwareAB, expected = true)
}

test("canCreatePartitioning: NullAwareHashShuffleSpec") {
val spreadDistribution =
ClusteredDistribution(Seq($"a", $"b"), allowNullKeySpreading = true)
val partialSpec = NullAwareHashShuffleSpec(
NullAwareHashPartitioning(Seq($"a"), 10), spreadDistribution)
val fullSpec = NullAwareHashShuffleSpec(
NullAwareHashPartitioning(Seq($"a", $"b"), 10), spreadDistribution)

withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false") {
assert(partialSpec.canCreatePartitioning)
}
withSQLConf(SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "true") {
assert(!partialSpec.canCreatePartitioning)
assert(fullSpec.canCreatePartitioning)
}
}

test("createPartitioning: NullAwareHashShuffleSpec") {
checkCreatePartitioning(
NullAwareHashShuffleSpec(
NullAwareHashPartitioning(Seq($"a"), 10),
ClusteredDistribution(Seq($"a", $"b"), allowNullKeySpreading = true)),
ClusteredDistribution(Seq($"c", $"d"), allowNullKeySpreading = true),
NullAwareHashPartitioning(Seq($"c"), 10)
)

checkCreatePartitioning(
HashShuffleSpec(
HashPartitioning(Seq($"a"), 10),
ClusteredDistribution(Seq($"a", $"b"), allowNullKeySpreading = true)),
ClusteredDistribution(Seq($"c", $"d"), allowNullKeySpreading = true),
NullAwareHashPartitioning(Seq($"c"), 10)
)
}

test("createPartitioning: other specs") {
val distribution = ClusteredDistribution(Seq($"a", $"b"))
checkCreatePartitioning(SinglePartitionShuffleSpec,
Expand Down
Loading