Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Duplicate
-
3.0.0
-
None
-
None
Description
We expect spark uses parquet metadata to fetch the rows count of a parquet file. But when we execute the following code
import org.apache.spark.SparkConf import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} object Test extends App { val sparkConf = new SparkConf() .setAppName("test-app") .setMaster("local[1]") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() import sparkSession.implicits._ val filePath = "./tempFile.parquet" (1 to 1000).toDF("c1") .repartition(10) .write .mode("overwrite") .parquet(filePath) val df = sparkSession.read.parquet(filePath) var rowsInHeavyComputation = 0 def heavyComputation(row: Row): Row = { rowsInHeavyComputation += 1 println(s"rowsInHeavyComputation = $rowsInHeavyComputation") Thread.sleep(50) row } implicit val encoder: ExpressionEncoder[Row] = RowEncoder(df.schema) val cnt = df .map(row => heavyComputation(row)) // map operation cannot change number of rows .count() println(s"counting done, cnt=$cnt") }
we see
rowsInHeavyComputation = 1 rowsInHeavyComputation = 2 ... rowsInHeavyComputation = 999 rowsInHeavyComputation = 1000 counting done, cnt=1000
Expected result - spark does not perform heavyComputation at all.
P.S. In our real application we:
- transform data from parquet files
- return some examples (50 rows and spark does heavyComputation only for 50 rows)
- return rows count of the whole DataFrame and here spark for some reason computes the whole DataFrame despite the fact there are only map operations and initial rows count can be gotten from parquet meta
Attachments
Issue Links
- duplicates
-
SPARK-34952 DS V2 Aggregate push down
- Resolved