diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 8751502..efdd22d 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -41,6 +42,7 @@ import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.shims.ShimLoader; @@ -252,6 +254,22 @@ protected void init(JobConf job) { pathToPartitionInfo = mrwork.getPathToPartitionInfo(); } + private void addSplitsForGroup(List dirs, TableScanOperator tableScan, JobConf conf, + InputFormat inputFormat, Class inputFormatClass, int splits, + TableDesc table, List result) throws IOException { + + Utilities.copyTableJobPropertiesToConf(table, conf); + + pushFilters(conf, tableScan); + + FileInputFormat.setInputPaths(conf, dirs.toArray(new Path[dirs.size()])); + conf.setInputFormat(inputFormat.getClass()); + InputSplit[] iss = inputFormat.getSplits(conf, splits); + for (InputSplit is : iss) { + result.add(new HiveInputSplit(is, inputFormatClass.getName())); + } + } + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { PerfLogger perfLogger = PerfLogger.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS); @@ -262,36 +280,60 @@ protected void init(JobConf job) { throw new IOException("No input paths specified in job"); } JobConf newjob = new JobConf(job); - ArrayList result = new ArrayList(); + List result = new ArrayList(); + + List currentDirs = new ArrayList(); + Class currentInputFormatClass = null; + TableDesc currentTable = null; + TableScanOperator currentTableScan = null; // for each dir, get the InputFormat, and do getSplits. for (Path dir : dirs) { PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir); - // create a new InputFormat instance if this is the first time to see this - // class - Class inputFormatClass = part.getInputFileFormatClass(); - InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); - Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), newjob); + Class inputFormatClass = part.getInputFileFormatClass(); + TableDesc table = part.getTableDesc(); + TableScanOperator tableScan = null; + + List aliases = + mrwork.getPathToAliases().get(dir.toUri().toString()); // Make filter pushdown information available to getSplits. - ArrayList aliases = - mrwork.getPathToAliases().get(dir.toUri().toString()); if ((aliases != null) && (aliases.size() == 1)) { Operator op = mrwork.getAliasToWork().get(aliases.get(0)); if ((op != null) && (op instanceof TableScanOperator)) { - TableScanOperator tableScan = (TableScanOperator) op; - pushFilters(newjob, tableScan); + tableScan = (TableScanOperator) op; } } - FileInputFormat.setInputPaths(newjob, dir); - newjob.setInputFormat(inputFormat.getClass()); - InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length); - for (InputSplit is : iss) { - result.add(new HiveInputSplit(is, inputFormatClass.getName())); + if (!currentDirs.isEmpty() && + inputFormatClass.equals(currentInputFormatClass) && + table.equals(currentTable) && + tableScan == currentTableScan) { + currentDirs.add(dir); + continue; } + + if (!currentDirs.isEmpty()) { + LOG.info("Generating splits"); + addSplitsForGroup(currentDirs, currentTableScan, newjob, + getInputFormatFromCache(currentInputFormatClass, job), + currentInputFormatClass, currentDirs.size()*(numSplits / dirs.length), + currentTable, result); + } + + currentDirs.clear(); + currentDirs.add(dir); + currentTableScan = tableScan; + currentTable = table; + currentInputFormatClass = inputFormatClass; } + LOG.info("Generating splits"); + addSplitsForGroup(currentDirs, currentTableScan, newjob, + getInputFormatFromCache(currentInputFormatClass, job), + currentInputFormatClass, currentDirs.size()*(numSplits / dirs.length), + currentTable, result); + LOG.info("number of splits " + result.size()); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); return result.toArray(new HiveInputSplit[result.size()]);