diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java index 14e2679..37228b0 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java @@ -88,7 +88,9 @@ private HCatConstants() { // restrict instantiation * The desired number of input splits produced for each partition. When the * input files are large and few, we want to split them into many splits, * so as to increase the parallelizm of loading the splits. Try also two - * other parameters, mapred.min.split.size and mapred.max.split.size, to + * other parameters, mapred.min.split.size and mapred.max.split.size for + * hadoop 1.x, or mapreduce.input.fileinputformat.split.minsize and + * mapreduce.input.fileinputformat.split.maxsize in hadoop 2.x to * control the number of input splits. */ public static final String HCAT_DESIRED_PARTITION_NUM_SPLITS = diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceInputFormat.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceInputFormat.java index bf2b799..0c3d707 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceInputFormat.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/rcfile/RCFileMapReduceInputFormat.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapreduce.InputSplit; @@ -44,7 +45,9 @@ @Override public List getSplits(JobContext job) throws IOException { - job.getConfiguration().setLong("mapred.min.split.size", SequenceFile.SYNC_INTERVAL); + job.getConfiguration().setLong( + ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE"), + SequenceFile.SYNC_INTERVAL); return super.getSplits(job); } } diff --git a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java index 5f835be..9dde771 100644 --- a/hcatalog/core/src/test/java/org/apache/hive/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java +++ b/hcatalog/core/src/test/java/org/apache/hive/hcatalog/rcfile/TestRCFileMapReduceInputFormat.java @@ -228,7 +228,8 @@ private void writeThenReadByRecordReader(int intervalRecordCount, Configuration jonconf = new Configuration(cloneConf); jonconf.set("mapred.input.dir", testDir.toString()); JobContext context = new Job(jonconf); - context.getConfiguration().setLong("mapred.max.split.size", maxSplitSize); + context.getConfiguration().setLong( + ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMAXSPLITSIZE"), maxSplitSize); List splits = inputFormat.getSplits(context); assertEquals("splits length should be " + splitNumber, splits.size(), splitNumber); int readCount = 0; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 647a9a6..6e8bfe2 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -284,7 +284,9 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job if (headerCount != 0 || footerCount != 0) { // Input file has header or footer, cannot be splitted. - conf.setLong("mapred.min.split.size", Long.MAX_VALUE); + conf.setLong( + ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE"), + Long.MAX_VALUE); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index a425a01..5602cac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -77,8 +77,10 @@ VectorizedOrcInputFormat voif = new VectorizedOrcInputFormat(); private static final Log LOG = LogFactory.getLog(OrcInputFormat.class); - static final String MIN_SPLIT_SIZE = "mapred.min.split.size"; - static final String MAX_SPLIT_SIZE = "mapred.max.split.size"; + static final String MIN_SPLIT_SIZE = + ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE"); + static final String MAX_SPLIT_SIZE = + ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMAXSPLITSIZE"); private static final long DEFAULT_MIN_SPLIT_SIZE = 16 * 1024 * 1024; private static final long DEFAULT_MAX_SPLIT_SIZE = 256 * 1024 * 1024; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java index 9d8009b..5edd265 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestRCFile.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -640,7 +641,9 @@ public void testSync() throws IOException { RCFileInputFormat inputFormat = new RCFileInputFormat(); JobConf jobconf = new JobConf(cloneConf); jobconf.set("mapred.input.dir", testDir.toString()); - jobconf.setLong("mapred.min.split.size", fileLen); + jobconf.setLong( + ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE"), + fileLen); InputSplit[] splits = inputFormat.getSplits(jobconf, 1); RCFileRecordReader rr = new RCFileRecordReader(jobconf, (FileSplit)splits[0]); long lastSync = 0; @@ -707,7 +710,9 @@ private void writeThenReadByRecordReader(int intervalRecordCount, RCFileInputFormat inputFormat = new RCFileInputFormat(); JobConf jonconf = new JobConf(cloneConf); jonconf.set("mapred.input.dir", testDir.toString()); - jonconf.setLong("mapred.min.split.size", minSplitSize); + jonconf.setLong( + ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE"), + minSplitSize); InputSplit[] splits = inputFormat.getSplits(jonconf, splitNumber); assertEquals("splits length should be " + splitNumber, splits.length, splitNumber); int readCount = 0; diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java index 380bdbe..6d0b924 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcSplitElimination.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputFormat; @@ -106,8 +107,8 @@ public void testSplitEliminationSmallMaxSplit() throws Exception { 100000, CompressionKind.NONE, 10000, 10000); writeData(writer); writer.close(); - conf.set("mapred.min.split.size", "1000"); - conf.set("mapred.max.split.size", "5000"); + conf.set(ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE"), "1000"); + conf.set(ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMAXSPLITSIZE"), "5000"); InputFormat in = new OrcInputFormat(); FileInputFormat.setInputPaths(conf, testFilePath.toString()); @@ -184,8 +185,8 @@ public void testSplitEliminationLargeMaxSplit() throws Exception { 100000, CompressionKind.NONE, 10000, 10000); writeData(writer); writer.close(); - conf.set("mapred.min.split.size", "1000"); - conf.set("mapred.max.split.size", "150000"); + conf.set(ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE"), "1000"); + conf.set(ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMAXSPLITSIZE"), "150000"); InputFormat in = new OrcInputFormat(); FileInputFormat.setInputPaths(conf, testFilePath.toString()); @@ -273,8 +274,8 @@ public void testSplitEliminationComplexExpr() throws Exception { 100000, CompressionKind.NONE, 10000, 10000); writeData(writer); writer.close(); - conf.set("mapred.min.split.size", "1000"); - conf.set("mapred.max.split.size", "150000"); + conf.set(ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMINSPLITSIZE"), "1000"); + conf.set(ShimLoader.getHadoopShims().getHadoopConfNames().get("MAPREDMAXSPLITSIZE"), "150000"); InputFormat in = new OrcInputFormat(); FileInputFormat.setInputPaths(conf, testFilePath.toString()); diff --git a/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java b/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java index 51c8051..7748c01 100644 --- a/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java +++ b/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java @@ -767,8 +767,8 @@ public FileSystem createProxyFileSystem(FileSystem fs, URI uri) { ret.put("HADOOPMAPREDINPUTDIRRECURSIVE", "mapred.input.dir.recursive"); ret.put("MAPREDMAXSPLITSIZE", "mapred.max.split.size"); ret.put("MAPREDMINSPLITSIZE", "mapred.min.split.size"); - ret.put("MAPREDMINSPLITSIZEPERNODE", "mapred.min.split.size.per.rack"); - ret.put("MAPREDMINSPLITSIZEPERRACK", "mapred.min.split.size.per.node"); + ret.put("MAPREDMINSPLITSIZEPERRACK", "mapred.min.split.size.per.rack"); + ret.put("MAPREDMINSPLITSIZEPERNODE", "mapred.min.split.size.per.node"); ret.put("HADOOPNUMREDUCERS", "mapred.reduce.tasks"); ret.put("HADOOPJOBNAME", "mapred.job.name"); ret.put("HADOOPSPECULATIVEEXECREDUCERS", "mapred.reduce.tasks.speculative.execution"); diff --git a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java index e4e56b7..7c59993 100644 --- a/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java +++ b/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java @@ -404,8 +404,8 @@ public FileSystem createProxyFileSystem(FileSystem fs, URI uri) { ret.put("HADOOPMAPREDINPUTDIRRECURSIVE", "mapred.input.dir.recursive"); ret.put("MAPREDMAXSPLITSIZE", "mapred.max.split.size"); ret.put("MAPREDMINSPLITSIZE", "mapred.min.split.size"); - ret.put("MAPREDMINSPLITSIZEPERNODE", "mapred.min.split.size.per.rack"); - ret.put("MAPREDMINSPLITSIZEPERRACK", "mapred.min.split.size.per.node"); + ret.put("MAPREDMINSPLITSIZEPERNODE", "mapred.min.split.size.per.node"); + ret.put("MAPREDMINSPLITSIZEPERRACK", "mapred.min.split.size.per.rack"); ret.put("HADOOPNUMREDUCERS", "mapred.reduce.tasks"); ret.put("HADOOPJOBNAME", "mapred.job.name"); ret.put("HADOOPSPECULATIVEEXECREDUCERS", "mapred.reduce.tasks.speculative.execution"); diff --git a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java index 5df5ed5..4cf3beb 100644 --- a/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java +++ b/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java @@ -568,8 +568,8 @@ public FileSystem createProxyFileSystem(FileSystem fs, URI uri) { ret.put("HADOOPMAPREDINPUTDIRRECURSIVE", "mapreduce.input.fileinputformat.input.dir.recursive"); ret.put("MAPREDMAXSPLITSIZE", "mapreduce.input.fileinputformat.split.maxsize"); ret.put("MAPREDMINSPLITSIZE", "mapreduce.input.fileinputformat.split.minsize"); - ret.put("MAPREDMINSPLITSIZEPERNODE", "mapreduce.input.fileinputformat.split.minsize.per.rack"); - ret.put("MAPREDMINSPLITSIZEPERRACK", "mapreduce.input.fileinputformat.split.minsize.per.node"); + ret.put("MAPREDMINSPLITSIZEPERNODE", "mapreduce.input.fileinputformat.split.minsize.per.node"); + ret.put("MAPREDMINSPLITSIZEPERRACK", "mapreduce.input.fileinputformat.split.minsize.per.rack"); ret.put("HADOOPNUMREDUCERS", "mapreduce.job.reduces"); ret.put("HADOOPJOBNAME", "mapreduce.job.name"); ret.put("HADOOPSPECULATIVEEXECREDUCERS", "mapreduce.reduce.speculative");