Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-33013

The constrains may grow exponentially in sql optimizer 'InferFiltersFromConstraints', which leads to driver oom

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Not A Problem
    • 3.0.1
    • None
    • Optimizer, SQL
    • None

    Description

       

       

       Consider the case below:

      Seq((1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)).toDF("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t").write.saveAsTable("test") 
      val df = spark.table("test") 
      val df2 = df.filter("a+b+c+d+e+f+g+h+i+j+k+l+m+n+o+p+q+r+s+t > 100") 
      val df3 = df2.select('a as 'a1, 'b as 'b1, 'c as 'c1, 'd as 'd1, 'e as 'e1, 'f as 'f1, 'g as 'g1, 'h as 'h1, 'i as 'i1, 'j as 'j1, 'k as 'k1, 'l as 'l1, 'm as 'm1, 'n as 'n1, 'o as 'o1, 'p as 'p1, 'q as 'q1, 'r as 'r1, 's as 's1, 't as 't1) 
      val df4 = df3.join(df2, df3("a1") === df2("a")) 
      df4.explain(true)
      

      If you run the this in spark shell, it will got stuck at "df4.explain(true)". The reason is in sql optimizer 'InferFiltersFromConstraints', it will try to infer all the constrains from the plan. And the plan has a constrain contains about 20 columns, each column has an alias. It will try to replace the column with alias, and at the same time keep the origin constrain, that will lead to the constrains grow exponentially. And make driver oom in the end.

      The related code:

        /**
         * Generates all valid constraints including an set of aliased constraints by replacing the
         * original constraint expressions with the corresponding alias
         */
        protected def getAllValidConstraints(projectList: Seq[NamedExpression]): Set[Expression] = {
          var allConstraints = child.constraints.asInstanceOf[Set[Expression]]
          projectList.foreach {
            case a @ Alias(l: Literal, _) =>
              allConstraints += EqualNullSafe(a.toAttribute, l)
            case a @ Alias(e, _) =>
              // For every alias in `projectList`, replace the reference in constraints by its attribute.
              allConstraints ++= allConstraints.map(_ transform {
                case expr: Expression if expr.semanticEquals(e) =>
                  a.toAttribute
              })
              allConstraints += EqualNullSafe(e, a.toAttribute)
            case _ => // Don't change.
          }    
          allConstraints
        }
      

       

       

       
       
       

      Attachments

        Activity

          People

            Unassigned Unassigned
            constzhou zhou xiang
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: