diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java index 52d0996..d23bcc9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java @@ -25,11 +25,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -72,6 +74,15 @@ private final MapWork work; private final SplitGrouper splitGrouper = new SplitGrouper(); + private static final String MIN_SPLIT_SIZE; + @SuppressWarnings("unused") + private static final String MAX_SPLIT_SIZE; + + static { + final HadoopShims SHIMS = ShimLoader.getHadoopShims(); + MIN_SPLIT_SIZE = SHIMS.getHadoopConfNames().get("MAPREDMINSPLITSIZE"); + MAX_SPLIT_SIZE = SHIMS.getHadoopConfNames().get("MAPREDMAXSPLITSIZE"); + } public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOException, SerDeException { @@ -97,6 +108,7 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE } + @SuppressWarnings("unchecked") @Override public List initialize() throws Exception { // Setup the map work for this thread. Pruning modified the work instance to potentially remove @@ -123,6 +135,18 @@ public HiveSplitGenerator(InputInitializerContext initializerContext) throws IOE int taskResource = getContext().getVertexTaskResource().getMemory(); int availableSlots = totalResource / taskResource; + if (conf.getLong(MIN_SPLIT_SIZE, 1) <= 1) { + // broken configuration from mapred-default.xml + final long blockSize = + conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); + final long minGrouping = + conf.getLong(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE, + TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_MIN_SIZE_DEFAULT); + final long preferredSplitSize = Math.min(blockSize/2, minGrouping); + jobConf.setLong(MIN_SPLIT_SIZE, preferredSplitSize); + LOG.info("The preferred split size is " + preferredSplitSize); + } + // Create the un-grouped splits float waves = conf.getFloat(TezMapReduceSplitsGrouper.TEZ_GROUPING_SPLIT_WAVES,