Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21443

Very long planning duration for queries with lots of operations

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Incomplete
    • 2.2.0
    • None
    • SQL, Structured Streaming

    Description

      Creating a streaming query with large amount of operations and fields (100+) results in a very long query planning phase. in the example bellow, the plan phase has taken 35 seconds while the actual batch execution took only 1.3 second.
      after some investigation, i have found out that the root causes of this are 2 optimizer rules which seems to take most of the planning time: InferFiltersFromConstraints and PruneFilters

      I would suggest the following:

      1. fix the inefficient optimizer rules
      2. add warn level logging if a rule has taken more then xx ms
      3. allow custom removing of optimizer rules (opposite to spark.experimental.extraOptimizations)
      4. reuse query plans (optional) where possible

      reproducing this issue can be done with the bellow script which simulates the scenario:

      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.execution.streaming.MemoryStream
      import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent}
      import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQueryListener}
      
      case class Product(pid: Long, name: String, price: Long, ts: Long = System.currentTimeMillis())
      
      case class Events (eventId: Long, eventName: String, productId: Long) {
      	def this(id: Long) = this(id, s"event$id", id%100)
      }
      
      object SparkTestFlow {
      	def main(args: Array[String]): Unit = {
      		val spark = SparkSession
      		  .builder
      		  .appName("TestFlow")
      		  .master("local[8]")
      		  .getOrCreate()
      
      		spark.sqlContext.streams.addListener(new StreamingQueryListener {
      			override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
      			override def onQueryProgress(event: QueryProgressEvent): Unit = {
      				if (event.progress.numInputRows>0) {
      					println(event.progress.toString())
      				}
      			}
      			override def onQueryStarted(event: QueryStartedEvent): Unit = {}
      		})
      		
      		import spark.implicits._
      		implicit val  sclContext = spark.sqlContext
      		import org.apache.spark.sql.functions.expr
      
      		val seq = (1L to 100L).map(i => Product(i, s"name$i", 10L*i))
      		val lookupTable = spark.createDataFrame(seq)
      
      		val inputData = MemoryStream[Events]
      		inputData.addData((1L to 100L).map(i => new Events(i)))
      
      		val events = inputData.toDF()
      		  .withColumn("w1", expr("0"))
      		  .withColumn("x1", expr("0"))
      		  .withColumn("y1", expr("0"))
      		  .withColumn("z1", expr("0"))
      
      		val numberOfSelects = 40 // set to 100+ and the planning takes forever
      		val dfWithSelectsExpr = (2 to numberOfSelects).foldLeft(events)((df,i) =>{
      			val arr = df.columns.++(Array(s"w${i-1} + rand() as w$i", s"x${i-1} + rand() as x$i", s"y${i-1} + 2 as y$i", s"z${i-1} +1 as z$i"))
      			df.selectExpr(arr:_*)
      		})
      
      		val withJoinAndFilter = dfWithSelectsExpr
      		  .join(lookupTable, expr("productId = pid"))
      		  .filter("productId < 50")
      
      		val query = withJoinAndFilter.writeStream
      		  .outputMode("append")
      		  .format("console")
      		  .trigger(ProcessingTime(2000))
      		  .start()
      
      		query.processAllAvailable()
      		spark.stop()
      	}
      }
      

      the query progress output will show:

      "durationMs" : {
          "addBatch" : 1310,
          "getBatch" : 6,
          "getOffset" : 0,
          "*queryPlanning*" : 36924,
          "triggerExecution" : 38297,
          "walCommit" : 33
        }
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            eyalzit Eyal Zituny
            Votes:
            0 Vote for this issue
            Watchers:
            6 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: