Description
I don't have a minimal reproducible example for this, but the place where it shows up in our workflow is very simple.
The data in "COLUMN" are a few hundred million distinct strings (gets deduplicated in the plan also) and it is being compared against itself using intersect.
The code that is failing is essentially:
values = [...] # python list containing many unique strings, none of which are None
df = spark.createDataFrame(
spark.sparkContext.parallelize(
[(value,) for value in values], numSlices=2 + len(values) // 10000
),
schema=StructType([StructField("COLUMN", StringType())]),
)df = df.distinct()
assert df.count() == df.intersect(df).count()
assert df.count() == df.intersectAll(df).count()
The issue is that both of the above asserts sometimes pass, and sometimes fail (technically we haven't seen intersectAll pass yet, but we have only tried a few times). One thing which is striking is that if you call df.intersect(df).count() multiple times, the returned count is not always the same. Sometimes it is exactly df.count(), sometimes it is ~1% lower, but how much lower exactly seems random.
In particular, we have called df.intersect(df).count() twice in a row, and got two different counts, which is very surprising given that df should be deterministic, and suggests maybe there is some kind of concurrency/inconsistent hashing issue?
One other thing which is possibly noteworthy is that using df.join(df, df.columns, how="inner") does seem to reliably have the desired behavior (not dropping any rows).
Here is the resulting plan from df.intersect(df)
== Parsed Logical Plan ==
'Intersect false
:- Deduplicate COLUMN#144487
: +- LogicalRDD COLUMN#144487, false
+- Deduplicate COLUMN#144487
+- LogicalRDD COLUMN#144487, false== Analyzed Logical Plan ==
COLUMN: string
Intersect false
:- Deduplicate COLUMN#144487
: +- LogicalRDD COLUMN#144487, false
+- Deduplicate COLUMN#144523
+- LogicalRDD COLUMN#144523, false== Optimized Logical Plan ==
Aggregate COLUMN#144487, COLUMN#144487
+- Join LeftSemi, (COLUMN#144487 <=> COLUMN#144523)
:- LogicalRDD COLUMN#144487, false
+- Aggregate COLUMN#144523, COLUMN#144523
+- LogicalRDD COLUMN#144523, false== Physical Plan ==
*(7) HashAggregate(keys=COLUMN#144487, functions=[], output=COLUMN#144487)
+- Exchange hashpartitioning(COLUMN#144487, 200), true, id=#22790
+- *(6) HashAggregate(keys=COLUMN#144487, functions=[], output=COLUMN#144487)
+- *(6) SortMergeJoin coalesce(COLUMN#144487, ), isnull(COLUMN#144487), coalesce(COLUMN#144523, ), isnull(COLUMN#144523), LeftSemi
:- *(2) Sort coalesce(COLUMN#144487, ) ASC NULLS FIRST, isnull(COLUMN#144487) ASC NULLS FIRST, false, 0
: +- Exchange hashpartitioning(coalesce(COLUMN#144487, ), isnull(COLUMN#144487), 200), true, id=#22772
: +- *(1) Scan ExistingRDDCOLUMN#144487
+- *(5) Sort coalesce(COLUMN#144523, ) ASC NULLS FIRST, isnull(COLUMN#144523) ASC NULLS FIRST, false, 0
+- Exchange hashpartitioning(coalesce(COLUMN#144523, ), isnull(COLUMN#144523), 200), true, id=#22782
+- *(4) HashAggregate(keys=COLUMN#144523, functions=[], output=COLUMN#144523)
+- Exchange hashpartitioning(COLUMN#144523, 200), true, id=#22778
+- *(3) HashAggregate(keys=COLUMN#144523, functions=[], output=COLUMN#144523)
+- *(3) Scan ExistingRDDCOLUMN#144523
and for df.intersectAll(df)
== Parsed Logical Plan ==
'IntersectAll true
:- Deduplicate COLUMN#144487
: +- LogicalRDD COLUMN#144487, false
+- Deduplicate COLUMN#144487
+- LogicalRDD COLUMN#144487, false== Analyzed Logical Plan ==
COLUMN: string
IntersectAll true
:- Deduplicate COLUMN#144487
: +- LogicalRDD COLUMN#144487, false
+- Deduplicate COLUMN#144533
+- LogicalRDD COLUMN#144533, false== Optimized Logical Plan ==
Project COLUMN#144487
+- Generate replicaterows(min_count#144566L, COLUMN#144487), [1], false, COLUMN#144487
+- Project COLUMN#144487, if ((vcol1_count#144563L > vcol2_count#144565L)) vcol2_count#144565L else vcol1_count#144563L AS min_count#144566L
+- Filter ((vcol1_count#144563L >= 1) AND (vcol2_count#144565L >= 1))
+- Aggregate COLUMN#144487, count(vcol1#144558) AS vcol1_count#144563L, count(vcol2#144561) AS vcol2_count#144565L, COLUMN#144487
+- Union
:- Aggregate COLUMN#144487, true AS vcol1#144558, null AS vcol2#144561, COLUMN#144487
: +- LogicalRDD COLUMN#144487, false
+- Aggregate COLUMN#144533, null AS vcol1#144559, true AS vcol2#144560, COLUMN#144533
+- LogicalRDD COLUMN#144533, false== Physical Plan ==
*(7) Project COLUMN#144487
+- Generate replicaterows(min_count#144566L, COLUMN#144487), COLUMN#144487, false, COLUMN#144487
+- *(6) Project COLUMN#144487, if ((vcol1_count#144563L > vcol2_count#144565L)) vcol2_count#144565L else vcol1_count#144563L AS min_count#144566L
+- *(6) Filter ((vcol1_count#144563L >= 1) AND (vcol2_count#144565L >= 1))
+- *(6) HashAggregate(keys=COLUMN#144487, functions=count(vcol1#144558), count(vcol2#144561), output=vcol1_count#144563L, vcol2_count#144565L, COLUMN#144487)
+- Exchange hashpartitioning(COLUMN#144487, 200), true, id=#23310
+- *(5) HashAggregate(keys=COLUMN#144487, functions=partial_count(vcol1#144558), partial_count(vcol2#144561), output=COLUMN#144487, count#144569L, count#144570L)
+- Union
:- *(2) HashAggregate(keys=COLUMN#144487, functions=[], output=vcol1#144558, vcol2#144561, COLUMN#144487)
: +- Exchange hashpartitioning(COLUMN#144487, 200), true, id=#23267
: +- *(1) HashAggregate(keys=COLUMN#144487, functions=[], output=COLUMN#144487)
: +- *(1) Scan ExistingRDDCOLUMN#144487
+- *(4) HashAggregate(keys=COLUMN#144533, functions=[], output=vcol1#144559, vcol2#144560, COLUMN#144533)
+- ReusedExchange COLUMN#144533, Exchange hashpartitioning(COLUMN#144487, 200), true, id=#23267