This is the mapreduce part of the latest HADOOP-13786 patch. It adds
- the notion of a committer factory which is given the job configuration and destinatioon path, and returns the committer to use for this job.
- The ability to declare a committer factory for org.apache.hadoop.mapreduce.lib.output.FileOutputFormat in the option mapreduce.outputcommitter.factory.class
- The ability to register the default committer to use for different file formats (searched for if mapreduce.outputcommitter.factory.class is unset. This lets you define different committers for s3a, wasb, etc
- the default committer factory for creating, FileOutputCommitter instances
- Another committer factory for creating a named instance of PathOutputCommitter (see
MAPREDUCE-6956), though the property mapreduce.outputcommitter.named.classname
- Static methods to make this straightforward to use
- FileOutputFormat wired up to use this factory mechanism.
- Tests for all this
As a result of this, you can now define a committer factory/committer class for file output on an explicit per-job or implicit per-destination basis. As these are subclasses of PathOutputCommitter, they don't have to go near the complexity of FileOutputFormat and its existing ls/rename/merge strategy, which is high performance and reliable for "real" filesystems, but not for object stores or other destinations.
Because the factories are used in FileOutputFormat, all file output formats which don't provide their own committer will get the feature too. Those which do provide their own committer (ParquetOutputFormat) don't pick up this new feature. It's left to applications to sort that out by calling the PathOutputCommitterFactory directly (which I have been doing downstream)
The per-dest logic is used in HADOOP-13786 for committer factories, most interesting being the "DynamicCommitterFactory" which chooses the actual committer based on the settings of the destination bucket. Thats why the complexity of config -> factory instance -> committer instance is important: it offloads the more complex decision making to the factories. The per-fs-schema logic allows us to declare different factories for different filesystems, and the factories making the final call
mapreduce.outputcommitter.factory.scheme.s3a = org.apache.hadoop.fs.s3a.commit.DynamicCommitterFactory
At the same time, jobs can override this with their own decisions, especially with a simple factory which just instantiates a committer class from the propery mapreduce.outputcommitter.named.classname. This gives the MRv2 output formats the same feature that MRv1 has long had: the ability to declare a new output committer for file outputs
mapreduce.outputcommitter.factory.class = org.apache.hadoop.mapreduce.lib.output.NamedCommitterFactory
mapreduce.outputcommitter.named.classname = YOUR-COMMITTER-CLASSNAME
This does not have any impact on normal commit operations, as the default factory creates FileOutputCommitters, as before. It merely offers the ability to change committers & makes it easy to do this for specific filesystems.
This has now been tested all the way through Spark; with a change to its parquet commit logic (
SPARK-22217) and a special PathOutputCommitter, this feature lets us write data direct to S3 through Hadoop MR and spark, with ORC, Parquet and other formats. It is also lined up to support any other committers/filesystem committers people write.
Testing: unit tests here, functional ITests in hadoop-aws, downstream tests elsewhere.
Things to consider/refine
- Config option names?
- Best way to document. I've got an empty option in mapred-default.xml, otherwise its all in the javadocs.
- Should the factory code all go into its own package, e.g. org.apache.hadoop.mapreduce.lib.output.factory?