diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index fbc5ea4..dc0b85e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -25,6 +25,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.Future; +import com.esotericsoftware.kryo.KryoException; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; @@ -513,6 +514,11 @@ public void closeOp(boolean abort) throws HiveException { if (hashPartitions[i].isHashMapOnDisk()) { try { continueProcess(i); // Re-process spilled data + } catch (KryoException ke) { + LOG.error("Processing the spilled data failed due to Kryo error!"); + LOG.error("Cleaning up all spilled data!"); + cleanupGraceHashJoin(); + throw new HiveException(ke); } catch (Exception e) { throw new HiveException(e); } @@ -657,6 +663,19 @@ protected void reProcessBigTable(int partitionId) throws HiveException { } /** + * Clean up data participating the join, i.e. in-mem and on-disk files for small table(s) and big table + */ + private void cleanupGraceHashJoin() { + for (byte pos = 0; pos < mapJoinTables.length; pos++) { + if (pos != conf.getPosBigTable()) { + LOG.info("Cleaning up small table data at pos: " + pos); + HybridHashTableContainer container = (HybridHashTableContainer) mapJoinTables[pos]; + container.clear(); + } + } + } + + /** * Implements the getName function for the Node Interface. * * @return the name of the operator