diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index f8c5314..9baa0c1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -21,6 +21,7 @@ import java.text.NumberFormat; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,6 +51,7 @@ protected boolean isMap = false; protected RecordProcessor rproc = null; + private final AtomicBoolean aborted = new AtomicBoolean(false); protected JobConf jobConf; @@ -115,26 +117,34 @@ private void setupMRLegacyConfigs(ProcessorContext processorContext) { String taskAttemptIdStr = taskAttemptIdBuilder.toString(); this.jobConf.set("mapred.task.id", taskAttemptIdStr); this.jobConf.set("mapreduce.task.attempt.id", taskAttemptIdStr); - this.jobConf.setInt("mapred.task.partition",processorContext.getTaskIndex()); + this.jobConf.setInt("mapred.task.partition", processorContext.getTaskIndex()); } @Override public void run(Map inputs, Map outputs) throws Exception { + if (aborted.get()) { + return; + } + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_RUN_PROCESSOR); // in case of broadcast-join read the broadcast edge inputs // (possibly asynchronously) LOG.info("Running task: " + getContext().getUniqueIdentifier()); + synchronized (this) { if (isMap) { rproc = new MapRecordProcessor(jobConf, getContext()); } else { rproc = new ReduceRecordProcessor(jobConf, getContext()); } + } + if (!aborted.get()) { initializeAndRunProcessor(inputs, outputs); + } } protected void initializeAndRunProcessor(Map inputs, @@ -174,7 +184,14 @@ protected void initializeAndRunProcessor(Map inputs, } public void abort() { - rproc.abort(); + aborted.set(true); + RecordProcessor rProcLocal; + synchronized (this) { + rProcLocal = rproc; + } + if (rProcLocal != null) { + rProcLocal.abort(); + } } /**