diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 00552a8..3d21215 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -29,6 +29,8 @@ import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; @@ -71,7 +73,7 @@ protected List> childOperators; protected List> parentOperators; protected String operatorId; - protected AtomicBoolean abortOp; + protected final AtomicBoolean abortOp; private transient ExecMapperContext execContext; private transient boolean rootInitializeCalled = false; protected final transient Collection> asyncInitOperations = new HashSet<>(); @@ -391,17 +393,28 @@ private void completeInitialization(Collection> fs) throws HiveExcepti int i = 0; Throwable asyncEx = null; for (Future f : fs) { - if (abortOp.get() || asyncEx != null) { - // We were aborted, interrupted or one of the operations failed; terminate all. - f.cancel(true); - } else { - try { - os[i++] = f.get(); - } catch (CancellationException ex) { - asyncEx = new InterruptedException("Future was canceled"); - } catch (Throwable t) { + while (true) { + if (abortOp.get() || asyncEx != null) { + // We were aborted, interrupted or one of the operations failed; terminate all. f.cancel(true); - asyncEx = t; + break; + } else { + try { + // Await future result with a timeout to check the abort field occasionally. + // It's possible that the interrupt which comes in along with an abort, is suppressed + // by some other operator. + os[i++] = f.get(200l, TimeUnit.MILLISECONDS); + break; + } catch (TimeoutException ex) { + // Expected if the operation takes time. Continue the loop, and wait for op completion. + } catch (CancellationException ex) { + asyncEx = new InterruptedException("Future was canceled"); + break; + } catch (Throwable t) { + f.cancel(true); + asyncEx = t; + break; + } } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index dc63d7b..541ee45 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -85,7 +85,7 @@ MRInputLegacy legacyMRInput; MultiMRInput mainWorkMultiMRInput; private final ExecMapperContext execContext; - private boolean abort; + private volatile boolean abort; private MapWork mapWork; List mergeWorkList; List cacheKeys; @@ -360,8 +360,12 @@ private KeyValueReader getKeyValueReader(Collection keyValueRead void run() throws Exception { while (sources[position].pushRecord()) { if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) { - if (abort && Thread.interrupted()) { - throw new HiveException("Processing thread interrupted"); + if (abort) { + // TODO Ideally, the interrupt status should not be cleared. + + // Retrieve and clear the interrupt status + boolean interruptState = Thread.interrupted(); + throw new HiveException("Processing thread aborted. Interrupt state: " + interruptState); } nRows = 0; }