diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 91c15ed..05ae4d1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -147,7 +147,16 @@ protected void initializeAndRunProcessor(Map inputs, throws Exception { Throwable originalThrowable = null; try { - // Outputs will be started later by the individual Processors. + TezCacheAccess cacheAccess = TezCacheAccess.createInstance(jobConf); + // Start the actual Inputs. After MRInput initialization. + for (Map.Entry inputEntry : inputs.entrySet()) { + if (!cacheAccess.isInputCached(inputEntry.getKey())) { + LOG.info("Input: " + inputEntry.getKey() + " is not cached"); + inputEntry.getValue().start(); + } else { + LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start"); + } + } MRTaskReporter mrReporter = new MRTaskReporter(getContext()); rproc.init(jobConf, getContext(), mrReporter, inputs, outputs);