diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index 82d4b93..97602a6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -108,7 +108,7 @@ // input is too large // to fit in memory - AbstractRowContainer>[] storage; // map b/w table alias + protected AbstractRowContainer>[] storage; // map b/w table alias // to RowContainer int joinEmitInterval = -1; int joinCacheSize = 0; @@ -184,7 +184,11 @@ public CommonJoinOperator(CommonJoinOperator clone) { return joinOutputObjectInspector; } - Configuration hconf; + protected Configuration hconf; + + protected Configuration getHadoopConf() { + return hconf; + } @Override @SuppressWarnings("unchecked") diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java index 31dbf41..d7aff23 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java @@ -22,16 +22,26 @@ import java.util.Map; import java.util.Map.Entry; +import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey; +import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinDoubleKeys; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinSingleKey; import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.JoinDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -44,12 +54,29 @@ import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.util.ReflectionUtils; public class JoinUtil { + public interface HasFilterPredicate { + + boolean hasFilter(int pos); + + } + + protected static MapJoinMetaData metadata = new MapJoinMetaData(); + public static MapJoinMetaData getMetadata() { + return metadata; + } + + public static void setMetadata(MapJoinMetaData value) { + metadata = value; + } + + public static List[] getObjectInspectorsFromEvaluators( List[] exprEntries, ObjectInspector[] inputObjInspector, @@ -380,4 +407,130 @@ public static RowContainer getRowContainer(Configuration hconf, rc.setTableDesc(tblDesc); return rc; } + + public static boolean loadHashTables( + Log LOG, + Configuration hconf, + ExecMapperContext execContext, + String dumpFilePrefix, + HashMapWrapper[] mapJoinTables, + boolean hashTblInitedOnce) throws HiveException { + + if (!execContext.getLocalWork().getInputFileChangeSensitive()) { + if (hashTblInitedOnce) { + return true; + } + } + + String baseDir = null; + + String currentInputFile = execContext.getCurrentInputFile(); + LOG.info("******* Load from HashTable File: input : " + currentInputFile); + + String fileName = execContext.getLocalWork().getBucketFileName(currentInputFile); + + try { + if (ShimLoader.getHadoopShims().isLocalMode(hconf)) { + baseDir = execContext.getLocalWork().getTmpFileURI(); + } else { + Path[] localArchives; + String stageID = execContext.getLocalWork().getStageID(); + String suffix = Utilities.generateTarFileName(stageID); + FileSystem localFs = FileSystem.getLocal(hconf); + localArchives = DistributedCache.getLocalCacheArchives(hconf); + Path archive; + for (int j = 0; j < localArchives.length; j++) { + archive = localArchives[j]; + if (!archive.getName().endsWith(suffix)) { + continue; + } + Path archiveLocalLink = archive.makeQualified(localFs); + baseDir = archiveLocalLink.toUri().getPath(); + } + } + for (byte pos = 0; pos < mapJoinTables.length; pos++) { + HashMapWrapper hashtable = mapJoinTables[pos]; + if (hashtable == null) { + continue; + } + String filePath = Utilities.generatePath(baseDir, dumpFilePrefix, pos, fileName); + Path path = new Path(filePath); + LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path.toString()); + hashtable.initilizePersistentHash(path.toUri().getPath()); + } + } catch (Exception e) { + LOG.error("Load Distributed Cache Error", e); + throw new HiveException(e); + } + + return true; + } + + public static HashMapWrapper[] buildMapJoinTables( + int tagLength, byte posBigTable, int numAliases) { + HashMapWrapper[] mapJoinTables = + new HashMapWrapper[tagLength]; + // initialize the hash tables for other tables + for (int pos = 0; pos < numAliases; pos++) { + if (pos == posBigTable) { + continue; + } + + HashMapWrapper hashTable = + new HashMapWrapper(); + mapJoinTables[pos] = hashTable; + } + return mapJoinTables; + } + + public static MapJoinRowContainer>[] buildRowcontainerMap(int tagLength, + byte posBigTable, int numAliases) { + MapJoinRowContainer>[] rowContainerMap = new MapJoinRowContainer[tagLength]; + // initialize the hash tables for other tables + for (int pos = 0; pos < numAliases; pos++) { + if (pos == posBigTable) { + continue; + } + + MapJoinRowContainer> rowContainer = new MapJoinRowContainer>(); + rowContainerMap[pos] = rowContainer; + } + return rowContainerMap; + } + + public static void generateMapMetaData(MapJoinDesc conf, int metadataKeyTag, + int posBigTable, Byte[] order, Configuration hconf, HasFilterPredicate filterMap) + throws SerDeException { + // generate the meta data for key + // index for key is -1 + TableDesc keyTableDesc = conf.getKeyTblDesc(); + SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), + null); + keySerializer.initialize(null, keyTableDesc.getProperties()); + metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx( + ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(), + ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf)); + + for (int pos = 0; pos < order.length; pos++) { + if (pos == posBigTable) { + continue; + } + TableDesc valueTableDesc; + if (conf.getNoOuterJoin()) { + valueTableDesc = conf.getValueTblDescs().get(pos); + } else { + valueTableDesc = conf.getValueFilteredTblDescs().get(pos); + } + SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), + null); + valueSerDe.initialize(null, valueTableDesc.getProperties()); + + ObjectInspector inspector = valueSerDe.getObjectInspector(); + metadata.put(Integer.valueOf(pos), new HashTableSinkObjectCtx(ObjectInspectorUtils + .getStandardObjectInspector(inspector, ObjectInspectorCopyOption.WRITABLE), + valueSerDe, valueTableDesc, filterMap.hasFilter(pos), hconf)); + } + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 4da1be8..c4d16f9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -24,25 +24,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx; import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey; import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapJoinDesc; -import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.util.ReflectionUtils; /** * Map side Join operator implementation. @@ -54,11 +43,6 @@ protected transient HashMapWrapper[] mapJoinTables; - protected static MapJoinMetaData metadata = new MapJoinMetaData(); - public static MapJoinMetaData getMetadata() { - return metadata; - } - private static final transient String[] FATAL_ERR_MSG = { null, // counter value 0 means no error "Mapside join exceeds available memory. " @@ -89,22 +73,10 @@ protected void initializeOp(Configuration hconf) throws HiveException { metadataKeyTag = -1; - int tagLen = conf.getTagLength(); - - mapJoinTables = new HashMapWrapper[tagLen]; - rowContainerMap = new MapJoinRowContainer[tagLen]; - // initialize the hash tables for other tables - for (int pos = 0; pos < numAliases; pos++) { - if (pos == posBigTable) { - continue; - } - - HashMapWrapper hashTable = new HashMapWrapper(); - - mapJoinTables[pos] = hashTable; - MapJoinRowContainer> rowContainer = new MapJoinRowContainer>(); - rowContainerMap[pos] = rowContainer; - } + mapJoinTables = JoinUtil.buildMapJoinTables( + conf.getTagLength(), posBigTable, numAliases); + rowContainerMap = JoinUtil.buildRowcontainerMap( + conf.getTagLength(), posBigTable, numAliases); hashTblInitedOnce = false; } @@ -116,87 +88,24 @@ protected void fatalErrorMessage(StringBuilder errMsg, long counterCode) { } public void generateMapMetaData() throws HiveException, SerDeException { - // generate the meta data for key - // index for key is -1 - TableDesc keyTableDesc = conf.getKeyTblDesc(); - SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), - null); - keySerializer.initialize(null, keyTableDesc.getProperties()); - metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx( - ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(), - ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf)); - - for (int pos = 0; pos < order.length; pos++) { - if (pos == posBigTable) { - continue; - } - TableDesc valueTableDesc; - if (conf.getNoOuterJoin()) { - valueTableDesc = conf.getValueTblDescs().get(pos); - } else { - valueTableDesc = conf.getValueFilteredTblDescs().get(pos); - } - SerDe valueSerDe = (SerDe) ReflectionUtils.newInstance(valueTableDesc.getDeserializerClass(), - null); - valueSerDe.initialize(null, valueTableDesc.getProperties()); - - ObjectInspector inspector = valueSerDe.getObjectInspector(); - metadata.put(Integer.valueOf(pos), new HashTableSinkObjectCtx(ObjectInspectorUtils - .getStandardObjectInspector(inspector, ObjectInspectorCopyOption.WRITABLE), - valueSerDe, valueTableDesc, hasFilter(pos), hconf)); - } + JoinUtil.generateMapMetaData(conf, metadataKeyTag, posBigTable, order, hconf, + new JoinUtil.HasFilterPredicate() { + + @Override + public boolean hasFilter(int pos) { + return MapJoinOperator.this.hasFilter(pos); + } + }); } private void loadHashTable() throws HiveException { - if (!this.getExecContext().getLocalWork().getInputFileChangeSensitive()) { - if (hashTblInitedOnce) { - return; - } else { - hashTblInitedOnce = true; - } - } - - String baseDir = null; - - String currentInputFile = getExecContext().getCurrentInputFile(); - LOG.info("******* Load from HashTable File: input : " + currentInputFile); - - String fileName = getExecContext().getLocalWork().getBucketFileName(currentInputFile); - - try { - if (ShimLoader.getHadoopShims().isLocalMode(hconf)) { - baseDir = this.getExecContext().getLocalWork().getTmpFileURI(); - } else { - Path[] localArchives; - String stageID = this.getExecContext().getLocalWork().getStageID(); - String suffix = Utilities.generateTarFileName(stageID); - FileSystem localFs = FileSystem.getLocal(hconf); - localArchives = DistributedCache.getLocalCacheArchives(this.hconf); - Path archive; - for (int j = 0; j < localArchives.length; j++) { - archive = localArchives[j]; - if (!archive.getName().endsWith(suffix)) { - continue; - } - Path archiveLocalLink = archive.makeQualified(localFs); - baseDir = archiveLocalLink.toUri().getPath(); - } - } - for (byte pos = 0; pos < mapJoinTables.length; pos++) { - HashMapWrapper hashtable = mapJoinTables[pos]; - if (hashtable == null) { - continue; - } - String filePath = Utilities.generatePath(baseDir, conf.getDumpFilePrefix(), pos, fileName); - Path path = new Path(filePath); - LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path.toString()); - hashtable.initilizePersistentHash(path.toUri().getPath()); - } - } catch (Exception e) { - LOG.error("Load Distributed Cache Error", e); - throw new HiveException(e); - } + hashTblInitedOnce = JoinUtil.loadHashTables(LOG, + getHadoopConf(), + getExecContext(), + conf.getDumpFilePrefix(), + mapJoinTables, + hashTblInitedOnce); } // Load the hash table diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 29de38d..5a5bb33 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -62,6 +62,21 @@ protected List> childOperators; protected List> parentOperators; protected String operatorId; + + private transient Operator vectorOperator; + + public Operator getVectorOperator() { + return vectorOperator; + } + + /** + * the method is intentionally named 'assign' not 'set' in order to avoid serialization + * @param vectorOperator + */ + public void assignVectorOperator(Operator vectorOperator) { + this.vectorOperator = vectorOperator; + } + /** * List of counter names associated with the operator. It contains the * following default counters NUM_INPUT_ROWS NUM_OUTPUT_ROWS TIME_TAKEN diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index e579c00..95674f8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FetchOperator; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.HiveTotalOrderPartitioner; import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack; import org.apache.hadoop.hive.ql.exec.Operator; @@ -64,7 +65,6 @@ import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.exec.vector.VectorExecMapper; import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; @@ -107,7 +107,7 @@ import org.apache.log4j.varia.NullAppender; /** - * ExecDriver is the central class in co-ordinating execution of any map-reduce task. + * ExecDriver is the central class in co-ordinating execution of any map-reduce task. * It's main responsabilities are: * * - Converting the plan (MapredWork) into a MR Job (JobConf) @@ -584,6 +584,8 @@ private boolean validateVectorPath() { try { vectorOp = VectorMapOperator.vectorizeOperator(op, vc); } catch (Exception e) { + System.out.println("Cannot vectorize: vectorize: " + e); + e.printStackTrace(); LOG.debug("Cannot vectorize the plan", e); return false; } @@ -595,6 +597,8 @@ private boolean validateVectorPath() { try { validateVectorOperator(vectorOp); } catch (HiveException e) { + System.out.println("Cannot vectorize: validate: " + e); + e.printStackTrace(); LOG.debug("Cannot vectorize the plan", e); return false; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java index d774226..775d634 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java @@ -25,8 +25,8 @@ import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.MapJoinMetaData; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; @@ -94,7 +94,7 @@ public boolean equals(Object obj) { public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { try { // get the tableDesc from the map stored in the mapjoin operator - HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get(Integer.valueOf(metadataTag)); + HashTableSinkObjectCtx ctx = JoinUtil.getMetadata().get(Integer.valueOf(metadataTag)); Writable val = ctx.getSerDe().getSerializedClass().newInstance(); val.readFields(in); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java index 791bb3f..55b5f33 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; @@ -80,7 +80,7 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { try { // get the tableDesc from the map stored in the mapjoin operator - HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get( + HashTableSinkObjectCtx ctx = JoinUtil.getMetadata().get( Integer.valueOf(metadataTag)); Writable val = ctx.getSerDe().getSerializedClass().newInstance(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java index 58a9dc0..d5f4d6e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java @@ -26,8 +26,7 @@ import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx; -import org.apache.hadoop.hive.ql.exec.MapJoinMetaData; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.io.ShortWritable; @@ -91,7 +90,7 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept metadataTag = in.readInt(); // get the tableDesc from the map stored in the mapjoin operator - HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get( + HashTableSinkObjectCtx ctx = JoinUtil.getMetadata().get( Integer.valueOf(metadataTag)); int sz = in.readInt(); MapJoinRowContainer res = new MapJoinRowContainer(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java index 4bff936..9f51409 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java @@ -25,8 +25,8 @@ import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx; +import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.MapJoinMetaData; -import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; @@ -80,7 +80,7 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { try { // get the tableDesc from the map stored in the mapjoin operator - HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get( + HashTableSinkObjectCtx ctx = JoinUtil.getMetadata().get( Integer.valueOf(metadataTag)); Writable val = ctx.getSerDe().getSerializedClass().newInstance(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java index 8b4c615..10c0c6f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec.vector; +import java.util.Arrays; + import org.apache.hadoop.io.Writable; /** @@ -26,33 +28,33 @@ * repeats, i.e. has all values the same, so only the first * one is set. This is used to accelerate query performance * by handling a whole vector in O(1) time when applicable. - * + * * The fields are public by design since this is a performance-critical * structure that is used in the inner loop of query execution. */ public abstract class ColumnVector { - + /* - * If hasNulls is true, then this array contains true if the value - * is null, otherwise false. The array is always allocated, so a batch can be re-used + * If hasNulls is true, then this array contains true if the value + * is null, otherwise false. The array is always allocated, so a batch can be re-used * later and nulls added. */ - public boolean[] isNull; - + public boolean[] isNull; + // If the whole column vector has no nulls, this is true, otherwise false. public boolean noNulls; - - /* - * True if same value repeats for whole column vector. + + /* + * True if same value repeats for whole column vector. * If so, vector[0] holds the repeating value. */ - public boolean isRepeating; + public boolean isRepeating; public abstract Writable getWritableObject(int index); /** * Constructor for super-class ColumnVector. This is not called directly, * but used to initialize inherited fields. - * + * * @param len Vector length */ public ColumnVector(int len) { @@ -60,5 +62,19 @@ public ColumnVector(int len) { noNulls = true; isRepeating = false; } + + /** + * Resets the column to default state + * - fills the isNull array with false + * - sets noNulls to true + * - sets isRepeating to false + */ + public void reset() { + if (false == noNulls) { + Arrays.fill(isNull, false); + } + noNulls = true; + isRepeating = false; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssign.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssign.java new file mode 100644 index 0000000..c44de40 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssign.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +public interface VectorColumnAssign { + + void assignVectorValue(VectorizedRowBatch inBatch, int batchIndex, int valueColumn, int destIndex) + throws HiveException; + void assignObjectValue(Object val, int destIndex) throws HiveException; + + void reset(); +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java new file mode 100644 index 0000000..c5c8a48 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.sql.Timestamp; +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; + +/** + * VectorColumnAssignFactory. + * + */ +public class VectorColumnAssignFactory { + + private static abstract class VectorColumnAssignVectorBase + implements VectorColumnAssign { + protected VectorizedRowBatch outBatch; + protected T outCol; + + protected void copyValue(T in, int srcIndex, int destIndex) throws HiveException { + throw new HiveException("Internal error: should not reach here"); + } + + @SuppressWarnings("unchecked") + @Override + public void assignVectorValue(VectorizedRowBatch inBatch, int batchIndex, + int valueColumnIndex, int destIndex) throws HiveException { + T in = (T) inBatch.cols[valueColumnIndex]; + if (in.isRepeating) { + if (in.noNulls) { + copyValue(in, 0, destIndex); + } + else { + assignNull(destIndex); + } + } + else { + int srcIndex = inBatch.selectedInUse ? inBatch.selected[batchIndex] : batchIndex; + if (in.noNulls || !in.isNull[srcIndex]) { + copyValue(in, srcIndex, destIndex); + } + else { + assignNull(destIndex); + } + } + } + + public VectorColumnAssign init(VectorizedRowBatch out, T cv) { + this.outBatch = out; + this.outCol = cv; + return this; + } + + protected void assignNull(int index) { + VectorizedBatchUtil.SetNullColIsNullValue(outCol, index); + } + + @Override + public void reset() { + } + + @Override + public void assignObjectValue(Object value, int destIndex) throws HiveException { + throw new HiveException("Internal error: should not reach here"); + } + } + + private static abstract class VectorLongColumnAssign + extends VectorColumnAssignVectorBase { + protected void assignLong(long value, int destIndex) { + outCol.vector[destIndex] = value; + } + } + + private static abstract class VectorDoubleColumnAssign + extends VectorColumnAssignVectorBase { + + protected void assignDouble(double value, int destIndex) { + outCol.vector[destIndex] = value; + } + } + + private static abstract class VectorBytesColumnAssign + extends VectorColumnAssignVectorBase { + byte[] pad = new byte[BytesColumnVector.DEFAULT_BUFFER_SIZE]; + int padUsed = 0; + + protected void assignBytes(byte[] buffer, int start, int length, int destIndex) { + if (padUsed + length <= pad.length) { + System.arraycopy(buffer, start, + pad, padUsed, length); + outCol.vector[destIndex] = pad; + outCol.start[destIndex] = padUsed; + outCol.length[destIndex] = length; + padUsed += length; + } + else { + outCol.vector[destIndex] = Arrays.copyOfRange(buffer, + start, length); + outCol.start[destIndex] = 0; + outCol.length[destIndex] = length; + } + } + + @Override + public void reset() { + super.reset(); + padUsed = 0; + } + } + + + public static VectorColumnAssign[] buildAssigners(VectorizedRowBatch outputBatch) + throws HiveException { + VectorColumnAssign[] vca = new VectorColumnAssign[outputBatch.cols.length]; + for(int i=0; i> vectorizedDummies; + private final ExecMapperContext execContext = new ExecMapperContext(); @Override @@ -106,9 +108,12 @@ public void configure(JobConf job) { //The following code is for mapjoin //initialize all the dummy ops - l4j.info("Initializing dummy operator"); + l4j.info("vectorizing dummy operators"); List> dummyOps = localWork.getDummyParentOp(); - for (Operator dummyOp : dummyOps){ + + vectorizedDummies = mo.vectorizeDummies(dummyOps); + + for (Operator dummyOp : vectorizedDummies){ dummyOp.setExecContext(execContext); dummyOp.initialize(jc,null); } @@ -197,9 +202,7 @@ public void close() { //for close the local work if(localWork != null){ - List> dummyOps = localWork.getDummyParentOp(); - - for (Operator dummyOp : dummyOps){ + for (Operator dummyOp : vectorizedDummies){ dummyOp.close(abort); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java new file mode 100644 index 0000000..1a394fa --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@ -0,0 +1,400 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; +import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey; +import org.apache.hadoop.hive.ql.exec.persistence.AbstractRowContainer; +import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinDoubleKeys; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinSingleKey; +import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +public class VectorMapJoinOperator extends Operator + implements Serializable { + + private static final long serialVersionUID = 1L; + private static final Log LOG = LogFactory.getLog(VectorMapJoinOperator.class.getName()); + + private transient VectorizationContext vContext; + + private transient HashMapWrapper[] mapJoinTables; + private transient boolean hashTblInitedOnce; + private transient boolean firstRow; + + private transient Map filterExpressions; + private transient VectorExpression[] keyExpressions; + private transient VectorHashKeyWrapperBatch keyWrapperBatch; + private transient VectorExpression[] bigTableFilterExpressions; + private transient Map valueExpressions; + + private transient VectorizedRowBatch outputBatch; + private transient VectorExpressionWriter[] keyOutputWriters; + private transient VectorExpression[] bigTableValueExpressions; + + interface VectorMapJoinKeyEvaluator { + AbstractMapJoinKey setObj(VectorHashKeyWrapper vectorHashKeyWrapper) throws HiveException; + } + + private transient VectorMapJoinKeyEvaluator keyEvaluator; + private transient AbstractRowContainer>[] rowContainerMap; + + private transient VectorColumnAssign[] outVCA; + private byte posBigTable; + private Byte[] order; + private int numAliases; + private AbstractRowContainer>[] storage; + private transient RowContainer> emptyList = null; + + private transient Configuration hconf; + private transient int metadataKeyTag; + private transient int[][] filterMaps; + private transient boolean noOuterJoin; + private transient int tagLen; + private transient List[] joinCurrentRow; + + public VectorMapJoinOperator(VectorizationContext ctxt, OperatorDesc conf) { + super(); + this.vContext = ctxt; + this.conf = (MapJoinDesc) conf; + + } + + @Override + public void initializeOp(Configuration hconf) throws HiveException { + this.hconf = hconf; + + order = conf.getTagOrder(); + numAliases = conf.getExprs().size(); + posBigTable = (byte) conf.getPosBigTable(); + metadataKeyTag = -1; + filterMaps = conf.getFilterMap(); + tagLen = conf.getTagLength(); + noOuterJoin = conf.isNoOuterJoin(); + + LOG.info(String.format("initializeOp: inputObjInspectors: length:%d [0]:%s", + inputObjInspectors.length, + inputObjInspectors[0] == null ? "null" : inputObjInspectors[0].getClass().getName())); + + filterMaps = conf.getFilterMap(); + + // Map that contains the rows for each alias + storage = new AbstractRowContainer[tagLen]; + emptyList = new RowContainer>(1, hconf, reporter); + + vContext.setOperatorType(OperatorType.FILTER); + filterExpressions = vContext.getMapVectorExpressions(conf.getFilters()); + bigTableFilterExpressions = filterExpressions.get(posBigTable); + vContext.setOperatorType(OperatorType.MAPJOIN); + + List keyDesc = conf.getKeys().get(posBigTable); + keyExpressions = vContext.getVectorExpressions(keyDesc); + keyOutputWriters = VectorExpressionWriterFactory.getExpressionWriters(keyDesc); + + valueExpressions = vContext.getMapVectorExpressions(conf.getExprs()); + bigTableValueExpressions = valueExpressions.get(posBigTable); + + rowContainerMap = JoinUtil.buildRowcontainerMap( + conf.getTagLength(), posBigTable, numAliases); + + outputBatch = VectorizedRowBatch.buildBatch(vContext, valueExpressions, + order, conf.getOutputColumnNames()); + + // This only builds the outVCAs for the big table alias + // The hash tables aliases outVCAs will be built on first call to + // this.cleanUpInputFileChangedOp + + outVCA = VectorColumnAssignFactory.buildAssigners(outputBatch); + + keyWrapperBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); + + if (keyExpressions.length < 1) { + throw new HiveException("Cross products are not supported"); + } + else if (keyExpressions.length == 1) { + keyEvaluator = new VectorMapJoinKeyEvaluator () { + private final MapJoinSingleKey keyObject = new MapJoinSingleKey(); + + @Override + public AbstractMapJoinKey setObj(VectorHashKeyWrapper kw) throws HiveException { + Object value = keyWrapperBatch.getWritableKeyValue(kw, 0, keyOutputWriters[0]); + keyObject.setObj(value); + return keyObject; + } + }; + } + else if (keyExpressions.length == 2) { + keyEvaluator = new VectorMapJoinKeyEvaluator () { + private final MapJoinDoubleKeys keyObject = new MapJoinDoubleKeys(); + + @Override + public AbstractMapJoinKey setObj(VectorHashKeyWrapper kw) throws HiveException { + Object v1 = keyWrapperBatch.getWritableKeyValue(kw, 0, keyOutputWriters[0]); + Object v2 = keyWrapperBatch.getWritableKeyValue(kw, 1, keyOutputWriters[1]); + keyObject.setObj1(v1); + keyObject.setObj2(v2); + return keyObject; + } + }; + } + else { + keyEvaluator = new VectorMapJoinKeyEvaluator () { + private final MapJoinObjectKey keyObject = new MapJoinObjectKey( + new Object[keyExpressions.length]); + + @Override + public AbstractMapJoinKey setObj(VectorHashKeyWrapper kw) throws HiveException { + Object[] values = keyObject.getObj(); + for(int i=0; i[] joinValues = new List[tagLen]; + + int totalSz = JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(), + order, posBigTable); + + List[] joinValuesObjectInspectors = + JoinUtil.getObjectInspectorsFromEvaluators(joinValues, + inputObjInspectors, posBigTable, tagLen); + List[] joinValuesStandardObjectInspectors = + JoinUtil.getStandardObjectInspectors(joinValuesObjectInspectors,posBigTable, tagLen); + + int outColIndex = 0; + for (Byte pos:order) { + if (pos == posBigTable) { + outColIndex += bigTableValueExpressions.length; + } + else { + for(ObjectInspector objInspector: joinValuesStandardObjectInspectors[pos]) { + outVCA[outColIndex] = VectorColumnAssignFactory.buildObjectAssign( + outputBatch, outColIndex, objInspector); + ++outColIndex; + } + } + } + } + loadHashTable(); + } + + @Override + public void processOp(Object row, int tag) throws HiveException { + if (firstRow) { + firstRow = false; + } + + byte alias = (byte) tag; + VectorizedRowBatch inBatch = (VectorizedRowBatch) row; + + if (null != bigTableValueExpressions) { + for(VectorExpression ve: bigTableValueExpressions) { + ve.evaluate(inBatch); + } + } + + keyWrapperBatch.evaluateBatch(inBatch); + VectorHashKeyWrapper[] keyValues = keyWrapperBatch.getVectorHashKeyWrappers(); + + for(int batchIndex = 0; batchIndex < inBatch.size; ++batchIndex) { + AbstractMapJoinKey joinKey = keyEvaluator.setObj(keyValues[batchIndex]); + boolean joinNeeded = false; + Arrays.fill(storage, emptyList); + if (!joinKey.hasAnyNulls(null)) { + for(byte pos: order) { + if (pos != alias) { + MapJoinObjectValue o = mapJoinTables[pos].get(joinKey); + if (o == null) { + joinNeeded = false; + break; + } + joinNeeded = true; + MapJoinRowContainer> rc = (MapJoinRowContainer>) + rowContainerMap[pos]; + rc.reset(o.getObj()); + storage[pos] = rc; + } + } + } + if (joinNeeded) { + assignRowToOutputBatch(inBatch, batchIndex); + } + } + } + + private void forwardBatch() throws HiveException { + forward(outputBatch, null); + outputBatch.reset(); + for(VectorColumnAssign vca: outVCA) { + vca.reset(); + } + } + + + private void assignRowToOutputBatch(VectorizedRowBatch inBatch, int batchIndex) + throws HiveException { + iterateAliasAndOutputRow(inBatch, batchIndex, 0); + } + + private void iterateAliasAndOutputRow(VectorizedRowBatch inBatch, + int batchIndex, int aliasOrder) throws HiveException { + if (order.length == aliasOrder) { + outputCurrentJoinRow(inBatch, batchIndex); + } + else + { + byte pos = order[aliasOrder]; + if (posBigTable == pos) { + joinCurrentRow[aliasOrder] = null; + iterateAliasAndOutputRow(inBatch, batchIndex, aliasOrder+1); + } + else { + MapJoinRowContainer> rowContainer = + (MapJoinRowContainer>) storage[pos]; + for(ArrayList val : rowContainer.getList()) { + joinCurrentRow[pos] = val; + iterateAliasAndOutputRow(inBatch, batchIndex, aliasOrder+1); + } + } + } + } + + private void outputCurrentJoinRow(VectorizedRowBatch inBatch, int batchIndex) throws HiveException { + int outColumn = 0; + for (byte pos:order) { + if (pos == posBigTable) { + for(VectorExpression ve: bigTableValueExpressions) { + outVCA[outColumn].assignVectorValue(inBatch, batchIndex, + ve.getOutputColumn(), outputBatch.size); + ++outColumn; + } + } + else { + List valList = joinCurrentRow[pos]; + for(Object val:valList) { + outVCA[outColumn].assignObjectValue(val, outputBatch.size); + outColumn++; + } + } + } + ++outputBatch.size; + if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { + forwardBatch(); + } + } + + @Override + public void closeOp(boolean abort) throws HiveException { + if (!abort && 0 < outputBatch.size) { + forwardBatch(); + } + } + + private void loadHashTable() throws HiveException { + + hashTblInitedOnce = JoinUtil.loadHashTables(LOG, + getHadoopConf(), + getExecContext(), + conf.getDumpFilePrefix(), + mapJoinTables, + hashTblInitedOnce); + } + + protected Configuration getHadoopConf() { + return hconf; + } + + @Override + public OperatorType getType() { + return OperatorType.MAPJOIN; + } + + /** + * Returns the name of the operator + */ + @Override + public String getName() { + return getOperatorName(); + } + + static public String getOperatorName() { + return "MAPJOIN"; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java index 41d2001..fb1e498 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -33,8 +33,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.MapredWork; @@ -495,6 +495,15 @@ public void setChildren(Configuration hconf) throws HiveException { boolean recursive = true; switch (op.getType()) { + case HASHTABLEDUMMY: + // The hashtable dummy is it's own vectorized counterpart + // the children are already vectorized, the recursive vectorization will + // not vectorize them again + vectorOp = op.cloneOp(); + break; + case MAPJOIN: + vectorOp = new VectorMapJoinOperator(vectorizationContext, op.getConf()); + break; case GROUPBY: vectorOp = new VectorGroupByOperator(vectorizationContext, op.getConf()); recursive = false; @@ -525,10 +534,15 @@ public void setChildren(Configuration hconf) throws HiveException { List> vectorizedChildren = new ArrayList>(children.size()); for (Operator childOp : children) { - Operator vectorizedChild = - vectorizeOperator(childOp, vectorizationContext); - List> parentList = - new ArrayList>(); + Operator vectorizedChild = childOp.getVectorOperator(); + List> parentList; + if (vectorizedChild == null) { + vectorizedChild = vectorizeOperator(childOp, vectorizationContext); + parentList = new ArrayList>(); + } + else { + parentList = vectorizedChild.getParentOperators(); + } parentList.add(vectorOp); vectorizedChild.setParentOperators(parentList); vectorizedChildren.add(vectorizedChild); @@ -553,6 +567,8 @@ public void setChildren(Configuration hconf) throws HiveException { } } + op.assignVectorOperator(vectorOp); + return vectorOp; } @@ -772,4 +788,18 @@ public OperatorType getType() { return null; } + public List> vectorizeDummies( + List> dummyOps) + throws HiveException, CloneNotSupportedException { + + List> vectorizedDummies = + new ArrayList>(dummyOps.size()); + + for (Operator dummyOp: dummyOps) { + Operator vectorOp = vectorizeOperator(dummyOp, vectorizationContext); + vectorizedDummies.add(vectorOp); + } + return vectorizedDummies; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index 9c90230..0b0de14 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -207,7 +207,7 @@ public void setOperatorType(OperatorType opType) { } private VectorExpression getVectorExpression(ExprNodeColumnDesc - exprDesc) { + exprDesc) throws HiveException { int columnNum = getInputColumnIndex(exprDesc.getColumn()); VectorExpression expr = null; switch (opType) { @@ -218,8 +218,11 @@ private VectorExpression getVectorExpression(ExprNodeColumnDesc case SELECT: case GROUPBY: case REDUCESINK: + case MAPJOIN: expr = new IdentityExpression(columnNum, exprDesc.getTypeString()); break; + default: + throw new HiveException("Unsupported opType:" + opType); } return expr; } @@ -301,7 +304,7 @@ private ExprNodeDesc foldConstantsForUnaryExpression(ExprNodeDesc exprDesc) thro private VectorExpression getConstantVectorExpression(ExprNodeConstantDesc exprDesc) throws HiveException { String type = exprDesc.getTypeString(); - String colVectorType = this.getOutputColType(type, "constant"); + String colVectorType = this.getOutputColType(type); int outCol = ocm.allocateOutputColumn(colVectorType); if (type.equalsIgnoreCase("long") || type.equalsIgnoreCase("int") || type.equalsIgnoreCase("short") || type.equalsIgnoreCase("byte")) { @@ -470,14 +473,14 @@ private VectorExpression getVectorExpression(GenericUDFBridge udf, /* Return a unary string vector expression. This is used for functions like * UPPER() and LOWER(). */ - private VectorExpression getUnaryStringExpression(String vectorExprClassName, + private VectorExpression getUnaryStringExpression(String vectorExprClassName, String resultType, // result type name List childExprList) throws HiveException { - + /* Create an instance of the class vectorExprClassName for the input column or expression result * and return it. */ - + ExprNodeDesc childExpr = childExprList.get(0); int inputCol; VectorExpression v1 = null; @@ -517,23 +520,23 @@ private VectorExpression getLikeExpression(List childExpr) throws VectorExpression expr = null; int inputCol; ExprNodeConstantDesc constDesc; - + if ((leftExpr instanceof ExprNodeColumnDesc) && (rightExpr instanceof ExprNodeConstantDesc) ) { ExprNodeColumnDesc leftColDesc = (ExprNodeColumnDesc) leftExpr; constDesc = (ExprNodeConstantDesc) rightExpr; inputCol = getInputColumnIndex(leftColDesc.getColumn()); - expr = (VectorExpression) new FilterStringColLikeStringScalar(inputCol, - new Text((byte[]) getScalarValue(constDesc))); + expr = (VectorExpression) new FilterStringColLikeStringScalar(inputCol, + new Text((byte[]) getScalarValue(constDesc))); } else if ((leftExpr instanceof ExprNodeGenericFuncDesc) && (rightExpr instanceof ExprNodeConstantDesc)) { v1 = getVectorExpression(leftExpr); inputCol = v1.getOutputColumn(); constDesc = (ExprNodeConstantDesc) rightExpr; - expr = (VectorExpression) new FilterStringColLikeStringScalar(inputCol, - new Text((byte[]) getScalarValue(constDesc))); + expr = (VectorExpression) new FilterStringColLikeStringScalar(inputCol, + new Text((byte[]) getScalarValue(constDesc))); } - // TODO add logic to handle cases where left input is an expression. + // TODO add logic to handle cases where left input is an expression. if (expr == null) { throw new HiveException("Vector LIKE filter expression could not be initialized"); } @@ -709,7 +712,7 @@ private VectorExpression getBinaryArithmeticExpression(String method, scalarType, method); try { expr = (VectorExpression) Class.forName(className). - getDeclaredConstructors()[0].newInstance(getScalarValue(constDesc), + getDeclaredConstructors()[0].newInstance(getScalarValue(constDesc), inputCol2, outputCol); } catch (Exception ex) { throw new HiveException(ex); @@ -870,7 +873,7 @@ private VectorExpression getVectorBinaryComparisonFilterExpression(String } catch (Exception ex) { throw new HiveException(ex); } - } else if ((leftExpr instanceof ExprNodeConstantDesc) && + } else if ((leftExpr instanceof ExprNodeConstantDesc) && (rightExpr instanceof ExprNodeColumnDesc)) { ExprNodeConstantDesc constDesc = (ExprNodeConstantDesc) leftExpr; ExprNodeColumnDesc rightColDesc = (ExprNodeColumnDesc) rightExpr; @@ -1147,7 +1150,7 @@ private void validateInputType(String inputType) throws HiveException { } } - private String getOutputColType(String inputType, String method) throws HiveException { + private String getOutputColType(String inputType) throws HiveException { validateInputType(inputType); if (inputType.equalsIgnoreCase("float") || inputType.equalsIgnoreCase("double")) { return "double"; @@ -1278,10 +1281,12 @@ private VectorizedRowBatch allocateRowBatch(int rowCount) throws HiveException { return map; } - public ColumnVector allocateColumnVector(String type, int defaultSize) { - if (type.equalsIgnoreCase("double")) { + public ColumnVector allocateColumnVector(String type, int defaultSize) throws HiveException { + validateInputType(type); + String vectorType = getOutputColType(type); + if (vectorType.equalsIgnoreCase("double")) { return new DoubleColumnVector(defaultSize); - } else if (type.equalsIgnoreCase("string")) { + } else if (vectorType.equalsIgnoreCase("string")) { return new BytesColumnVector(defaultSize); } else { return new LongColumnVector(defaultSize); @@ -1294,5 +1299,16 @@ public void addToColumnMap(String columnName, int outputColumn) { columnMap.put(columnName, outputColumn); } } + + public Map getMapVectorExpressions( + Map> expressions) throws HiveException { + Map result = new HashMap(); + if (null != expressions) { + for(T key: expressions.keySet()) { + result.put(key, getVectorExpressions(expressions.get(key))); + } + } + return result; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java index ff13f89..aaaf6b1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java @@ -20,7 +20,10 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.io.NullWritable; @@ -172,5 +175,42 @@ public void write(DataOutput arg0) throws IOException { public void setValueWriters(VectorExpressionWriter[] valueWriters) { this.valueWriters = valueWriters; } + + public static VectorizedRowBatch buildBatch( + VectorizationContext vContext, + Map valueExpressions, + Byte[] order, + List outputColumnNames) throws HiveException { + int outColIndex = 0; + VectorizedRowBatch batch = new VectorizedRowBatch(outputColumnNames.size()); + for(byte alias:order) { + VectorExpression[] exprs = valueExpressions.get(alias); + for(VectorExpression ve:exprs) { + String colName = outputColumnNames.get(outColIndex); + vContext.addToColumnMap(colName, outColIndex); + batch.cols[outColIndex] = vContext.allocateColumnVector(ve.getOutputType(), + VectorizedRowBatch.DEFAULT_SIZE); + ++outColIndex; + } + } + batch.reset(); + return batch; + } + + /** + * Resets the row batch to default state + * - sets selectedInUse to false + * - sets size to 0 + * - sets endOfFile to false + * - resets each column + */ + public void reset() { + selectedInUse = false; + size = 0; + endOfFile = false; + for (ColumnVector vc : cols) { + vc.reset(); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java index 9e189c9..85cf6ac 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java @@ -75,7 +75,7 @@ public VectorExpressionWriter init(ExprNodeDesc nodeDesc) throws HiveException { } if (null == objectInspector) { throw new HiveException(String.format( - "Failed to initialize VectorExpressionWriter for expr: %s", + "Failed to initialize VectorExpressionWriter for expr: %s", nodeDesc.getExprString())); } return this; diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableDummyDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableDummyDesc.java index f15ce48..5396d6e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableDummyDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/HashTableDummyDesc.java @@ -34,4 +34,10 @@ public void setTbl(TableDesc tbl) { this.tbl = tbl; } + @Override + public Object clone() { + HashTableDummyDesc c = new HashTableDummyDesc(); + c.setTbl((TableDesc)tbl.clone()); + return c; + } }