diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java index 061db16..1f74cf2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java @@ -40,9 +40,11 @@ protected transient VectorExpression[] vExpressions; - VectorizedRowBatch output; private final VectorizationContext vContext; + private final VectorizedRowBatch lastBatch = null; + private int [] projectedColumns = null; + public VectorSelectOperator(VectorizationContext ctxt, OperatorDesc conf) { this.vContext = ctxt; this.conf = (SelectDesc) conf; @@ -62,18 +64,19 @@ protected void initializeOp(Configuration hconf) throws HiveException { for (int i = 0; i < colList.size(); i++) { vExpressions[i] = vContext.getVectorExpression(colList.get(i)); } - output = new VectorizedRowBatch(colList.size(), - VectorizedRowBatch.DEFAULT_SIZE); initializeChildren(hconf); + projectedColumns = new int [vExpressions.length]; + for (int i = 0; i < projectedColumns.length; i++) { + projectedColumns[i] = vExpressions[i].getOutputColumn(); + } } public void setSelectExpressions(VectorExpression[] exprs) { this.vExpressions = exprs; - output = new VectorizedRowBatch(exprs.length, VectorizedRowBatch.DEFAULT_SIZE); } public VectorizedRowBatch getOutput() { - return output; + return lastBatch; } @Override @@ -95,15 +98,18 @@ public void processOp(Object row, int tag) throws HiveException { } } - //Prepare output, shallow vector copy - output.selectedInUse = vrg.selectedInUse; - output.selected = vrg.selected; - output.size = vrg.size; + //Prepare output, set the projections + int[] originalProjections = vrg.projectedColumns; + int originalProjectionSize = vrg.projectionSize; + vrg.projectionSize = vExpressions.length; for (int i = 0; i < vExpressions.length; i++) { - output.cols[i] = vrg.cols[vExpressions[i].getOutputColumn()]; + vrg.projectedColumns[i] = vExpressions[i].getOutputColumn(); } - output.numCols = vExpressions.length; - forward(output, outputObjInspector); + forward(vrg, outputObjInspector); + + // Revert the projected columns back, because vrg will be re-used. + vrg.projectionSize = originalProjectionSize; + vrg.projectedColumns = originalProjections; } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java index 676b86a..ff68dfa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedColumnarSerDe.java @@ -102,7 +102,8 @@ public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspe byteRow.resetValid(numCols); - for (int k = 0; k < numCols; k++) { + for (int p = 0; p < batch.projectionSize; p++) { + int k = batch.projectedColumns[p]; ObjectInspector foi = fields.get(k).getFieldObjectInspector(); ColumnVector currentColVector = batch.cols[k]; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java index c0e7c21..a22a2de 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java @@ -35,6 +35,8 @@ public ColumnVector[] cols; // a vector for each column public int size; // number of rows that qualify (i.e. haven't been filtered out) public int[] selected; // array of positions of selected values + public int[] projectedColumns; + public int projectionSize; /* * If no filtering has been applied yet, selectedInUse is false, @@ -80,6 +82,13 @@ public VectorizedRowBatch(int numCols, int size) { selectedInUse = false; this.cols = new ColumnVector[numCols]; writableRow = new Writable[numCols]; + projectedColumns = new int[numCols]; + + // Initially all columns are projected and in the same order + projectionSize = numCols; + for (int i = 0; i < numCols; i++) { + projectedColumns[i] = i; + } } public void initRowIterator(){ @@ -92,12 +101,14 @@ public void initRowIterator(){ } if (selectedInUse) { int i = selected[rowIteratorIndex]; - for (int c = 0; c < numCols; c++) { + for (int k = 0; k < projectionSize; k++) { + int c = this.projectedColumns[k]; writableRow[c] = cols[c].getWritableObject(i); } } else { int i = rowIteratorIndex; - for (int c = 0; c < numCols; c++) { + for (int k = 0; k < projectionSize; k++) { + int c = this.projectedColumns[k]; writableRow[c] = cols[c].getWritableObject(i); } } @@ -123,7 +134,8 @@ public String toString() { for (int j = 0; j < size; j++) { int i = selected[j]; int colIndex = 0; - for (ColumnVector cv : cols) { + for (int k = 0; k < projectionSize; k++) { + ColumnVector cv = cols[this.projectedColumns[k]]; if (cv.isRepeating) { b.append(cv.getWritableObject(0).toString()); } else { @@ -141,7 +153,8 @@ public String toString() { } else { for (int i = 0; i < size; i++) { int colIndex = 0; - for (ColumnVector cv : cols) { + for (int k = 0; k < projectionSize; k++) { + ColumnVector cv = cols[this.projectedColumns[k]]; if (cv.isRepeating) { b.append(cv.getWritableObject(0).toString()); } else { diff --git ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java index 33d59e5..0bb10d3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java @@ -55,7 +55,8 @@ public Writable serialize(Object obj, ObjectInspector inspector) { } else { index = i; } - for (int k = 0; k < batch.numCols; k++) { + for (int p = 0; p < batch.projectionSize; p++) { + int k = batch.projectedColumns[p]; Writable w; if (batch.cols[k].isRepeating) { w = batch.cols[k].getWritableObject(0);