Details
-
Question
-
Status: Open
-
Major
-
Resolution: Unresolved
-
3.2.0
-
None
-
None
-
files:
- ORC with snappy compression
- 232 GB files on disk
- 1800 files on disk (pretty sure no individual file is over 200MB)
- 9 partitions on disk
cluster:
- EMR 6.6.0 (spark 3.2.0)
- cluster: 288 vCPU (executors), 1.1TB memory (executors)
OS info:
LSB Version: :core-4.1-amd64:core-4.1-noarch:cxx-4.1-amd64:cxx-4.1-noarch:desktop-4.1-amd64:desktop-4.1-noarch:languages-4.1-amd64:languages-4.1-noarch:printing-4.1-amd64:printing-4.1-noarch
Distributor ID: Amazon
Description: Amazon Linux release 2 (Karoo)
Release: 2
Codename: Karoofiles: ORC with snappy compression 232 GB files on disk 1800 files on disk (pretty sure no individual file is over 200MB) 9 partitions on disk cluster: EMR 6.6.0 (spark 3.2.0) cluster: 288 vCPU (executors), 1.1TB memory (executors) OS info: LSB Version: :core-4.1-amd64:core-4.1-noarch:cxx-4.1-amd64:cxx-4.1-noarch:desktop-4.1-amd64:desktop-4.1-noarch:languages-4.1-amd64:languages-4.1-noarch:printing-4.1-amd64:printing-4.1-noarch Distributor ID: Amazon Description: Amazon Linux release 2 (Karoo) Release: 2 Codename: Karoo
Description
Why does `spark.sql.files.maxPartitionBytes` estimate the number of partitions based on file size on disk instead of the uncompressed file size?
For example I have a dataset that is 213GB on disk. When I read this in to my application I get 2050 partitions based on the default value of 128MB for maxPartitionBytes. My application is a simple broadcast index join that adds 1 column to the dataframe and writes it out. There is no shuffle.
Initially the size of input /output records seem ok, but I still get a large amount of memory "spill" on the executors. I believe this is due to the data being highly compressed and each partition becoming too big when it is deserialized to work on in memory.
(If I try to do a repartition immediately after reading I still see the first stage spilling memory to disk, so that is not the right solution or what I'm interested in.)
Instead, I attempt to lower maxPartitionBytes by the (average) compression ratio of my files (about 7x, so let's round up to 8). So I set maxPartitionBytes=16MB. At this point I see that spark is reading in from the file in 12-28 MB chunks. Now it makes 14316 partitions on the initial file read and completes with no spillage.
Is there something I'm missing here? Is this just intended behavior? How can I tune my partition size correctly for my application when I do not know how much the data will be compressed ahead of time?
Attachments
Attachments
Issue Links
- is related to
-
SPARK-24914 totalSize is not a good estimate for broadcast joins
- In Progress