diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 706f205..ee141f5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -2911,6 +2911,16 @@ public static double getHighestSamplePercentage (MapWork work) { } /** + * On Tez we're not creating dummy files when getting/setting input paths. + * We let Tez handle the situation. We're also setting the paths in the AM + * so we don't want to depend on scratch dir and context. + */ + public static List getInputPathsTez(JobConf job, MapWork work) throws Exception { + List paths = getInputPaths(job, work, null, null); + return paths; + } + + /** * Computes a list of all input paths needed to compute the given MapWork. All aliases * are considered and a merged list of input paths is returned. If any input path points * to an empty table or partition a dummy file in the scratch dir is instead created and diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 883e716..642841f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -209,11 +209,6 @@ private Vertex createVertex(JobConf conf, MapWork mapWork, // set up the operator plan Utilities.setMapWork(conf, mapWork, mrScratchDir, false); - // setup input paths and split info - List inputPaths = Utilities.getInputPaths(conf, mapWork, - mrScratchDir, ctx); - Utilities.setInputPaths(conf, inputPaths); - // create the directories FileSinkOperators need Utilities.createTmpDirs(conf, mapWork); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java index eea8cdb..520590f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/BucketizedHiveInputFormat.java @@ -28,7 +28,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.FileInputFormat; @@ -114,8 +116,23 @@ public RecordReader getRecordReader(InputSplit split, JobConf job, Path[] dirs = FileInputFormat.getInputPaths(job); if (dirs.length == 0) { - throw new IOException("No input paths specified in job"); + // on tez we're avoiding to duplicate the file info in FileInputFormat. + if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + try { + List paths = Utilities.getInputPathsTez(job, mrwork); + dirs = paths.toArray(new Path[paths.size()]); + if (dirs.length == 0) { + // if we still don't have any files it's time to fail. + throw new IOException("No input paths specified in job"); + } + } catch (Exception e) { + throw new IOException("Could not create input paths", e); + } + } else { + throw new IOException("No input paths specified in job"); + } } + JobConf newjob = new JobConf(job); ArrayList result = new ArrayList(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index d6344e4..dc2fb96 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -271,6 +272,17 @@ public int hashCode() { mrwork.getAliasToWork(); CombineFileInputFormatShim combine = ShimLoader.getHadoopShims() .getCombineFileInputFormat(); + + // on tez we're avoiding duplicating path info since the info will go over + // rpc + if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + try { + List dirs = Utilities.getInputPathsTez(job, mrwork); + Utilities.setInputPaths(job, dirs); + } catch (Exception e) { + throw new IOException("Could not create input paths", e); + } + } InputSplit[] splits = null; if (combine == null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 99172d4..647a9a6 100755 --- ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.exec.Operator; @@ -300,7 +301,21 @@ private void addSplitsForGroup(List dirs, TableScanOperator tableScan, Job Path[] dirs = FileInputFormat.getInputPaths(job); if (dirs.length == 0) { - throw new IOException("No input paths specified in job"); + // on tez we're avoiding to duplicate the file info in FileInputFormat. + if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) { + try { + List paths = Utilities.getInputPathsTez(job, mrwork); + dirs = paths.toArray(new Path[paths.size()]); + if (dirs.length == 0) { + // if we still don't have any files it's time to fail. + throw new IOException("No input paths specified in job"); + } + } catch (Exception e) { + throw new IOException("Could not create input files", e); + } + } else { + throw new IOException("No input paths specified in job"); + } } JobConf newjob = new JobConf(job); List result = new ArrayList();