diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java index d320b47..91db713 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java @@ -183,7 +183,7 @@ public CommonJoinOperator(CommonJoinOperator clone) { return joinOutputObjectInspector; } - Configuration hconf; + protected Configuration hconf; @Override @SuppressWarnings("unchecked") @@ -407,9 +407,9 @@ protected long getNextSize(long sz) { // // for MapJoin, filter tag is pre-calculated in MapredLocalTask and stored with value. // when reading the hashtable, MapJoinObjectValue calculates alias filter and provide it to join - protected ArrayList getFilteredValue(byte alias, Object row) throws HiveException { + protected List getFilteredValue(byte alias, Object row) throws HiveException { boolean hasFilter = hasFilter(alias); - ArrayList nr = JoinUtil.computeValues(row, joinValues[alias], + List nr = JoinUtil.computeValues(row, joinValues[alias], joinValuesObjectInspectors[alias], hasFilter); if (hasFilter) { short filterTag = JoinUtil.isFiltered(row, joinFilters[alias], @@ -434,7 +434,7 @@ private void createForwardJoinObject(boolean[] skip) throws HiveException { } } if (forward) { - forward(forwardCache, null); + internalForward(forwardCache, null); countAfterReport = 0; } } @@ -639,6 +639,10 @@ public void endGroup() throws HiveException { checkAndGenObject(); } + protected void internalForward(Object row, ObjectInspector outputOI) throws HiveException { + forward(row, outputOI); + } + private void genUniqueJoinObject(int aliasNum, int forwardCachePos) throws HiveException { AbstractRowContainer> alias = storage[order[aliasNum]]; @@ -649,7 +653,7 @@ private void genUniqueJoinObject(int aliasNum, int forwardCachePos) forwardCache[p++] = row.get(j); } if (aliasNum == numAliases - 1) { - forward(forwardCache, outputObjInspector); + internalForward(forwardCache, outputObjInspector); countAfterReport = 0; } else { genUniqueJoinObject(aliasNum + 1, p); @@ -668,7 +672,7 @@ private void genAllOneUniqueJoinObject() } } - forward(forwardCache, outputObjInspector); + internalForward(forwardCache, outputObjInspector); countAfterReport = 0; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java index 86db044..9c37018 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java @@ -75,7 +75,7 @@ public void processOp(Object row, int tag) throws HiveException { // get alias alias = (byte) tag; - ArrayList nr = getFilteredValue(alias, row); + List nr = getFilteredValue(alias, row); if (handleSkewJoin) { skewJoinKeyContext.handleSkew(tag); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java index fa9ee35..1e0314d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/JoinUtil.java @@ -199,14 +199,15 @@ public static MapJoinKey computeMapJoinKeys(MapJoinKey key, Object row, * Return the value as a standard object. StandardObject can be inspected by a * standard ObjectInspector. * If it would be tagged by filter, reserve one more slot for that. + * outValues can be passed in to avoid allocation */ - public static ArrayList computeValues(Object row, + public static List computeValues(Object row, List valueFields, List valueFieldsOI, boolean hasFilter) throws HiveException { // Compute the values int reserve = hasFilter ? valueFields.size() + 1 : valueFields.size(); - ArrayList nr = new ArrayList(reserve); + List nr = new ArrayList(reserve); for (int i = 0; i < valueFields.size(); i++) { nr.add(ObjectInspectorUtils.copyToStandardObject(valueFields.get(i) .evaluate(row), valueFieldsOI.get(i), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 153b8ea..6c053a2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -22,7 +22,7 @@ import java.io.FileInputStream; import java.io.ObjectInputStream; import java.io.Serializable; -import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,7 +56,7 @@ "Mapside join exceeds available memory. " + "Please try removing the mapjoin hint."}; - private transient MapJoinTableContainer[] mapJoinTables; + protected transient MapJoinTableContainer[] mapJoinTables; private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes; private transient boolean hashTblInitedOnce; private transient MapJoinKey key; @@ -86,7 +86,7 @@ protected void fatalErrorMessage(StringBuilder errMsg, long counterCode) { public void generateMapMetaData() throws HiveException, SerDeException { // generate the meta data for key // index for key is -1 - + TableDesc keyTableDesc = conf.getKeyTblDesc(); SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(), null); @@ -154,7 +154,7 @@ private void loadHashTable() throws HiveException { LOG.info("\tLoad back 1 hashtable file from tmp file uri:" + path); ObjectInputStream in = new ObjectInputStream(new BufferedInputStream( new FileInputStream(path.toUri().getPath()), 4096)); - try{ + try{ mapJoinTables[pos] = mapJoinTableSerdes[pos].load(in); } finally { in.close(); @@ -180,6 +180,11 @@ public void cleanUpInputFileChangedOp() throws HiveException { } } + protected MapJoinKey computeMapJoinKey(Object row, byte alias) throws HiveException { + return JoinUtil.computeMapJoinKeys(key, row, joinKeys[alias], + joinKeysObjectInspectors[alias]); + } + @Override public void processOp(Object row, int tag) throws HiveException { try { @@ -191,8 +196,7 @@ public void processOp(Object row, int tag) throws HiveException { alias = (byte)tag; // compute keys and values as StandardObjects - key = JoinUtil.computeMapJoinKeys(key, row, joinKeys[alias], - joinKeysObjectInspectors[alias]); + key = computeMapJoinKey(row, alias); boolean joinNeeded = false; for (byte pos = 0; pos < order.length; pos++) { if (pos != alias) { @@ -213,7 +217,7 @@ public void processOp(Object row, int tag) throws HiveException { } } if (joinNeeded) { - ArrayList value = getFilteredValue(alias, row); + List value = getFilteredValue(alias, row); // Add the value to the ArrayList storage[alias].add(value); // generate the output records diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java index 54f2644..1f955d4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; @@ -118,6 +119,7 @@ public OpTuple(Class descClass, Class> opClass) { vectorOpvec = new ArrayList(); vectorOpvec.add(new OpTuple(SelectDesc.class, VectorSelectOperator.class)); vectorOpvec.add(new OpTuple(GroupByDesc.class, VectorGroupByOperator.class)); + vectorOpvec.add(new OpTuple(MapJoinDesc.class, VectorMapJoinOperator.class)); vectorOpvec.add(new OpTuple(ReduceSinkDesc.class, VectorReduceSinkOperator.class)); vectorOpvec.add(new OpTuple(FileSinkDesc.class, VectorFileSinkOperator.class)); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index cde1a59..653f40a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -262,7 +262,7 @@ public void processOp(Object row, int tag) throws HiveException { // compute keys and values as StandardObjects ArrayList key = JoinUtil.computeKeys(row, joinKeys[alias], joinKeysObjectInspectors[alias]); - ArrayList value = getFilteredValue(alias, row); + List value = getFilteredValue(alias, row); //have we reached a new key group? diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java index 8b4c615..48b87ea 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec.vector; +import java.util.Arrays; + import org.apache.hadoop.io.Writable; /** @@ -26,33 +28,33 @@ * repeats, i.e. has all values the same, so only the first * one is set. This is used to accelerate query performance * by handling a whole vector in O(1) time when applicable. - * + * * The fields are public by design since this is a performance-critical * structure that is used in the inner loop of query execution. */ public abstract class ColumnVector { - + /* - * If hasNulls is true, then this array contains true if the value - * is null, otherwise false. The array is always allocated, so a batch can be re-used + * If hasNulls is true, then this array contains true if the value + * is null, otherwise false. The array is always allocated, so a batch can be re-used * later and nulls added. */ - public boolean[] isNull; - + public boolean[] isNull; + // If the whole column vector has no nulls, this is true, otherwise false. public boolean noNulls; - - /* - * True if same value repeats for whole column vector. + + /* + * True if same value repeats for whole column vector. * If so, vector[0] holds the repeating value. */ - public boolean isRepeating; + public boolean isRepeating; public abstract Writable getWritableObject(int index); /** * Constructor for super-class ColumnVector. This is not called directly, * but used to initialize inherited fields. - * + * * @param len Vector length */ public ColumnVector(int len) { @@ -60,5 +62,19 @@ public ColumnVector(int len) { noNulls = true; isRepeating = false; } -} + + /** + * Resets the column to default state + * - fills the isNull array with false + * - sets noNulls to true + * - sets isRepeating to false + */ + public void reset() { + if (false == noNulls) { + Arrays.fill(isNull, false); + } + noNulls = true; + isRepeating = false; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssign.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssign.java new file mode 100644 index 0000000..6a44c27 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssign.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +public interface VectorColumnAssign { + + void assignVectorValue(VectorizedRowBatch inBatch, int batchIndex, int valueColumn, int destIndex) + throws HiveException; + void assignObjectValue(Object val, int destIndex) throws HiveException; + + void reset(); +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java new file mode 100644 index 0000000..d1a75df --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java @@ -0,0 +1,369 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; + +/** + * This class is used as a static factory for VectorColumnAssign. + * Is capable of building assigners from expression nodes or from object inspectors. + */ +public class VectorColumnAssignFactory { + + private static abstract class VectorColumnAssignVectorBase + implements VectorColumnAssign { + protected VectorizedRowBatch outBatch; + protected T outCol; + + protected void copyValue(T in, int srcIndex, int destIndex) throws HiveException { + throw new HiveException("Internal error: should not reach here"); + } + + @SuppressWarnings("unchecked") + @Override + public void assignVectorValue(VectorizedRowBatch inBatch, int batchIndex, + int valueColumnIndex, int destIndex) throws HiveException { + T in = (T) inBatch.cols[valueColumnIndex]; + if (in.isRepeating) { + if (in.noNulls) { + copyValue(in, 0, destIndex); + } + else { + assignNull(destIndex); + } + } + else { + int srcIndex = inBatch.selectedInUse ? inBatch.selected[batchIndex] : batchIndex; + if (in.noNulls || !in.isNull[srcIndex]) { + copyValue(in, srcIndex, destIndex); + } + else { + assignNull(destIndex); + } + } + } + + public VectorColumnAssign init(VectorizedRowBatch out, T cv) { + this.outBatch = out; + this.outCol = cv; + return this; + } + + protected void assignNull(int index) { + VectorizedBatchUtil.SetNullColIsNullValue(outCol, index); + } + + @Override + public void reset() { + } + + @Override + public void assignObjectValue(Object value, int destIndex) throws HiveException { + throw new HiveException("Internal error: should not reach here"); + } + } + + private static abstract class VectorLongColumnAssign + extends VectorColumnAssignVectorBase { + protected void assignLong(long value, int destIndex) { + outCol.vector[destIndex] = value; + } + } + + private static abstract class VectorDoubleColumnAssign + extends VectorColumnAssignVectorBase { + + protected void assignDouble(double value, int destIndex) { + outCol.vector[destIndex] = value; + } + } + + private static abstract class VectorBytesColumnAssign + extends VectorColumnAssignVectorBase { + byte[] pad = new byte[BytesColumnVector.DEFAULT_BUFFER_SIZE]; + int padUsed = 0; + + protected void assignBytes(byte[] buffer, int start, int length, int destIndex) { + if (padUsed + length <= pad.length) { + System.arraycopy(buffer, start, + pad, padUsed, length); + outCol.vector[destIndex] = pad; + outCol.start[destIndex] = padUsed; + outCol.length[destIndex] = length; + padUsed += length; + } + else { + outCol.vector[destIndex] = Arrays.copyOfRange(buffer, + start, length); + outCol.start[destIndex] = 0; + outCol.length[destIndex] = length; + } + } + + @Override + public void reset() { + super.reset(); + padUsed = 0; + } + } + + + public static VectorColumnAssign[] buildAssigners(VectorizedRowBatch outputBatch) + throws HiveException { + VectorColumnAssign[] vca = new VectorColumnAssign[outputBatch.cols.length]; + for(int i=0; i columnMap, + List outputColumnNames) throws HiveException { + StructObjectInspector soi = (StructObjectInspector) outputOI; + VectorColumnAssign[] vcas = new VectorColumnAssign[outputColumnNames.size()]; + for (int i=0; i outputVectorAssigners; + + // These members are used as out-of-band params + // for the inner-loop supper.processOp callbacks + // + private transient int batchIndex; + private transient VectorHashKeyWrapper[] keyValues; + + public VectorMapJoinOperator() { + super(); + } + + private interface MapJoinKeyEvaluator { + MapJoinKey evaluate(VectorHashKeyWrapper kw) throws HiveException; + } + + public VectorMapJoinOperator (VectorizationContext vContext, OperatorDesc conf) + throws HiveException { + this(); + + MapJoinDesc desc = (MapJoinDesc) conf; + this.conf = desc; + + order = desc.getTagOrder(); + numAliases = desc.getExprs().size(); + posBigTable = (byte) desc.getPosBigTable(); + filterMaps = desc.getFilterMap(); + tagLen = desc.getTagLength(); + noOuterJoin = desc.isNoOuterJoin(); + + vContext.setOperatorType(OperatorType.FILTER); + Map> filterExpressions = desc.getFilters(); + bigTableFilterExpressions = vContext.getVectorExpressions(filterExpressions.get(posBigTable)); + + vContext.setOperatorType(OperatorType.MAPJOIN); + + List keyDesc = desc.getKeys().get(posBigTable); + keyExpressions = vContext.getVectorExpressions(keyDesc); + keyOutputWriters = VectorExpressionWriterFactory.getExpressionWriters(keyDesc); + + // We're only going to evaluate the big table vectorized expressions, + Map> exprs = desc.getExprs(); + bigTableValueExpressions = vContext.getVectorExpressions(exprs.get(posBigTable)); + + List outColNames = desc.getOutputColumnNames(); + int outputColumnIndex = 0; + for(byte alias:order) { + for(ExprNodeDesc expr: exprs.get(alias)) { + String columnName = outColNames.get(outputColumnIndex); + vContext.addOutputColumn(columnName, expr.getTypeString()); + ++outputColumnIndex; + } + } + + this.fileKey = vContext.getFileKey(); + } + + @Override + public void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); + + Map> allTypeMaps = Utilities. + getMapRedWork(hconf).getMapWork().getScratchColumnVectorTypes(); + Map typeMap = allTypeMaps.get(fileKey); + + Map> allColumnMaps = Utilities. + getMapRedWork(hconf).getMapWork().getScratchColumnMap(); + + Map columnMap = allColumnMaps.get(fileKey); + + outputBatch = VectorizedRowBatch.buildBatch(typeMap, columnMap); + + keyWrapperBatch =VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); + + // This key evaluator translates from the vectorized VectorHashKeyWrapper format + // into the row-mode MapJoinKey + keyEvaluator = new MapJoinKeyEvaluator() { + private MapJoinKey key; + + public MapJoinKeyEvaluator init() { + key = new MapJoinKey(new Object[keyExpressions.length]); + return this; + } + + @Override + public MapJoinKey evaluate(VectorHashKeyWrapper kw) throws HiveException { + Object[] keyValues = key.getKey(); + for(int i=0; i> valueExpressions = conf.getExprs(); + List bigTableExpressions = valueExpressions.get(posBigTable); + + VectorExpressionWriterFactory.processVectorExpressions( + bigTableExpressions, + new VectorExpressionWriterFactory.ListOIDClosure() { + @Override + public void assign(VectorExpressionWriter[] writers, List oids) { + valueWriters = writers; + joinValuesObjectInspectors[posBigTable] = oids; + } + }); + + // We're hijacking the big table evaluators an replace them with our own custom ones + // which are going to return values from the input batch vector expressions + List vectorNodeEvaluators = new ArrayList(bigTableExpressions.size()); + + for(int i=0; i(desc) { + int columnIndex;; + int writerIndex; + + public ExprNodeEvaluator initVectorExpr(int columnIndex, int writerIndex) { + this.columnIndex = columnIndex; + this.writerIndex = writerIndex; + return this; + } + + @Override + public ObjectInspector initialize(ObjectInspector rowInspector) throws HiveException { + throw new HiveException("should never reach here"); + } + + @Override + protected Object _evaluate(Object row, int version) throws HiveException { + VectorizedRowBatch inBatch = (VectorizedRowBatch) row; + int rowIndex = inBatch.selectedInUse ? inBatch.selected[batchIndex] : batchIndex; + return valueWriters[writerIndex].writeValue(inBatch.cols[columnIndex], rowIndex); + } + }.initVectorExpr(vectorExpr.getOutputColumn(), i); + vectorNodeEvaluators.add(eval); + } + // Now replace the old evaluators with our own + joinValues[posBigTable] = vectorNodeEvaluators; + + // Filtering is handled in the input batch processing + filterMaps[posBigTable] = null; + + outputVectorAssigners = new HashMap(); + } + + /** + * 'forwards' the (row-mode) record into the (vectorized) output batch + */ + @Override + protected void internalForward(Object row, ObjectInspector outputOI) throws HiveException { + Object[] values = (Object[]) row; + VectorColumnAssign[] vcas = outputVectorAssigners.get(outputOI); + if (null == vcas) { + Map> allColumnMaps = Utilities. + getMapRedWork(hconf).getMapWork().getScratchColumnMap(); + Map columnMap = allColumnMaps.get(fileKey); + vcas = VectorColumnAssignFactory.buildAssigners( + outputBatch, outputOI, columnMap, conf.getOutputColumnNames()); + outputVectorAssigners.put(outputOI, vcas); + } + for (int i=0; i cMap = vContext.getColumnMap(); + for (int i=0; i < colList.size(); ++i) { String columnName = this.conf.getOutputColumnNames().get(i); - // Update column map with output column names - vContext.addToColumnMap(columnName, vExpressions[i].getOutputColumn()); + if (!cMap.containsKey(columnName)) { + VectorExpression ve = vExpressions[i]; + // Update column map with output column names + vContext.addToColumnMap(columnName, ve.getOutputColumn()); + } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index 8f10644..e57c28c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -180,6 +180,8 @@ private static final String GENERATED_EXPR_PACKAGE = "org.apache.hadoop.hive.ql.exec.vector.expressions.gen"; + private String fileKey = null; + public VectorizationContext(Map columnMap, int initialOutputCol) { this.columnMap = columnMap; @@ -187,7 +189,18 @@ public VectorizationContext(Map columnMap, this.firstOutputColumnIndex = initialOutputCol; } + public String getFileKey() { + return fileKey; + } + + public void setFileKey(String fileKey) { + this.fileKey = fileKey; + } + private int getInputColumnIndex(String name) { + if (!columnMap.containsKey(name)) { + LOG.error(String.format("The column %s is not in the vectorization context column map.", name)); + } return columnMap.get(name); } @@ -259,6 +272,7 @@ private VectorExpression getVectorExpression(ExprNodeColumnDesc //Important: It will come here only if the column is being used as a boolean expr = new SelectColumnIsTrue(columnNum); break; + case MAPJOIN: case SELECT: case GROUPBY: case REDUCESINK: @@ -987,6 +1001,14 @@ private VectorExpression getCustomUDFExpression(ExprNodeGenericFuncDesc expr) return ve; } + public static boolean isStringFamily(String resultType) { + return resultType.equalsIgnoreCase("string"); + } + + public static boolean isDatetimeFamily(String resultType) { + return resultType.equalsIgnoreCase("timestamp"); + } + // return true if this is any kind of float public static boolean isFloatFamily(String resultType) { return resultType.equalsIgnoreCase("double") @@ -999,7 +1021,23 @@ public static boolean isIntFamily(String resultType) { || resultType.equalsIgnoreCase("smallint") || resultType.equalsIgnoreCase("int") || resultType.equalsIgnoreCase("bigint") - || resultType.equalsIgnoreCase("boolean"); + || resultType.equalsIgnoreCase("boolean") + || resultType.equalsIgnoreCase("long"); + } + + public static String mapJavaTypeToVectorType(String javaType) + throws HiveException { + if (isStringFamily(javaType)) { + return "string"; + } + if (isFloatFamily(javaType)) { + return "double"; + } + if (isIntFamily(javaType) || + isDatetimeFamily(javaType)) { + return "bigint"; + } + throw new HiveException("Unsuported type for vectorization: " + javaType); } /* Return a unary string vector expression. This is used for functions like @@ -1889,21 +1927,43 @@ public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc) return map; } - public ColumnVector allocateColumnVector(String type, int defaultSize) { - if (type.equalsIgnoreCase("double")) { + public Map getColumnMap() { + return columnMap; + } + + public static ColumnVector allocateColumnVector(String type, int defaultSize) { + if (isFloatFamily(type)) { return new DoubleColumnVector(defaultSize); - } else if (type.equalsIgnoreCase("string")) { + } else if (isStringFamily(type)) { return new BytesColumnVector(defaultSize); } else { return new LongColumnVector(defaultSize); } } + public void addToColumnMap(String columnName, int outputColumn) throws HiveException { + if (columnMap.containsKey(columnName) && (columnMap.get(columnName) != outputColumn)) { + throw new HiveException(String.format("Column %s is already mapped to %d. Cannot remap to %d.", + columnName, columnMap.get(columnName), outputColumn)); + } + columnMap.put(columnName, outputColumn); + } - public void addToColumnMap(String columnName, int outputColumn) { - if (columnMap != null) { - columnMap.put(columnName, outputColumn); + public Map getMapVectorExpressions( + Map> expressions) throws HiveException { + Map result = new HashMap(); + if (null != expressions) { + for(T key: expressions.keySet()) { + result.put(key, getVectorExpressions(expressions.get(key))); + } } + return result; + } + + public void addOutputColumn(String columnName, String columnType) throws HiveException { + String vectorType = mapJavaTypeToVectorType(columnType); + int columnIndex = ocm.allocateOutputColumn(vectorType); + this.addToColumnMap(columnName, columnIndex); } -} + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java index ff13f89..be7cb9f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java @@ -20,6 +20,10 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -172,5 +176,67 @@ public void write(DataOutput arg0) throws IOException { public void setValueWriters(VectorExpressionWriter[] valueWriters) { this.valueWriters = valueWriters; } -} + public static VectorizedRowBatch buildBatch(Map typeMap, + Map columnMap) throws HiveException { + + Map mapVectorColumn = new HashMap(typeMap.size()); + int maxIndex = 0; + + Iterator> typeMapIt = typeMap.entrySet().iterator(); + while(typeMapIt.hasNext()) { + Entry type = typeMapIt.next(); + ColumnVector cv = VectorizationContext.allocateColumnVector(type.getValue(), + VectorizedRowBatch.DEFAULT_SIZE); + mapVectorColumn.put(type.getKey(), cv); + if (maxIndex < type.getKey()) { + maxIndex = type.getKey(); + } + } + + VectorizedRowBatch batch = new VectorizedRowBatch(maxIndex+1); + for(int i=0; i <= maxIndex; ++i) { + ColumnVector cv = mapVectorColumn.get(i); + if (cv == null) { + // allocate a default type for the unused column. + // there are APIs that expect all cols[i] to be non NULL + cv = VectorizationContext.allocateColumnVector("long", + VectorizedRowBatch.DEFAULT_SIZE); + } + batch.cols[i] = cv; + } + + // Validate that every column in the column map exists + Iterator> columnMapIt = columnMap.entrySet().iterator(); + while(columnMapIt.hasNext()) { + Entry cm = columnMapIt.next(); + if (batch.cols.length <= cm.getValue() || batch.cols[cm.getValue()] == null) { + throw new HiveException(String.format( + "Internal error: The type map has no entry for column %d %s", + cm.getValue(), cm.getKey())); + } + } + + batch.reset(); + + return batch; + } + + /** + * Resets the row batch to default state + * - sets selectedInUse to false + * - sets size to 0 + * - sets endOfFile to false + * - resets each column + */ + public void reset() { + selectedInUse = false; + size = 0; + endOfFile = false; + for (ColumnVector vc : cols) { + if (vc != null) { + vc.reset(); + } + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java index 9e189c9..c5b026a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java @@ -75,7 +75,7 @@ public VectorExpressionWriter init(ExprNodeDesc nodeDesc) throws HiveException { } if (null == objectInspector) { throw new HiveException(String.format( - "Failed to initialize VectorExpressionWriter for expr: %s", + "Failed to initialize VectorExpressionWriter for expr: %s", nodeDesc.getExprString())); } return this; @@ -378,18 +378,22 @@ public Object writeValue(double value) { * A poor man Java closure. Works around the problem of having to return multiple objects * from one function call. */ - public static interface Closure { + public static interface SingleOIDClosure { void assign(VectorExpressionWriter[] writers, ObjectInspector objectInspector); } + public static interface ListOIDClosure { + void assign(VectorExpressionWriter[] writers, List oids); + } + /** * Creates the value writers for a column vector expression list. * Creates an appropriate output object inspector. */ public static void processVectorExpressions( List nodesDesc, - List outputColumnNames, - Closure closure) + List columnNames, + SingleOIDClosure closure) throws HiveException { VectorExpressionWriter[] writers = getExpressionWriters(nodesDesc); List oids = new ArrayList(writers.length); @@ -397,10 +401,26 @@ public static void processVectorExpressions( oids.add(writers[i].getObjectInspector()); } ObjectInspector objectInspector = ObjectInspectorFactory. - getStandardStructObjectInspector(outputColumnNames,oids); + getStandardStructObjectInspector(columnNames,oids); closure.assign(writers, objectInspector); } + /** + * Creates the value writers for a column vector expression list. + * Creates an appropriate output object inspector. + */ + public static void processVectorExpressions( + List nodesDesc, + ListOIDClosure closure) + throws HiveException { + VectorExpressionWriter[] writers = getExpressionWriters(nodesDesc); + List oids = new ArrayList(writers.length); + for(int i=0; i nodeOutput = new HashMap(); ogw.startWalking(topNodes, nodeOutput); mapWork.setScratchColumnVectorTypes(vnp.getScratchColumnVectorTypes()); + mapWork.setScratchColumnMap(vnp.getScratchColumnMap()); return; } } @@ -361,6 +364,17 @@ public VectorizationNodeProcessor(MapRedTask mrTask) { return scratchColumnVectorTypes; } + public Map> getScratchColumnMap() { + Map> scratchColumnMap = + new HashMap>(); + for(String oneFile: vectorizationContexts.keySet()) { + VectorizationContext vc = vectorizationContexts.get(oneFile); + Map cmap = vc.getColumnMap(); + scratchColumnMap.put(oneFile, cmap); + } + return scratchColumnMap; + } + @Override public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { @@ -384,6 +398,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, } } vContext = getVectorizationContext(tsOp, physicalContext); + vContext.setFileKey(fileKey); vectorizationContexts.put(fileKey, vContext); vContextsByTSOp.put(tsOp, vContext); } @@ -438,6 +453,9 @@ public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { boolean validateOperator(Operator op) { boolean ret = false; switch (op.getType()) { + case MAPJOIN: + ret = validateMapJoinOperator((MapJoinOperator) op); + break; case GROUPBY: ret = validateGroupByOperator((GroupByOperator) op); break; @@ -462,6 +480,17 @@ boolean validateOperator(Operator op) { return ret; } + private boolean validateMapJoinOperator(MapJoinOperator op) { + MapJoinDesc desc = op.getConf(); + byte posBigTable = (byte) desc.getPosBigTable(); + List filterExprs = desc.getFilters().get(posBigTable); + List keyExprs = desc.getKeys().get(posBigTable); + List valueExprs = desc.getExprs().get(posBigTable); + return validateExprNodeDesc(filterExprs) && + validateExprNodeDesc(keyExprs) && + validateExprNodeDesc(valueExprs); + } + private boolean validateReduceSinkOperator(ReduceSinkOperator op) { List keyDescs = op.getConf().getKeyCols(); List partitionDescs = op.getConf().getPartitionCols(); @@ -575,11 +604,22 @@ private VectorizationContext getVectorizationContext(Operator op, + VectorizationContext vContext) throws HiveException { + + RowResolver rr = physicalContext.getParseContext().getOpParseCtx().get(op).getRowResolver(); + for(ColumnInfo c : rr.getColumnInfos()) { + vContext.addOutputColumn(c.getInternalName(), c.getTypeName()); + } + } + Operator vectorizeOperator(Operator op, VectorizationContext vContext) throws HiveException { Operator vectorOp = null; switch (op.getType()) { + case MAPJOIN: case GROUPBY: case FILTER: case SELECT: diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index a72ec8b..65c4e3f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -113,8 +113,10 @@ private boolean useBucketizedHiveInputFormat; private Map> scratchColumnVectorTypes = null; + private Map> scratchColumnMap = null; private boolean vectorMode = false; + public MapWork() { } @@ -492,11 +494,21 @@ public void setScratchColumnVectorTypes( this.scratchColumnVectorTypes = scratchColumnVectorTypes; } + public Map> getScratchColumnMap() { + return scratchColumnMap; + } + + public void setScratchColumnMap(Map> scratchColumnMap) { + this.scratchColumnMap = scratchColumnMap; + } + public boolean getVectorMode() { return vectorMode; } + @Override public void setVectorMode(boolean vectorMode) { this.vectorMode = vectorMode; } + } diff --git ql/src/test/queries/clientpositive/vectorized_mapjoin.q ql/src/test/queries/clientpositive/vectorized_mapjoin.q new file mode 100644 index 0000000..b68282b --- /dev/null +++ ql/src/test/queries/clientpositive/vectorized_mapjoin.q @@ -0,0 +1,9 @@ +SET hive.vectorized.execution.enabled=true; + +EXPLAIN SELECT /*+ MAPJOIN(t1) */ COUNT(t1.cint), MAX(t2.cint), MIN(t1.cint), AVG(t1.cint+t2.cint) + FROM alltypesorc t1 + JOIN alltypesorc t2 ON t1.cint = t2.cint; + +SELECT /*+ MAPJOIN(t1) */ COUNT(t1.cint), MAX(t2.cint), MIN(t1.cint), AVG(t1.cint+t2.cint) + FROM alltypesorc t1 + JOIN alltypesorc t2 ON t1.cint = t2.cint; \ No newline at end of file diff --git ql/src/test/results/clientpositive/vectorized_mapjoin.q.out ql/src/test/results/clientpositive/vectorized_mapjoin.q.out new file mode 100644 index 0000000..a8b2ccd --- /dev/null +++ ql/src/test/results/clientpositive/vectorized_mapjoin.q.out @@ -0,0 +1,129 @@ +PREHOOK: query: EXPLAIN SELECT /*+ MAPJOIN(t1) */ COUNT(t1.cint), MAX(t2.cint), MIN(t1.cint), AVG(t1.cint+t2.cint) + FROM alltypesorc t1 + JOIN alltypesorc t2 ON t1.cint = t2.cint +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN SELECT /*+ MAPJOIN(t1) */ COUNT(t1.cint), MAX(t2.cint), MIN(t1.cint), AVG(t1.cint+t2.cint) + FROM alltypesorc t1 + JOIN alltypesorc t2 ON t1.cint = t2.cint +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME alltypesorc) t1) (TOK_TABREF (TOK_TABNAME alltypesorc) t2) (= (. (TOK_TABLE_OR_COL t1) cint) (. (TOK_TABLE_OR_COL t2) cint)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_HINTLIST (TOK_HINT TOK_MAPJOIN (TOK_HINTARGLIST t1))) (TOK_SELEXPR (TOK_FUNCTION COUNT (. (TOK_TABLE_OR_COL t1) cint))) (TOK_SELEXPR (TOK_FUNCTION MAX (. (TOK_TABLE_OR_COL t2) cint))) (TOK_SELEXPR (TOK_FUNCTION MIN (. (TOK_TABLE_OR_COL t1) cint))) (TOK_SELEXPR (TOK_FUNCTION AVG (+ (. (TOK_TABLE_OR_COL t1) cint) (. (TOK_TABLE_OR_COL t2) cint))))))) +STAGE DEPENDENCIES: + Stage-3 is a root stage + Stage-1 depends on stages: Stage-3 + Stage-0 is a root stage +STAGE PLANS: + Stage: Stage-3 + Map Reduce Local Work + Alias -> Map Local Tables: + t1 + Fetch Operator + limit: -1 + Alias -> Map Local Operator Tree: + t1 + TableScan + alias: t1 + HashTable Sink Operator + condition expressions: + 0 {cint} + 1 {cint} + handleSkewJoin: false + keys: + 0 [Column[cint]] + 1 [Column[cint]] + Position of Big Table: 1 + Stage: Stage-1 + Map Reduce + Alias -> Map Operator Tree: + t2 + TableScan + alias: t2 + Map Join Operator + condition map: + Inner Join 0 to 1 + condition expressions: + 0 {cint} + 1 {cint} + handleSkewJoin: false + keys: + 0 [Column[cint]] + 1 [Column[cint]] + outputColumnNames: _col2, _col16 + Position of Big Table: 1 + Vectorized execution: true + Select Operator + expressions: + expr: _col2 + type: int + expr: _col16 + type: int + outputColumnNames: _col2, _col16 + Vectorized execution: true + Group By Operator + aggregations: + expr: count(_col2) + expr: max(_col16) + expr: min(_col2) + expr: avg((_col2 + _col16)) + bucketGroup: false + mode: hash + outputColumnNames: _col0, _col1, _col2, _col3 + Vectorized execution: true + Reduce Output Operator + sort order: + tag: -1 + value expressions: + expr: _col0 + type: bigint + expr: _col1 + type: int + expr: _col2 + type: int + expr: _col3 + type: struct + Local Work: + Map Reduce Local Work + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + expr: max(VALUE._col1) + expr: min(VALUE._col2) + expr: avg(VALUE._col3) + bucketGroup: false + mode: mergepartial + outputColumnNames: _col0, _col1, _col2, _col3 + Select Operator + expressions: + expr: _col0 + type: bigint + expr: _col1 + type: int + expr: _col2 + type: int + expr: _col3 + type: double + outputColumnNames: _col0, _col1, _col2, _col3 + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Stage: Stage-0 + Fetch Operator + limit: -1 +PREHOOK: query: SELECT /*+ MAPJOIN(t1) */ COUNT(t1.cint), MAX(t2.cint), MIN(t1.cint), AVG(t1.cint+t2.cint) + FROM alltypesorc t1 + JOIN alltypesorc t2 ON t1.cint = t2.cint +PREHOOK: type: QUERY +PREHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +POSTHOOK: query: SELECT /*+ MAPJOIN(t1) */ COUNT(t1.cint), MAX(t2.cint), MIN(t1.cint), AVG(t1.cint+t2.cint) + FROM alltypesorc t1 + JOIN alltypesorc t2 ON t1.cint = t2.cint +POSTHOOK: type: QUERY +POSTHOOK: Input: default@alltypesorc +#### A masked pattern was here #### +3149925 1073680599 -1073051226 9.381482540406644E8