diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java index 1087622..e1d4543 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java @@ -49,6 +49,9 @@ private byte[] buffer; // optional buffer to use when actually copying in data private int nextFree; // next free position in buffer + // Reusable text object + private final Text textObject = new Text(); + // Estimate that there will be 16 bytes per entry static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE; @@ -208,8 +211,9 @@ public Writable getWritableObject(int index) { } Writable result = null; if (!isNull[index] && vector[index] != null) { - result = new Text(); - ((Text) result).append(vector[index], start[index], length[index]); + textObject.clear(); + textObject.append(vector[index], start[index], length[index]); + result = textObject; } else { result = NullWritable.get(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java index cd827ab..eb52961 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java @@ -505,6 +505,9 @@ public void setChildren(Configuration hconf) throws HiveException { case TABLESCAN: vectorOp = op.clone(); break; + case REDUCESINK: + vectorOp = new VectorReduceSinkOperator(vectorizationContext, op.getConf()); + break; default: throw new HiveException("Operator: " + op.getName() + ", " + "not vectorized"); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java new file mode 100644 index 0000000..f61fcb6 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.exec.TerminalOperator; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.Serializer; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +public class VectorReduceSinkOperator extends TerminalOperator + implements Serializable { + + private static final Log LOG = LogFactory.getLog( + VectorReduceSinkOperator.class.getName()); + + private static final long serialVersionUID = 1L; + + private final VectorizationContext vContext; + + /** + * The evaluators for the key columns. Key columns decide the sort order on + * the reducer side. Key columns are passed to the reducer in the "key". + */ + protected transient VectorExpression[] keyEval; + /** + * The evaluators for the value columns. Value columns are passed to reducer + * in the "value". + */ + protected transient VectorExpression[] valueEval; + + /** + * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in + * Hive language). Partition columns decide the reducer that the current row + * goes to. Partition columns are not passed to reducer. + */ + protected transient VectorExpression[] partitionEval; + + private int numDistributionKeys; + + private List> distinctColIndices; + + private int numDistinctExprs; + + transient HiveKey keyWritable = new HiveKey(); + transient Writable value; + + transient Object[] cachedValues; + transient Object[][] cachedKeys; + transient Random random; + + transient Serializer keySerializer; + transient boolean keyIsText; + transient Serializer valueSerializer; + transient int tag; + transient byte[] tagByte = new byte[1]; + + transient ObjectInspector keyObjectInspector; + transient ObjectInspector valueObjectInspector; + transient ObjectInspector[] partitionObjectInspectors; + transient int [] keyHashCode = new int [VectorizedRowBatch.DEFAULT_SIZE]; + + + @Override + protected void initializeOp(Configuration hconf) throws HiveException { + try { + vContext.setOperatorType(OperatorType.REDUCESINK); + keyEval = vContext.getVectorExpressions(conf.getKeyCols()); + valueEval = vContext.getVectorExpressions(conf.getValueCols()); + partitionEval = vContext.getVectorExpressions(conf.getPartitionCols()); + + numDistributionKeys = conf.getNumDistributionKeys(); + distinctColIndices = conf.getDistinctColumnIndices(); + numDistinctExprs = distinctColIndices.size(); + + TableDesc keyTableDesc = conf.getKeySerializeInfo(); + keySerializer = (Serializer) keyTableDesc.getDeserializerClass() + .newInstance(); + keySerializer.initialize(null, keyTableDesc.getProperties()); + keyIsText = keySerializer.getSerializedClass().equals(Text.class); + + keyObjectInspector = vContext.createObjectInspector(keyEval, + conf.getOutputKeyColumnNames()); + + partitionObjectInspectors = new ObjectInspector[partitionEval.length]; + for (int i = 0; i < partitionEval.length; i++) { + partitionObjectInspectors[i] = vContext.createObjectInspector(partitionEval[i]); + } + + String colNames = ""; + for(String colName : conf.getOutputKeyColumnNames()) { + colNames = String.format("%s %s", colNames, colName); + } + + LOG.info(String.format("keyObjectInspector [%s]%s => %s", + keyObjectInspector.getClass(), + keyObjectInspector, + colNames)); + + conf.getOutputKeyColumnNames(); + conf.getOutputValueColumnNames(); + + //keyObjectInspector = ObjectInspectorFactory. + + TableDesc valueTableDesc = conf.getValueSerializeInfo(); + valueSerializer = (Serializer) valueTableDesc.getDeserializerClass() + .newInstance(); + valueSerializer.initialize(null, valueTableDesc.getProperties()); + + valueObjectInspector = vContext.createObjectInspector (valueEval, + conf.getOutputValueColumnNames()); + + colNames = ""; + for(String colName : conf.getOutputValueColumnNames()) { + colNames = String.format("%s %s", colNames, colName); + } + + LOG.info(String.format("valueObjectInspector [%s]%s => %s", + valueObjectInspector.getClass(), + valueObjectInspector, + colNames)); + + int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1; + int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : + numDistributionKeys; + cachedKeys = new Object[numKeys][keyLen]; + cachedValues = new Object[valueEval.length]; + + } catch(Exception e) { + throw new HiveException(e); + } + } + + @Override + public void processOp(Object row, int tag) throws HiveException { + VectorizedRowBatch vrg = (VectorizedRowBatch) row; + + LOG.info(String.format("sinking %d rows, %d values, %d keys, %d parts", + vrg.size, + valueEval.length, + keyEval.length, + partitionEval.length)); + + try { + + for (int i = 0; i < partitionEval.length; i++) { + partitionEval[i].evaluate(vrg); + } + + // run the vector evaluations + for (int i = 0; i < valueEval.length; i++) { + valueEval[i].evaluate(vrg); + } + // Evaluate the keys + for (int i = 0; i < keyEval.length; i++) { + keyEval[i].evaluate(vrg); + } + + Object[] distributionKeys = new Object[numDistributionKeys]; + + // Emit a (k,v) pair for each row in the batch + // + for (int j = 0 ; j < vrg.size; ++j) { + int rowIndex = j; + if (vrg.selectedInUse) { + rowIndex = vrg.selected[j]; + } + for (int i = 0; i < valueEval.length; i++) { + int batchColumn = valueEval[i].getOutputColumn(); + ColumnVector vectorColumn = vrg.cols[batchColumn]; + cachedValues[i] = vectorColumn.getWritableObject(rowIndex); + } + // Serialize the value + value = valueSerializer.serialize(cachedValues, valueObjectInspector); + + for (int i = 0; i < keyEval.length; i++) { + int batchColumn = keyEval[i].getOutputColumn(); + ColumnVector vectorColumn = vrg.cols[batchColumn]; + distributionKeys[i] = vectorColumn.getWritableObject(rowIndex); + } + // no distinct key + System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys); + // Serialize the keys and append the tag + for (int i = 0; i < cachedKeys.length; i++) { + if (keyIsText) { + Text key = (Text) keySerializer.serialize(cachedKeys[i], + keyObjectInspector); + if (tag == -1) { + keyWritable.set(key.getBytes(), 0, key.getLength()); + } else { + int keyLength = key.getLength(); + keyWritable.setSize(keyLength + 1); + System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); + keyWritable.get()[keyLength] = tagByte[0]; + } + } else { + // Must be BytesWritable + BytesWritable key = (BytesWritable) keySerializer.serialize( + cachedKeys[i], keyObjectInspector); + if (tag == -1) { + keyWritable.set(key.getBytes(), 0, key.getLength()); + } else { + int keyLength = key.getLength(); + keyWritable.setSize(keyLength + 1); + System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength); + keyWritable.get()[keyLength] = tagByte[0]; + } + } + // Evaluate the HashCode + int keyHashCode = 0; + if (partitionEval.length == 0) { + // If no partition cols, just distribute the data uniformly to provide + // better + // load balance. If the requirement is to have a single reducer, we + // should set + // the number of reducers to 1. + // Use a constant seed to make the code deterministic. + if (random == null) { + random = new Random(12345); + } + keyHashCode = random.nextInt(); + } else { + for (int p = 0; p < partitionEval.length; p++) { + keyHashCode = keyHashCode + * 31 + + ObjectInspectorUtils.hashCode( + vrg.cols[partitionEval[p].getOutputColumn()].getWritableObject(rowIndex), + partitionObjectInspectors[i]); + } + } + keyWritable.setHashCode(keyHashCode); + if (out != null) { + out.collect(keyWritable, value); + // Since this is a terminal operator, update counters explicitly - + // forward is not called + if (counterNameToEnum != null) { + ++outputRows; + if (outputRows % 1000 == 0) { + incrCounter(numOutputRowsCntr, outputRows); + outputRows = 0; + } + } + } + } + } + } catch (SerDeException e) { + throw new HiveException(e); + } catch (IOException e) { + throw new HiveException(e); + } + } + + public VectorReduceSinkOperator ( + VectorizationContext context, + OperatorDesc conf) { + this.vContext = context; + this.conf = (ReduceSinkDesc) conf; + } + + /** + * @return the name of the operator + */ + @Override + public String getName() { + return getOperatorName(); + } + + static public String getOperatorName() { + return "RS"; + } + + @Override + public OperatorType getType() { + return OperatorType.REDUCESINK; + } + + @Override + public boolean opAllowedBeforeMapJoin() { + return false; + } + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java index 061db16..d7baeaf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java @@ -40,9 +40,10 @@ protected transient VectorExpression[] vExpressions; - VectorizedRowBatch output; private final VectorizationContext vContext; + private int [] projectedColumns = null; + public VectorSelectOperator(VectorizationContext ctxt, OperatorDesc conf) { this.vContext = ctxt; this.conf = (SelectDesc) conf; @@ -61,19 +62,19 @@ protected void initializeOp(Configuration hconf) throws HiveException { vExpressions = new VectorExpression[colList.size()]; for (int i = 0; i < colList.size(); i++) { vExpressions[i] = vContext.getVectorExpression(colList.get(i)); + String columnName = conf.getOutputColumnNames().get(i); + // Update column map with output column names + vContext.addToColumnMap(columnName, vExpressions[i].getOutputColumn()); } - output = new VectorizedRowBatch(colList.size(), - VectorizedRowBatch.DEFAULT_SIZE); initializeChildren(hconf); + projectedColumns = new int [vExpressions.length]; + for (int i = 0; i < projectedColumns.length; i++) { + projectedColumns[i] = vExpressions[i].getOutputColumn(); + } } public void setSelectExpressions(VectorExpression[] exprs) { this.vExpressions = exprs; - output = new VectorizedRowBatch(exprs.length, VectorizedRowBatch.DEFAULT_SIZE); - } - - public VectorizedRowBatch getOutput() { - return output; } @Override @@ -95,15 +96,18 @@ public void processOp(Object row, int tag) throws HiveException { } } - //Prepare output, shallow vector copy - output.selectedInUse = vrg.selectedInUse; - output.selected = vrg.selected; - output.size = vrg.size; + //Prepare output, set the projections + int[] originalProjections = vrg.projectedColumns; + int originalProjectionSize = vrg.projectionSize; + vrg.projectionSize = vExpressions.length; for (int i = 0; i < vExpressions.length; i++) { - output.cols[i] = vrg.cols[vExpressions[i].getOutputColumn()]; + vrg.projectedColumns[i] = vExpressions[i].getOutputColumn(); } - output.numCols = vExpressions.length; - forward(output, outputObjInspector); + forward(vrg, outputObjInspector); + + // Revert the projected columns back, because vrg will be re-used. + vrg.projectionSize = originalProjectionSize; + vrg.projectedColumns = originalProjections; } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index 2fe8192..6bb5618 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -88,6 +88,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; /** @@ -192,7 +193,6 @@ public void setOperatorType(OperatorType opType) { private VectorExpression getVectorExpression(ExprNodeColumnDesc exprDesc) { - int columnNum = getInputColumnIndex(exprDesc.getColumn()); VectorExpression expr = null; switch (opType) { @@ -1074,7 +1074,8 @@ public ObjectInspector createObjectInspector(VectorExpression vectorExpression) String columnType = vectorExpression.getOutputType(); if (columnType.equalsIgnoreCase("long") || columnType.equalsIgnoreCase("bigint") || - columnType.equalsIgnoreCase("int")) { + columnType.equalsIgnoreCase("int") || + columnType.equalsIgnoreCase("smallint")) { return PrimitiveObjectInspectorFactory.writableLongObjectInspector; } else if (columnType.equalsIgnoreCase("double")) { return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; @@ -1084,5 +1085,23 @@ public ObjectInspector createObjectInspector(VectorExpression vectorExpression) throw new HiveException(String.format("Must implement type %s", columnType)); } } + + public ObjectInspector createObjectInspector( + VectorExpression[] vectorExpressions, List columnNames) + throws HiveException { + List oids = new ArrayList(); + for (VectorExpression vexpr : vectorExpressions) { + ObjectInspector oi = createObjectInspector(vexpr); + oids.add(oi); + } + return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, + oids); + } + + public void addToColumnMap(String columnName, int outputColumn) { + if (columnMap != null) { + columnMap.put(columnName, outputColumn); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java index 676b86a..ff68dfa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java @@ -102,7 +102,8 @@ public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspe byteRow.resetValid(numCols); - for (int k = 0; k < numCols; k++) { + for (int p = 0; p < batch.projectionSize; p++) { + int k = batch.projectedColumns[p]; ObjectInspector foi = fields.get(k).getFieldObjectInspector(); ColumnVector currentColVector = batch.cols[k]; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java index c0e7c21..a22a2de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java @@ -35,6 +35,8 @@ public ColumnVector[] cols; // a vector for each column public int size; // number of rows that qualify (i.e. haven't been filtered out) public int[] selected; // array of positions of selected values + public int[] projectedColumns; + public int projectionSize; /* * If no filtering has been applied yet, selectedInUse is false, @@ -80,6 +82,13 @@ public VectorizedRowBatch(int numCols, int size) { selectedInUse = false; this.cols = new ColumnVector[numCols]; writableRow = new Writable[numCols]; + projectedColumns = new int[numCols]; + + // Initially all columns are projected and in the same order + projectionSize = numCols; + for (int i = 0; i < numCols; i++) { + projectedColumns[i] = i; + } } public void initRowIterator(){ @@ -92,12 +101,14 @@ public void initRowIterator(){ } if (selectedInUse) { int i = selected[rowIteratorIndex]; - for (int c = 0; c < numCols; c++) { + for (int k = 0; k < projectionSize; k++) { + int c = this.projectedColumns[k]; writableRow[c] = cols[c].getWritableObject(i); } } else { int i = rowIteratorIndex; - for (int c = 0; c < numCols; c++) { + for (int k = 0; k < projectionSize; k++) { + int c = this.projectedColumns[k]; writableRow[c] = cols[c].getWritableObject(i); } } @@ -123,7 +134,8 @@ public String toString() { for (int j = 0; j < size; j++) { int i = selected[j]; int colIndex = 0; - for (ColumnVector cv : cols) { + for (int k = 0; k < projectionSize; k++) { + ColumnVector cv = cols[this.projectedColumns[k]]; if (cv.isRepeating) { b.append(cv.getWritableObject(0).toString()); } else { @@ -141,7 +153,8 @@ public String toString() { } else { for (int i = 0; i < size; i++) { int colIndex = 0; - for (ColumnVector cv : cols) { + for (int k = 0; k < projectionSize; k++) { + ColumnVector cv = cols[this.projectedColumns[k]]; if (cv.isRepeating) { b.append(cv.getWritableObject(0).toString()); } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java index 33d59e5..0bb10d3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java @@ -55,7 +55,8 @@ public Writable serialize(Object obj, ObjectInspector inspector) { } else { index = i; } - for (int k = 0; k < batch.numCols; k++) { + for (int p = 0; p < batch.projectionSize; p++) { + int k = batch.projectedColumns[p]; Writable w; if (batch.cols[k].isRepeating) { w = batch.cols[k].getWritableObject(0); diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSelectOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSelectOperator.java index 1744df8..36fe30a 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSelectOperator.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSelectOperator.java @@ -25,44 +25,66 @@ import org.apache.hadoop.hive.ql.exec.vector.expressions.gen.LongColAddLongColumn; import org.apache.hadoop.hive.ql.exec.vector.util.VectorizedRowGroupGenUtil; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SelectDesc; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.junit.Test; public class TestVectorSelectOperator { + static class ValidatorVectorSelectOperator extends VectorSelectOperator { + + private static final long serialVersionUID = 1L; + + public ValidatorVectorSelectOperator(VectorizationContext ctxt, OperatorDesc conf) { + super(ctxt, conf); + } + + /** + * Override forward to do validation + */ + @Override + public void forward(Object row, ObjectInspector rowInspector) throws HiveException { + VectorizedRowBatch vrg = (VectorizedRowBatch) row; + + int[] projections = vrg.projectedColumns; + assertEquals(2, vrg.projectionSize); + assertEquals(2, projections[0]); + assertEquals(3, projections[1]); + + LongColumnVector out0 = (LongColumnVector) vrg.cols[projections[0]]; + LongColumnVector out1 = (LongColumnVector) vrg.cols[projections[1]]; + + LongColumnVector in0 = (LongColumnVector) vrg.cols[0]; + LongColumnVector in1 = (LongColumnVector) vrg.cols[1]; + LongColumnVector in2 = (LongColumnVector) vrg.cols[2]; + LongColumnVector in3 = (LongColumnVector) vrg.cols[3]; + + for (int i = 0; i < VectorizedRowBatch.DEFAULT_SIZE; i++) { + assertEquals(in0.vector[i] + in1.vector[i], out0.vector[i]); + assertEquals(in2.vector[i], out0.vector[i]); + assertEquals(in3.vector[i], out1.vector[i]); + } + } + } + @Test public void testSelectOperator() throws HiveException { - VectorSelectOperator vso = new VectorSelectOperator(null, new SelectDesc(false)); + + ValidatorVectorSelectOperator vso = new ValidatorVectorSelectOperator(null, new SelectDesc( + false)); VectorizedRowBatch vrg = VectorizedRowGroupGenUtil.getVectorizedRowBatch( VectorizedRowBatch.DEFAULT_SIZE, 4, 17); - LongColAddLongColumn lcalcExpr = new LongColAddLongColumn(0,1,2); + LongColAddLongColumn lcalcExpr = new LongColAddLongColumn(0, 1, 2); IdentityExpression iexpr = new IdentityExpression(3, "long"); - VectorExpression [] ves = new VectorExpression [] { lcalcExpr, iexpr }; + VectorExpression[] ves = new VectorExpression[] {lcalcExpr, iexpr}; vso.setSelectExpressions(ves); vso.processOp(vrg, 0); - - VectorizedRowBatch output = vso.getOutput(); - - assertEquals(2, output.numCols); - - LongColumnVector out0 = (LongColumnVector) output.cols[0]; - LongColumnVector out1 = (LongColumnVector) output.cols[1]; - - LongColumnVector in0 = (LongColumnVector) vrg.cols[0]; - LongColumnVector in1 = (LongColumnVector) vrg.cols[1]; - LongColumnVector in2 = (LongColumnVector) vrg.cols[2]; - LongColumnVector in3 = (LongColumnVector) vrg.cols[3]; - - for (int i = 0; i < VectorizedRowBatch.DEFAULT_SIZE; i ++) { - assertEquals(in0.vector[i]+in1.vector[i], out0.vector[i]); - assertEquals(in2.vector[i], out0.vector[i]); - assertEquals(in3.vector[i], out1.vector[i]); - } } }