Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-34369

Track number of pairs processed out of Join

    XMLWordPrintableJSON

Details

    • New Feature
    • Status: In Progress
    • Major
    • Resolution: Unresolved
    • 3.2.0
    • None
    • Web UI
    • None

    Description

      Often users face a scenario where even a modest skew in a join can lead to tasks appearing to be stuck, due to the O(n^2) nature of a join considering all pairs of rows with matching keys. When this happens users think that spark has gotten deadlocked. If there is a bound condition, the "number of output rows" metric may look typical. Other metrics may look very modest (eg: shuffle read). In those cases, it is very hard to understand what the problem is. There is no conclusive proof without getting a heap dump and looking at some internal data structures.

      It would be much better if spark had a metric(which we propose be titled “number of matched pairs” as a companion to “number of output rows”) which showed the user how many pairs were being processed in the join. This would get updated in the live UI (when metrics get collected during heartbeats), so the user could easily see what was going on.

      This would even help in cases where there was some other cause of a stuck executor (eg. network issues) just to disprove this theory. For example, you may have 100k records with the same key on each side of a join. That probably won't really show up as extreme skew in task input data. But it'll become 10B join pairs that spark works through, in one task.

       

      To further demonstrate the usefulness of this metric please follow the steps below.

       

          val df1 = spark.range(0, 200000).map { x => (x % 20, 20) }.toDF("b", "c")

          val df2 = spark.range(0, 300000).map { x => (77, 20) }.toDF("b", "c")

       

          val df3 = spark.range(0, 200000).map(x => (x + 1, x + 2)).toDF("b", "c")

          val df4 = spark.range(0, 300000).map(x => (77, x + 2)).toDF("b", "c")

       

          val df5 = df1.union(df2)

          val df6 = df3.union(df4)

       

          df5.createOrReplaceTempView("table1")

          df6.createOrReplaceTempView("table2")

      InnerJoin

      sql("select p.*, f.* from table2 p join table1 f on f.b = p.b and f.c > p.c").count

      number of output rows: 5,580,000

      number of matched pairs: 90,000,490,000

      FullOuterJoin

      spark.sql("select p.*, f.* from table2 p full outer join table1 f on f.b = p.b and f.c > p.c").count

      number of output rows: 6,099,964

      number of matched pairs: 90,000,490,000

      LeftOuterJoin

      sql("select p.*, f.* from table2 p left outer join table1 f on f.b = p.b and f.c > p.c").count

      number of output rows: 6,079,964

      number of matched pairs: 90,000,490,000

      RightOuterJoin

      spark.sql("select p.*, f.* from table2 p right outer join table1 f on f.b = p.b and f.c > p.c").count

      number of output rows: 5,600,000

      number of matched pairs: 90,000,490,000

      LeftSemiJoin

      spark.sql("select * from table2 p left semi join table1 f on f.b = p.b and f.c > p.c").count

      number of output rows: 36

      number of matched pairs: 89,994,910,036

      CrossJoin

      spark.sql("select p., f. from table2 p cross join table1 f on f.b = p.b and f.c > p.c").count

      number of output rows: 5,580,000

      number of matched pairs: 90,000,490,000

      LeftAntiJoin

      spark.sql("select * from table2 p anti join table1 f on f.b = p.b and f.c > p.c").count

      number of output rows: 499,964

      number of matched pairs: 89,994,910,036

      Attachments

        Activity

          People

            Unassigned Unassigned
            sririshindra Srinivas Rishindra Pothireddi
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

            Dates

              Created:
              Updated: