Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-26683

Incorrect value of "internal.metrics.input.recordsRead" when reading from temp hive table backed by HDFS file

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Incomplete
    • 2.3.1
    • None
    • Spark Core

    Description

      Issue description

      The summary of the issue is - when persisted DataFrame is used in two different concurrent threads, we are getting wrong value of internal.metrics.input.recordsRead in SparkListenerStageCompleted event.

       

      Issue Details 

      The spark code I have written has 2 source temp hive tables. When the first temp table is read, it's dataframe is persisted. Whereas, for the other temp table, its source dataframe is not persisted. After that, we have 2 pipelines which we run in async fashion. In the 1st pipeline, the persisted dataframe is written to some hive target table. Whereas, in the 2nd pipeline, we are performing a UNION of persisted dataframe with non-persisted dataframe, which is then written to a separate hive table.

      Our expectation is, since the first dataframe is persisted, its metric for recordsRead should be computed exactly once. But in our case, we are seeing an increased value of the metric.

      Example - if my persisted dataframe has 2 rows, the above mentioned metric is consistently reporting it as 3 rows.

       

      Steps to reproduce Issue:

      1. Create directory /tmp/INFA_UNION1 and copy input1.txt to this directory.
      2. Create directory /tmp/INFA_UNION2 and copy input2.txt to this directory.
      3. Run the following in spark-shell:

      scala> :load asyncfactory.scala

      scala> : paste -raw

       

      package org.apache.spark
      
      import org.apache.spark.scheduler._
      import org.apache.spark.util.JsonProtocol
      import org.json4s.jackson.JsonMethods._
      
      class InfaListener(mode:String="ACCUMULATOR") extends org.apache.spark.scheduler.SparkListener {
      
      def onEvent(event: SparkListenerEvent): Unit = {
      val jv = JsonProtocol.sparkEventToJson(event)
      println(compact(jv))
      }
      
      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { onEvent(stageCompleted)}
      override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { onEvent(stageSubmitted)}
      }
      
      

       

      scala> :paste

      import org.apache.spark.InfaListener
      implicit def df2idf(d:DataFrame):InfaDataFrame = new InfaDataFrame(d);
      val sqlc = spark.sqlContext
      val sc = spark.sparkContext
      val lis = new InfaListener("TAG")
      sc.addSparkListener(lis)
      
      sqlc.sql("DROP TABLE IF EXISTS `default`.`read1`")
      sqlc.sql("CREATE TABLE `default`.`read1` (`col0` STRING) LOCATION '/tmp/INFA_UNION1'")
      sqlc.sql("DROP TABLE IF EXISTS `default`.`read2`")
      sqlc.sql("CREATE TABLE `default`.`read2` (`col0` STRING) LOCATION '/tmp/INFA_UNION2'")
      sqlc.sql("DROP TABLE IF EXISTS `default`.`write1`")
      sqlc.sql("CREATE TABLE `default`.`write1` (`col0` STRING)")
      sqlc.sql("DROP TABLE IF EXISTS `default`.`write2`")
      sqlc.sql("CREATE TABLE `default`.`write2` (`col0` STRING)")
      
      val v0 = sqlc.sql("SELECT `read1`.`col0` as a0 FROM `default`.`read1`").itoDF.persist(MEMORY_AND_DISK).where(lit(true));
      async(asBlock(sqlc.sql("INSERT OVERWRITE TABLE `default`.`write1` SELECT tbl0.c0 as a0 FROM tbl0"), v0.unionAll(sqlc.sql("SELECT `read2`.`col0` as a0 FROM `default`.`read2`").itoDF).itoDF("TGT_").itoDF("c").createOrReplaceTempView("tbl0")));
      async(asBlock(sqlc.sql("INSERT OVERWRITE TABLE `default`.`write2` SELECT tbl1.c0 as a0 FROM tbl1"), v0.itoDF("TGT_").itoDF("c").createOrReplaceTempView("tbl1")));
      stop;
      

      NOTE - The above code refers to 2 file directories /tmp/INFA_UNION1 and /tmp/INFA_UNION2. We have attached the files which need to be copied to the above locations after these directories are created.

       

      Attachments

        1. asyncfactory.scala
          2 kB
          Amar Agrawal
        2. input1.txt
          0.0 kB
          Amar Agrawal
        3. input2.txt
          0.0 kB
          Amar Agrawal

        Activity

          People

            Unassigned Unassigned
            Amar Agrawal Amar Agrawal
            Votes:
            1 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: