Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.1.2, 2.2.0
Description
this is a bug in binaryFiles - even though we give it the partitions, binaryFiles ignores it.
This is a bug introduced in spark 2.1 from spark 2.0, in file PortableDataStream.scala the argument “minPartitions” is no longer used (with the push to master on 11/7/6):
/** Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API which is set through setMaxSplitSize */ def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) val defaultParallelism = sc.defaultParallelism val files = listStatus(context).asScala val totalBytes = files.filterNot(.isDirectory).map(.getLen + openCostInBytes).sum val bytesPerCore = totalBytes / defaultParallelism val maxSplitSize = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) super.setMaxSplitSize(maxSplitSize) }
The code previously, in version 2.0, was:
def setMinPartitions(context: JobContext, minPartitions: Int) {
val totalLen = listStatus(context).asScala.filterNot(.isDirectory).map(.getLen).sum
val maxSplitSize = math.ceil(totalLen / math.max(minPartitions, 1.0)).toLong
super.setMaxSplitSize(maxSplitSize)
}
The new code is very smart, but it ignores what the user passes in and uses the data size, which is kind of a breaking change in some sense
In our specific case this was a problem, because we initially read in just the files names and only after that the dataframe becomes very large, when reading in the images themselves – and in this case the new code does not handle the partitioning very well.
I’m not sure if it can be easily fixed because I don’t understand the full context of the change in spark (but at the very least the unused parameter should be removed to avoid confusion).