diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 16657729042294fc9e3fe805f00be2cccc1ef27b..b3ef499845d4f900eba9f85da7cc7085cb334e26 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -431,67 +431,77 @@ protected void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object @Override public void closeOp(boolean abort) throws HiveException { + boolean loaded = cache.containsKey(cacheKey); - boolean spilled = false; - for (MapJoinTableContainer container: mapJoinTables) { - if (container != null) { - spilled = spilled || container.hasSpill(); - container.dumpMetrics(); + // if the table containers are loaded already, then do not clean as they are shared + if (loaded) { + boolean spilled = false; + for (MapJoinTableContainer container : mapJoinTables) { + if (container != null) { + spilled = spilled || container.hasSpill(); + container.dumpMetrics(); + } } - } - if (spilled) { - for (MapJoinTableContainer tableContainer : mapJoinTables) { - if (tableContainer != null) { - if (tableContainer instanceof HybridHashTableContainer) { + if (spilled) { + for (MapJoinTableContainer tableContainer : mapJoinTables) { + if (tableContainer != null && tableContainer instanceof HybridHashTableContainer) { HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer; hybridHtContainer.dumpStats(); HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions(); - // Clear all in memory partitions first - for (int i = 0; i < hashPartitions.length; i++) { - if (!hashPartitions[i].isHashMapOnDisk()) { - hybridHtContainer.setTotalInMemRowCount( - hybridHtContainer.getTotalInMemRowCount() - - hashPartitions[i].getHashMapFromMemory().getNumValues()); - hashPartitions[i].getHashMapFromMemory().clear(); - } - } - assert hybridHtContainer.getTotalInMemRowCount() == 0; - - for (int i = 0; i < hashPartitions.length; i++) { - if (hashPartitions[i].isHashMapOnDisk()) { - // Recursively process on-disk triplets (hash partition, sidefile, matchfile) + // if abort is true just cleanup all partitions + if (abort) { + for (HashPartition hashPartition : hashPartitions) { try { - hybridMapJoinLeftover = true; - hashMapRowGetters[smallTable] = null; - continueProcess(hashPartitions[i], hybridHtContainer); + hashPartition.clear(); } catch (IOException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } catch (SerDeException e) { - e.printStackTrace(); + throw new HiveException(e); } } - hybridMapJoinLeftover = false; - currentSmallTable = null; + } else { + // Clear all in memory partitions first + for (int i = 0; i < hashPartitions.length; i++) { + if (!hashPartitions[i].isHashMapOnDisk()) { + hybridHtContainer.setTotalInMemRowCount( + hybridHtContainer.getTotalInMemRowCount() - + hashPartitions[i].getHashMapFromMemory().getNumValues()); + hashPartitions[i].getHashMapFromMemory().clear(); + } + } + assert hybridHtContainer.getTotalInMemRowCount() == 0; + + for (int i = 0; i < hashPartitions.length; i++) { + if (hashPartitions[i].isHashMapOnDisk()) { + // Recursively process on-disk triplets (hash partition, sidefile, matchfile) + try { + hybridMapJoinLeftover = true; + hashMapRowGetters[smallTable] = null; + continueProcess(hashPartitions[i], hybridHtContainer); + } catch (IOException | ClassNotFoundException | SerDeException e) { + throw new HiveException(e); + } + } + hybridMapJoinLeftover = false; + currentSmallTable = null; + } } } } } - } - if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null) - && (this.getExecContext().getLocalWork().getInputFileChangeSensitive()) - && mapJoinTables != null) { - for (MapJoinTableContainer tableContainer : mapJoinTables) { - if (tableContainer != null) { - tableContainer.clear(); + if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null) + && (this.getExecContext().getLocalWork().getInputFileChangeSensitive()) + && mapJoinTables != null) { + for (MapJoinTableContainer tableContainer : mapJoinTables) { + if (tableContainer != null) { + tableContainer.clear(); + } } } + + cache.release(cacheKey); } - cache.release(cacheKey); super.closeOp(abort); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java index f0df2d3d457640cc54f8a208c0706eaeddbc6ed8..18195be183a8caf5c2a204769f0beb2f33996da1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java @@ -53,4 +53,12 @@ * @return the last cached object with the key, null if none. */ public Future retrieveAsync(String key, Callable fn) throws HiveException; + + /** + * Check if key exists in the object cache. + * + * @param key - key to check in object cache + * @return true if key exists else false + */ + public boolean containsKey(String key); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java index a6f698d4d49ed74223a5297c513ac19312034adf..44891ff49f97be6f6c67de06c7f3426f329b112f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java @@ -91,4 +91,9 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, Execution } }; } + + @Override + public boolean containsKey(String key) { + return true; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index cb9083dd0cde84a342681eca0e5e1656ab20d749..cb14abcad67e14dfd4ff0e8fccdb795b90ba4240 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -180,6 +180,33 @@ public VectorMapJoinRowBytesContainer getMatchfileRowBytesContainer() { public boolean isHashMapOnDisk() { return hashMapOnDisk; } + + public void clear() throws IOException { + if (hashMap != null) { + hashMap.clear(); + hashMap = null; + } + + if (hashMapLocalPath != null) { + Files.delete(hashMapLocalPath); + hashMapLocalPath = null; + } + + if (sidefileKVContainer != null) { + sidefileKVContainer.clear(); + sidefileKVContainer = null; + } + + if (matchfileObjContainer != null) { + matchfileObjContainer.clear(); + matchfileObjContainer = null; + } + + if (matchfileRowBytesContainer != null) { + matchfileRowBytesContainer.clear(); + matchfileRowBytesContainer = null; + } + } } public HybridHashTableContainer(Configuration hconf, long keyCount, long memUsage, long tableSize) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java index c0bcb2158bd13b1561a0c2bcab49b4d8c48974d7..0849bb4ef97d54838f21bd895e797b7db41b82b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java @@ -93,4 +93,9 @@ public T call() throws Exception { } }); } + + @Override + public boolean containsKey(String key) { + return registry.get(key) != null; + } }