Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-21443

Very long planning duration for queries with lots of operations



    • Bug
    • Status: Resolved
    • Minor
    • Resolution: Incomplete
    • 2.2.0
    • None
    • SQL, Structured Streaming


      Creating a streaming query with large amount of operations and fields (100+) results in a very long query planning phase. in the example bellow, the plan phase has taken 35 seconds while the actual batch execution took only 1.3 second.
      after some investigation, i have found out that the root causes of this are 2 optimizer rules which seems to take most of the planning time: InferFiltersFromConstraints and PruneFilters

      I would suggest the following:

      1. fix the inefficient optimizer rules
      2. add warn level logging if a rule has taken more then xx ms
      3. allow custom removing of optimizer rules (opposite to spark.experimental.extraOptimizations)
      4. reuse query plans (optional) where possible

      reproducing this issue can be done with the bellow script which simulates the scenario:

      import org.apache.spark.sql.SparkSession
      import org.apache.spark.sql.execution.streaming.MemoryStream
      import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent}
      import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQueryListener}
      case class Product(pid: Long, name: String, price: Long, ts: Long = System.currentTimeMillis())
      case class Events (eventId: Long, eventName: String, productId: Long) {
      	def this(id: Long) = this(id, s"event$id", id%100)
      object SparkTestFlow {
      	def main(args: Array[String]): Unit = {
      		val spark = SparkSession
      		spark.sqlContext.streams.addListener(new StreamingQueryListener {
      			override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
      			override def onQueryProgress(event: QueryProgressEvent): Unit = {
      				if (event.progress.numInputRows>0) {
      			override def onQueryStarted(event: QueryStartedEvent): Unit = {}
      		import spark.implicits._
      		implicit val  sclContext = spark.sqlContext
      		import org.apache.spark.sql.functions.expr
      		val seq = (1L to 100L).map(i => Product(i, s"name$i", 10L*i))
      		val lookupTable = spark.createDataFrame(seq)
      		val inputData = MemoryStream[Events]
      		inputData.addData((1L to 100L).map(i => new Events(i)))
      		val events = inputData.toDF()
      		  .withColumn("w1", expr("0"))
      		  .withColumn("x1", expr("0"))
      		  .withColumn("y1", expr("0"))
      		  .withColumn("z1", expr("0"))
      		val numberOfSelects = 40 // set to 100+ and the planning takes forever
      		val dfWithSelectsExpr = (2 to numberOfSelects).foldLeft(events)((df,i) =>{
      			val arr = df.columns.++(Array(s"w${i-1} + rand() as w$i", s"x${i-1} + rand() as x$i", s"y${i-1} + 2 as y$i", s"z${i-1} +1 as z$i"))
      		val withJoinAndFilter = dfWithSelectsExpr
      		  .join(lookupTable, expr("productId = pid"))
      		  .filter("productId < 50")
      		val query = withJoinAndFilter.writeStream

      the query progress output will show:

      "durationMs" : {
          "addBatch" : 1310,
          "getBatch" : 6,
          "getOffset" : 0,
          "*queryPlanning*" : 36924,
          "triggerExecution" : 38297,
          "walCommit" : 33




            Unassigned Unassigned
            eyalzit Eyal Zituny
            0 Vote for this issue
            6 Start watching this issue

