diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index d7f1b42fa280d8f946d3872d1a86d3b294249876..3a6c5f1007ba01a5f8b12bd501e4a26dbe7950f5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -68,6 +69,7 @@ protected List> childOperators; protected List> parentOperators; protected String operatorId; + protected AtomicBoolean abortOp; private transient ExecMapperContext execContext; private transient boolean rootInitializeCalled = false; @@ -106,6 +108,7 @@ private Operator(String name) { initOperatorId(); childOperators = new ArrayList>(); parentOperators = new ArrayList>(); + abortOp = new AtomicBoolean(false); } public Operator() { @@ -383,7 +386,11 @@ private void completeInitialization(Collection> fs) throws HiveExcepti int i = 0; for (Future f : fs) { try { - os[i++] = f.get(); + if (abortOp.get()) { + f.cancel(true); + } else { + os[i++] = f.get(); + } } catch (Exception e) { throw new HiveException(e); } @@ -442,6 +449,10 @@ protected void initializeChildren(Configuration hconf) throws HiveException { } } + public void abort() { + abortOp.set(true); + } + /** * Pass the execContext reference to every child operator */ @@ -612,6 +623,8 @@ public void close(boolean abort) throws HiveException { LOG.info(id + " finished. closing... "); } + abort |= abortOp.get(); + // call the operator specific close routine closeOp(abort); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 44d54188684992729a0e1b6d9c7cd4f5789d57da..2172fdb6c3cac1354c4bcd75150c82ddfcddc778 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -70,11 +71,11 @@ * Just pump the records through the query plan. */ public class MapRecordProcessor extends RecordProcessor { - + public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class); + protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private MapOperator mapOp; private final List mergeMapOpList = new ArrayList(); - public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class); private MapRecordSource[] sources; private final Map multiMRInputMap = new HashMap(); private int position; @@ -82,11 +83,11 @@ MultiMRInput mainWorkMultiMRInput; private final ExecMapperContext execContext; private boolean abort; - protected static final String MAP_PLAN_KEY = "__MAP_PLAN__"; private MapWork mapWork; List mergeWorkList; List cacheKeys; ObjectCache cache; + private int nRows; private static Map connectOps = new TreeMap(); @@ -101,6 +102,7 @@ public MapRecordProcessor(final JobConf jconf, final ProcessorContext context) t execContext = new ExecMapperContext(jconf); execContext.setJc(jconf); cacheKeys = new ArrayList(); + nRows = 0; } @Override @@ -311,7 +313,25 @@ private DummyStoreOperator getJoinParentOp(Operator merg @Override void run() throws Exception { - while (sources[position].pushRecord()) {} + while (sources[position].pushRecord()) { + if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) { + if (abort && Thread.interrupted()) { + throw new HiveException("Processing thread interrupted"); + } + nRows = 0; + } + } + } + + @Override + public void abort() { + // this will stop run() from pushing records + abort = true; + + // this will abort initializeOp() + if (mapOp != null) { + mapOp.abort(); + } } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java index d82a0488b12c35f285bd4264b5d38f1f31dc0c00..7c0eb8986a2b97723e14c45af31bf1008fd0a76c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java @@ -148,13 +148,18 @@ void run() throws Exception { while (reader.next()) { boolean needMore = processRow(reader.getCurrentKey(), reader.getCurrentValue()); - if (!needMore) { + if (!needMore || abort) { break; } } } @Override + void abort() { + abort = true; + } + + @Override void close() { if (cache != null && cacheKey != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index c563d9d78cdf46829689dfebe77869aaa0ba1502..0859dc4071e7a4b34b80ce1c40dc619fb9b91a47 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -16,8 +16,14 @@ * limitations under the License. */ package org.apache.hadoop.hive.ql.exec.tez; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Callable; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.ObjectCache; @@ -26,7 +32,6 @@ import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.tez.mapreduce.processor.MRTaskReporter; @@ -34,21 +39,15 @@ import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.ProcessorContext; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.Callable; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; /** * Process input from tez LogicalInput and write output * It has different subclasses for map and reduce processing */ public abstract class RecordProcessor { + protected static final int CHECK_INTERRUPTION_AFTER_ROWS = 1000; protected final JobConf jconf; protected Map inputs; @@ -108,6 +107,7 @@ void init(MRTaskReporter mrReporter, */ abstract void run() throws Exception; + abstract void abort(); abstract void close(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 23ef420ad4da1886127ff705cb2998c546119dc2..63c427f96bdd50b6bc0935d8b68d3e5cdcc5c060 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector; import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -83,6 +84,7 @@ private byte bigTablePosition = 0; private boolean abort; + private int nRows = 0; public ReduceRecordProcessor(final JobConf jconf, final ProcessorContext context) throws Exception { super(jconf, context); @@ -246,12 +248,31 @@ void run() throws Exception { for (Entry outputEntry : outputs.entrySet()) { l4j.info("Starting Output: " + outputEntry.getKey()); - outputEntry.getValue().start(); - ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); + if (!abort) { + outputEntry.getValue().start(); + ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); + } } // run the operator pipeline while (sources[bigTablePosition].pushRecord()) { + if (nRows++ == CHECK_INTERRUPTION_AFTER_ROWS) { + if (abort && Thread.interrupted()) { + throw new HiveException("Processing thread interrupted"); + } + nRows = 0; + } + } + } + + @Override + public void abort() { + // this will stop run() from pushing records + abort = true; + + // this will abort initializeOp() + if (reducer != null) { + reducer.abort(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 39f9db67aff7d217ad4fb06aafd7e09d69524d59..f8c531470e43b76b22906687d9e9f8aab842e4ea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -173,6 +173,10 @@ protected void initializeAndRunProcessor(Map inputs, } } + public void abort() { + rproc.abort(); + } + /** * KVOutputCollector. OutputCollector that writes using KVWriter. * Must be initialized before it is used.