diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index ca0873a..faf2f02 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -80,7 +80,7 @@ private final Map, MapOpCtx> childrenOpToOpCtxMap = new HashMap, MapOpCtx>(); - private transient MapOpCtx current; + protected transient MapOpCtx current; private transient List> extraChildrenToClose = null; private static class MapInputPath { @@ -121,7 +121,7 @@ public int hashCode() { } } - private static class MapOpCtx { + protected static class MapOpCtx { StructObjectInspector tblRawRowObjectInspector; // columns StructObjectInspector partObjectInspector; // partition columns @@ -150,6 +150,10 @@ private boolean hasVC() { private Object readRow(Writable value) throws SerDeException { return partTblObjectInspectorConverter.convert(deserializer.deserialize(value)); } + + public StructObjectInspector getRowObjectInspector() { + return rowObjectInspector; + } } /** @@ -474,7 +478,6 @@ public void process(Writable value) throws HiveException { // The child operators cleanup if input file has changed cleanUpInputFileChanged(); } - Object row; try { row = current.readRow(value); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java index d1e82a2..c1b5508 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.MapredContext; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -97,7 +98,11 @@ public void configure(JobConf job) { execContext.setJc(jc); // create map and fetch operators MapWork mrwork = Utilities.getMapWork(job); - mo = new MapOperator(); + if (mrwork.getVectorMode()) { + mo = new VectorMapOperator(); + } else { + mo = new MapOperator(); + } mo.setConf(mrwork); // initialize map operator mo.setChildren(job); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java new file mode 100644 index 0000000..311f6d6 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import org.apache.hadoop.hive.ql.exec.MapOperator; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.io.Writable; + +public class VectorMapOperator extends MapOperator { + + private static final long serialVersionUID = 1L; + + @Override + public void process(Writable value) throws HiveException { + // A mapper can span multiple files/partitions. + // The serializers need to be reset if the input file changed + ExecMapperContext context = getExecContext(); + if (context != null && context.inputFileChanged()) { + // The child operators cleanup if input file has changed + cleanUpInputFileChanged(); + } + + // The row has been converted to comply with table schema, irrespective of partition schema. + // So, use tblOI (and not partOI) for forwarding + try { + forward(value, current.getRowObjectInspector()); + } catch (Exception e) { + throw new HiveException("Hive Runtime Error while processing row ", e); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 02c32cb..f1b3cb2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.FilterOperator; @@ -58,6 +59,7 @@ import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc; @@ -108,7 +110,6 @@ import org.apache.hadoop.hive.ql.udf.UDFSubstr; import org.apache.hadoop.hive.ql.udf.UDFTan; import org.apache.hadoop.hive.ql.udf.UDFTrim; -import org.apache.hadoop.hive.ql.udf.UDFUnhex; import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear; import org.apache.hadoop.hive.ql.udf.UDFYear; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; @@ -561,7 +562,7 @@ private boolean validateDataType(String type) { return supportedDataTypes.contains(type.toLowerCase()); } - private VectorizationContext getVectorizationContext(Operator op, + private VectorizationContext getVectorizationContext(TableScanOperator op, PhysicalContext pctx) { RowResolver rr = pctx.getParseContext().getOpParseCtx().get(op).getRowResolver(); @@ -572,6 +573,12 @@ private VectorizationContext getVectorizationContext(Operator