diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 636f079..c6645d4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -28,7 +28,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; @@ -71,7 +74,7 @@ protected List> childOperators; protected List> parentOperators; protected String operatorId; - protected AtomicBoolean abortOp; + protected final AtomicBoolean abortOp; private transient ExecMapperContext execContext; private transient boolean rootInitializeCalled = false; protected final transient Collection> asyncInitOperations = new HashSet<>(); @@ -390,24 +393,58 @@ private void completeInitialization(Collection> fs) throws HiveExcepti Object[] os = new Object[fs.size()]; int i = 0; Throwable asyncEx = null; + + // Wait for all futures to complete. Check for an abort while waiting for each future. If any of the futures is cancelled / aborted - cancel all subsequent futures. + + boolean cancelAll = false; for (Future f : fs) { - if (abortOp.get() || asyncEx != null) { - // We were aborted, interrupted or one of the operations failed; terminate all. - f.cancel(true); + // If aborted - break out of the loop, and cancel all subsequent futures. + if (cancelAll) { + break; + } + if (abortOp.get()) { + cancelAll = true; + break; } else { - try { - os[i++] = f.get(); - } catch (CancellationException ex) { - asyncEx = new InterruptedException("Future was canceled"); - } catch (Throwable t) { - f.cancel(true); - asyncEx = t; + // Wait for the current future. + while (true) { + if (abortOp.get()) { + cancelAll = true; + break; + } else { + try { + // Await future result with a timeout to check the abort field occasionally. + // It's possible that the interrupt which comes in along with an abort, is suppressed + // by some other operator. + Object futureResult = f.get(200l, TimeUnit.MILLISECONDS); + os[i++] = futureResult; + break; + } catch (TimeoutException e) { + // Expected if the operation takes time. Continue the loop, and wait for op completion. + } catch (InterruptedException e) { + asyncEx = e; + cancelAll = true; + break; + } catch (ExecutionException e) { + asyncEx = e; + cancelAll = true; + break; + } + } } + } } - if (asyncEx != null) { - throw new HiveException("Async initialization failed", asyncEx); + + if (cancelAll || asyncEx != null) { + for (Future f : fs) { + // It's ok to send a cancel to an already completed future. Is a no-op + f.cancel(true); + } + throw new HiveException("Async Initialization failed. abortRequested=" + abortOp.get(), asyncEx); } + + completeInitializationOp(os); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index dc63d7b..f4a9cac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -85,7 +85,6 @@ MRInputLegacy legacyMRInput; MultiMRInput mainWorkMultiMRInput; private final ExecMapperContext execContext; - private boolean abort; private MapWork mapWork; List mergeWorkList; List cacheKeys; @@ -360,18 +359,17 @@ private KeyValueReader getKeyValueReader(Collection keyValueRead void run() throws Exception { while (sources[position].pushRecord()) { if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) { - if (abort && Thread.interrupted()) { - throw new HiveException("Processing thread interrupted"); - } + checkAbortCondition(); nRows = 0; } } } + @Override public void abort() { // this will stop run() from pushing records - abort = true; + super.abort(); // this will abort initializeOp() if (mapOp != null) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java index bb56e1c..6fad405 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java @@ -62,7 +62,6 @@ private String cacheKey; private MergeFileWork mfWork; MRInputLegacy mrInput = null; - private boolean abort = false; private final Object[] row = new Object[2]; ObjectCache cache; @@ -158,7 +157,7 @@ void run() throws Exception { @Override void abort() { - abort = true; + super.abort(); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index 2f08529..e665ba7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -58,6 +58,7 @@ public static final Logger l4j = LoggerFactory.getLogger(RecordProcessor.class); + protected volatile boolean abort = false; // used to log memory usage periodically protected boolean isLogInfoEnabled = false; @@ -108,8 +109,6 @@ void init(MRTaskReporter mrReporter, */ abstract void run() throws Exception; - abstract void abort(); - abstract void close(); protected void createOutputMap() { @@ -148,4 +147,19 @@ public Object call() { return null; } } + + void abort() { + this.abort = true; + } + + protected void checkAbortCondition() throws HiveException { + if (abort || Thread.currentThread().isInterrupted()) { + // TODO Ideally, the interrupt status should not be cleared. + + // Retrieve and clear the interrupt status + boolean interruptState = Thread.interrupted(); + // TODO Ideally, this should be throwin an InterruptedException + throw new HiveException("Processing thread aborted. Interrupt state: " + interruptState); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 0579dbc..415df92 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -83,7 +83,7 @@ private byte bigTablePosition = 0; - private boolean abort; + private int nRows = 0; public ReduceRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception { @@ -262,9 +262,7 @@ void run() throws Exception { // run the operator pipeline while (sources[bigTablePosition].pushRecord()) { if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) { - if (abort && Thread.interrupted()) { - throw new HiveException("Processing thread interrupted"); - } + checkAbortCondition(); nRows = 0; } } @@ -273,7 +271,7 @@ void run() throws Exception { @Override public void abort() { // this will stop run() from pushing records - abort = true; + super.abort(); // this will abort initializeOp() if (reducer != null) {