Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26569

Fixed point for batch Operator Optimizations never reached when optimize logicalPlan

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Duplicate
    • 2.1.0
    • None
    • SQL
    • None
    •  

       

    Description

      There is a bit complicated Spark App using DataSet api run once a day, and I noticed the app will hang once in a while,
      I add some log and compare two driver log which one belong to successful app, another belong to faied, and here is some results of investigation

      1. Usually the app will running correctly, but sometime it will hang after finishing job 1

      2. According to log I append , the successful app always reach the fixed point when iteration is 7 on Batch Operator Optimizations, but failed app never reached this fixed point.

      2019-01-04,11:35:34,199 DEBUG org.apache.spark.sql.execution.SparkOptimizer: 
      === Result of Batch Operator Optimizations ===
      
      2019-01-04,14:00:42,847 INFO org.apache.spark.sql.execution.SparkOptimizer: iteration is 6/100, for batch Operator Optimizations
      2019-01-04,14:00:42,851 INFO org.apache.spark.sql.execution.SparkOptimizer: 
      === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
       
      2019-01-04,14:00:42,852 INFO org.apache.spark.sql.execution.SparkOptimizer: 
      === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
       
      2019-01-04,14:00:42,903 INFO org.apache.spark.sql.execution.SparkOptimizer: 
      === Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
       
      2019-01-04,14:00:42,939 INFO org.apache.spark.sql.execution.SparkOptimizer: 
      === Applying Rule org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===
       
      2019-01-04,14:00:42,951 INFO org.apache.spark.sql.execution.SparkOptimizer: 
      === Applying Rule org.apache.spark.sql.catalyst.optimizer.PruneFilters ===
       
      2019-01-04,14:00:42,970 INFO org.apache.spark.sql.execution.SparkOptimizer: iteration is 7/100, for batch Operator Optimizations
      2019-01-04,14:00:42,971 INFO org.apache.spark.sql.execution.SparkOptimizer: Fixed point reached for batch Operator Optimizations after 7 iterations.
      
      2019-01-04,14:13:15,616 INFO org.apache.spark.sql.execution.SparkOptimizer: iteration is 45/100, for batch Operator Optimizations
      2019-01-04,14:13:15,619 INFO org.apache.spark.sql.execution.SparkOptimizer: 
      === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
       
      2019-01-04,14:13:15,620 INFO org.apache.spark.sql.execution.SparkOptimizer: 
      === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
       
      2019-01-04,14:13:59,529 INFO org.apache.spark.sql.execution.SparkOptimizer: 
      === Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
       
      2019-01-04,14:13:59,806 INFO org.apache.spark.sql.execution.SparkOptimizer: 
      === Applying Rule org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===
       
      2019-01-04,14:13:59,845 INFO org.apache.spark.sql.execution.SparkOptimizer: iteration is 46/100, for batch Operator Optimizations
      2019-01-04,14:13:59,849 INFO org.apache.spark.sql.execution.SparkOptimizer: 
      === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
       
      2019-01-04,14:13:59,849 INFO org.apache.spark.sql.execution.SparkOptimizer: 
      === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownPredicate ===
       
      2019-01-04,14:14:45,340 INFO org.apache.spark.sql.execution.SparkOptimizer: 
      === Applying Rule org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
       
      2019-01-04,14:14:45,631 INFO org.apache.spark.sql.execution.SparkOptimizer: 
      === Applying Rule org.apache.spark.sql.catalyst.optimizer.BooleanSimplification ===
       
      2019-01-04,14:14:45,678 INFO org.apache.spark.sql.execution.SparkOptimizer: iteration is 47/100, for batch Operator Optimizations
      

      3. The difference between two logical plan appear in BooleanSimplification on iteration, before this rule, two logical plan is same:

      // just a head part of plan
      Project [model#2486, version#12, device#11, date#30, imei#13, pType#14, releaseType#15, regionType#16, expType#17, tag#18, imeiCount#1586, startdate#2360, enddate#2361, status#2362]
      +- Join Inner, (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || (model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) && model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) && ((((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || (model#2358 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) && model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) && (date#30 >= startdate#2360)) && (date#30 <= enddate#2361)) && (model#2486 = model#2358)) && (version#12 = version#2359)))
         :- Project [device#11, version#12, date#30, imei#13, pType#14, releaseType#15, regionType#16, expType#17, tag#18, imeiCount#1586, UDF(model#10, device#11) AS model#2486]
         :  +- Filter ((((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || UDF(model#10, device#1584) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))) && ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || UDF(model#1583, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) && isnotnull(UDF(model#1583, device#11))) && isnotnull(UDF(model#10, device#1584))) && (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || UDF(model#10, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))) && isnotnull(UDF(model#10, device#11))))
         :     +- Join Inner, ((((model#10 = model#1583) && (device#11 = device#1584)) && (version#12 = version#1585)) && (date#30 = date#1592))
      

      4. after BooleanSimplification, There is only one difference: Filter's constraints format like (A || B) && (A || C) on successful app but A && (B || C) on failed app.

      //successful app's logical plan
      Filter ((((isnotnull(UDF(model#1583, device#11)) && ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || UDF(model#1583, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) && isnotnull(UDF(model#10, device#1584))) && ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || UDF(model#10, device#1584) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) && (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || UDF(model#10, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))) && isnotnull(UDF(model#10, device#11))))
      
      // failed app's plan
      Filter (((((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || (UDF(model#10, device#1584) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) && UDF(model#1583, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))) && isnotnull(UDF(model#1583, device#11))) && isnotnull(UDF(model#10, device#1584))) && (((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || UDF(model#10, device#11) INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))) && isnotnull(UDF(model#10, device#11))))
      

      5. hereafter, error occurred with rule InferFiltersFromConstraints:

      // code about upon log
      case join @ Join(left, right, joinType, conditionOpt) =>
            // Only consider constraints that can be pushed down completely to either the left or the
            // right child
            val constraints = join.constraints.filter { c =>
              c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)
            }
      
            // Remove those constraints that are already enforced by either the left or the right child
            val additionalConstraints = constraints -- (left.constraints ++ right.constraints)
      
            logInfo(
              s"""
                 | After remove constraints additional constraints is  ${additionalConstraints.toList.toString}
                 | left constraints is ${left.constraints.toList.toString()}
                 | right constraints is ${right.constraints.toList.toString()}
              """.stripMargin)
      
      // successful app's log
      2019-01-08,16:44:48,911 INFO org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints:
       After remove constraints additional constraints is  List(((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || (model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) && model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))))
       left constraints is List(isnotnull(model#2486), isnotnull(device#11), isnotnull(date#30), ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))), ((((((((((((((((((((((((((((((20190107 <=> date#30) || (20190106 <=> date#30)) || (20190105 <=> date#30)) || (20190104 <=> date#30)) || (20190103 <=> date#30)) || (20190102 <=> date#30)) || (20190101 <=> date#30)) || (20181231 <=> date#30)) || (20181230 <=> date#30)) || (20181229 <=> date#30)) || (20181228 <=> date#30)) || (20181227 <=> date#30)) || (20181226 <=> date#30)) || (20181225 <=> date#30)) || (20181224 <=> date#30)) || (20181223 <=> date#30)) || (20181222 <=> date#30)) || (20181221 <=> date#30)) || (20181220 <=> date#30)) || (20181219 <=> date#30)) || (20181218 <=> date#30)) || (20181217 <=> date#30)) || (20181216 <=> date#30)) || (20181215 <=> date#30)) || (20181214 <=> date#30)) || (20181213 <=> date#30)) || (20181212 <=> date#30)) || (20181211 <=> date#30)) || (20181210 <=> date#30)) || (20181209 <=> date#30)), isnotnull(version#12))
       right constraints is List(isnotnull(enddate#2361), isnotnull(startdate#2360), isnotnull(model#2358), isnotnull(version#2359))
      
      //failed app's log
      2019-01-08,16:55:11,614 INFO org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints:
       After remove constraints additional constraints is  List()
       left constraints is List(isnotnull(date#30), ((((((((((((((((((((((((((((((20190107 <=> date#30) || (20190106 <=> date#30)) || (20190105 <=> date#30)) || (20190104 <=> date#30)) || (20190103 <=> date#30)) || (20190102 <=> date#30)) || (20190101 <=> date#30)) || (20181231 <=> date#30)) || (20181230 <=> date#30)) || (20181229 <=> date#30)) || (20181228 <=> date#30)) || (20181227 <=> date#30)) || (20181226 <=> date#30)) || (20181225 <=> date#30)) || (20181224 <=> date#30)) || (20181223 <=> date#30)) || (20181222 <=> date#30)) || (20181221 <=> date#30)) || (20181220 <=> date#30)) || (20181219 <=> date#30)) || (20181218 <=> date#30)) || (20181217 <=> date#30)) || (20181216 <=> date#30)) || (20181215 <=> date#30)) || (20181214 <=> date#30)) || (20181213 <=> date#30)) || (20181212 <=> date#30)) || (20181211 <=> date#30)) || (20181210 <=> date#30)) || (20181209 <=> date#30)), isnotnull(version#12), isnotnull(device#11), isnotnull(model#2486), ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))), ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || (model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) && model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)))))
       right constraints is List(isnotnull(enddate#2361), isnotnull(startdate#2360), isnotnull(version#2359), isnotnull(model#2358))
      

      Failed app plan's left child has an additional constraint:

      ((cast(cast(imeiCount#1586 as decimal(20,0)) as int) > 10000) || (model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael)) && model#2486 INSET (grus(grus),lyra(lyra),lavender(lavender),violet(violet),andromeda(andromeda),davinci(davinci),Cepheus(cepheus),onc(onc),aquila(aquila),tiare(tiare),raphael(raphael))))
      

      5. Soon after the gap between two app's plan is getting bigger, one will successful, another hang. It seems there are two possibly reason :
      1. BooleanSimplification is not idempotent
      2. InferFiltersFromConstraints's behavior not correct when child's constraints has A || ( B && C) instead of (A || B) && (A || C)

      I'm not sure which is root casue, could someone follow this question?

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              chenfan Chen Fan
              Votes:
              1 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: