commit 261adf5eec42df34d994cbd3c74a0305d2136866 Author: Mithun RK Date: Tue Feb 21 13:46:32 2017 -0800 YHIVE-978: Leverage new Tez API to compute the correct progress (cherry picked from commit e89f886) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 4242262aa0..38f47f0006 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,7 +69,63 @@ private static final String CLASS_NAME = TezProcessor.class.getName(); private final PerfLogger perfLogger = SessionState.getPerfLogger(); + private static class ReflectiveProgressHelper { + + Configuration conf; + Class progressHelperClass = null; + Object progressHelper = null; + + ReflectiveProgressHelper(Configuration conf, + Map inputs, + ProcessorContext processorContext, + String processorName) { + this.conf = conf; + try { + progressHelperClass = this.conf.getClassByName("org.apache.tez.common.ProgressHelper"); + progressHelper = progressHelperClass.getDeclaredConstructor(Map.class, ProcessorContext.class, String.class) + .newInstance(inputs, processorContext, processorName); + LOG.debug("ProgressHelper initialized!"); + } + catch(Exception ex) { + LOG.warn("Could not find ProgressHelper. " + ex); + } + } + + private boolean isValid() { + return progressHelperClass != null && progressHelper != null; + } + + void scheduleProgressTaskService(long delay, long period) { + if (!isValid()) { + LOG.warn("ProgressHelper uninitialized. Bailing on scheduleProgressTaskService()"); + return; + } + try { + progressHelperClass.getDeclaredMethod("scheduleProgressTaskService", long.class, long.class) + .invoke(progressHelper, delay, period); + LOG.debug("scheduleProgressTaskService() called!"); + } catch (Exception exception) { + LOG.warn("Could not scheduleProgressTaskService.", exception); + } + } + + void shutDownProgressTaskService() { + if (!isValid()) { + LOG.warn("ProgressHelper uninitialized. Bailing on scheduleProgressTaskService()"); + return; + } + try { + progressHelperClass.getDeclaredMethod("shutDownProgressTaskService").invoke(progressHelper); + LOG.debug("shutDownProgressTaskService() called!"); + } + catch (Exception exception) { + LOG.warn("Could not shutDownProgressTaskService.", exception); + } + } + } + protected ProcessorContext processorContext; + private ReflectiveProgressHelper progressHelper; protected static final NumberFormat taskIdFormat = NumberFormat.getInstance(); protected static final NumberFormat jobIdFormat = NumberFormat.getInstance(); @@ -91,6 +146,9 @@ public void close() throws IOException { // we have to close in the processor's run method, because tez closes inputs // before calling close (TEZ-955) and we might need to read inputs // when we flush the pipeline. + if (progressHelper != null) { + progressHelper.shutDownProgressTaskService(); + } } @Override @@ -158,6 +216,11 @@ public void run(Map inputs, Map out if (aborted.get()) { return; } + + // leverage TEZ-3437: Improve synchronization and the progress report behavior, need Tez 0.7.1.15 or above. + progressHelper = new ReflectiveProgressHelper(jobConf, inputs, getContext(), this.getClass().getSimpleName()); + + // There should be no blocking operation in RecordProcessor creation, // otherwise the abort operation will not register since they are synchronized on the same // lock. @@ -168,6 +231,7 @@ public void run(Map inputs, Map out } } + progressHelper.scheduleProgressTaskService(0, 100); if (!aborted.get()) { initializeAndRunProcessor(inputs, outputs); }