diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index a7a6da3..6bd404c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -404,7 +404,7 @@ public void process(Object row, int tag) throws HiveException { * @param hybridHtContainer Hybrid hashtable container * @param row big table row */ - private void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object row) { + protected void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object row) throws HiveException { HybridHashTableContainer ht = (HybridHashTableContainer) hybridHtContainer; int partitionId = ht.getToSpillPartitionId(); HashPartition hp = ht.getHashPartitions()[partitionId]; @@ -469,8 +469,7 @@ public void closeOp(boolean abort) throws HiveException { } /** - * Continue processing each pair of spilled hashtable and big table row container, - * by bringing them back to memory and calling process() again. + * Continue processing each pair of spilled hashtable and big table row container * @param partition hash partition to process * @param hybridHtContainer Hybrid hashtable container * @throws HiveException @@ -481,13 +480,7 @@ public void closeOp(boolean abort) throws HiveException { private void continueProcess(HashPartition partition, HybridHashTableContainer hybridHtContainer) throws HiveException, IOException, ClassNotFoundException, SerDeException { reloadHashTable(partition, hybridHtContainer); - // Iterate thru the on-disk matchfile, and feed processOp with leftover rows - ObjectContainer bigTable = partition.getMatchfileObjContainer(); - while (bigTable.hasNext()) { - Object row = bigTable.next(); - process(row, tag); - } - bigTable.clear(); + reProcessBigTable(partition); } /** @@ -547,6 +540,20 @@ private void reloadHashTable(HashPartition partition, } /** + * Iterate over the big table row container and feed process() with leftover rows + * @param partition the hash partition being brought back to memory at the moment + * @throws HiveException + */ + protected void reProcessBigTable(HashPartition partition) throws HiveException { + ObjectContainer bigTable = partition.getMatchfileObjContainer(); + while (bigTable.hasNext()) { + Object row = bigTable.next(); + process(row, tag); + } + bigTable.clear(); + } + + /** * Implements the getName function for the Node Interface. * * @return the name of the operator diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java index 0394370..e2b5a66 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.vector; +import java.util.Arrays; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -31,8 +32,10 @@ import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.persistence.HybridHashTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer.ReusableGetAdaptor; +import org.apache.hadoop.hive.ql.exec.persistence.ObjectContainer; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; @@ -42,6 +45,7 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.DataOutputBuffer; /** * The vectorized version of the MapJoinOperator. @@ -68,6 +72,7 @@ //--------------------------------------------------------------------------- private transient VectorizedRowBatch outputBatch; + private transient VectorizedRowBatch scratchBatch; // holds restored (from disk) big table rows private transient VectorExpressionWriter[] valueWriters; private transient Map outputVectorAssigners; @@ -81,6 +86,10 @@ private transient VectorizedRowBatchCtx vrbCtx = null; + private transient int tag; // big table alias + private VectorExpressionWriter[] rowWriters; // Writer for producing row from input batch + protected transient Object[] singleRow; + public VectorMapJoinOperator() { super(); } @@ -117,6 +126,19 @@ public VectorMapJoinOperator (VectorizationContext vContext, OperatorDesc conf) @Override public Collection> initializeOp(Configuration hconf) throws HiveException { + // Code borrowed from VectorReduceSinkOperator.initializeOp + VectorExpressionWriterFactory.processVectorInspector( + (StructObjectInspector) inputObjInspectors[0], + new VectorExpressionWriterFactory.SingleOIDClosure() { + @Override + public void assign(VectorExpressionWriter[] writers, + ObjectInspector objectInspector) { + rowWriters = writers; + inputObjInspectors[0] = objectInspector; + } + }); + singleRow = new Object[rowWriters.length]; + Collection> result = super.initializeOp(hconf); List keyDesc = conf.getKeys().get(posBigTable); @@ -213,6 +235,7 @@ private void flushOutput() throws HiveException { @Override public void closeOp(boolean aborted) throws HiveException { + super.closeOp(aborted); for (MapJoinTableContainer tableContainer : mapJoinTables) { if (tableContainer != null) { tableContainer.dumpMetrics(); @@ -235,6 +258,12 @@ public void process(Object row, int tag) throws HiveException { byte alias = (byte) tag; VectorizedRowBatch inBatch = (VectorizedRowBatch) row; + // Preparation for hybrid grace hash join + this.tag = tag; + if (scratchBatch == null) { + scratchBatch = makeLike(inBatch); + } + if (null != bigTableFilterExpressions) { for(VectorExpression ve:bigTableFilterExpressions) { ve.evaluate(inBatch); @@ -270,4 +299,100 @@ public void process(Object row, int tag) throws HiveException { public VectorizationContext getOuputVectorizationContext() { return vOutContext; } + + @Override + protected void spillBigTableRow(MapJoinTableContainer hybridHtContainer, Object row) + throws HiveException { + // Extract the actual row from row batch + VectorizedRowBatch inBatch = (VectorizedRowBatch) row; + Object[] actualRow = getRowObject(inBatch, batchIndex); + + // Find the right container, into which to add the actual row + HybridHashTableContainer ht = (HybridHashTableContainer) hybridHtContainer; + int partitionId = ht.getToSpillPartitionId(); + HybridHashTableContainer.HashPartition hp = ht.getHashPartitions()[partitionId]; + ObjectContainer bigTable = hp.getMatchfileObjContainer(); + bigTable.add(actualRow); + } + + @Override + protected void reProcessBigTable(HybridHashTableContainer.HashPartition partition) + throws HiveException { + ObjectContainer bigTable = partition.getMatchfileObjContainer(); + + DataOutputBuffer dataOutputBuffer = new DataOutputBuffer(); + while (bigTable.hasNext()) { + Object row = bigTable.next(); + VectorizedBatchUtil.addProjectedRowToBatchFrom(row, + (StructObjectInspector) inputObjInspectors[posBigTable], + scratchBatch.size, scratchBatch, dataOutputBuffer); + scratchBatch.size++; + + if (scratchBatch.size == VectorizedRowBatch.DEFAULT_SIZE) { + process(scratchBatch, tag); // call process once we have a full batch + scratchBatch.reset(); + dataOutputBuffer.reset(); + } + } + // Process the row batch that has less than DEFAULT_SIZE rows + if (scratchBatch.size > 0) { + process(scratchBatch, tag); + scratchBatch.reset(); + dataOutputBuffer.reset(); + } + bigTable.clear(); + } + + // Code borrowed from VectorReduceSinkOperator + private Object[] getRowObject(VectorizedRowBatch vrb, int rowIndex) throws HiveException { + int batchIndex = rowIndex; + if (vrb.selectedInUse) { + batchIndex = vrb.selected[rowIndex]; + } + for (int i = 0; i < vrb.projectionSize; i++) { + ColumnVector vectorColumn = vrb.cols[vrb.projectedColumns[i]]; + if (vectorColumn != null) { + singleRow[i] = rowWriters[i].writeValue(vectorColumn, batchIndex); + } else { + // Some columns from tables are not used. + singleRow[i] = null; + } + } + return singleRow; + } + + /** + * Make a new (scratch) batch, which is exactly "like" the batch provided, except that it's empty + * @param batch the batch to imitate + * @return the new batch + * @throws HiveException + */ + VectorizedRowBatch makeLike(VectorizedRowBatch batch) throws HiveException { + VectorizedRowBatch newBatch = new VectorizedRowBatch(batch.numCols); + for (int i = 0; i < batch.numCols; i++) { + ColumnVector colVector = batch.cols[i]; + if (colVector != null) { + ColumnVector newColVector; + if (colVector instanceof LongColumnVector) { + newColVector = new LongColumnVector(); + } else if (colVector instanceof DoubleColumnVector) { + newColVector = new DoubleColumnVector(); + } else if (colVector instanceof BytesColumnVector) { + newColVector = new BytesColumnVector(); + } else if (colVector instanceof DecimalColumnVector) { + DecimalColumnVector decColVector = (DecimalColumnVector) colVector; + newColVector = new DecimalColumnVector(decColVector.precision, decColVector.scale); + } else { + throw new HiveException("Column vector class " + colVector.getClass().getName() + + " is not supported!"); + } + newBatch.cols[i] = newColVector; + newBatch.cols[i].init(); + } + } + newBatch.projectedColumns = Arrays.copyOf(batch.projectedColumns, batch.projectedColumns.length); + newBatch.projectionSize = batch.projectionSize; + newBatch.reset(); + return newBatch; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index e304cf8..b56855c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -235,11 +235,28 @@ public static void addRowToBatchFrom(Object row, StructObjectInspector oi, final int off = colOffset; // Iterate thru the cols and load the batch for (int i = 0; i < fieldRefs.size(); i++) { - setVector(row, oi, fieldRefs, batch, buffer, rowIndex, i, off); + setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, i, off); } } /** + * Add only the projected column of a regular row to the specified vectorized row batch + * @param row the regular row + * @param oi object inspector for the row + * @param rowIndex the offset to add in the batch + * @param batch vectorized row batch + * @param buffer data output buffer + * @throws HiveException + */ + public static void addProjectedRowToBatchFrom(Object row, StructObjectInspector oi, + int rowIndex, VectorizedRowBatch batch, DataOutputBuffer buffer) throws HiveException { + List fieldRefs = oi.getAllStructFieldRefs(); + for (int i = 0; i < fieldRefs.size(); i++) { + int projectedOutputCol = batch.projectedColumns[i]; + setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, projectedOutputCol, 0); + } + } + /** * Iterates thru all the columns in a given row and populates the batch * from a given offset * @@ -268,21 +285,21 @@ public static void acidAddRowToBatch(Object row, // The value will have already been set before we're called, so don't overwrite it continue; } - setVector(row, oi, fieldRefs, batch, buffer, rowIndex, i, 0); + setVector(row, oi, fieldRefs.get(i), batch, buffer, rowIndex, i, 0); } } private static void setVector(Object row, StructObjectInspector oi, - List fieldRefs, + StructField field, VectorizedRowBatch batch, DataOutputBuffer buffer, int rowIndex, int colIndex, int offset) throws HiveException { - Object fieldData = oi.getStructFieldData(row, fieldRefs.get(colIndex)); - ObjectInspector foi = fieldRefs.get(colIndex).getFieldObjectInspector(); + Object fieldData = oi.getStructFieldData(row, field); + ObjectInspector foi = field.getFieldObjectInspector(); // Vectorization only supports PRIMITIVE data types. Assert the same assert (foi.getCategory() == Category.PRIMITIVE);