Details
Description
If coalesce transformation with small number of output partitions (in my case 16) is applied to large Parquet file (in my has about 150Gb with 215k partitions), then it case OutOfMemory exceptions 250Gb is not enough) and open file limit exhaustion (with limit set to 8k).
The source of the problem is in SqlNewHad\oopRDD.compute method:
val reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => close())
Created Parquet file reader is intended to be closed at task completion time. This reader contains a lot of references to parquet.bytes.BytesInput object which in turn contains reference sot large byte arrays (some of them are several megabytes).
As far as in case of CoalescedRDD task is completed only after processing larger number of parquet files, it cause file handles exhaustion and memory overflow.
Attachments
Issue Links
- links to