WholeStageCodegen after InMemoryTableScan task takes significant time and time increases based on the input size



    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.1
    • None
    • Structured Streaming


      Scenario :  I am doing a left outer join between Sreaming dataframe and Static dataframe and writing result to kafka target. Static dataframe is created with Hive Source and Streaming dataframe is created with kafka source. And joining both the dataframe with equal condition. Here is sample program.


      package com.spark.exec;
      import org.apache.spark._
      import org.apache.spark.rdd._
      import org.apache.spark.storage.StorageLevel._
      import org.apache.spark.sql._
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.functions._
      import org.apache.spark.sql.functions.{ broadcast => infabroadcast }
      import java.io._
      import java.sql.Timestamp
      import scala.reflect.ClassTag
      import scala.collection.JavaConversions._
      import org.apache.spark.sql.streaming._
      import org.apache.spark.sql.streaming.Trigger._
      import java.util.UUID.randomUUID
      import org.apache.spark.storage.StorageLevel
      object Spark0 {
      def main(s:Array[String]) {
      val sqlContext = SparkSession.builder().enableHiveSupport().getOrCreate()
      import sqlContext.implicits._
      import org.apache.spark.sql.functions.{stddev_samp, var_samp}
      val v1 = sqlContext.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:49092").option("subscribe", "source").load().toDF();
      val schema = StructType(List(StructField("id", IntegerType, true), StructField("name", StringType, true)))
      val stream = v1.selectExpr("cast (value as string) as json")
      .select(from_json($"json", schema=schema) as "data")
      val static = sqlContext.sql("SELECT hive_lookup.c0 as id, hive_lookup.c1 as name FROM default.hive_lookup").repartition(18).persist(StorageLevel.MEMORY_AND_DISK).toDF;
      val joinDF = stream.join(static, stream.col("id").equalTo(static.col("id")), "left_outer")
      val result = joinDF.selectExpr("to_json(struct(*)) AS value")
      val UUID = randomUUID().toString
      val checkpoint = "/tmp/" + UUID
      result.writeStream.format("kafka").option("kafka.bootstrap.servers", "localhost:49092")
      .option("topic", "target").options(Map(Tuple2("batch.size", "16384"), Tuple2("metadata.fetch.timeout.ms", "10000"), Tuple2("linger.ms", "1000")))
      .option("checkpointLocation", checkpoint).trigger(Trigger.ProcessingTime(20000L)).start()
      val activeStreams = sqlContext.streams.active
      activeStreams.foreach( stream => stream.awaitTermination())


      On the static dataframe applied repartition and persist function.

      val static = sqlContext.sql("SELECT hive_lookup.c0 as id, hive_lookup.c1 as name FROM default.hive_lookup").repartition(18).persist(StorageLevel.MEMORY_AND_DISK).toDF;

      What i observed in Spark UI that WholeStageCodegen after InMemoryTableScan task takes is taking significant amount of time in every batch which degrade the performance. And time increases for large amount of dataset in Hive source (static datafrme). we have already persisted the data after reparation. What is WholeStageCodegen is doing here which is taking significant amount of time based on the hive source dataset? Is this happening as per design?

      Expectation is that when we have partitioned and persisted the data frame in memory or disk than we should just need to read the data from memory and pass it to joiner to join the data.



