Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Incomplete
-
2.3.1
-
None
Description
Scenario : I am doing a left outer join between Sreaming dataframe and Static dataframe and writing result to kafka target. Static dataframe is created with Hive Source and Streaming dataframe is created with kafka source. And joining both the dataframe with equal condition. Here is sample program.
package com.spark.exec; import org.apache.spark._ import org.apache.spark.rdd._ import org.apache.spark.storage.StorageLevel._ import org.apache.spark.sql._ import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.functions.{ broadcast => infabroadcast } import java.io._ import java.sql.Timestamp import scala.reflect.ClassTag import scala.collection.JavaConversions._ import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.Trigger._ import java.util.UUID.randomUUID import org.apache.spark.storage.StorageLevel object Spark0 { def main(s:Array[String]) { val sqlContext = SparkSession.builder().enableHiveSupport().getOrCreate() import sqlContext.implicits._ import org.apache.spark.sql.functions.{stddev_samp, var_samp} val v1 = sqlContext.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:49092").option("subscribe", "source").load().toDF(); val schema = StructType(List(StructField("id", IntegerType, true), StructField("name", StringType, true))) val stream = v1.selectExpr("cast (value as string) as json") .select(from_json($"json", schema=schema) as "data") .select("data.*") val static = sqlContext.sql("SELECT hive_lookup.c0 as id, hive_lookup.c1 as name FROM default.hive_lookup").repartition(18).persist(StorageLevel.MEMORY_AND_DISK).toDF; val joinDF = stream.join(static, stream.col("id").equalTo(static.col("id")), "left_outer") val result = joinDF.selectExpr("to_json(struct(*)) AS value") val UUID = randomUUID().toString val checkpoint = "/tmp/" + UUID result.writeStream.format("kafka").option("kafka.bootstrap.servers", "localhost:49092") .option("topic", "target").options(Map(Tuple2("batch.size", "16384"), Tuple2("metadata.fetch.timeout.ms", "10000"), Tuple2("linger.ms", "1000"))) .option("checkpointLocation", checkpoint).trigger(Trigger.ProcessingTime(20000L)).start() val activeStreams = sqlContext.streams.active activeStreams.foreach( stream => stream.awaitTermination()) } }
On the static dataframe applied repartition and persist function.
val static = sqlContext.sql("SELECT hive_lookup.c0 as id, hive_lookup.c1 as name FROM default.hive_lookup").repartition(18).persist(StorageLevel.MEMORY_AND_DISK).toDF;
What i observed in Spark UI that WholeStageCodegen after InMemoryTableScan task takes is taking significant amount of time in every batch which degrade the performance. And time increases for large amount of dataset in Hive source (static datafrme). we have already persisted the data after reparation. What is WholeStageCodegen is doing here which is taking significant amount of time based on the hive source dataset? Is this happening as per design?
Expectation is that when we have partitioned and persisted the data frame in memory or disk than we should just need to read the data from memory and pass it to joiner to join the data.