diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java index af11196..036f080 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; @@ -39,11 +40,12 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; /** * The vectorized version of the MapJoinOperator. */ -public class VectorMapJoinOperator extends MapJoinOperator { +public class VectorMapJoinOperator extends MapJoinOperator implements VectorizationContextRegion { private static final Log LOG = LogFactory.getLog( VectorMapJoinOperator.class.getName()); @@ -77,7 +79,10 @@ // private transient int batchIndex; private transient VectorHashKeyWrapper[] keyValues; - + + private transient VectorizationContext vOutContext = null; + private transient VectorizedRowBatchCtx vrbCtx = null; + public VectorMapJoinOperator() { super(); } @@ -113,36 +118,28 @@ public VectorMapJoinOperator (VectorizationContext vContext, OperatorDesc conf) bigTableValueExpressions = vContext.getVectorExpressions(exprs.get(posBigTable)); List outColNames = desc.getOutputColumnNames(); - int outputColumnIndex = 0; - - Map cMap = vContext.getColumnMap(); - for(byte alias:order) { - for(ExprNodeDesc expr: exprs.get(alias)) { - String columnName = outColNames.get(outputColumnIndex); - if (!cMap.containsKey(columnName)) { - vContext.addOutputColumn(columnName, expr.getTypeString()); - } - ++outputColumnIndex; - } + + Map mapOutCols = new HashMap(outColNames.size()); + + int outColIndex = 0; + for(String outCol: outColNames) { + mapOutCols.put(outCol, outColIndex++); } - - this.fileKey = vContext.getFileKey(); + + vOutContext = new VectorizationContext(mapOutCols, outColIndex); + vOutContext.setFileKey(vContext.getFileKey() + "/MAP_JOIN_" + desc.getBigTableAlias()); + this.fileKey = vOutContext.getFileKey(); } @Override public void initializeOp(Configuration hconf) throws HiveException { super.initializeOp(hconf); + - Map> allTypeMaps = Utilities. - getMapRedWork(hconf).getMapWork().getScratchColumnVectorTypes(); - Map typeMap = allTypeMaps.get(fileKey); - - Map> allColumnMaps = Utilities. - getMapRedWork(hconf).getMapWork().getScratchColumnMap(); - - Map columnMap = allColumnMaps.get(fileKey); - - outputBatch = VectorizedRowBatch.buildBatch(typeMap, columnMap); + vrbCtx = new VectorizedRowBatchCtx(); + vrbCtx.init(hconf, this.fileKey, (StructObjectInspector) this.outputObjInspector); + + outputBatch = vrbCtx.createVectorizedRowBatch(); keyWrapperBatch =VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions); @@ -298,4 +295,9 @@ public void processOp(Object row, int tag) throws HiveException { batchIndex = -1; keyValues = null; } + + @Override + public VectorizationContext getOuputVectorizationContext() { + return vOutContext; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContextRegion.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContextRegion.java new file mode 100644 index 0000000..a403725 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContextRegion.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +/** + * VectorizationContextRegion optional interface implemented by vectorized operators + * that are changing the vectorizaiton context (region boundary operators) + */ +public interface VectorizationContextRegion { + + VectorizationContext getOuputVectorizationContext(); + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java index be7cb9f..0b504de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java @@ -177,51 +177,6 @@ public void setValueWriters(VectorExpressionWriter[] valueWriters) { this.valueWriters = valueWriters; } - public static VectorizedRowBatch buildBatch(Map typeMap, - Map columnMap) throws HiveException { - - Map mapVectorColumn = new HashMap(typeMap.size()); - int maxIndex = 0; - - Iterator> typeMapIt = typeMap.entrySet().iterator(); - while(typeMapIt.hasNext()) { - Entry type = typeMapIt.next(); - ColumnVector cv = VectorizationContext.allocateColumnVector(type.getValue(), - VectorizedRowBatch.DEFAULT_SIZE); - mapVectorColumn.put(type.getKey(), cv); - if (maxIndex < type.getKey()) { - maxIndex = type.getKey(); - } - } - - VectorizedRowBatch batch = new VectorizedRowBatch(maxIndex+1); - for(int i=0; i <= maxIndex; ++i) { - ColumnVector cv = mapVectorColumn.get(i); - if (cv == null) { - // allocate a default type for the unused column. - // there are APIs that expect all cols[i] to be non NULL - cv = VectorizationContext.allocateColumnVector("long", - VectorizedRowBatch.DEFAULT_SIZE); - } - batch.cols[i] = cv; - } - - // Validate that every column in the column map exists - Iterator> columnMapIt = columnMap.entrySet().iterator(); - while(columnMapIt.hasNext()) { - Entry cm = columnMapIt.next(); - if (batch.cols.length <= cm.getValue() || batch.cols[cm.getValue()] == null) { - throw new HiveException(String.format( - "Internal error: The type map has no entry for column %d %s", - cm.getValue(), cm.getKey())); - } - } - - batch.reset(); - - return batch; - } - /** * Resets the row batch to default state * - sets selectedInUse to false diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java index 289757c..f513188 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java @@ -96,6 +96,25 @@ public VectorizedRowBatchCtx(StructObjectInspector rawRowOI, StructObjectInspect public VectorizedRowBatchCtx() { } + + /** + * Initializes the VectorizedRowBatch context based on an arbitrary object inspector + * Used by non-tablescan operators when they change the vectorization context + * @param hiveConf + * @param fileKey + * The key on which to retrieve the extra column mapping from the map scratch + * @param rowOI + * Object inspector that shapes the column types + */ + public void init(Configuration hiveConf, String fileKey, + StructObjectInspector rowOI) { + columnTypeMap = Utilities + .getMapRedWork(hiveConf).getMapWork().getScratchColumnVectorTypes() + .get(fileKey); + this.rowOI= rowOI; + this.rawRowOI = rowOI; + } + /** * Initializes VectorizedRowBatch context based on the @@ -251,6 +270,7 @@ public VectorizedRowBatch createVectorizedRowBatch() throws HiveException } result.numCols = fieldRefs.size(); this.addScratchColumnsToBatch(result); + result.reset(); return result; } @@ -351,4 +371,5 @@ private ColumnVector allocateColumnVector(String type, int defaultSize) { return new LongColumnVector(defaultSize); } } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 151c648..d6c79f7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -23,9 +23,11 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.Stack; @@ -48,6 +50,7 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; @@ -342,8 +345,17 @@ private void vectorizeMRTask(MapRedTask mrTask) throws SemanticException { topNodes.addAll(mapWork.getAliasToWork().values()); HashMap nodeOutput = new HashMap(); ogw.startWalking(topNodes, nodeOutput); - mapWork.setScratchColumnVectorTypes(vnp.getScratchColumnVectorTypes()); - mapWork.setScratchColumnMap(vnp.getScratchColumnMap()); + + Map> columnVectorTypes = vnp.getScratchColumnVectorTypes(); + mapWork.setScratchColumnVectorTypes(columnVectorTypes); + Map> columnMap = vnp.getScratchColumnMap(); + mapWork.setScratchColumnMap(columnMap); + + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("vectorTypes: %s", columnVectorTypes.toString())); + LOG.debug(String.format("columnMap: %s", columnMap.toString())); + } + return; } } @@ -411,33 +423,42 @@ public VectorizationNodeProcessor(MapRedTask mrTask) { public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { - Node firstOp = stack.firstElement(); - TableScanOperator tsOp = null; - - tsOp = (TableScanOperator) firstOp; - - VectorizationContext vContext = vContextsByTSOp.get(tsOp); - if (vContext == null) { - String fileKey = ""; + Operator op = (Operator) nd; + + VectorizationContext vContext = null; + + if (op instanceof TableScanOperator) { + vContext = getVectorizationContext(op, physicalContext); for (String onefile : mWork.getPathToAliases().keySet()) { List aliases = mWork.getPathToAliases().get(onefile); for (String alias : aliases) { - Operator op = mWork.getAliasToWork().get(alias); - if (op == tsOp) { - fileKey = onefile; - if (vContext == null) { - vContext = getVectorizationContext(tsOp, physicalContext); - } - vContext.setFileKey(fileKey); - vectorizationContexts.put(fileKey, vContext); + Operator opRoot = mWork.getAliasToWork().get(alias); + if (op == opRoot) { + // The same vectorization context is copied multiple times into + // the MapWork scratch columnMap + // Each partition gets a copy + // + vContext.setFileKey(onefile); + vectorizationContexts.put(onefile, vContext); break; } } } - vContextsByTSOp.put(tsOp, vContext); + vContextsByTSOp.put(op, vContext); + } else { + assert stack.size() > 1; + // Walk down the stack of operators until we found one willing to give us a context. + // At the bottom will be the TS operator, guaranteed to have a context + int i= stack.size()-2; + while (vContext == null) { + Operator opParent = (Operator) stack.get(i); + vContext = vContextsByTSOp.get(opParent); + --i; + } } - - Operator op = (Operator) nd; + + assert vContext != null; + if (op.getType().equals(OperatorType.REDUCESINK) && op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) { // No need to vectorize @@ -453,6 +474,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, if (vectorOp != op) { opsDone.add(vectorOp); } + if (vectorOp instanceof VectorizationContextRegion) { + VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp; + VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext(); + vContextsByTSOp.put(op, vOutContext); + vectorizationContexts.put(vOutContext.getFileKey(), vOutContext); + } } } catch (HiveException e) { throw new SemanticException(e); @@ -678,7 +705,7 @@ private boolean validateDataType(String type) { return supportedDataTypes.contains(type.toLowerCase()); } - private VectorizationContext getVectorizationContext(TableScanOperator op, + private VectorizationContext getVectorizationContext(Operator op, PhysicalContext pctx) { RowResolver rr = pctx.getParseContext().getOpParseCtx().get(op).getRowResolver(); diff --git ql/src/test/queries/clientpositive/vectorized_context_regression_5817.q ql/src/test/queries/clientpositive/vectorized_context_regression_5817.q new file mode 100644 index 0000000..2baa44e --- /dev/null +++ ql/src/test/queries/clientpositive/vectorized_context_regression_5817.q @@ -0,0 +1,36 @@ +create table store_5817 (s_store_sk int, s_city string) stored as orc; +insert overwrite table store_5817 + select cint, cstring1 + from alltypesorc + where cint not in (-3728, -563, 762, 6981, 253665376, 528534767, 626923679); + +create table store_sales_5817 (ss_store_sk int, ss_hdemo_sk int, ss_net_profit double) stored as orc; +insert overwrite table store_sales_5817 + select cint, cint, cdouble + from alltypesorc + where cint not in (-3728, -563, 762, 6981, 253665376, 528534767, 626923679); + +create table household_demographics_5817 (hd_demo_sk int) stored as orc; +insert overwrite table household_demographics_5817 + select cint + from alltypesorc + where cint not in (-3728, -563, 762, 6981, 253665376, 528534767, 626923679); + +set hive.auto.convert.join=true; +set hive.vectorized.execution.enabled = true; +explain +select store.s_city, ss_net_profit + from store_sales_5817 ss + JOIN store_5817 s ON ss.ss_store_sk = s.s_store_sk + JOIN household_demographics_5817 hd ON ss.ss_hdemo_sk = hd.hd_demo_sk; +select store.s_city, ss_net_profit + from store_sales_5817 ss + JOIN store_5817 s ON ss.ss_store_sk = s.s_store_sk + JOIN household_demographics_5817 hd ON ss.ss_hdemo_sk = hd.hd_demo_sk; + +set hive.auto.convert.join=false; +set hive.vectorized.execution.enabled = false; + +drop table store_5817; +drop table store_sales_5817; +drop table household_demographics_5817; \ No newline at end of file