diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java index 0b8eae8..9184b8d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java @@ -70,6 +70,7 @@ transient List[] nextKeyWritables; transient RowContainer>[] nextGroupStorage; transient RowContainer>[] candidateStorage; + protected static final int CHECK_INTERRUPTION_AFTER_ROWS = 1000; transient String[] tagToAlias; private transient boolean[] fetchDone; @@ -354,6 +355,14 @@ private void putDummyOrEmpty(Byte i) { return smallestOne == null ? null : result; } + protected void checkAbortCondition() throws HiveException { + if (abortOp.get() || Thread.currentThread().isInterrupted()) { + // Not cleaning the interrupt status. + boolean interruptState = Thread.currentThread().isInterrupted(); + throw new HiveException("Processing thread aborted. Interrupt state: " + interruptState); + } + } + private void fetchNextGroup(Byte t) throws HiveException { if (foundNextKeyGroup[t]) { // first promote the next group to be the current group if we reached a @@ -374,11 +383,16 @@ private void fetchNextGroup(Byte t) throws HiveException { // for tables other than the big table, we need to fetch more data until reach a new group or // done. + int nRows = 0; while (!foundNextKeyGroup[t]) { if (fetchDone[t]) { break; } fetchOneRow(t); + if (nRows++ % CHECK_INTERRUPTION_AFTER_ROWS == 0) { + checkAbortCondition(); + nRows = 0; + } } if (!foundNextKeyGroup[t] && fetchDone[t]) { this.nextKeyWritables[t] = null; @@ -430,12 +444,18 @@ private void joinFinalLeftData() throws HiveException { boolean allFetchDone = allFetchDone(); // if all left data in small tables are less than and equal to the left data // in big table, let's them catch up + int nRows = 0; while (bigTblRowContainer != null && bigTblRowContainer.rowCount() > 0 && !allFetchDone) { joinOneGroup(); bigTblRowContainer = this.candidateStorage[this.posBigTable]; allFetchDone = allFetchDone(); + if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) { + checkAbortCondition(); + nRows = 0; + } } + nRows = 0; while (!allFetchDone) { List ret = joinOneGroup(); // if we are in close op phase, we have definitely exhausted the big table input @@ -460,6 +480,10 @@ private void joinFinalLeftData() throws HiveException { reportProgress(); numMapRowsRead++; allFetchDone = allFetchDone(); + if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) { + checkAbortCondition(); + nRows = 0; + } } boolean dataInCache = true;