Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26691

WholeStageCodegen after InMemoryTableScan task takes significant time and time increases based on the input size

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.1
    • None
    • Structured Streaming

    Description

      Scenario :  I am doing a left outer join between Sreaming dataframe and Static dataframe and writing result to kafka target. Static dataframe is created with Hive Source and Streaming dataframe is created with kafka source. And joining both the dataframe with equal condition. Here is sample program.

       

      package com.spark.exec;
      
      import org.apache.spark._
      import org.apache.spark.rdd._
      import org.apache.spark.storage.StorageLevel._
      import org.apache.spark.sql._
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.functions.{ broadcast => infabroadcast }
      import java.io._
      import java.sql.Timestamp
      import scala.reflect.ClassTag
      import scala.collection.JavaConversions._
      import org.apache.spark.sql.streaming._
      import org.apache.spark.sql.streaming.Trigger._
      import java.util.UUID.randomUUID
      import org.apache.spark.storage.StorageLevel
      
      
      object Spark0 {
      
      def main(s:Array[String]) {
      val sqlContext = SparkSession.builder().enableHiveSupport().getOrCreate()
      import sqlContext.implicits._
      import org.apache.spark.sql.functions.{stddev_samp, var_samp}
      
      val v1 = sqlContext.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:49092").option("subscribe", "source").load().toDF();
      
      val schema = StructType(List(StructField("id", IntegerType, true), StructField("name", StringType, true)))
      
      val stream = v1.selectExpr("cast (value as string) as json")
      .select(from_json($"json", schema=schema) as "data")
      .select("data.*")
      
      val static = sqlContext.sql("SELECT hive_lookup.c0 as id, hive_lookup.c1 as name FROM default.hive_lookup").repartition(18).persist(StorageLevel.MEMORY_AND_DISK).toDF;
      
      val joinDF = stream.join(static, stream.col("id").equalTo(static.col("id")), "left_outer")
      val result = joinDF.selectExpr("to_json(struct(*)) AS value")
      
      val UUID = randomUUID().toString
      val checkpoint = "/tmp/" + UUID
      result.writeStream.format("kafka").option("kafka.bootstrap.servers", "localhost:49092")
      .option("topic", "target").options(Map(Tuple2("batch.size", "16384"), Tuple2("metadata.fetch.timeout.ms", "10000"), Tuple2("linger.ms", "1000")))
      .option("checkpointLocation", checkpoint).trigger(Trigger.ProcessingTime(20000L)).start()
      
      val activeStreams = sqlContext.streams.active
      activeStreams.foreach( stream => stream.awaitTermination())
      }
      }
      

       

      On the static dataframe applied repartition and persist function.

      val static = sqlContext.sql("SELECT hive_lookup.c0 as id, hive_lookup.c1 as name FROM default.hive_lookup").repartition(18).persist(StorageLevel.MEMORY_AND_DISK).toDF;

      What i observed in Spark UI that WholeStageCodegen after InMemoryTableScan task takes is taking significant amount of time in every batch which degrade the performance. And time increases for large amount of dataset in Hive source (static datafrme). we have already persisted the data after reparation. What is WholeStageCodegen is doing here which is taking significant amount of time based on the hive source dataset? Is this happening as per design?

      Expectation is that when we have partitioned and persisted the data frame in memory or disk than we should just need to read the data from memory and pass it to joiner to join the data.

       

      Attachments

        1. DataScale_LkpPolicy_FirstRow_SF50_JobDetail.png
          267 kB
          Vikash Kumar
        2. DataScale_LkpPolicy_FirstRow_SF1_JobDetail.png
          337 kB
          Vikash Kumar
        3. WholeStageCodegen.PNG
          29 kB
          Vikash Kumar

        Activity

          People

            Unassigned Unassigned
            krvikash Vikash Kumar
            Votes:
            0 Vote for this issue
            Watchers:
            1 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: