Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
3.0.3, 3.1.2
-
None
Description
In some corner cases, DSV2 is not updating the input metrics.
This is very special case where the number of records read are less than 1000 and hasNext is not called for last element(cz input.hasNext returns false so MetricsIterator.hasNext is not called)
hasNext implementation of MetricsIterator
override def hasNext: Boolean = { if (iter.hasNext) { true } else { metricsHandler.updateMetrics(0, force = true) false }
You reproduce this issue easily in spark-shell by running below code
import scala.collection.mutable import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}spark.conf.set("spark.sql.sources.useV1SourceList", "") val dir = "Users/tmp1" spark.range(0, 100).write.format("parquet").mode("overwrite").save(dir) val df = spark.read.format("parquet").load(dir) val bytesReads = new mutable.ArrayBuffer[Long]() val recordsRead = new mutable.ArrayBuffer[Long]()val bytesReadListener = new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { bytesReads += taskEnd.taskMetrics.inputMetrics.bytesRead recordsRead += taskEnd.taskMetrics.inputMetrics.recordsRead } } spark.sparkContext.addSparkListener(bytesReadListener) try { df.limit(10).collect() assert(recordsRead.sum > 0) assert(bytesReads.sum > 0) } finally { spark.sparkContext.removeSparkListener(bytesReadListener) }
This code generally fails at assert(bytesReads.sum > 0) which confirms that updateMetrics API is not called