diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index f2b800a79aa08b3a40445259c6ec6bc76e35a2d4..1cfc411790af69817769a728412101d9c6765253 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -284,7 +284,17 @@ public void generateMapMetaData() throws HiveException { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE); loader.init(mapContext, mrContext, hconf, this); - loader.load(mapJoinTables, mapJoinTableSerdes); + try { + loader.load(mapJoinTables, mapJoinTableSerdes); + } catch (HiveException e) { + if (isLogInfoEnabled) { + LOG.info("Exception loading hash tables. Clearing partially loaded hash table containers."); + } + + // there could be some spilled partitions which needs to be cleaned up + clearAllTableContainers(); + throw e; + } hashTblInitedOnce = true; @@ -433,7 +443,7 @@ protected void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object @Override public void closeOp(boolean abort) throws HiveException { boolean spilled = false; - for (MapJoinTableContainer container: mapJoinTables) { + for (MapJoinTableContainer container : mapJoinTables) { if (container != null) { spilled = spilled || container.hasSpill(); container.dumpMetrics(); @@ -442,79 +452,93 @@ public void closeOp(boolean abort) throws HiveException { // For Hybrid Grace Hash Join, we need to see if there is any spilled data to be processed next if (spilled) { - if (hashMapRowGetters == null) { - hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length]; - } - int numPartitions = 0; - // Find out number of partitions for each small table (should be same across tables) - for (byte pos = 0; pos < mapJoinTables.length; pos++) { - if (pos != conf.getPosBigTable()) { - firstSmallTable = (HybridHashTableContainer)mapJoinTables[pos]; - numPartitions = firstSmallTable.getHashPartitions().length; - break; + if (!abort) { + if (hashMapRowGetters == null) { + hashMapRowGetters = new ReusableGetAdaptor[mapJoinTables.length]; } - } - assert numPartitions != 0 : "Number of partitions must be greater than 0!"; - - if (firstSmallTable.hasSpill()) { - spilledMapJoinTables = new MapJoinBytesTableContainer[mapJoinTables.length]; - hybridMapJoinLeftover = true; - - // Clear all in-memory partitions first + int numPartitions = 0; + // Find out number of partitions for each small table (should be same across tables) for (byte pos = 0; pos < mapJoinTables.length; pos++) { - MapJoinTableContainer tableContainer = mapJoinTables[pos]; - if (tableContainer != null && tableContainer instanceof HybridHashTableContainer) { - HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer; - hybridHtContainer.dumpStats(); - - HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions(); - // Clear all in memory partitions first - for (int i = 0; i < hashPartitions.length; i++) { - if (!hashPartitions[i].isHashMapOnDisk()) { - hybridHtContainer.setTotalInMemRowCount( - hybridHtContainer.getTotalInMemRowCount() - - hashPartitions[i].getHashMapFromMemory().getNumValues()); - hashPartitions[i].getHashMapFromMemory().clear(); + if (pos != conf.getPosBigTable()) { + firstSmallTable = (HybridHashTableContainer) mapJoinTables[pos]; + numPartitions = firstSmallTable.getHashPartitions().length; + break; + } + } + assert numPartitions != 0 : "Number of partitions must be greater than 0!"; + + if (firstSmallTable.hasSpill()) { + spilledMapJoinTables = new MapJoinBytesTableContainer[mapJoinTables.length]; + hybridMapJoinLeftover = true; + + // Clear all in-memory partitions first + for (byte pos = 0; pos < mapJoinTables.length; pos++) { + MapJoinTableContainer tableContainer = mapJoinTables[pos]; + if (tableContainer != null && tableContainer instanceof HybridHashTableContainer) { + HybridHashTableContainer hybridHtContainer = (HybridHashTableContainer) tableContainer; + hybridHtContainer.dumpStats(); + + HashPartition[] hashPartitions = hybridHtContainer.getHashPartitions(); + // Clear all in memory partitions first + for (int i = 0; i < hashPartitions.length; i++) { + if (!hashPartitions[i].isHashMapOnDisk()) { + hybridHtContainer.setTotalInMemRowCount( + hybridHtContainer.getTotalInMemRowCount() - + hashPartitions[i].getHashMapFromMemory().getNumValues()); + hashPartitions[i].getHashMapFromMemory().clear(); + } } + assert hybridHtContainer.getTotalInMemRowCount() == 0; } - assert hybridHtContainer.getTotalInMemRowCount() == 0; } - } - // Reprocess the spilled data - for (int i = 0; i < numPartitions; i++) { - HashPartition[] hashPartitions = firstSmallTable.getHashPartitions(); - if (hashPartitions[i].isHashMapOnDisk()) { - try { - continueProcess(i); // Re-process spilled data - } catch (IOException e) { - e.printStackTrace(); - } catch (SerDeException e) { - e.printStackTrace(); - } catch (ClassNotFoundException e) { - e.printStackTrace(); - } - for (byte pos = 0; pos < order.length; pos++) { - if (pos != conf.getPosBigTable()) - spilledMapJoinTables[pos] = null; + // Reprocess the spilled data + for (int i = 0; i < numPartitions; i++) { + HashPartition[] hashPartitions = firstSmallTable.getHashPartitions(); + if (hashPartitions[i].isHashMapOnDisk()) { + try { + continueProcess(i); // Re-process spilled data + } catch (Exception e) { + throw new HiveException(e); + } + for (byte pos = 0; pos < order.length; pos++) { + if (pos != conf.getPosBigTable()) + spilledMapJoinTables[pos] = null; + } } } } } + + if (isLogInfoEnabled) { + LOG.info("spilled: " + spilled + " abort: " + abort + ". Clearing spilled partitions."); + } + + // spilled tables are loaded always (no sharing), so clear it + clearAllTableContainers(); + cache.remove(cacheKey); } + // in mapreduce case, we need to always clear up as mapreduce doesn't have object registry. if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null) - && (this.getExecContext().getLocalWork().getInputFileChangeSensitive()) - && mapJoinTables != null) { + && (this.getExecContext().getLocalWork().getInputFileChangeSensitive())) { + if (isLogInfoEnabled) { + LOG.info("MR: Clearing all map join table containers."); + } + clearAllTableContainers(); + } + + super.closeOp(abort); + } + + private void clearAllTableContainers() { + if (mapJoinTables != null) { for (MapJoinTableContainer tableContainer : mapJoinTables) { if (tableContainer != null) { tableContainer.clear(); } } } - cache.release(cacheKey); - this.loader = null; - super.closeOp(abort); } /** diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java index f0df2d3d457640cc54f8a208c0706eaeddbc6ed8..440e0a16517ff97ee2d24fdbe8851d2b99802039 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCache.java @@ -53,4 +53,11 @@ * @return the last cached object with the key, null if none. */ public Future retrieveAsync(String key, Callable fn) throws HiveException; + + /** + * Removes the specified key from the object cache. + * + * @param key - key to be removed + */ + public void remove(String key); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java index a6f698d4d49ed74223a5297c513ac19312034adf..bf4ae8d9e0ae729a999dfab3a5ded0d54f7ad8a5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ObjectCache.java @@ -91,4 +91,9 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, Execution } }; } + + @Override + public void remove(String key) { + // nothing to do + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index 3f6d61ebce1daa485d40ac3bf4ead2beb807065a..412226e53ecfc805bdb88b2f19721b2bbd02cef0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -183,6 +183,36 @@ public VectorMapJoinRowBytesContainer getMatchfileRowBytesContainer() { public boolean isHashMapOnDisk() { return hashMapOnDisk; } + + public void clear() { + if (hashMap != null) { + hashMap.clear(); + hashMap = null; + } + + if (hashMapLocalPath != null) { + try { + Files.delete(hashMapLocalPath); + } catch (Throwable ignored) { + } + hashMapLocalPath = null; + } + + if (sidefileKVContainer != null) { + sidefileKVContainer.clear(); + sidefileKVContainer = null; + } + + if (matchfileObjContainer != null) { + matchfileObjContainer.clear(); + matchfileObjContainer = null; + } + + if (matchfileRowBytesContainer != null) { + matchfileRowBytesContainer.clear(); + matchfileRowBytesContainer = null; + } + } } public HybridHashTableContainer(Configuration hconf, long keyCount, long memoryAvailable, @@ -546,12 +576,11 @@ public int getToSpillPartitionId() { return toSpillPartitionId; } - /* Clean up in memory hashtables */ @Override public void clear() { for (HashPartition hp : hashPartitions) { - if (hp.hashMap != null) { - hp.hashMap.clear(); + if (hp != null) { + hp.clear(); } } memoryUsed = 0; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 6a81f1184b9f150d4ed177b149dc1d91908759c5..536b92c5dd03abe9ff57bf64d87be0f3ef34aa7a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -37,7 +37,6 @@ import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; -import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; @@ -181,10 +180,6 @@ public void load(MapJoinTableContainer[] mapJoinTables, } tableContainer.seal(); mapJoinTables[pos] = tableContainer; - } catch (IOException e) { - throw new HiveException(e); - } catch (SerDeException e) { - throw new HiveException(e); } catch (Exception e) { throw new HiveException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java index c0bcb2158bd13b1561a0c2bcab49b4d8c48974d7..64295d4cfa650d39554e51492964d817fd51e320 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ObjectCache.java @@ -93,4 +93,10 @@ public T call() throws Exception { } }); } + + @Override + public void remove(String key) { + LOG.info("Removing key: " + key); + registry.delete(key); + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java index c8359d3eb65d9621af7e953ce9bb4e6e79c59774..1c91be6a01e8e257993a477304f75351f99156a8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinRowBytesContainer.java @@ -290,7 +290,7 @@ public int currentLength() { return currentLength; } - public void clear() throws IOException { + public void clear() { if (fileInputStream != null) { try { fileInputStream.close();