Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-40181

DataFrame.intersect and .intersectAll are inconsistently dropping rows

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Open
    • Major
    • Resolution: Unresolved
    • 3.0.1
    • None
    • PySpark, SQL
    • None

    Description

      I don't have a minimal reproducible example for this, but the place where it shows up in our workflow is very simple.

      The data in "COLUMN" are a few hundred million distinct strings (gets deduplicated in the plan also) and it is being compared against itself using intersect.

      The code that is failing is essentially:

      values = [...] # python list containing many unique strings, none of which are None

      df = spark.createDataFrame(
          spark.sparkContext.parallelize(
              [(value,) for value in values], numSlices=2 + len(values) // 10000
          ),
          schema=StructType([StructField("COLUMN", StringType())]),
      )

      df = df.distinct()

      assert df.count() == df.intersect(df).count()

      assert df.count() == df.intersectAll(df).count()

      The issue is that both of the above asserts sometimes pass, and sometimes fail (technically we haven't seen intersectAll pass yet, but we have only tried a few times). One thing which is striking is that if you call df.intersect(df).count() multiple times, the returned count is not always the same. Sometimes it is exactly df.count(), sometimes it is ~1% lower, but how much lower exactly seems random.

      In particular, we have called df.intersect(df).count() twice in a row, and got two different counts, which is very surprising given that df should be deterministic, and suggests maybe there is some kind of concurrency/inconsistent hashing issue?

      One other thing which is possibly noteworthy is that using df.join(df, df.columns, how="inner") does seem to reliably have the desired behavior (not dropping any rows).

      Here is the resulting plan from df.intersect(df)

      == Parsed Logical Plan ==
      'Intersect false
      :- Deduplicate COLUMN#144487
      :  +- LogicalRDD COLUMN#144487, false
      +- Deduplicate COLUMN#144487
         +- LogicalRDD COLUMN#144487, false

      == Analyzed Logical Plan ==
      COLUMN: string
      Intersect false
      :- Deduplicate COLUMN#144487
      :  +- LogicalRDD COLUMN#144487, false
      +- Deduplicate COLUMN#144523
         +- LogicalRDD COLUMN#144523, false

      == Optimized Logical Plan ==
      Aggregate COLUMN#144487, COLUMN#144487
      +- Join LeftSemi, (COLUMN#144487 <=> COLUMN#144523)
         :- LogicalRDD COLUMN#144487, false
         +- Aggregate COLUMN#144523, COLUMN#144523
            +- LogicalRDD COLUMN#144523, false

      == Physical Plan ==
      *(7) HashAggregate(keys=COLUMN#144487, functions=[], output=COLUMN#144487)
      +- Exchange hashpartitioning(COLUMN#144487, 200), true, id=#22790
         +- *(6) HashAggregate(keys=COLUMN#144487, functions=[], output=COLUMN#144487)
            +- *(6) SortMergeJoin coalesce(COLUMN#144487, ), isnull(COLUMN#144487), coalesce(COLUMN#144523, ), isnull(COLUMN#144523), LeftSemi
               :- *(2) Sort coalesce(COLUMN#144487, ) ASC NULLS FIRST, isnull(COLUMN#144487) ASC NULLS FIRST, false, 0
               :  +- Exchange hashpartitioning(coalesce(COLUMN#144487, ), isnull(COLUMN#144487), 200), true, id=#22772
               :     +- *(1) Scan ExistingRDDCOLUMN#144487
               +- *(5) Sort coalesce(COLUMN#144523, ) ASC NULLS FIRST, isnull(COLUMN#144523) ASC NULLS FIRST, false, 0
                  +- Exchange hashpartitioning(coalesce(COLUMN#144523, ), isnull(COLUMN#144523), 200), true, id=#22782
                     +- *(4) HashAggregate(keys=COLUMN#144523, functions=[], output=COLUMN#144523)
                        +- Exchange hashpartitioning(COLUMN#144523, 200), true, id=#22778
                           +- *(3) HashAggregate(keys=COLUMN#144523, functions=[], output=COLUMN#144523)
                              +- *(3) Scan ExistingRDDCOLUMN#144523

      and for df.intersectAll(df)

      == Parsed Logical Plan ==
      'IntersectAll true
      :- Deduplicate COLUMN#144487
      :  +- LogicalRDD COLUMN#144487, false
      +- Deduplicate COLUMN#144487
         +- LogicalRDD COLUMN#144487, false

      == Analyzed Logical Plan ==
      COLUMN: string
      IntersectAll true
      :- Deduplicate COLUMN#144487
      :  +- LogicalRDD COLUMN#144487, false
      +- Deduplicate COLUMN#144533
         +- LogicalRDD COLUMN#144533, false

      == Optimized Logical Plan ==
      Project COLUMN#144487
      +- Generate replicaterows(min_count#144566L, COLUMN#144487), [1], false, COLUMN#144487
         +- Project COLUMN#144487, if ((vcol1_count#144563L > vcol2_count#144565L)) vcol2_count#144565L else vcol1_count#144563L AS min_count#144566L
            +- Filter ((vcol1_count#144563L >= 1) AND (vcol2_count#144565L >= 1))
               +- Aggregate COLUMN#144487, count(vcol1#144558) AS vcol1_count#144563L, count(vcol2#144561) AS vcol2_count#144565L, COLUMN#144487
                  +- Union
                     :- Aggregate COLUMN#144487, true AS vcol1#144558, null AS vcol2#144561, COLUMN#144487
                     :  +- LogicalRDD COLUMN#144487, false
                     +- Aggregate COLUMN#144533, null AS vcol1#144559, true AS vcol2#144560, COLUMN#144533
                        +- LogicalRDD COLUMN#144533, false

      == Physical Plan ==
      *(7) Project COLUMN#144487
      +- Generate replicaterows(min_count#144566L, COLUMN#144487), COLUMN#144487, false, COLUMN#144487
         +- *(6) Project COLUMN#144487, if ((vcol1_count#144563L > vcol2_count#144565L)) vcol2_count#144565L else vcol1_count#144563L AS min_count#144566L
            +- *(6) Filter ((vcol1_count#144563L >= 1) AND (vcol2_count#144565L >= 1))
               +- *(6) HashAggregate(keys=COLUMN#144487, functions=count(vcol1#144558), count(vcol2#144561), output=vcol1_count#144563L, vcol2_count#144565L, COLUMN#144487)
                  +- Exchange hashpartitioning(COLUMN#144487, 200), true, id=#23310
                     +- *(5) HashAggregate(keys=COLUMN#144487, functions=partial_count(vcol1#144558), partial_count(vcol2#144561), output=COLUMN#144487, count#144569L, count#144570L)
                        +- Union
                           :- *(2) HashAggregate(keys=COLUMN#144487, functions=[], output=vcol1#144558, vcol2#144561, COLUMN#144487)
                           :  +- Exchange hashpartitioning(COLUMN#144487, 200), true, id=#23267
                           :     +- *(1) HashAggregate(keys=COLUMN#144487, functions=[], output=COLUMN#144487)
                           :        +- *(1) Scan ExistingRDDCOLUMN#144487
                           +- *(4) HashAggregate(keys=COLUMN#144533, functions=[], output=vcol1#144559, vcol2#144560, COLUMN#144533)
                              +- ReusedExchange COLUMN#144533, Exchange hashpartitioning(COLUMN#144487, 200), true, id=#23267 

      Attachments

        Activity

          People

            Unassigned Unassigned
            luke.hartman Luke
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated: