diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 1a836bdc6648ed68c41623d3994b8eab5e9c9373..6c09ea7dd83098b7d7f3ec7118f6ef8c6bbd30c2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hive.ql.exec; +import static org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartition; +import static org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer.KeyValueHelper; + import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -37,18 +40,18 @@ import org.apache.hadoop.hive.ql.HashTableLoaderFactory; import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionHandler; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.exec.persistence.BytesBytesMultiHashMap; +import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.KeyValueContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor; -import org.apache.hadoop.hive.ql.exec.persistence.UnwrapRowContainer; -import org.apache.hadoop.hive.ql.exec.persistence.BytesBytesMultiHashMap; -import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; -import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer; -import org.apache.hadoop.hive.ql.exec.persistence.KeyValueContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.exec.persistence.ObjectContainer; +import org.apache.hadoop.hive.ql.exec.persistence.UnwrapRowContainer; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -63,9 +66,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hive.common.util.ReflectionUtil; -import static org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer.HashPartition; -import static org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer.KeyValueHelper; - /** * Map side Join operator implementation. */ @@ -414,59 +414,61 @@ protected void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object @Override public void closeOp(boolean abort) throws HiveException { - for (MapJoinTableContainer tableContainer : mapJoinTables) { - if (tableContainer != null) { - tableContainer.dumpMetrics(); - - if (tableContainer instanceof HybridHashTableContainer) { - // TODO: most of the below code should be moved inside HybridHashTableContainer. - // Ideally, even the instanceof should not exist; instead, an API on MJTC. - HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer; - hybridHtContainer.dumpStats(); - - HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions(); - // Clear all in memory partitions first - for (int i = 0; i < hashPartitions.length; i++) { - if (!hashPartitions[i].isHashMapOnDisk()) { - hybridHtContainer.setTotalInMemRowCount( - hybridHtContainer.getTotalInMemRowCount() - - hashPartitions[i].getHashMapFromMemory().getNumValues()); - hashPartitions[i].getHashMapFromMemory().discardData(); + if (!abort) { + for (MapJoinTableContainer tableContainer : mapJoinTables) { + if (tableContainer != null) { + tableContainer.dumpMetrics(); + + if (tableContainer instanceof HybridHashTableContainer) { + // TODO: most of the below code should be moved inside HybridHashTableContainer. + // Ideally, even the instanceof should not exist; instead, an API on MJTC. + HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer; + hybridHtContainer.dumpStats(); + + HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions(); + // Clear all in memory partitions first + for (int i = 0; i < hashPartitions.length; i++) { + if (!hashPartitions[i].isHashMapOnDisk()) { + hybridHtContainer.setTotalInMemRowCount( + hybridHtContainer.getTotalInMemRowCount() - + hashPartitions[i].getHashMapFromMemory().getNumValues()); + hashPartitions[i].getHashMapFromMemory().discardData(); + } } - } - assert hybridHtContainer.getTotalInMemRowCount() == 0; - - for (int i = 0; i < hashPartitions.length; i++) { - if (hashPartitions[i].isHashMapOnDisk()) { - // Recursively process on-disk triplets (hash partition, sidefile, matchfile) - try { - hybridMapJoinLeftover = true; - hashMapRowGetters[smallTable] = null; - continueProcess(hashPartitions[i], hybridHtContainer); - } catch (IOException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } catch (SerDeException e) { - e.printStackTrace(); + assert hybridHtContainer.getTotalInMemRowCount() == 0; + + for (int i = 0; i < hashPartitions.length; i++) { + if (hashPartitions[i].isHashMapOnDisk()) { + // Recursively process on-disk triplets (hash partition, sidefile, matchfile) + try { + hybridMapJoinLeftover = true; + hashMapRowGetters[smallTable] = null; + continueProcess(hashPartitions[i], hybridHtContainer); + } catch (IOException e) { + e.printStackTrace(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } catch (SerDeException e) { + e.printStackTrace(); + } } + hybridMapJoinLeftover = false; + currentSmallTable = null; } - hybridMapJoinLeftover = false; - currentSmallTable = null; } } } - } - if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null) - && (this.getExecContext().getLocalWork().getInputFileChangeSensitive()) - && mapJoinTables != null) { - for (MapJoinTableContainer tableContainer : mapJoinTables) { - if (tableContainer != null) { - tableContainer.clear(); + if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null) + && (this.getExecContext().getLocalWork().getInputFileChangeSensitive()) + && mapJoinTables != null) { + for (MapJoinTableContainer tableContainer : mapJoinTables) { + if (tableContainer != null) { + tableContainer.clear(); + } } } + mapJoinTables = null; } - mapJoinTables = null; cache.release(cacheKey); super.closeOp(abort); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 5856cfd1cfe6a577f36228dc5cf5ad6e25e1c59e..fe310bc3492b65b9d29d257d4d34b49f4ba6db6c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -68,6 +69,7 @@ protected List> childOperators; protected List> parentOperators; protected String operatorId; + protected AtomicBoolean abortOp; private transient ExecMapperContext execContext; private transient boolean rootInitializeCalled = false; @@ -106,6 +108,7 @@ private Operator(String name) { initOperatorId(); childOperators = new ArrayList>(); parentOperators = new ArrayList>(); + abortOp = new AtomicBoolean(false); } public Operator() { @@ -383,7 +386,11 @@ private void completeInitialization(Collection> fs) throws HiveExcepti int i = 0; for (Future f : fs) { try { - os[i++] = f.get(); + if (abortOp.get()) { + f.cancel(true); + } else { + os[i++] = f.get(); + } } catch (Exception e) { throw new HiveException(e); } @@ -442,6 +449,10 @@ protected void initializeChildren(Configuration hconf) throws HiveException { } } + public void abort() { + abortOp.set(true); + } + /** * Pass the execContext reference to every child operator */ @@ -612,6 +623,8 @@ public void close(boolean abort) throws HiveException { LOG.info(id + " finished. closing... "); } + abort |= abortOp.get(); + // call the operator specific close routine closeOp(abort); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 44d54188684992729a0e1b6d9c7cd4f5789d57da..20f34e95baff0645d606610fe0336452d9386723 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -71,7 +71,6 @@ */ public class MapRecordProcessor extends RecordProcessor { - private MapOperator mapOp; private final List mergeMapOpList = new ArrayList(); public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class); @@ -311,7 +310,18 @@ private DummyStoreOperator getJoinParentOp(Operator merg @Override void run() throws Exception { - while (sources[position].pushRecord()) {} + while (!abort && sources[position].pushRecord()) {} + } + + @Override + public void abort() { + // this will stop run() from pushing records + abort = true; + + // this will abort initializeOp() + if (mapOp != null) { + mapOp.abort(); + } } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java index c563d9d78cdf46829689dfebe77869aaa0ba1502..9098868e48f79a72d461f8b6c384d611d37df492 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java @@ -16,8 +16,14 @@ * limitations under the License. */ package org.apache.hadoop.hive.ql.exec.tez; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Callable; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.ObjectCache; @@ -26,7 +32,6 @@ import org.apache.hadoop.hive.ql.log.PerfLogger; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; -import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputCollector; import org.apache.tez.mapreduce.processor.MRTaskReporter; @@ -34,15 +39,8 @@ import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.ProcessorContext; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; -import java.net.URLClassLoader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.Callable; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; /** * Process input from tez LogicalInput and write output @@ -108,6 +106,7 @@ void init(MRTaskReporter mrReporter, */ abstract void run() throws Exception; + abstract void abort(); abstract void close(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 23ef420ad4da1886127ff705cb2998c546119dc2..21eaf36ae7eef374c79ff70115d8fcc8c2ffe10d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -246,12 +246,25 @@ void run() throws Exception { for (Entry outputEntry : outputs.entrySet()) { l4j.info("Starting Output: " + outputEntry.getKey()); - outputEntry.getValue().start(); - ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); + if (!abort) { + outputEntry.getValue().start(); + ((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize(); + } } // run the operator pipeline - while (sources[bigTablePosition].pushRecord()) { + while (!abort && sources[bigTablePosition].pushRecord()) { + } + } + + @Override + public void abort() { + // this will stop run() from pushing records + abort = true; + + // this will abort initializeOp() + if (reducer != null) { + reducer.abort(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 39f9db67aff7d217ad4fb06aafd7e09d69524d59..f8c531470e43b76b22906687d9e9f8aab842e4ea 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -173,6 +173,10 @@ protected void initializeAndRunProcessor(Map inputs, } } + public void abort() { + rproc.abort(); + } + /** * KVOutputCollector. OutputCollector that writes using KVWriter. * Must be initialized before it is used.