commit c944e217e4d6f73f2f5fc427f4ba439f1d0db761 Author: Mithun RK Date: Tue Feb 21 13:46:32 2017 -0800 YHIVE-978: Leverage new Tez API to compute the correct progress (cherry picked from commit e89f886) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 4242262aa0..6da5556cf3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -33,6 +33,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.util.StringUtils; +import org.apache.tez.common.ProgressHelper; import org.apache.tez.common.TezUtils; import org.apache.tez.mapreduce.processor.MRTaskReporter; import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; @@ -71,6 +72,7 @@ private final PerfLogger perfLogger = SessionState.getPerfLogger(); protected ProcessorContext processorContext; + private ProgressHelper progressHelper; protected static final NumberFormat taskIdFormat = NumberFormat.getInstance(); protected static final NumberFormat jobIdFormat = NumberFormat.getInstance(); @@ -91,6 +93,9 @@ public void close() throws IOException { // we have to close in the processor's run method, because tez closes inputs // before calling close (TEZ-955) and we might need to read inputs // when we flush the pipeline. + if (progressHelper != null) { + progressHelper.shutDownProgressTaskService(); + } } @Override @@ -158,6 +163,10 @@ public void run(Map inputs, Map out if (aborted.get()) { return; } + + // leverage TEZ-3437: Improve synchronization and the progress report behavior, need Tez 0.7.1.15 or above. + progressHelper = new ProgressHelper(inputs, getContext(), this.getClass().getSimpleName()); + // There should be no blocking operation in RecordProcessor creation, // otherwise the abort operation will not register since they are synchronized on the same // lock. @@ -168,6 +177,7 @@ public void run(Map inputs, Map out } } + progressHelper.scheduleProgressTaskService(0, 100); if (!aborted.get()) { initializeAndRunProcessor(inputs, outputs); }