diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index 5382be7..bfebf66 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.exec.vector.expressions.ConstantVectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterExprAndExpr; import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterExprOrExpr; import org.apache.hadoop.hive.ql.exec.vector.expressions.FilterNotExpr; @@ -241,7 +242,20 @@ public VectorExpression getVectorExpression(ExprNodeDesc exprDesc) throws HiveEx private VectorExpression getConstantVectorExpression(ExprNodeConstantDesc exprDesc) throws HiveException { - return null; + String type = exprDesc.getTypeString(); + String colVectorType = this.getOutputColType(type, "constant"); + int outCol = ocm.allocateOutputColumn(colVectorType); + if (type.equalsIgnoreCase("long") || type.equalsIgnoreCase("int") || + type.equalsIgnoreCase("short") || type.equalsIgnoreCase("byte")) { + return new ConstantVectorExpression(outCol, + ((Number) exprDesc.getValue()).longValue()); + } else if (type.equalsIgnoreCase("double") || type.equalsIgnoreCase("float")) { + return new ConstantVectorExpression(outCol, ((Number) exprDesc.getValue()).doubleValue()); + } else if (type.equalsIgnoreCase("string")) { + return new ConstantVectorExpression(outCol, ((String) exprDesc.getValue()).getBytes()); + } else { + throw new HiveException("Unsupported constant type"); + } } private VectorExpression getUnaryMinusExpression(List childExprList) diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java index b4097ca..c0e7c21 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java @@ -35,41 +35,41 @@ public ColumnVector[] cols; // a vector for each column public int size; // number of rows that qualify (i.e. haven't been filtered out) public int[] selected; // array of positions of selected values - + /* * If no filtering has been applied yet, selectedInUse is false, * meaning that all rows qualify. If it is true, then the selected[] array * records the offsets of qualifying rows. */ - public boolean selectedInUse; - + public boolean selectedInUse; + // If this is true, then there is no data in the batch -- we have hit the end of input. - public boolean endOfFile; - - /* - * This number is carefully chosen to minimize overhead and typically allows + public boolean endOfFile; + + /* + * This number is carefully chosen to minimize overhead and typically allows * one VectorizedRowBatch to fit in cache. */ - public static final int DEFAULT_SIZE = 1024; + public static final int DEFAULT_SIZE = 1024; - private Writable[] writableRow; + private final Writable[] writableRow; private int rowIteratorIndex = 0; - /** + /** * Return a batch with the specified number of columns. * This is the standard constructor -- all batches should be the same size - * + * * @param numCols the number of columns to include in the batch */ public VectorizedRowBatch(int numCols) { this(numCols, DEFAULT_SIZE); } - + /** * Return a batch with the specified number of columns and rows. * Only call this constructor directly for testing purposes. * Batch size should normally always be defaultSize. - * + * * @param numCols the number of columns to include in the batch * @param size the number of rows to include in the batch */ @@ -104,13 +104,13 @@ public void initRowIterator(){ return writableRow; } - /** + /** * Return count of qualifying rows. - * + * * @return number of rows that have not been filtered out */ public long count() { - return size; + return size; } @Override @@ -124,7 +124,11 @@ public String toString() { int i = selected[j]; int colIndex = 0; for (ColumnVector cv : cols) { - b.append(cv.getWritableObject(i).toString()); + if (cv.isRepeating) { + b.append(cv.getWritableObject(0).toString()); + } else { + b.append(cv.getWritableObject(i).toString()); + } colIndex++; if (colIndex < cols.length) { b.append('\u0001'); @@ -138,7 +142,11 @@ public String toString() { for (int i = 0; i < size; i++) { int colIndex = 0; for (ColumnVector cv : cols) { - b.append(cv.getWritableObject(i).toString()); + if (cv.isRepeating) { + b.append(cv.getWritableObject(0).toString()); + } else { + b.append(cv.getWritableObject(i).toString()); + } colIndex++; if (colIndex < cols.length) { b.append('\u0001'); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java new file mode 100644 index 0000000..e0e87f1 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/ConstantVectorExpression.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; + +public class ConstantVectorExpression extends VectorExpression { + + private static enum Type { + LONG, + DOUBLE, + BYTES + } + + private final Type type; + private final int outputColumn; + private long longValue = 0; + private double doubleValue = 0; + private byte[] bytesValue = null; + private int bytesValueLength = 0; + private final String typeString; + + ConstantVectorExpression(int outputColumn, String typeString) { + this.outputColumn = outputColumn; + this.typeString = typeString; + if ("string".equalsIgnoreCase(typeString)) { + this.type = Type.BYTES; + } else if ("double".equalsIgnoreCase(typeString)) { + this.type = Type.DOUBLE; + } else { + this.type = Type.LONG; + } + } + + public ConstantVectorExpression(int outputColumn, long value) { + this(outputColumn, "long"); + this.longValue = value; + } + + public ConstantVectorExpression(int outputColumn, double value) { + this(outputColumn, "double"); + this.doubleValue = value; + } + + public ConstantVectorExpression(int outputColumn, byte[] value) { + this(outputColumn, "string"); + this.bytesValue = value; + this.bytesValueLength = this.bytesValue.length; + } + + private void evaluateLong(VectorizedRowBatch vrg) { + LongColumnVector cv = (LongColumnVector) vrg.cols[outputColumn]; + cv.isRepeating = true; + cv.noNulls = true; + cv.vector[0] = longValue; + } + + private void evaluateDouble(VectorizedRowBatch vrg) { + DoubleColumnVector cv = (DoubleColumnVector) vrg.cols[outputColumn]; + cv.isRepeating = true; + cv.noNulls = true; + cv.vector[0] = doubleValue; + } + + private void evaluateBytes(VectorizedRowBatch vrg) { + BytesColumnVector cv = (BytesColumnVector) vrg.cols[outputColumn]; + cv.isRepeating = true; + cv.noNulls = true; + cv.setRef(0, bytesValue, 0, bytesValueLength); + } + + @Override + public void evaluate(VectorizedRowBatch vrg) { + switch (type) { + case LONG: + evaluateLong(vrg); + break; + case DOUBLE: + evaluateDouble(vrg); + break; + case BYTES: + evaluateBytes(vrg); + break; + } + } + + @Override + public int getOutputColumn() { + return outputColumn; + } + + @Override + public String getOutputType() { + return typeString; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java index d8764c0..425c12d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.io.NullWritable; @@ -36,7 +37,7 @@ public class CommonOrcInputFormat extends FileInputFormat - implements InputFormatChecker { + implements InputFormatChecker, VectorizedInputFormatInterface { OrcInputFormat oif = new OrcInputFormat(); VectorizedOrcInputFormat voif = new VectorizedOrcInputFormat(); diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index c8fd293..d7a4ca7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; @@ -42,7 +43,7 @@ * A MapReduce/Hive input format for ORC files. */ public class VectorizedOrcInputFormat extends FileInputFormat - implements InputFormatChecker { + implements InputFormatChecker, VectorizedInputFormatInterface { private static class VectorizedOrcRecordReader implements RecordReader { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java index 2fcfc95..33d59e5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java @@ -56,7 +56,12 @@ public Writable serialize(Object obj, ObjectInspector inspector) { index = i; } for (int k = 0; k < batch.numCols; k++) { - Writable w = batch.cols[k].getWritableObject(index); + Writable w; + if (batch.cols[k].isRepeating) { + w = batch.cols[k].getWritableObject(0); + } else { + w = batch.cols[k].getWritableObject(index); + } ost.setFieldValue(k, w); } OrcSerdeRow row = (OrcSerdeRow) orcRowArray[i]; diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizationContext.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizationContext.java index 0ef11e3..7b24749 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizationContext.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizationContext.java @@ -14,6 +14,7 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.LongColModuloLongColumn; import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.LongColMultiplyLongColumn; import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.LongColSubtractLongColumn; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -30,7 +31,7 @@ public class TestVectorizationContext { @Test - public void testArithmeticExpressionVectorization() { + public void testArithmeticExpressionVectorization() throws HiveException { /** * Create original expression tree for following * (plus (minus (plus col1 col2) col3) (multiply col4 (mod col5 col6)) ) @@ -119,7 +120,7 @@ public void testArithmeticExpressionVectorization() { } @Test - public void testStringFilterExpressions() { + public void testStringFilterExpressions() throws HiveException { ExprNodeColumnDesc col1Expr = new ExprNodeColumnDesc(String.class, "col1", "table", false); ExprNodeConstantDesc constDesc = new ExprNodeConstantDesc("Alpha"); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestConstantVectorExpression.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestConstantVectorExpression.java new file mode 100644 index 0000000..f2e5399 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/expressions/TestConstantVectorExpression.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.expressions; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.junit.Test; + +public class TestConstantVectorExpression { + + @Test + public void testConstantExpression() { + ConstantVectorExpression longCve = new ConstantVectorExpression(0, 17); + ConstantVectorExpression doubleCve = new ConstantVectorExpression(1, 17.34); + ConstantVectorExpression bytesCve = new ConstantVectorExpression(2, "alpha".getBytes()); + + int size = 20; + VectorizedRowBatch vrg = VectorizedRowGroupGenUtil.getVectorizedRowBatch(size, 3, 0); + + LongColumnVector lcv = (LongColumnVector) vrg.cols[0]; + DoubleColumnVector dcv = new DoubleColumnVector(size); + BytesColumnVector bcv = new BytesColumnVector(size); + vrg.cols[1] = dcv; + vrg.cols[2] = bcv; + + longCve.evaluate(vrg); + doubleCve.evaluate(vrg); + bytesCve.evaluate(vrg); + + assertTrue(lcv.isRepeating); + assertTrue(dcv.isRepeating); + assertTrue(bcv.isRepeating); + assertEquals(17, lcv.vector[0]); + assertTrue(17.34 == dcv.vector[0]); + assertTrue(Arrays.equals("alpha".getBytes(), bcv.vector[0])); + } + +}