diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 601ad08..8941333 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -63,6 +63,7 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.InvalidInputException; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.RecordReader; @@ -260,25 +261,6 @@ private StructObjectInspector getPartitionKeyOI(TableDesc tableDesc) throws Exce return partValues; } - private boolean getNextPath() throws Exception { - while (iterPath.hasNext()) { - currPath = iterPath.next(); - currDesc = iterPartDesc.next(); - if (isNonNativeTable) { - return true; - } - FileSystem fs = currPath.getFileSystem(job); - if (fs.exists(currPath)) { - for (FileStatus fStat : listStatusUnderPath(fs, currPath)) { - if (fStat.getLen() > 0) { - return true; - } - } - } - } - return false; - } - /** * Set context for this fetch operator in to the jobconf. * This helps InputFormats make decisions based on the scope of the complete @@ -356,7 +338,10 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException } protected FetchInputFormatSplit[] getNextSplits() throws Exception { - while (getNextPath()) { + while (iterPath.hasNext()) { + currPath = iterPath.next(); + currDesc = iterPartDesc.next(); + // not using FileInputFormat.setInputPaths() here because it forces a connection to the // default file system - which may or may not be online during pure metadata operations job.set("mapred.input.dir", StringUtils.escapeString(currPath.toString())); @@ -369,7 +354,13 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job); InputFormat inputFormat = getInputFormatFromCache(formatter, job); - InputSplit[] splits = inputFormat.getSplits(job, 1); + InputSplit[] splits; + try { + splits = inputFormat.getSplits(job, 1); + } catch (InvalidInputException e) { + continue; + } + FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length]; for (int i = 0; i < splits.length; i++) { inputSplits[i] = new FetchInputFormatSplit(splits[i], inputFormat); @@ -649,31 +640,6 @@ private boolean needConversion(TableDesc tableDesc, List partDesc return false; } - /** - * Lists status for all files under a given path. Whether or not this is recursive depends on the - * setting of job configuration parameter mapred.input.dir.recursive. - * - * @param fs - * file system - * - * @param p - * path in file system - * - * @return list of file status entries - */ - private FileStatus[] listStatusUnderPath(FileSystem fs, Path p) throws IOException { - boolean recursive = job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false); - // If this is in acid format always read it recursively regardless of what the jobconf says. - if (!recursive && !AcidUtils.isAcid(p, job)) { - return fs.listStatus(p, FileUtils.HIDDEN_FILES_PATH_FILTER); - } - List results = new ArrayList(); - for (FileStatus stat : fs.listStatus(p, FileUtils.HIDDEN_FILES_PATH_FILTER)) { - FileUtils.listStatusRecursively(fs, stat, results); - } - return results.toArray(new FileStatus[results.size()]); - } - // for split sampling. shrinkedLength is checked against IOContext.getCurrentBlockStart, // which is from RecordReader.getPos(). So some inputformats which does not support getPos() // like HiveHBaseTableInputFormat cannot be used with this (todo)