diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java index af11196..036f080 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; @@ -39,11 +40,12 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; /** * The vectorized version of the MapJoinOperator. */ -public class VectorMapJoinOperator extends MapJoinOperator { +public class VectorMapJoinOperator extends MapJoinOperator implements VectorizationContextRegion { private static final Log LOG = LogFactory.getLog( VectorMapJoinOperator.class.getName()); @@ -77,7 +79,10 @@ // private transient int batchIndex; private transient VectorHashKeyWrapper[] keyValues; - + + private transient VectorizationContext vOutContext = null; + private transient VectorizedRowBatchCtx vrbCtx = null; + public VectorMapJoinOperator() { super(); } @@ -113,36 +118,28 @@ public VectorMapJoinOperator (VectorizationContext vContext, OperatorDesc conf) bigTableValueExpressions = vContext.getVectorExpressions(exprs.get(posBigTable)); List outColNames = desc.getOutputColumnNames(); - int outputColumnIndex = 0; - - Map cMap = vContext.getColumnMap(); - for(byte alias:order) { - for(ExprNodeDesc expr: exprs.get(alias)) { - String columnName = outColNames.get(outputColumnIndex); - if (!cMap.containsKey(columnName)) { - vContext.addOutputColumn(columnName, expr.getTypeString()); - } - ++outputColumnIndex; - } + + Map mapOutCols = new HashMap(outColNames.size()); + + int outColIndex = 0; + for(String outCol: outColNames) { + mapOutCols.put(outCol, outColIndex++); } - - this.fileKey = vContext.getFileKey(); + + vOutContext = new VectorizationContext(mapOutCols, outColIndex); + vOutContext.setFileKey(vContext.getFileKey() + "/MAP_JOIN_" + desc.getBigTableAlias()); + this.fileKey = vOutContext.getFileKey(); } @Override public void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); + - Map> allTypeMaps = Utilities. - getMapRedWork(hconf).getMapWork().getScratchColumnVectorTypes(); - Map typeMap = allTypeMaps.get(fileKey); - - Map> allColumnMaps = Utilities. - getMapRedWork(hconf).getMapWork().getScratchColumnMap(); - - Map columnMap = allColumnMaps.get(fileKey); - - outputBatch = VectorizedRowBatch.buildBatch(typeMap, columnMap); + vrbCtx = new VectorizedRowBatchCtx(); + vrbCtx.init(hconf, this.fileKey, (StructObjectInspector) this.outputObjInspector); + + outputBatch = vrbCtx.createVectorizedRowBatch(); keyWrapperBatch =VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); @@ -298,4 +295,9 @@ public void processOp(Object row, int tag) throws HiveException { batchIndex = -1; keyValues = null; } + + @Override + public VectorizationContext getOuputVectorizationContext() { + return vOutContext; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContextRegion.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContextRegion.java new file mode 100644 index 0000000..a403725 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContextRegion.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +/** + * VectorizationContextRegion optional interface implemented by vectorized operators + * that are changing the vectorizaiton context (region boundary operators) + */ +public interface VectorizationContextRegion { + + VectorizationContext getOuputVectorizationContext(); + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java index be7cb9f..0b504de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java @@ -177,51 +177,6 @@ public void setValueWriters(VectorExpressionWriter[] valueWriters) { this.valueWriters = valueWriters; } - public static VectorizedRowBatch buildBatch(Map typeMap, - Map columnMap) throws HiveException { - - Map mapVectorColumn = new HashMap(typeMap.size()); - int maxIndex = 0; - - Iterator> typeMapIt = typeMap.entrySet().iterator(); - while(typeMapIt.hasNext()) { - Entry type = typeMapIt.next(); - ColumnVector cv = VectorizationContext.allocateColumnVector(type.getValue(), - VectorizedRowBatch.DEFAULT_SIZE); - mapVectorColumn.put(type.getKey(), cv); - if (maxIndex < type.getKey()) { - maxIndex = type.getKey(); - } - } - - VectorizedRowBatch batch = new VectorizedRowBatch(maxIndex+1); - for(int i=0; i <= maxIndex; ++i) { - ColumnVector cv = mapVectorColumn.get(i); - if (cv == null) { - // allocate a default type for the unused column. - // there are APIs that expect all cols[i] to be non NULL - cv = VectorizationContext.allocateColumnVector("long", - VectorizedRowBatch.DEFAULT_SIZE); - } - batch.cols[i] = cv; - } - - // Validate that every column in the column map exists - Iterator> columnMapIt = columnMap.entrySet().iterator(); - while(columnMapIt.hasNext()) { - Entry cm = columnMapIt.next(); - if (batch.cols.length <= cm.getValue() || batch.cols[cm.getValue()] == null) { - throw new HiveException(String.format( - "Internal error: The type map has no entry for column %d %s", - cm.getValue(), cm.getKey())); - } - } - - batch.reset(); - - return batch; - } - /** * Resets the row batch to default state * - sets selectedInUse to false diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index 289757c..f513188 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -96,6 +96,25 @@ public VectorizedRowBatchCtx(StructObjectInspector rawRowOI, StructObjectInspect public VectorizedRowBatchCtx() { } + + /** + * Initializes the VectorizedRowBatch context based on an arbitrary object inspector + * Used by non-tablescan operators when they change the vectorization context + * @param hiveConf + * @param fileKey + * The key on which to retrieve the extra column mapping from the map scratch + * @param rowOI + * Object inspector that shapes the column types + */ + public void init(Configuration hiveConf, String fileKey, + StructObjectInspector rowOI) { + columnTypeMap = Utilities + .getMapRedWork(hiveConf).getMapWork().getScratchColumnVectorTypes() + .get(fileKey); + this.rowOI= rowOI; + this.rawRowOI = rowOI; + } + /** * Initializes VectorizedRowBatch context based on the @@ -251,6 +270,7 @@ public VectorizedRowBatch createVectorizedRowBatch() throws HiveException } result.numCols = fieldRefs.size(); this.addScratchColumnsToBatch(result); + result.reset(); return result; } @@ -351,4 +371,5 @@ private ColumnVector allocateColumnVector(String type, int defaultSize) { return new LongColumnVector(defaultSize); } } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 151c648..a8a1478 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; @@ -411,33 +412,38 @@ public VectorizationNodeProcessor(MapRedTask mrTask) { public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - Node firstOp = stack.firstElement(); - TableScanOperator tsOp = null; - - tsOp = (TableScanOperator) firstOp; - - VectorizationContext vContext = vContextsByTSOp.get(tsOp); - if (vContext == null) { - String fileKey = ""; + Operator op = (Operator) nd; + + VectorizationContext vContext = null; + + if (op instanceof TableScanOperator) { + vContext = getVectorizationContext(op, physicalContext); for (String onefile : mWork.getPathToAliases().keySet()) { List aliases = mWork.getPathToAliases().get(onefile); for (String alias : aliases) { - Operator op = mWork.getAliasToWork().get(alias); - if (op == tsOp) { - fileKey = onefile; - if (vContext == null) { - vContext = getVectorizationContext(tsOp, physicalContext); - } - vContext.setFileKey(fileKey); - vectorizationContexts.put(fileKey, vContext); + Operator opRoot = mWork.getAliasToWork().get(alias); + if (op == opRoot) { + vContext.setFileKey(onefile); break; } } } - vContextsByTSOp.put(tsOp, vContext); + vContextsByTSOp.put(op, vContext); + vectorizationContexts.put(vContext.getFileKey(), vContext); + } else { + assert stack.size() > 1; + // Walk down the stack of operators until we found one willing to give us a context. + // At the bottom will be the TS operator, guaranteed to have a context + int i= stack.size()-2; + while (vContext == null) { + Operator opParent = (Operator) stack.get(i); + vContext = vContextsByTSOp.get(opParent); + --i; + } } - - Operator op = (Operator) nd; + + assert vContext != null; + if (op.getType().equals(OperatorType.REDUCESINK) && op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) { // No need to vectorize @@ -453,6 +459,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if (vectorOp != op) { opsDone.add(vectorOp); } + if (vectorOp instanceof VectorizationContextRegion) { + VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp; + VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext(); + vContextsByTSOp.put(op, vOutContext); + vectorizationContexts.put(vOutContext.getFileKey(), vOutContext); + } } } catch (HiveException e) { throw new SemanticException(e); @@ -678,7 +690,7 @@ private boolean validateDataType(String type) { return supportedDataTypes.contains(type.toLowerCase()); } - private VectorizationContext getVectorizationContext(TableScanOperator op, + private VectorizationContext getVectorizationContext(Operator op, PhysicalContext pctx) { RowResolver rr = pctx.getParseContext().getOpParseCtx().get(op).getRowResolver();