Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Not A Problem
-
2.2.0, 2.2.1
-
None
-
None
Description
Running in (py)spark 2.2.
Marking this as PySpark, but have not confirmed whether this is Spark-wide; I've observed it in pyspark which is my preferred API.
df = spark.sql( """ select <redacted> from <inputtbl> where <conditions """ ) df.coalesce(32).write.parquet(...)
The above code will only attempt to use 32 tasks to read and process all of the original input data. This compares to
df = spark.sql( """ select <redacted> from <inputtbl> where <conditions """ ).cache() df.count() df.coalesce(32).write.parquet(...)
where this will use the full complement of tasks available to the cluster to do the initial filter, with a subsequent shuffle to coalesce and write. The latter execution path is way more efficient, particularly at large volumes where filtering will remove most records and should be the default. Note that in the real setting in which I am running this, I'm operating a 20 node cluster with 16 cores and 56gb RAM per machine, and processing well over a TB of raw data in <inputtbl>. The scale of the task I am testing on generates approximately 300,000 read tasks in the latter version of the code when not constrained by the former's execution plan.