diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 601ad08..b345d8a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -63,6 +63,7 @@ import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.InvalidInputException; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.RecordReader; @@ -260,25 +261,6 @@ private StructObjectInspector getPartitionKeyOI(TableDesc tableDesc) throws Exce return partValues; } - private boolean getNextPath() throws Exception { - while (iterPath.hasNext()) { - currPath = iterPath.next(); - currDesc = iterPartDesc.next(); - if (isNonNativeTable) { - return true; - } - FileSystem fs = currPath.getFileSystem(job); - if (fs.exists(currPath)) { - for (FileStatus fStat : listStatusUnderPath(fs, currPath)) { - if (fStat.getLen() > 0) { - return true; - } - } - } - } - return false; - } - /** * Set context for this fetch operator in to the jobconf. * This helps InputFormats make decisions based on the scope of the complete @@ -356,7 +338,9 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException } protected FetchInputFormatSplit[] getNextSplits() throws Exception { - while (getNextPath()) { + while (iterPath.hasNext()) { + currPath = iterPath.next(); + currDesc = iterPartDesc.next(); // not using FileInputFormat.setInputPaths() here because it forces a connection to the // default file system - which may or may not be online during pure metadata operations job.set("mapred.input.dir", StringUtils.escapeString(currPath.toString())); @@ -368,7 +352,6 @@ public boolean doNext(WritableComparable key, Writable value) throws IOException Class formatter = currDesc.getInputFileFormatClass(); Utilities.copyTableJobPropertiesToConf(currDesc.getTableDesc(), job); InputFormat inputFormat = getInputFormatFromCache(formatter, job); - InputSplit[] splits = inputFormat.getSplits(job, 1); FetchInputFormatSplit[] inputSplits = new FetchInputFormatSplit[splits.length]; for (int i = 0; i < splits.length; i++) { @@ -427,7 +410,9 @@ public boolean pushRow() throws IOException, HiveException { } InspectableObject row = getNextRow(); if (row != null) { - pushRow(row); + if (row.o != null && row.oi != null) { + pushRow(row); + } } else { flushRow(); } @@ -517,6 +502,10 @@ public InspectableObject getNextRow() throws IOException { currRecReader = null; } } + } catch (InvalidInputException e) { + inspectable.o = null; + inspectable.oi = null; + return inspectable; } catch (Exception e) { throw new IOException(e); } @@ -649,31 +638,6 @@ private boolean needConversion(TableDesc tableDesc, List partDesc return false; } - /** - * Lists status for all files under a given path. Whether or not this is recursive depends on the - * setting of job configuration parameter mapred.input.dir.recursive. - * - * @param fs - * file system - * - * @param p - * path in file system - * - * @return list of file status entries - */ - private FileStatus[] listStatusUnderPath(FileSystem fs, Path p) throws IOException { - boolean recursive = job.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, false); - // If this is in acid format always read it recursively regardless of what the jobconf says. - if (!recursive && !AcidUtils.isAcid(p, job)) { - return fs.listStatus(p, FileUtils.HIDDEN_FILES_PATH_FILTER); - } - List results = new ArrayList(); - for (FileStatus stat : fs.listStatus(p, FileUtils.HIDDEN_FILES_PATH_FILTER)) { - FileUtils.listStatusRecursively(fs, stat, results); - } - return results.toArray(new FileStatus[results.size()]); - } - // for split sampling. shrinkedLength is checked against IOContext.getCurrentBlockStart, // which is from RecordReader.getPos(). So some inputformats which does not support getPos() // like HiveHBaseTableInputFormat cannot be used with this (todo)