Index: ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (revision 1036137) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (working copy) @@ -264,7 +264,7 @@ } HashMapWrapper hashTable = new HashMapWrapper( - hashTableThreshold, hashTableLoadFactor, hashTableMaxMemoryUsage); + hashTableThreshold, hashTableLoadFactor, hashTableMaxMemoryUsage, hconf); mapJoinTables.put(pos, hashTable); } Index: ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (revision 1036137) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (working copy) @@ -90,13 +90,18 @@ mapJoinTables = new HashMap>(); rowContainerMap = new HashMap>>(); // initialize the hash tables for other tables + int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD); + float hashTableLoadFactor = HiveConf.getFloatVar(hconf, + HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR); + float hashTableMaxMemoryUsage = HiveConf.getFloatVar(hconf, + HiveConf.ConfVars.HIVEHASHTABLEMAXMEMORYUSAGE); for (int pos = 0; pos < numAliases; pos++) { if (pos == posBigTable) { continue; } - HashMapWrapper hashTable = new HashMapWrapper(); - + HashMapWrapper hashTable = new HashMapWrapper( + hashTableThreshold, hashTableLoadFactor, hashTableMaxMemoryUsage, hconf); mapJoinTables.put(Byte.valueOf((byte) pos), hashTable); MapJoinRowContainer> rowContainer = new MapJoinRowContainer>(); rowContainerMap.put(Byte.valueOf((byte) pos), rowContainer); Index: ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java =================================================================== --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (revision 1036137) +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (working copy) @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec.persistence; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -33,9 +35,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.ReflectionUtils; /** @@ -65,6 +73,7 @@ private long maxMemory; private long currentMemory; private NumberFormat num; + private CompressionCodec codec; /** * Constructor. @@ -72,7 +81,7 @@ * @param threshold * User specified threshold to store new values into persistent storage. */ - public HashMapWrapper(int threshold, float loadFactor, float memoryUsage) { + public HashMapWrapper(int threshold, float loadFactor, float memoryUsage, Configuration hconf) { maxMemoryUsage = memoryUsage; mHash = new HashMap(threshold, loadFactor); memoryMXBean = ManagementFactory.getMemoryMXBean(); @@ -80,14 +89,23 @@ LOG.info("maximum memory: " + maxMemory); num = NumberFormat.getInstance(); num.setMinimumFractionDigits(2); + + if (hconf != null) { + JobConf job = new JobConf(hconf, HashMapWrapper.class); + Class codecClass = FileOutputFormat.getOutputCompressorClass(job, + GzipCodec.class); + codec = ReflectionUtils.newInstance(codecClass, job); + } else { + codec = null; + } } public HashMapWrapper(int threshold) { - this(threshold, LOADFACTOR, MEMORYUSAGE); + this(threshold, LOADFACTOR, MEMORYUSAGE, null); } public HashMapWrapper() { - this(THRESHOLD, LOADFACTOR, MEMORYUSAGE); + this(THRESHOLD, LOADFACTOR, MEMORYUSAGE, null); } public V get(K key) { @@ -111,8 +129,15 @@ * @return persistent cache file */ public long flushMemoryCacheToPersistent(File file) throws IOException { - ObjectOutputStream outputStream = null; - outputStream = new ObjectOutputStream(new FileOutputStream(file)); + DataOutputStream dataOutputStream = null; + if (codec != null) { + dataOutputStream = new DataOutputStream(codec.createOutputStream(new FileOutputStream(file))); + } else { + dataOutputStream = new DataOutputStream(new FileOutputStream(file)); + } + + // get object output stream + ObjectOutputStream outputStream = new ObjectOutputStream(dataOutputStream); outputStream.writeObject(mHash); outputStream.flush(); outputStream.close(); @@ -121,8 +146,15 @@ } public void initilizePersistentHash(String fileName) throws IOException, ClassNotFoundException { - ObjectInputStream inputStream = null; - inputStream = new ObjectInputStream(new FileInputStream(fileName)); + DataInputStream dataInputStream = null; + if (codec != null) { + dataInputStream = new DataInputStream(codec.createInputStream(new FileInputStream(fileName))); + } else { + dataInputStream = new DataInputStream(new FileInputStream(fileName)); + } + + // get object input stream + ObjectInputStream inputStream = new ObjectInputStream(dataInputStream); HashMap hashtable = (HashMap) inputStream.readObject(); this.setMHash(hashtable); @@ -155,7 +187,7 @@ return mHash.size(); } - public boolean isAbort(long numRows,LogHelper console) { + public boolean isAbort(long numRows, LogHelper console) { System.gc(); System.gc(); int size = mHash.size();