diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 16657729042294fc9e3fe805f00be2cccc1ef27b..9becc815a21bd4365c685ec0e9c57f17f044972a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -287,7 +287,17 @@ public void generateMapMetaData() throws HiveException { loader.init(mapContext, mrContext, hconf, this); long memUsage = (long)(MapJoinMemoryExhaustionHandler.getMaxHeapSize() * conf.getHashTableMemoryUsage()); - loader.load(mapJoinTables, mapJoinTableSerdes, memUsage); + try { + loader.load(mapJoinTables, mapJoinTableSerdes, memUsage); + } catch (HiveException e) { + if (isLogInfoEnabled) { + LOG.info("Exception loading hash tables. Clearing partially loaded hash table containers."); + } + + // there could be some spilled partitions which needs to be cleaned up + clearAllTableContainers(); + throw e; + } hashTblInitedOnce = true; @@ -433,7 +443,7 @@ protected void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object public void closeOp(boolean abort) throws HiveException { boolean spilled = false; - for (MapJoinTableContainer container: mapJoinTables) { + for (MapJoinTableContainer container : mapJoinTables) { if (container != null) { spilled = spilled || container.hasSpill(); container.dumpMetrics(); @@ -441,9 +451,19 @@ public void closeOp(boolean abort) throws HiveException { } if (spilled) { - for (MapJoinTableContainer tableContainer : mapJoinTables) { - if (tableContainer != null) { - if (tableContainer instanceof HybridHashTableContainer) { + if (abort) { + if (isLogInfoEnabled) { + LOG.info("spilled: " + spilled + " abort: " + abort + ". Clearing spilled partitions."); + } + clearAllTableContainers(); + cache.release(cacheKey); + } else { + if (isLogInfoEnabled) { + LOG.info("spilled: " + spilled + " abort: " + abort + ". Finishing spilled partitions."); + } + + for (MapJoinTableContainer tableContainer : mapJoinTables) { + if (tableContainer != null && tableContainer instanceof HybridHashTableContainer) { HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer; hybridHtContainer.dumpStats(); @@ -453,7 +473,7 @@ public void closeOp(boolean abort) throws HiveException { if (!hashPartitions[i].isHashMapOnDisk()) { hybridHtContainer.setTotalInMemRowCount( hybridHtContainer.getTotalInMemRowCount() - - hashPartitions[i].getHashMapFromMemory().getNumValues()); + hashPartitions[i].getHashMapFromMemory().getNumValues()); hashPartitions[i].getHashMapFromMemory().clear(); } } @@ -466,12 +486,8 @@ public void closeOp(boolean abort) throws HiveException { hybridMapJoinLeftover = true; hashMapRowGetters[smallTable] = null; continueProcess(hashPartitions[i], hybridHtContainer); - } catch (IOException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } catch (SerDeException e) { - e.printStackTrace(); + } catch (IOException | ClassNotFoundException | SerDeException e) { + throw new HiveException(e); } } hybridMapJoinLeftover = false; @@ -479,20 +495,32 @@ public void closeOp(boolean abort) throws HiveException { } } } + + // spilled tables are loaded always (no sharing), so clear it + clearAllTableContainers(); } } + // in mapreduce case, we need to always clear up as mapreduce doesn't have object registry. if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null) - && (this.getExecContext().getLocalWork().getInputFileChangeSensitive()) - && mapJoinTables != null) { + && (this.getExecContext().getLocalWork().getInputFileChangeSensitive())) { + if (isLogInfoEnabled) { + LOG.info("MR: Clearing all map join table containers."); + } + clearAllTableContainers(); + } + + super.closeOp(abort); + } + + private void clearAllTableContainers() { + if (mapJoinTables != null) { for (MapJoinTableContainer tableContainer : mapJoinTables) { if (tableContainer != null) { tableContainer.clear(); } } } - cache.release(cacheKey); - super.closeOp(abort); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index cb9083dd0cde84a342681eca0e5e1656ab20d749..e392ecd0f361eca431c6bec00fcc51c591da6853 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -180,6 +180,36 @@ public VectorMapJoinRowBytesContainer getMatchfileRowBytesContainer() { public boolean isHashMapOnDisk() { return hashMapOnDisk; } + + public void clear() { + if (hashMap != null) { + hashMap.clear(); + hashMap = null; + } + + if (hashMapLocalPath != null) { + try { + Files.delete(hashMapLocalPath); + } catch (Throwable ignored) { + } + hashMapLocalPath = null; + } + + if (sidefileKVContainer != null) { + sidefileKVContainer.clear(); + sidefileKVContainer = null; + } + + if (matchfileObjContainer != null) { + matchfileObjContainer.clear(); + matchfileObjContainer = null; + } + + if (matchfileRowBytesContainer != null) { + matchfileRowBytesContainer.clear(); + matchfileRowBytesContainer = null; + } + } } public HybridHashTableContainer(Configuration hconf, long keyCount, long memUsage, long tableSize) @@ -486,12 +516,11 @@ public int getToSpillPartitionId() { return toSpillPartitionId; } - /* Clean up in memory hashtables */ @Override public void clear() { for (HashPartition hp : hashPartitions) { - if (hp.hashMap != null) { - hp.hashMap.clear(); + if (hp != null) { + hp.clear(); } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index ba5a7972890df843e9814dd4ca698d7721fae394..e4f2ec500d849f85d99a819cd8927d71ae2c6d89 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.ql.exec.tez; -import java.io.IOException; import java.util.Collections; import java.util.Map; @@ -36,7 +35,6 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; -import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; @@ -135,10 +133,6 @@ public void load(MapJoinTableContainer[] mapJoinTables, } tableContainer.seal(); mapJoinTables[pos] = tableContainer; - } catch (IOException e) { - throw new HiveException(e); - } catch (SerDeException e) { - throw new HiveException(e); } catch (Exception e) { throw new HiveException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java index c8359d3eb65d9621af7e953ce9bb4e6e79c59774..1c91be6a01e8e257993a477304f75351f99156a8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java @@ -290,7 +290,7 @@ public int currentLength() { return currentLength; } - public void clear() throws IOException { + public void clear() { if (fileInputStream != null) { try { fileInputStream.close();