diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java index 96283cd..63d4989 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java @@ -269,9 +269,9 @@ private boolean hasFilter(int alias) { public void closeOp(boolean abort) throws HiveException { try { if (mapJoinTables == null) { - if (isLogDebugEnabled) { - LOG.debug("mapJoinTables is null"); - } + if (isLogDebugEnabled) { + LOG.debug("mapJoinTables is null"); + } } else { flushToFile(); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java index 92625f2..e97a9f0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java @@ -26,10 +26,12 @@ import java.util.ConcurrentModificationException; import java.util.Map; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -97,11 +99,12 @@ public MapJoinPersistableTableContainer load(ObjectInputStream in) * Loads the table container from a folder. Only used on Spark path. * @param fs FileSystem of the folder. * @param folder The folder to load table container. + * @param hconf The hive configuration * @return Loaded table. */ @SuppressWarnings("unchecked") - public MapJoinPersistableTableContainer load( - FileSystem fs, Path folder) throws HiveException { + public MapJoinTableContainer load( + FileSystem fs, Path folder, Configuration hconf) throws HiveException { try { if (!fs.isDirectory(folder)) { throw new HiveException("Error, not a directory: " + folder); @@ -116,7 +119,10 @@ public MapJoinPersistableTableContainer load( Writable keyContainer = keySerDe.getSerializedClass().newInstance(); Writable valueContainer = valueSerDe.getSerializedClass().newInstance(); - MapJoinPersistableTableContainer tableContainer = null; + MapJoinTableContainer tableContainer = null; + + boolean useOptimizedContainer = HiveConf.getBoolVar( + hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE); for (FileStatus fileStatus: fileStatuses) { Path filePath = fileStatus.getPath(); @@ -131,18 +137,16 @@ public MapJoinPersistableTableContainer load( String name = in.readUTF(); Map metaData = (Map) in.readObject(); if (tableContainer == null) { - tableContainer = create(name, metaData); + tableContainer = useOptimizedContainer ? + new MapJoinBytesTableContainer(hconf, valueContext, -1, 0) : + create(name, metaData); } - int numKeys = in.readInt(); - for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) { - MapJoinKeyObject key = new MapJoinKeyObject(); - key.read(keyContext, in, keyContainer); - if (tableContainer.get(key) == null) { - tableContainer.put(key, new MapJoinEagerRowContainer()); - } - MapJoinEagerRowContainer values = (MapJoinEagerRowContainer) tableContainer.get(key); - values.read(valueContext, in, valueContainer); - tableContainer.put(key, values); + if (useOptimizedContainer) { + loadOptimized((MapJoinBytesTableContainer) tableContainer, + in, keyContainer, valueContainer); + } else { + loadNormal((MapJoinPersistableTableContainer) tableContainer, + in, keyContainer, valueContainer); } } finally { if (in != null) { @@ -152,6 +156,9 @@ public MapJoinPersistableTableContainer load( } } } + if (tableContainer != null) { + tableContainer.seal(); + } return tableContainer; } catch (IOException e) { throw new HiveException("IO error while trying to create table container", e); @@ -160,6 +167,34 @@ public MapJoinPersistableTableContainer load( } } + private void loadNormal(MapJoinPersistableTableContainer container, + ObjectInputStream in, Writable keyContainer, Writable valueContainer) throws Exception { + int numKeys = in.readInt(); + for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) { + MapJoinKeyObject key = new MapJoinKeyObject(); + key.read(keyContext, in, keyContainer); + if (container.get(key) == null) { + container.put(key, new MapJoinEagerRowContainer()); + } + MapJoinEagerRowContainer values = (MapJoinEagerRowContainer) container.get(key); + values.read(valueContext, in, valueContainer); + container.put(key, values); + } + } + + private void loadOptimized(MapJoinBytesTableContainer container, ObjectInputStream in, + Writable key, Writable value) throws Exception { + int numKeys = in.readInt(); + for (int keyIndex = 0; keyIndex < numKeys; keyIndex++) { + key.readFields(in); + long numRows = in.readLong(); + for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) { + value.readFields(in); + container.putRow(keyContext, key, valueContext, value); + } + } + } + public void persist(ObjectOutputStream out, MapJoinPersistableTableContainer tableContainer) throws HiveException { int numKeys = tableContainer.size(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java index 1d674e9..10e3497 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.MapredContext; @@ -35,6 +36,8 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -43,6 +46,7 @@ import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SparkBucketMapJoinContext; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.mapred.JobConf; /** @@ -93,10 +97,28 @@ public void load(MapJoinTableContainer[] mapJoinTables, } FileSystem fs = FileSystem.get(baseDir.toUri(), hconf); BucketMapJoinContext mapJoinCtx = localWork.getBucketMapjoinContext(); + boolean firstContainer = true; + boolean useOptimizedContainer = HiveConf.getBoolVar( + hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE); for (int pos = 0; pos < mapJoinTables.length; pos++) { if (pos == desc.getPosBigTable() || mapJoinTables[pos] != null) { continue; } + if (useOptimizedContainer) { + MapJoinObjectSerDeContext keyCtx = mapJoinTableSerdes[pos].getKeyContext(); + ObjectInspector keyOI = keyCtx.getSerDe().getObjectInspector(); + if (!MapJoinBytesTableContainer.isSupportedKey(keyOI)) { + if (firstContainer) { + LOG.warn("Not using optimized table container." + + "Only a subset of mapjoin keys is supported."); + useOptimizedContainer = false; + HiveConf.setBoolVar(hconf, HiveConf.ConfVars.HIVEMAPJOINUSEOPTIMIZEDTABLE, false); + } else { + throw new HiveException("Only a subset of mapjoin keys is supported."); + } + } + } + firstContainer = false; String bigInputPath = currentInputPath; if (currentInputPath != null && mapJoinCtx != null) { if (!desc.isBucketMapJoin()) { @@ -124,14 +146,14 @@ private MapJoinTableContainer load(FileSystem fs, Path path, MapJoinTableContainerSerDe mapJoinTableSerde) throws HiveException { LOG.info("\tLoad back all hashtable files from tmp folder uri:" + path); if (!SparkUtilities.isDedicatedCluster(hconf)) { - return mapJoinTableSerde.load(fs, path); + return mapJoinTableSerde.load(fs, path, hconf); } MapJoinTableContainer mapJoinTable = SmallTableCache.get(path); if (mapJoinTable == null) { synchronized (path.toString().intern()) { mapJoinTable = SmallTableCache.get(path); if (mapJoinTable == null) { - mapJoinTable = mapJoinTableSerde.load(fs, path); + mapJoinTable = mapJoinTableSerde.load(fs, path, hconf); SmallTableCache.cache(path, mapJoinTable); } }