Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-37290

Exponential planning time in case of non-deterministic function

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 3.1.2
    • 3.1.3, 3.3.0, 3.2.2
    • SQL
    • None

    Description

      We are experiencing an exponential growth of processing time in case of some DataFrame queries including non-deterministic functions. I could create a small example program, which can be pasted into the Spark shell for reproducing the issue:

      val adselect_raw = spark.createDataFrame(Seq(("imp-1",1),("imp-2",2)))
          .cache()
      val adselect = adselect_raw.select(
              expr("uuid()").alias("userUuid"),
              expr("_1").alias("impressionUuid"),
              expr("_1").alias("accessDateTime"),
              expr("_1").alias("publisher"),
              expr("_1").alias("site"),
              expr("_1").alias("placement"),
              expr("_1").alias("advertiser"),
              expr("_1").alias("campaign"),
              expr("_1").alias("lineItem"),
              expr("_1").alias("creative"),
              expr("_1").alias("browserLanguage"),
              expr("_1").alias("geoLocode"),
              expr("_1").alias("osFamily"),
              expr("_1").alias("osName"),
              expr("_1").alias("browserName"),
              expr("_1").alias("referrerDomain"),
              expr("_1").alias("placementIabCategory"),
              expr("_1").alias("placementDeviceGroup"),
              expr("_1").alias("placementDevice"),
              expr("_1").alias("placementVideoType"),
              expr("_1").alias("placementSection"),
              expr("_1").alias("placementPlayer"),
              expr("_1").alias("demandType"),
              expr("_1").alias("techCosts"),
              expr("_1").alias("mediaCosts"),
              expr("_1").alias("directSPrice"),
              expr("_1").alias("network"),
              expr("_1").alias("deviceSetting"),
              expr("_1").alias("placementGroup"),
              expr("_1").alias("postalCode"),
              expr("_1").alias("householdId")
          )
      
      val adcount_raw = spark.createDataFrame(Seq(("imp-1", 1), ("imp-2", 2)))
      val adcount = adcount_raw.select(
              expr("_1").alias("impressionUuid"),
              expr("_2").alias("accessDateTime")
          )
      
      val result =  adselect.join(adcount, Seq("impressionUuid"))
      result.explain()
      

      Further reducing the program (for example by removing the join or the cache) did not show the problem any more.

      The problem occurs during planning time and debugging lead me to the function UnaryNode.getAllValidConstraints where the local variable allConstraints grew with an apparently exponential number of entries for the non-deterministic function "uuid()" in the code example above. Every time a new column from the large select is processed in the foreach loop in the function UnaryNode.getAllValidConstraints, the number of entries for the uuid() column in the ExpressionSet seems to be doubled:

      trait UnaryNode extends LogicalPlan with UnaryLike[LogicalPlan] {
        override def getAllValidConstraints(projectList: Seq[NamedExpression]): ExpressionSet = {
          var allConstraints = child.constraints
          projectList.foreach {
            case a @ Alias(l: Literal, _) =>
              allConstraints += EqualNullSafe(a.toAttribute, l)
            case a @ Alias(e, _) =>
              // KK: Since the ExpressionSet handles each non-deterministic function as a separate entry, each "uuid()" entry in allConstraints is re-added over an over again in every iteration, 
              // thereby doubling the list every time    
              allConstraints ++= allConstraints.map(_ transform {
                case expr: Expression if expr.semanticEquals(e) =>
                  a.toAttribute
              })
              allConstraints += EqualNullSafe(e, a.toAttribute)
            case _ => // Don't change.
          }
      
          allConstraints
        }
      }
      

      As a workaround, we moved the uuid() column in our code to the end of the list in the select statement, which solved the issue (since all other columns were already processed in the foreach loop).

      Attachments

        Activity

          People

            kupferk Kaya Kupferschmidt
            kupferk Kaya Kupferschmidt
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: