diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index 211f452..2e89408 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -989,4 +989,12 @@ public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) { debugDisplayOneRow(batch, index, prefix); } } + + public static void systemOutDisplayBatch(VectorizedRowBatch batch, String prefix) { + for (int i = 0; i < batch.size; i++) { + int index = (batch.selectedInUse ? batch.selected[i] : i); + StringBuilder sb = new StringBuilder(); + System.out.println(debugFormatOneRow(batch, index, prefix, sb).toString()); + } + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinBigOnlyGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinBigOnlyGenerateResultOperator.java new file mode 100644 index 0000000..30190f1 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinBigOnlyGenerateResultOperator.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSet; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTableResult; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSetResult; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; + +/** + * This class has methods for generating vectorized join results for the big table only + * variation of inner joins. + * + * When an inner join does not have any small table columns in the join result, we use this + * variation we call inner big only. This variation uses a hash multi-set instead of hash map + * since there are no values (just a count). + * + * Note that if a inner key appears in the small table results area, we use the inner join + * projection optimization and are able to use this variation. + */ +public abstract class VectorMapJoinBigOnlyGenerateResultOperator + extends VectorMapJoinGenerateResultOperator { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = + LoggerFactory.getLogger(VectorMapJoinBigOnlyGenerateResultOperator.class.getName()); + + //--------------------------------------------------------------------------- + // Inner big-table only join specific members. + // + + // An array of hash multi-set results so we can do lookups on the whole batch before output result + // generation. + protected transient VectorMapJoinHashMultiSetResult hashMultiSetResults[]; + + // Pre-allocated member for storing the (physical) batch index of matching row (single- or + // multi-small-table-valued) indexes during a process call. + protected transient int[] allMatchs; + + /* + * Pre-allocated members for storing information on single- and multi-valued-small-table matches. + * + * ~ValueCounts + * Number of (empty) small table values. + * ~AllMatchIndices + * (Logical) indices into allMatchs to the first row of a match of a + * possible series of duplicate keys. + * ~DuplicateCounts + * The duplicate count for each matched key. + * + */ + protected transient long[] equalKeySeriesValueCounts; + protected transient int[] equalKeySeriesAllMatchIndices; + protected transient int[] equalKeySeriesDuplicateCounts; + + + // Pre-allocated member for storing the (physical) batch index of rows that need to be spilled. + protected transient int[] spills; + + // Pre-allocated member for storing index into the hashMultiSetResults for each spilled row. + protected transient int[] spillHashMultiSetResultIndices; + + /** Kryo ctor. */ + protected VectorMapJoinBigOnlyGenerateResultOperator() { + super(); + } + + public VectorMapJoinBigOnlyGenerateResultOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorMapJoinBigOnlyGenerateResultOperator(CompilationOpContext ctx, OperatorDesc conf, + VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException { + super(ctx, conf, vContext, vectorDesc); + } + + /* + * Setup our inner big table only join specific members. + */ + protected void commonSetup(VectorizedRowBatch batch) throws HiveException { + super.commonSetup(batch); + + // Inner big-table only join specific. + VectorMapJoinHashMultiSet baseHashMultiSet = (VectorMapJoinHashMultiSet) vectorMapJoinHashTable; + + hashMultiSetResults = new VectorMapJoinHashMultiSetResult[batch.DEFAULT_SIZE]; + for (int i = 0; i < hashMultiSetResults.length; i++) { + hashMultiSetResults[i] = baseHashMultiSet.createHashMultiSetResult(); + } + + allMatchs = new int[batch.DEFAULT_SIZE]; + + equalKeySeriesValueCounts = new long[batch.DEFAULT_SIZE]; + equalKeySeriesAllMatchIndices = new int[batch.DEFAULT_SIZE]; + equalKeySeriesDuplicateCounts = new int[batch.DEFAULT_SIZE]; + + spills = new int[batch.DEFAULT_SIZE]; + spillHashMultiSetResultIndices = new int[batch.DEFAULT_SIZE]; + } + + //----------------------------------------------------------------------------------------------- + + /** + * Generate the single value match inner big table only join output results for a match. + * + * @param batch + * The big table batch. + * @param allMatchs + * A subset of the rows of the batch that are matches. + * @param allMatchesIndex + * The logical index into allMatchs of the first equal key. + * @param duplicateCount + * The number of duplicates or equal keys. + * @param numSel + * The current count of rows in the rebuilding of the selected array. + * + * @return + * The new count of selected rows. + */ + protected int generateHashMultiSetResultSingleValue(VectorizedRowBatch batch, + int[] allMatchs, int allMatchesIndex, int duplicateCount, int numSel) + throws HiveException, IOException { + + // LOG.debug("generateHashMultiSetResultSingleValue enter..."); + + // Generate result within big table batch itself. + + // LOG.debug("generateHashMultiSetResultSingleValue with big table..."); + + for (int i = 0; i < duplicateCount; i++) { + + int batchIndex = allMatchs[allMatchesIndex + i]; + + // Outer key copying is only used when we are using the input BigTable batch as the output. + // + if (bigTableVectorCopyOuterKeys != null) { + // Copy within row. + bigTableVectorCopyOuterKeys.copyByReference(batch, batchIndex, batch, batchIndex); + } + + // Use the big table row as output. + batch.selected[numSel++] = batchIndex; + } + + return numSel; + } + + /** + * Generate results for a N x M cross product. + * + * @param batch + * The big table batch. + * @param allMatchs + * The all match selected array that contains (physical) batch indices. + * @param allMatchesIndex + * The index of the match key. + * @param duplicateCount + * Number of equal key rows. + * @param count + * Value count. + */ + protected void generateHashMultiSetResultMultiValue(VectorizedRowBatch batch, + int[] allMatchs, int allMatchesIndex, + int duplicateCount, long count) throws HiveException, IOException { + + // LOG.debug("generateHashMultiSetResultMultiValue allMatchesIndex " + allMatchesIndex + " duplicateCount " + duplicateCount + " count " + count); + + // TODO: Look at repeating optimizations... + + for (int i = 0; i < duplicateCount; i++) { + + int batchIndex = allMatchs[allMatchesIndex + i]; + + for (long l = 0; l < count; l++) { + + // Copy the BigTable values into the overflow batch. Since the overflow batch may + // not get flushed here, we must copy by value. + if (bigTableRetainedVectorCopy != null) { + bigTableRetainedVectorCopy.copyByValue(batch, batchIndex, + overflowBatch, overflowBatch.size); + } + + // Outer key copying is only used when we are using the input BigTable batch as the output. + // + if (bigTableVectorCopyOuterKeys != null) { + // Copy within row. + bigTableVectorCopyOuterKeys.copyByReference(batch, batchIndex, + overflowBatch, overflowBatch.size); + } + + overflowBatch.size++; + if (overflowBatch.size == overflowBatch.DEFAULT_SIZE) { + forwardOverflow(); + } + } + } + } + + /** + * Generate the inner big table only join output results for one vectorized row batch with + * a repeated key. + * + * @param batch + * The big table batch with any matching and any non matching rows both as + * selected in use. + * @param hashMultiSetResult + * The hash multi-set results for the batch. + */ + protected void generateHashMultiSetResultRepeatedAll(VectorizedRowBatch batch, + VectorMapJoinHashMultiSetResult hashMultiSetResult) + throws HiveException, IOException { + + long count = hashMultiSetResult.count(); + + goSelected(batch); + + generateHashMultiSetResultMultiValue(batch, + batch.selected, /* allMatchesIndex */ 0, batch.size, count); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index 3821cc6..5ad14eb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -402,28 +402,319 @@ private void generateHashMapResultLargeMultiValue(VectorizedRowBatch batch, protected void generateHashMapResultRepeatedAll(VectorizedRowBatch batch, VectorMapJoinHashMapResult hashMapResult) throws IOException, HiveException { - int[] selected = batch.selected; + goSelected(batch); + + int numSel = 0; + if (hashMapResult.isSingleRow()) { + numSel = generateHashMapResultSingleValue(batch, hashMapResult, + batch.selected, 0, batch.size, numSel); + + } else { + generateHashMapResultMultiValue(batch, hashMapResult, + batch.selected, 0, batch.size); + } + + batch.size = numSel; + } + + /** + * Apply the value expression to rows in the (original) input selected array. + * + * @param batch + * The vectorized row batch. + * @param inputSelectedInUse + * Whether the (original) input batch is selectedInUse. + * @param inputLogicalSize + * The (original) input batch size. + */ + protected void doValueExprOnInputSelected(VectorizedRowBatch batch, + boolean inputSelectedInUse, int inputLogicalSize, int[] inputSelected) + throws HiveException { + + int saveBatchSize = batch.size; + int[] saveSelected = batch.selected; + boolean saveSelectedInUse = batch.selectedInUse; + + batch.size = inputLogicalSize; + batch.selected = inputSelected; + batch.selectedInUse = inputSelectedInUse; + + if (bigTableValueExpressions != null) { + for(VectorExpression ve: bigTableValueExpressions) { + ve.evaluate(batch); + } + } + + batch.size = saveBatchSize; + batch.selected = saveSelected; + batch.selectedInUse = saveSelectedInUse; + } + + /** + * Apply the value expression to rows specified by a selected array. + * + * @param batch + * The vectorized row batch. + * @param selected + * The (physical) batch indices to apply the expression to. + * @param size + * The size of selected. + */ + protected void doValueExpr(VectorizedRowBatch batch, + int[] selected, int size) throws HiveException { + + int saveBatchSize = batch.size; + int[] saveSelected = batch.selected; + boolean saveSelectedInUse = batch.selectedInUse; + + batch.size = size; + batch.selected = selected; + batch.selectedInUse = true; + + if (bigTableValueExpressions != null) { + for(VectorExpression ve: bigTableValueExpressions) { + ve.evaluate(batch); + } + } + batch.size = saveBatchSize; + batch.selected = saveSelected; + batch.selectedInUse = saveSelectedInUse; + } + + protected void goSelected(VectorizedRowBatch batch) { if (batch.selectedInUse) { // The selected array is already filled in as we want it. } else { + int[] selected = batch.selected; for (int i = 0; i < batch.size; i++) { selected[i] = i; } batch.selectedInUse = true; } + } - int numSel = 0; - if (hashMapResult.isSingleRow()) { - numSel = generateHashMapResultSingleValue(batch, hashMapResult, - batch.selected, 0, batch.size, numSel); + /** + * Remove (subtract) members from the input selected array and produce the results into + * a difference array. + * + * @param inputSelectedInUse + * Whether the (original) input batch is selectedInUse. + * @param inputLogicalSize + * The (original) input batch size. + * @param remove + * The indices to remove. They must all be present in input selected array. + * @param removeSize + * The size of remove. + * @param difference + * The resulting difference -- the input selected array indices not in the + * remove array. + * @return + * The resulting size of the difference array. + * @throws HiveException + */ + protected int subtractFromInputSelected(boolean inputSelectedInUse, int inputLogicalSize, + int[] inputSelected, + int[] remove, int removeSize, int[] difference) throws HiveException { + + // if (!verifyMonotonicallyIncreasing(remove, removeSize)) { + // throw new HiveException("remove is not in sort order and unique"); + // } + + int differenceCount = 0; + + // Determine which rows are left. + int removeIndex = 0; + if (inputSelectedInUse) { + for (int i = 0; i < inputLogicalSize; i++) { + int candidateIndex = inputSelected[i]; + if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) { + removeIndex++; + } else { + difference[differenceCount++] = candidateIndex; + } + } + } else { + for (int candidateIndex = 0; candidateIndex < inputLogicalSize; candidateIndex++) { + if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) { + removeIndex++; + } else { + difference[differenceCount++] = candidateIndex; + } + } + } + + if (removeIndex != removeSize) { + throw new HiveException("Not all batch indices removed"); + } + + // if (!verifyMonotonicallyIncreasing(difference, differenceCount)) { + // throw new HiveException("difference is not in sort order and unique"); + // } + + return differenceCount; + } - } else { - generateHashMapResultMultiValue(batch, hashMapResult, - batch.selected, 0, batch.size); + /** + * Remove (subtract) members from an array and produce the results into + * a difference array. + + * @param all + * The selected array containing all members. + * @param allSize + * The size of all. + * @param remove + * The indices to remove. They must all be present in input selected array. + * @param removeSize + * The size of remove. + * @param difference + * The resulting difference -- the all array indices not in the + * remove array. + * @return + * The resulting size of the difference array. + * @throws HiveException + */ + protected int subtract(int[] all, int allSize, + int[] remove, int removeSize, int[] difference) throws HiveException { + + // if (!verifyMonotonicallyIncreasing(remove, removeSize)) { + // throw new HiveException("remove is not in sort order and unique"); + // } + + int differenceCount = 0; + + // Determine which rows are left. + int removeIndex = 0; + for (int i = 0; i < allSize; i++) { + int candidateIndex = all[i]; + if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) { + removeIndex++; + } else { + difference[differenceCount++] = candidateIndex; + } } - batch.size = numSel; + if (removeIndex != removeSize) { + throw new HiveException("Not all batch indices removed"); + } + + return differenceCount; + } + + /** + * Sort merge two select arrays so the resulting array is ordered by (batch) index. + * + * @param selected1 + * @param selected1Count + * @param selected2 + * @param selected2Count + * @param sortMerged + * The resulting sort merge of selected1 and selected2. + * @return + * The resulting size of the sortMerged array. + * @throws HiveException + */ + protected int sortMerge(int[] selected1, int selected1Count, + int[] selected2, int selected2Count, int[] sortMerged) throws HiveException { + + // if (!verifyMonotonicallyIncreasing(selected1, selected1Count)) { + // throw new HiveException("selected1 is not in sort order and unique"); + // } + + // if (!verifyMonotonicallyIncreasing(selected2, selected2Count)) { + // throw new HiveException("selected1 is not in sort order and unique"); + // } + + + int sortMergeCount = 0; + + int selected1Index = 0; + int selected2Index = 0; + for (int i = 0; i < selected1Count + selected2Count; i++) { + if (selected1Index < selected1Count && selected2Index < selected2Count) { + if (selected1[selected1Index] < selected2[selected2Index]) { + sortMerged[sortMergeCount++] = selected1[selected1Index++]; + } else { + sortMerged[sortMergeCount++] = selected2[selected2Index++]; + } + } else if (selected1Index < selected1Count) { + sortMerged[sortMergeCount++] = selected1[selected1Index++]; + } else { + sortMerged[sortMergeCount++] = selected2[selected2Index++]; + } + } + + // if (!verifyMonotonicallyIncreasing(sortMerged, sortMergeCount)) { + // throw new HiveException("sortMerged is not in sort order and unique"); + // } + + return sortMergeCount; + } + + /** + * Generate the non matching outer join output results for one vectorized row batch. + * + * For each non matching row specified by parameter, generate nulls for the small table results. + * + * @param batch + * The big table batch with any matching and any non matching rows both as + * selected in use. + * @param noMatchs + * A subset of the rows of the batch that are non matches. + * @param noMatchSize + * Number of non matches in noMatchs. + */ + protected void generateOuterNulls(VectorizedRowBatch batch, int[] noMatchs, + int noMatchSize) throws IOException, HiveException { + + // Set null information in the small table results area. + + for (int i = 0; i < noMatchSize; i++) { + int batchIndex = noMatchs[i]; + + // Mark any scratch small table scratch columns that would normally receive a copy of the + // key as null, too. + for (int column : bigTableOuterKeyOutputVectorColumns) { + ColumnVector colVector = batch.cols[column]; + colVector.noNulls = false; + colVector.isNull[batchIndex] = true; + } + + // Small table values are set to null. + for (int column : smallTableOutputVectorColumns) { + ColumnVector colVector = batch.cols[column]; + colVector.noNulls = false; + colVector.isNull[batchIndex] = true; + } + } + } + + /** + * Generate the non-match outer join output results for the whole repeating vectorized + * row batch. + * + * Each row will get nulls for all small table values. + * + * @param batch + * The big table batch. + */ + protected void generateOuterNullsRepeatedAll(VectorizedRowBatch batch) throws HiveException { + + for (int column : smallTableOutputVectorColumns) { + ColumnVector colVector = batch.cols[column]; + colVector.noNulls = false; + colVector.isNull[0] = true; + colVector.isRepeating = true; + } + + // Mark any scratch small table scratch columns that would normally receive a copy of the key + // as null, too. + for (int column : bigTableOuterKeyOutputVectorColumns) { + ColumnVector colVector = batch.cols[column]; + colVector.noNulls = false; + colVector.isNull[0] = true; + colVector.isRepeating = true; + } } //----------------------------------------------------------------------------------------------- @@ -494,7 +785,7 @@ private void spillSerializeRow(VectorizedRowBatch batch, int batchIndex, // LOG.debug("spillSerializeRow spilled batchIndex " + batchIndex + ", length " + length); } - protected void spillHashMapBatch(VectorizedRowBatch batch, + protected void spillHashTableBatch(VectorizedRowBatch batch, VectorMapJoinHashTableResult[] hashTableResults, int[] spills, int[] spillHashTableResultIndices, int spillCount) throws HiveException, IOException { @@ -637,6 +928,7 @@ public void forwardBigTableBatch(VectorizedRowBatch batch) throws HiveException batch.projectionSize = outputProjection.length; batch.projectedColumns = outputProjection; + // VectorizedBatchUtil.systemOutDisplayBatch(batch, "*DEBUG* forwardBigTableBatch"); forward(batch, null, true); // Revert the projected columns back, because batch can be re-used by our parent operators. @@ -649,6 +941,7 @@ public void forwardBigTableBatch(VectorizedRowBatch batch) throws HiveException * Forward the overflow batch and reset the batch. */ protected void forwardOverflow() throws HiveException { + // VectorizedBatchUtil.systemOutDisplayBatch(overflowBatch, "*DEBUG* forwardOverflow"); forward(overflowBatch, null, true); overflowBatch.reset(); maybeCheckInterrupt(); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java index f791d95..a86dca2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java @@ -46,45 +46,11 @@ * projection optimization and are able to use this variation. */ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator - extends VectorMapJoinGenerateResultOperator { + extends VectorMapJoinBigOnlyGenerateResultOperator { private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinInnerBigOnlyGenerateResultOperator.class.getName()); - - //--------------------------------------------------------------------------- - // Inner big-table only join specific members. - // - - // An array of hash multi-set results so we can do lookups on the whole batch before output result - // generation. - protected transient VectorMapJoinHashMultiSetResult hashMultiSetResults[]; - - // Pre-allocated member for storing the (physical) batch index of matching row (single- or - // multi-small-table-valued) indexes during a process call. - protected transient int[] allMatchs; - - /* - * Pre-allocated members for storing information on single- and multi-valued-small-table matches. - * - * ~ValueCounts - * Number of (empty) small table values. - * ~AllMatchIndices - * (Logical) indices into allMatchs to the first row of a match of a - * possible series of duplicate keys. - * ~DuplicateCounts - * The duplicate count for each matched key. - * - */ - protected transient long[] equalKeySeriesValueCounts; - protected transient int[] equalKeySeriesAllMatchIndices; - protected transient int[] equalKeySeriesDuplicateCounts; - - - // Pre-allocated member for storing the (physical) batch index of rows that need to be spilled. - protected transient int[] spills; - - // Pre-allocated member for storing index into the hashMultiSetResults for each spilled row. - protected transient int[] spillHashMapResultIndices; + private static final Logger LOG = + LoggerFactory.getLogger(VectorMapJoinInnerBigOnlyGenerateResultOperator.class.getName()); /** Kryo ctor. */ protected VectorMapJoinInnerBigOnlyGenerateResultOperator() { @@ -100,30 +66,6 @@ public VectorMapJoinInnerBigOnlyGenerateResultOperator(CompilationOpContext ctx, super(ctx, conf, vContext, vectorDesc); } - /* - * Setup our inner big table only join specific members. - */ - protected void commonSetup(VectorizedRowBatch batch) throws HiveException { - super.commonSetup(batch); - - // Inner big-table only join specific. - VectorMapJoinHashMultiSet baseHashMultiSet = (VectorMapJoinHashMultiSet) vectorMapJoinHashTable; - - hashMultiSetResults = new VectorMapJoinHashMultiSetResult[batch.DEFAULT_SIZE]; - for (int i = 0; i < hashMultiSetResults.length; i++) { - hashMultiSetResults[i] = baseHashMultiSet.createHashMultiSetResult(); - } - - allMatchs = new int[batch.DEFAULT_SIZE]; - - equalKeySeriesValueCounts = new long[batch.DEFAULT_SIZE]; - equalKeySeriesAllMatchIndices = new int[batch.DEFAULT_SIZE]; - equalKeySeriesDuplicateCounts = new int[batch.DEFAULT_SIZE]; - - spills = new int[batch.DEFAULT_SIZE]; - spillHashMapResultIndices = new int[batch.DEFAULT_SIZE]; - } - //----------------------------------------------------------------------------------------------- /* @@ -156,8 +98,8 @@ protected void finishInnerBigOnly(VectorizedRowBatch batch, // Get rid of spills before we start modifying the batch. if (spillCount > 0) { - spillHashMapBatch(batch, hashTableResults, - spills, spillHashMapResultIndices, spillCount); + spillHashTableBatch(batch, hashTableResults, + spills, spillHashMultiSetResultIndices, spillCount); } /* @@ -186,121 +128,6 @@ protected void finishInnerBigOnly(VectorizedRowBatch batch, batch.selectedInUse = true; } - /** - * Generate the single value match inner big table only join output results for a match. - * - * @param batch - * The big table batch. - * @param allMatchs - * A subset of the rows of the batch that are matches. - * @param allMatchesIndex - * The logical index into allMatchs of the first equal key. - * @param duplicateCount - * The number of duplicates or equal keys. - * @param numSel - * The current count of rows in the rebuilding of the selected array. - * - * @return - * The new count of selected rows. - */ - private int generateHashMultiSetResultSingleValue(VectorizedRowBatch batch, - int[] allMatchs, int allMatchesIndex, int duplicateCount, int numSel) - throws HiveException, IOException { - - // LOG.debug("generateHashMultiSetResultSingleValue enter..."); - - // Generate result within big table batch itself. - - // LOG.debug("generateHashMultiSetResultSingleValue with big table..."); - - for (int i = 0; i < duplicateCount; i++) { - - int batchIndex = allMatchs[allMatchesIndex + i]; - - // Use the big table row as output. - batch.selected[numSel++] = batchIndex; - } - - return numSel; - } - - /** - * Generate results for a N x M cross product. - * - * @param batch - * The big table batch. - * @param allMatchs - * The all match selected array that contains (physical) batch indices. - * @param allMatchesIndex - * The index of the match key. - * @param duplicateCount - * Number of equal key rows. - * @param count - * Value count. - */ - private void generateHashMultiSetResultMultiValue(VectorizedRowBatch batch, - int[] allMatchs, int allMatchesIndex, - int duplicateCount, long count) throws HiveException, IOException { - - // LOG.debug("generateHashMultiSetResultMultiValue allMatchesIndex " + allMatchesIndex + " duplicateCount " + duplicateCount + " count " + count); - - // TODO: Look at repeating optimizations... - - for (int i = 0; i < duplicateCount; i++) { - - int batchIndex = allMatchs[allMatchesIndex + i]; - - for (long l = 0; l < count; l++) { - - // Copy the BigTable values into the overflow batch. Since the overflow batch may - // not get flushed here, we must copy by value. - if (bigTableRetainedVectorCopy != null) { - bigTableRetainedVectorCopy.copyByValue(batch, batchIndex, - overflowBatch, overflowBatch.size); - } - - overflowBatch.size++; - if (overflowBatch.size == overflowBatch.DEFAULT_SIZE) { - forwardOverflow(); - } - } - } - } - - /** - * Generate the inner big table only join output results for one vectorized row batch with - * a repeated key. - * - * @param batch - * The big table batch with any matching and any non matching rows both as - * selected in use. - * @param hashMultiSetResult - * The hash multi-set results for the batch. - */ - protected int generateHashMultiSetResultRepeatedAll(VectorizedRowBatch batch, - VectorMapJoinHashMultiSetResult hashMultiSetResult) throws HiveException { - - long count = hashMultiSetResult.count(); - - if (batch.selectedInUse) { - // The selected array is already filled in as we want it. - } else { - int[] selected = batch.selected; - for (int i = 0; i < batch.size; i++) { - selected[i] = i; - } - batch.selectedInUse = true; - } - - do { - forwardBigTableBatch(batch); - count--; - } while (count > 0); - - // We forwarded the batch in this method. - return 0; - } - protected void finishInnerBigOnlyRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult, VectorMapJoinHashMultiSetResult hashMultiSetResult) throws HiveException, IOException { @@ -314,10 +141,10 @@ protected void finishInnerBigOnlyRepeated(VectorizedRowBatch batch, JoinUtil.Joi } } - // Generate special repeated case. - int numSel = generateHashMultiSetResultRepeatedAll(batch, hashMultiSetResult); - batch.size = numSel; - batch.selectedInUse = true; + // Output repeated rows into overflow batch. + generateHashMultiSetResultRepeatedAll(batch, hashMultiSetResult); + batch.size = 0; + batch.selectedInUse = false; break; case SPILL: diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java index 678fa42..14df1d8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java @@ -43,7 +43,8 @@ * Specialized class for doing a vectorized map join that is an inner join on a Single-Column Long * and only big table columns appear in the join result so a hash multi-set is used. */ -public class VectorMapJoinInnerBigOnlyLongOperator extends VectorMapJoinInnerBigOnlyGenerateResultOperator { +public class VectorMapJoinInnerBigOnlyLongOperator + extends VectorMapJoinInnerBigOnlyGenerateResultOperator { private static final long serialVersionUID = 1L; @@ -297,22 +298,23 @@ public void process(Object row, int tag) throws HiveException { // Regardless of our matching result, we keep that information to make multiple use // of it for a possible series of equal keys. haveSaveKey = true; - + /* * Single-Column Long specific save key. */ - + saveKey = currentKey; - + /* * Single-Column Long specific lookup key. */ - + if (useMinMax && (currentKey < min || currentKey > max)) { // Key out of range for whole hash table. saveJoinResult = JoinUtil.JoinResult.NOMATCH; } else { - saveJoinResult = hashMultiSet.contains(currentKey, hashMultiSetResults[hashMultiSetResultCount]); + saveJoinResult = + hashMultiSet.contains(currentKey, hashMultiSetResults[hashMultiSetResultCount]); } } @@ -331,7 +333,7 @@ public void process(Object row, int tag) throws HiveException { case SPILL: spills[spillCount] = batchIndex; - spillHashMapResultIndices[spillCount] = hashMultiSetResultCount; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; spillCount++; break; @@ -351,7 +353,7 @@ public void process(Object row, int tag) throws HiveException { case SPILL: spills[spillCount] = batchIndex; - spillHashMapResultIndices[spillCount] = hashMultiSetResultCount; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; spillCount++; break; @@ -385,8 +387,8 @@ public void process(Object row, int tag) throws HiveException { " equalKeySeriesAllMatchIndices " + intArrayToRangesString(equalKeySeriesAllMatchIndices, equalKeySeriesCount) + " equalKeySeriesDuplicateCounts " + intArrayToRangesString(equalKeySeriesDuplicateCounts, equalKeySeriesCount) + " spills " + intArrayToRangesString(spills, spillCount) + - " spillHashMapResultIndices " + intArrayToRangesString(spillHashMapResultIndices, spillCount) + - " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); + " spillHashMultiSetResultIndices " + intArrayToRangesString(spillHashMultiSetResultIndices, spillCount) + + " hashMultiSetResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); } finishInnerBigOnly(batch, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java index 866aa60..32209f4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java @@ -315,14 +315,17 @@ public void process(Object row, int tag) throws HiveException { temp = saveKeyOutput; saveKeyOutput = currentKeyOutput; currentKeyOutput = temp; - + /* * Single-Column Long specific lookup key. */ - + byte[] keyBytes = saveKeyOutput.getData(); int keyLength = saveKeyOutput.getLength(); - saveJoinResult = hashMultiSet.contains(keyBytes, 0, keyLength, hashMultiSetResults[hashMultiSetResultCount]); + saveJoinResult = + hashMultiSet.contains( + keyBytes, 0, keyLength, + hashMultiSetResults[hashMultiSetResultCount]); } /* @@ -340,7 +343,7 @@ public void process(Object row, int tag) throws HiveException { case SPILL: spills[spillCount] = batchIndex; - spillHashMapResultIndices[spillCount] = hashMultiSetResultCount; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; spillCount++; break; @@ -360,7 +363,7 @@ public void process(Object row, int tag) throws HiveException { case SPILL: spills[spillCount] = batchIndex; - spillHashMapResultIndices[spillCount] = hashMultiSetResultCount; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; spillCount++; break; @@ -394,7 +397,7 @@ public void process(Object row, int tag) throws HiveException { " equalKeySeriesAllMatchIndices " + intArrayToRangesString(equalKeySeriesAllMatchIndices, equalKeySeriesCount) + " equalKeySeriesDuplicateCounts " + intArrayToRangesString(equalKeySeriesDuplicateCounts, equalKeySeriesCount) + " spills " + intArrayToRangesString(spills, spillCount) + - " spillHashMapResultIndices " + intArrayToRangesString(spillHashMapResultIndices, spillCount) + + " spillHashMultiSetResultIndices " + intArrayToRangesString(spillHashMultiSetResultIndices, spillCount) + " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java index a0c3b9c..4a60a00 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java @@ -285,21 +285,24 @@ public void process(Object row, int tag) throws HiveException { // Regardless of our matching result, we keep that information to make multiple use // of it for a possible series of equal keys. haveSaveKey = true; - + /* * Single-Column String specific save key. */ - + saveKeyBatchIndex = batchIndex; - + /* * Single-Column String specific lookup key. */ - + byte[] keyBytes = vector[batchIndex]; int keyStart = start[batchIndex]; int keyLength = length[batchIndex]; - saveJoinResult = hashMultiSet.contains(keyBytes, keyStart, keyLength, hashMultiSetResults[hashMultiSetResultCount]); + saveJoinResult = + hashMultiSet.contains( + keyBytes, keyStart, keyLength, + hashMultiSetResults[hashMultiSetResultCount]); } /* @@ -317,7 +320,7 @@ public void process(Object row, int tag) throws HiveException { case SPILL: spills[spillCount] = batchIndex; - spillHashMapResultIndices[spillCount] = hashMultiSetResultCount; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; spillCount++; break; @@ -337,7 +340,7 @@ public void process(Object row, int tag) throws HiveException { case SPILL: spills[spillCount] = batchIndex; - spillHashMapResultIndices[spillCount] = hashMultiSetResultCount; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; spillCount++; break; @@ -371,7 +374,7 @@ public void process(Object row, int tag) throws HiveException { " equalKeySeriesAllMatchIndices " + intArrayToRangesString(equalKeySeriesAllMatchIndices, equalKeySeriesCount) + " equalKeySeriesDuplicateCounts " + intArrayToRangesString(equalKeySeriesDuplicateCounts, equalKeySeriesCount) + " spills " + intArrayToRangesString(spills, spillCount) + - " spillHashMapResultIndices " + intArrayToRangesString(spillHashMapResultIndices, spillCount) + + " spillHashMultiSetResultIndices " + intArrayToRangesString(spillHashMultiSetResultIndices, spillCount) + " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java index ea2c04d..7db35fa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java @@ -193,7 +193,7 @@ protected void finishInner(VectorizedRowBatch batch, } if (spillCount > 0) { - spillHashMapBatch(batch, (VectorMapJoinHashTableResult[]) hashMapResults, + spillHashTableBatch(batch, (VectorMapJoinHashTableResult[]) hashMapResults, spills, spillHashMapResultIndices, spillCount); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java index f68d4c4..b5c5efd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java @@ -132,7 +132,7 @@ protected void finishLeftSemi(VectorizedRowBatch batch, // Get rid of spills before we start modifying the batch. if (spillCount > 0) { - spillHashMapBatch(batch, hashTableResults, + spillHashTableBatch(batch, hashTableResults, spills, spillHashMapResultIndices, spillCount); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyGenerateResultOperator.java new file mode 100644 index 0000000..5edd6f6 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyGenerateResultOperator.java @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMap; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSet; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTableResult; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSetResult; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; + +/** + * This class has methods for generating vectorized join results for the big table only + * variation of outer joins. + * + * When an outer join does not have any small table columns in the join result, we use this + * variation we call outer big only. This variation uses a hash multi-set instead of hash map + * since there are no values (just a count). + * + * Note that if a outer key appears in the small table results area, we use the outer join + * projection optimization and are able to use this variation. + */ +public abstract class VectorMapJoinOuterBigOnlyGenerateResultOperator + extends VectorMapJoinBigOnlyGenerateResultOperator { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger( + VectorMapJoinOuterBigOnlyGenerateResultOperator.class.getName()); + + //--------------------------------------------------------------------------- + // Outer big-table only join specific members. + // + + // Pre-allocated member for remembering the big table's selected array at the beginning of + // the process method before applying any filter. For outer join we need to remember which + // rows did not match since they will appear the in outer join result with NULLs for the + // small table. + protected transient int[] inputSelected; + + // Pre-allocated member for storing any non-spills, non-matches, or merged row indexes during a + // process method call. + protected transient int[] nonSpills; + protected transient int[] noMatchs; + protected transient int[] merged; + + /** Kryo ctor. */ + protected VectorMapJoinOuterBigOnlyGenerateResultOperator() { + super(); + } + + public VectorMapJoinOuterBigOnlyGenerateResultOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorMapJoinOuterBigOnlyGenerateResultOperator(CompilationOpContext ctx, OperatorDesc conf, + VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException { + super(ctx, conf, vContext, vectorDesc); + } + + /* + * Setup our outer join specific members. + */ + protected void commonSetup(VectorizedRowBatch batch) throws HiveException { + super.commonSetup(batch); + + inputSelected = new int[batch.DEFAULT_SIZE]; + + nonSpills = new int[batch.DEFAULT_SIZE]; + noMatchs = new int[batch.DEFAULT_SIZE]; + merged = new int[batch.DEFAULT_SIZE]; + + } + + //----------------------------------------------------------------------------------------------- + + /* + * Outer big table only join (hash multi-set). + */ + + /** + * Do the per-batch setup for an outer join. + */ + protected void outerPerBatchSetup(VectorizedRowBatch batch) { + + // For join operators that can generate small table results, reset their + // (target) scratch columns. + + for (int column : smallTableOutputVectorColumns) { + ColumnVector smallTableColumn = batch.cols[column]; + smallTableColumn.reset(); + } + + for (int column : bigTableOuterKeyOutputVectorColumns) { + ColumnVector bigTableOuterKeyColumn = batch.cols[column]; + bigTableOuterKeyColumn.reset(); + } + } + + /** + * Generate the outer join output results for one vectorized row batch. + * + * @param batch + * The big table batch with any matching and any non matching rows both as + * selected in use. + * @param allMatchCount + * Number of matches in allMatchs. + * @param equalKeySeriesCount + * Number of single value matches. + * @param atLeastOneNonMatch + * Whether at least one row was a non-match. + * @param inputSelectedInUse + * A copy of the batch's selectedInUse flag on input to the process method. + * @param inputLogicalSize + * The batch's size on input to the process method. + * @param spillCount + * Number of spills in spills. + * @param hashMapResultCount + * Number of entries in hashMapResults. + */ + public void finishOuterBigOnly(VectorizedRowBatch batch, + int allMatchCount, int equalKeySeriesCount, boolean atLeastOneNonMatch, + boolean inputSelectedInUse, int inputLogicalSize, + int spillCount, int hashMapResultCount) throws IOException, HiveException { + + // Get rid of spills before we start modifying the batch. + if (spillCount > 0) { + spillHashTableBatch(batch, (VectorMapJoinHashTableResult[]) hashMultiSetResults, + spills, spillHashMultiSetResultIndices, spillCount); + } + + int noMatchCount = 0; + if (spillCount > 0) { + + // Subtract the spills to get all match and non-match rows. + int nonSpillCount = subtractFromInputSelected( + inputSelectedInUse, inputLogicalSize, inputSelected, + spills, spillCount, nonSpills); + + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter spillCount > 0" + + " nonSpills " + intArrayToRangesString(nonSpills, nonSpillCount)); + } + + // Big table value expressions apply to ALL matching and non-matching rows. + if (bigTableValueExpressions != null) { + + doValueExpr(batch, nonSpills, nonSpillCount); + + } + + if (atLeastOneNonMatch) { + noMatchCount = subtract(nonSpills, nonSpillCount, allMatchs, allMatchCount, + noMatchs); + + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter spillCount > 0" + + " noMatchs " + intArrayToRangesString(noMatchs, noMatchCount)); + } + + } + } else { + + // Run value expressions over original (whole) input batch. + doValueExprOnInputSelected(batch, inputSelectedInUse, inputLogicalSize, inputSelected); + + if (atLeastOneNonMatch) { + noMatchCount = subtractFromInputSelected( + inputSelectedInUse, inputLogicalSize, inputSelected, + allMatchs, allMatchCount, noMatchs); + + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter spillCount == 0" + + " noMatchs " + intArrayToRangesString(noMatchs, noMatchCount)); + } + } + } + + // When we generate results into the overflow batch, we may still end up with fewer rows + // in the big table batch. So, nulSel and the batch's selected array will be rebuilt with + // just the big table rows that need to be forwarded, minus any rows processed with the + // overflow batch. + if (allMatchCount > 0) { + + int numSel = 0; + for (int i = 0; i < equalKeySeriesCount; i++) { + long count = equalKeySeriesValueCounts[i]; + int allMatchesIndex = equalKeySeriesAllMatchIndices[i]; + int duplicateCount = equalKeySeriesDuplicateCounts[i]; + + if (count == 1) { + numSel = generateHashMultiSetResultSingleValue( + batch, allMatchs, allMatchesIndex, duplicateCount, numSel); + } else { + generateHashMultiSetResultMultiValue(batch, + allMatchs, allMatchesIndex, + duplicateCount, count); + } + } + + // The number of single value rows that were generated in the big table batch. + batch.size = numSel; + batch.selectedInUse = true; + + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter allMatchCount > 0" + + " batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } + + } else { + batch.size = 0; + } + + if (noMatchCount > 0) { + if (batch.size > 0) { + + generateOuterNulls(batch, noMatchs, noMatchCount); + + // Merge noMatchs and (match) selected. + int mergeCount = sortMerge( + noMatchs, noMatchCount, batch.selected, batch.size, merged); + + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter noMatchCount > 0 && batch.size > 0" + + " merged " + intArrayToRangesString(merged, mergeCount)); + } + + System.arraycopy(merged, 0, batch.selected, 0, mergeCount); + batch.size = mergeCount; + batch.selectedInUse = true; + } else { + + // We can use the whole batch for output of no matches. + + generateOuterNullsRepeatedAll(batch); + + System.arraycopy(noMatchs, 0, batch.selected, 0, noMatchCount); + batch.size = noMatchCount; + batch.selectedInUse = true; + + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter noMatchCount > 0 && batch.size == 0" + + " batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } + } + } + } + + /** + * Generate the outer join output results for one vectorized row batch with a repeated key. + * + * Any filter expressions will apply now since hash map lookup for outer join is complete. + * + * @param batch + * The big table batch with any matching and any non matching rows both as + * selected in use. + * @param joinResult + * The hash map lookup result for the repeated key. + * @param hashMultiSetResult + * The repeated hash multi-set result for the batch. + * @param someRowsFilteredOut + * Whether some rows of the repeated key batch were knocked out by the filter. + * @param inputSelectedInUse + * A copy of the batch's selectedInUse flag on input to the process method. + * @param inputLogicalSize + * The batch's size on input to the process method. + * @param scratch1 + * Pre-allocated storage to internal use. + * @param scratch2 + * Pre-allocated storage to internal use. + */ + public void finishOuterBigOnlyRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult, + VectorMapJoinHashMultiSetResult hashMultiSetResult, boolean someRowsFilteredOut, + boolean inputSelectedInUse, int inputLogicalSize) + throws IOException, HiveException { + + // LOG.debug("finishOuterRepeated batch #" + batchCounter + " " + joinResult.name() + " batch.size " + batch.size + " someRowsFilteredOut " + someRowsFilteredOut); + + switch (joinResult) { + case MATCH: + + // Rows we looked up as one repeated key are a match. But filtered out rows + // need to be generated as non-matches, too. + + if (someRowsFilteredOut) { + + // For the filtered out rows that didn't (logically) get looked up in the hash table, + // we need to generate no match results for those too... + + // Run value expressions over original (whole) input batch. + doValueExprOnInputSelected(batch, inputSelectedInUse, inputLogicalSize, inputSelected); + + // Now calculate which rows were filtered out (they are logically no matches). + + // Determine which rows are non matches by determining the delta between inputSelected and + // (current) batch selected. + + int noMatchCount = subtractFromInputSelected( + inputSelectedInUse, inputLogicalSize, inputSelected, + batch.selected, batch.size, noMatchs); + + final long count = hashMultiSetResult.count(); + if (count == 1) { + + // Make all the non-filtered rows as selectedInUse, if necessary. + goSelected(batch); + + // Use our selected array as the equal key. Modify our batch in-place. + generateHashMultiSetResultSingleValue( + batch, batch.selected, /* allMatchesIndex */ 0, + /* duplicateCount */ batch.size, /* numSel */ 0); + + // Merge noMatchs and (match) selected. + int mergeCount = sortMerge( + noMatchs, noMatchCount, batch.selected, batch.size, merged); + + generateOuterNulls(batch, noMatchs, noMatchCount); + + System.arraycopy(merged, 0, batch.selected, 0, mergeCount); + batch.size = mergeCount; + batch.selectedInUse = true; + } else { + + // Make all the non-filtered rows as selectedInUse, if necessary. + goSelected(batch); + + // Use our selected array as the equal key. Send all matched rows multiple times + // to the overflow batch. + generateHashMultiSetResultMultiValue( + batch, batch.selected, /* allMatchesIndex */ 0, + /* duplicateCount */ batch.size, count); + + // Only no matches in batch. + generateOuterNulls(batch, noMatchs, noMatchCount); + + System.arraycopy(noMatchs, 0, batch.selected, 0, noMatchCount); + batch.size = noMatchCount; + batch.selectedInUse = true; + } + } else { + + // Just run our value expressions over input batch. + + if (bigTableValueExpressions != null) { + for(VectorExpression ve: bigTableValueExpressions) { + ve.evaluate(batch); + } + } + + // Output repeated rows into overflow batch. + generateHashMultiSetResultRepeatedAll(batch, hashMultiSetResult); + + // Everything got forwarded. + batch.size = 0; + batch.selectedInUse = false; + } + break; + + case SPILL: + + // Rows we looked up as one repeated key need to spill. But filtered out rows + // need to be generated as non-matches below. + + spillBatchRepeated(batch, (VectorMapJoinHashTableResult) hashMultiSetResult); + + // After using selected to generate spills, generate non-matches, if any. + if (someRowsFilteredOut) { + + // Determine which rows are non matches by determining the delta between inputSelected and + // (current) batch selected. + + int noMatchCount = subtractFromInputSelected( + inputSelectedInUse, inputLogicalSize, inputSelected, + batch.selected, batch.size, noMatchs); + + generateOuterNulls(batch, noMatchs, noMatchCount); + + System.arraycopy(noMatchs, 0, batch.selected, 0, noMatchCount); + batch.size = noMatchCount; + batch.selectedInUse = true; + } else { + batch.size = 0; + } + + break; + + case NOMATCH: + + if (someRowsFilteredOut) { + + // When the repeated no match is due to filtering, we need to restore the + // selected information. + + if (inputSelectedInUse) { + System.arraycopy(inputSelected, 0, batch.selected, 0, inputLogicalSize); + } + batch.size = inputLogicalSize; + } + + // Run our value expressions over whole batch. + if (bigTableValueExpressions != null) { + for(VectorExpression ve: bigTableValueExpressions) { + ve.evaluate(batch); + } + } + + generateOuterNullsRepeatedAll(batch); + break; + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyLongOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyLongOperator.java new file mode 100644 index 0000000..175ec73 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyLongOperator.java @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +import java.io.IOException; +import java.util.Arrays; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; +// Single-Column Long hash table import. +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMap; + +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMultiSet; +// Single-Column Long specific imports. +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; + +/* + * Specialized class for doing a vectorized map join that is an outer join on a Single-Column Long + * using a hash map. + */ +public class VectorMapJoinOuterBigOnlyLongOperator + extends VectorMapJoinOuterBigOnlyGenerateResultOperator { + + private static final long serialVersionUID = 1L; + + //------------------------------------------------------------------------------------------------ + + private static final String CLASS_NAME = VectorMapJoinOuterBigOnlyLongOperator.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + protected String getLoggingPrefix() { + return super.getLoggingPrefix(CLASS_NAME); + } + + //------------------------------------------------------------------------------------------------ + + // (none) + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + + // The hash map for this specialized class. + private transient VectorMapJoinLongHashMultiSet hashMultiSet; + + //--------------------------------------------------------------------------- + // Single-Column Long specific members. + // + + // For integers, we have optional min/max filtering. + private transient boolean useMinMax; + private transient long min; + private transient long max; + + // The column number for this one column join specialization. + private transient int singleJoinColumn; + + //--------------------------------------------------------------------------- + // Pass-thru constructors. + // + + /** Kryo ctor. */ + protected VectorMapJoinOuterBigOnlyLongOperator() { + super(); + } + + public VectorMapJoinOuterBigOnlyLongOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorMapJoinOuterBigOnlyLongOperator(CompilationOpContext ctx, OperatorDesc conf, + VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException { + super(ctx, conf, vContext, vectorDesc); + } + + //--------------------------------------------------------------------------- + // Process Single-Column Long Outer Join on a vectorized row batch. + // + + @Override + public void process(Object row, int tag) throws HiveException { + + try { + VectorizedRowBatch batch = (VectorizedRowBatch) row; + + alias = (byte) tag; + + if (needCommonSetup) { + // Our one time process method initialization. + commonSetup(batch); + + /* + * Initialize Single-Column Long members for this specialized class. + */ + + singleJoinColumn = bigTableKeyColumnMap[0]; + + needCommonSetup = false; + } + + if (needHashTableSetup) { + // Setup our hash table specialization. It will be the first time the process + // method is called, or after a Hybrid Grace reload. + + /* + * Get our Single-Column Long hash multi-set information for this specialized class. + */ + + hashMultiSet = (VectorMapJoinLongHashMultiSet) vectorMapJoinHashTable; + useMinMax = hashMultiSet.useMinMax(); + if (useMinMax) { + min = hashMultiSet.min(); + max = hashMultiSet.max(); + } + + needHashTableSetup = false; + } + + batchCounter++; + + final int inputLogicalSize = batch.size; + + if (inputLogicalSize == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); + } + return; + } + + // Do the per-batch setup for an outer join. + + outerPerBatchSetup(batch); + + // For outer join, remember our input rows before ON expression filtering or before + // hash table matching so we can generate results for all rows (matching and non matching) + // later. + boolean inputSelectedInUse = batch.selectedInUse; + if (inputSelectedInUse) { + // if (!verifyMonotonicallyIncreasing(batch.selected, batch.size)) { + // throw new HiveException("batch.selected is not in sort order and unique"); + // } + System.arraycopy(batch.selected, 0, inputSelected, 0, inputLogicalSize); + } + + // Filtering for outer join just removes rows available for hash table matching. + boolean someRowsFilteredOut = false; + if (bigTableFilterExpressions.length > 0) { + // Since the input + for (VectorExpression ve : bigTableFilterExpressions) { + ve.evaluate(batch); + } + someRowsFilteredOut = (batch.size != inputLogicalSize); + if (LOG.isDebugEnabled()) { + if (batch.selectedInUse) { + if (inputSelectedInUse) { + LOG.debug(CLASS_NAME + + " inputSelected " + intArrayToRangesString(inputSelected, inputLogicalSize) + + " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } else { + LOG.debug(CLASS_NAME + + " inputLogicalSize " + inputLogicalSize + + " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } + } + } + } + + // Perform any key expressions. Results will go into scratch columns. + if (bigTableKeyExpressions != null) { + for (VectorExpression ve : bigTableKeyExpressions) { + ve.evaluate(batch); + } + } + + /* + * Single-Column Long specific declarations. + */ + + // The one join column for this specialized class. + LongColumnVector joinColVector = (LongColumnVector) batch.cols[singleJoinColumn]; + long[] vector = joinColVector.vector; + + /* + * Single-Column Long check for repeating. + */ + + // Check single column for repeating. + boolean allKeyInputColumnsRepeating = joinColVector.isRepeating; + + if (allKeyInputColumnsRepeating) { + + /* + * Repeating. + */ + + // All key input columns are repeating. Generate key once. Lookup once. + // Since the key is repeated, we must use entry 0 regardless of selectedInUse. + + /* + * Single-Column Long specific repeated lookup. + */ + + JoinUtil.JoinResult joinResult; + if (batch.size == 0) { + // Whole repeated key batch was filtered out. + joinResult = JoinUtil.JoinResult.NOMATCH; + } else if (!joinColVector.noNulls && joinColVector.isNull[0]) { + // Any (repeated) null key column is no match for whole batch. + joinResult = JoinUtil.JoinResult.NOMATCH; + } else { + // Handle *repeated* join key, if found. + long key = vector[0]; + // LOG.debug(CLASS_NAME + " repeated key " + key); + if (useMinMax && (key < min || key > max)) { + // Out of range for whole batch. + joinResult = JoinUtil.JoinResult.NOMATCH; + } else { + joinResult = hashMultiSet.contains(key, hashMultiSetResults[0]); + } + } + + /* + * Common repeated join result processing. + */ + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); + } + finishOuterBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0], someRowsFilteredOut, + inputSelectedInUse, inputLogicalSize); + } else { + + /* + * NOT Repeating. + */ + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " non-repeated"); + } + + int selected[] = batch.selected; + boolean selectedInUse = batch.selectedInUse; + + int hashMultiSetResultCount = 0; + int allMatchCount = 0; + int equalKeySeriesCount = 0; + int spillCount = 0; + + boolean atLeastOneNonMatch = someRowsFilteredOut; + + /* + * Single-Column Long specific variables. + */ + + long saveKey = 0; + + // We optimize performance by only looking up the first key in a series of equal keys. + boolean haveSaveKey = false; + JoinUtil.JoinResult saveJoinResult = JoinUtil.JoinResult.NOMATCH; + + // Logical loop over the rows in the batch since the batch may have selected in use. + for (int logical = 0; logical < batch.size; logical++) { + int batchIndex = (selectedInUse ? selected[logical] : logical); + + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, taskName + ", " + getOperatorId() + " candidate " + CLASS_NAME + " batch"); + + /* + * Single-Column Long outer null detection. + */ + + boolean isNull = !joinColVector.noNulls && joinColVector.isNull[batchIndex]; + + if (isNull) { + + // Have that the NULL does not interfere with the current equal key series, if there + // is one. We do not set saveJoinResult. + // + // Let a current MATCH equal key series keep going, or + // Let a current SPILL equal key series keep going, or + // Let a current NOMATCH keep not matching. + + atLeastOneNonMatch = true; + + // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " NULL"); + } else { + + /* + * Single-Column Long outer get key. + */ + + long currentKey = vector[batchIndex]; + + /* + * Equal key series checking. + */ + + if (!haveSaveKey || currentKey != saveKey) { + // New key. + + if (haveSaveKey) { + // Move on with our counts. + switch (saveJoinResult) { + case MATCH: + hashMultiSetResultCount++; + equalKeySeriesCount++; + break; + case SPILL: + hashMultiSetResultCount++; + break; + case NOMATCH: + break; + } + } + + // Regardless of our matching result, we keep that information to make multiple use + // of it for a possible series of equal keys. + haveSaveKey = true; + + /* + * Single-Column Long specific save key. + */ + + saveKey = currentKey; + + /* + * Single-Column Long specific lookup key. + */ + + if (useMinMax && (currentKey < min || currentKey > max)) { + // Key out of range for whole hash table. + saveJoinResult = JoinUtil.JoinResult.NOMATCH; + } else { + saveJoinResult = + hashMultiSet.contains(currentKey, hashMultiSetResults[hashMultiSetResultCount]); + } + + // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " New Key " + currentKey + " " + saveJoinResult.name()); + + /* + * Common outer join result processing. + */ + + switch (saveJoinResult) { + case MATCH: + equalKeySeriesValueCounts[equalKeySeriesCount] = hashMultiSetResults[hashMultiSetResultCount].count(); + equalKeySeriesAllMatchIndices[equalKeySeriesCount] = allMatchCount; + equalKeySeriesDuplicateCounts[equalKeySeriesCount] = 1; + allMatchs[allMatchCount++] = batchIndex; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH isSingleValue " + equalKeySeriesIsSingleValue[equalKeySeriesCount] + " currentKey " + currentKey); + break; + + case SPILL: + spills[spillCount] = batchIndex; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; + spillCount++; + break; + + case NOMATCH: + atLeastOneNonMatch = true; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH" + " currentKey " + currentKey); + break; + } + } else { + // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " Key Continues " + saveKey + " " + saveJoinResult.name()); + + // Series of equal keys. + + switch (saveJoinResult) { + case MATCH: + equalKeySeriesDuplicateCounts[equalKeySeriesCount]++; + allMatchs[allMatchCount++] = batchIndex; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH duplicate"); + break; + + case SPILL: + spills[spillCount] = batchIndex; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; + spillCount++; + break; + + case NOMATCH: + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH duplicate"); + break; + } + } + // if (!verifyMonotonicallyIncreasing(allMatchs, allMatchCount)) { + // throw new HiveException("allMatchs is not in sort order and unique"); + // } + } + } + + if (haveSaveKey) { + // Update our counts for the last key. + switch (saveJoinResult) { + case MATCH: + hashMultiSetResultCount++; + equalKeySeriesCount++; + break; + case SPILL: + hashMultiSetResultCount++; + break; + case NOMATCH: + break; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + + " allMatchs " + intArrayToRangesString(allMatchs,allMatchCount) + + " equalKeySeriesValueCounts " + longArrayToRangesString(equalKeySeriesValueCounts, equalKeySeriesCount) + + " equalKeySeriesAllMatchIndices " + intArrayToRangesString(equalKeySeriesAllMatchIndices, equalKeySeriesCount) + + " equalKeySeriesDuplicateCounts " + intArrayToRangesString(equalKeySeriesDuplicateCounts, equalKeySeriesCount) + + " atLeastOneNonMatch " + atLeastOneNonMatch + + " inputSelectedInUse " + inputSelectedInUse + + " inputLogicalSize " + inputLogicalSize + + " spills " + intArrayToRangesString(spills, spillCount) + + " spillHashMultiSetResultIndices " + intArrayToRangesString(spillHashMultiSetResultIndices, spillCount) + + " hashMultiSetResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); + } + + // We will generate results for all matching and non-matching rows. + finishOuterBigOnly(batch, + allMatchCount, equalKeySeriesCount, atLeastOneNonMatch, + inputSelectedInUse, inputLogicalSize, + spillCount, hashMultiSetResultCount); + } + + if (batch.size > 0) { + // Forward any remaining selected rows. + forwardBigTableBatch(batch); + } + + } catch (IOException e) { + throw new HiveException(e); + } catch (Exception e) { + throw new HiveException(e); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyMultiKeyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyMultiKeyOperator.java new file mode 100644 index 0000000..3d75956 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyMultiKeyOperator.java @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +import java.io.IOException; +import java.util.Arrays; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; +// Multi-Key hash table import. +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap; + +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMultiSet; +// Multi-Key specific imports. +import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; + +import com.google.common.base.Preconditions; + +/* + * Specialized class for doing a vectorized map join that is an outer join on Multi-Key + * using a hash map. + */ +public class VectorMapJoinOuterBigOnlyMultiKeyOperator + extends VectorMapJoinOuterBigOnlyGenerateResultOperator { + + private static final long serialVersionUID = 1L; + + //------------------------------------------------------------------------------------------------ + + private static final String CLASS_NAME = VectorMapJoinOuterBigOnlyMultiKeyOperator.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + protected String getLoggingPrefix() { + return super.getLoggingPrefix(CLASS_NAME); + } + + //------------------------------------------------------------------------------------------------ + + // (none) + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + + // The hash map for this specialized class. + private transient VectorMapJoinBytesHashMultiSet hashMultiSet; + + //--------------------------------------------------------------------------- + // Multi-Key specific members. + // + + // Object that can take a set of columns in row in a vectorized row batch and serialized it. + private transient VectorSerializeRow keyVectorSerializeWrite; + + // The BinarySortable serialization of the current key. + private transient Output currentKeyOutput; + + // The BinarySortable serialization of the saved key for a possible series of equal keys. + private transient Output saveKeyOutput; + + //--------------------------------------------------------------------------- + // Pass-thru constructors. + // + + /** Kryo ctor. */ + protected VectorMapJoinOuterBigOnlyMultiKeyOperator() { + super(); + } + + public VectorMapJoinOuterBigOnlyMultiKeyOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorMapJoinOuterBigOnlyMultiKeyOperator(CompilationOpContext ctx, OperatorDesc conf, + VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException { + super(ctx, conf, vContext, vectorDesc); + } + + //--------------------------------------------------------------------------- + // Process Multi-Key Outer Join on a vectorized row batch. + // + + @Override + public void process(Object row, int tag) throws HiveException { + + try { + VectorizedRowBatch batch = (VectorizedRowBatch) row; + + alias = (byte) tag; + + if (needCommonSetup) { + // Our one time process method initialization. + commonSetup(batch); + + /* + * Initialize Multi-Key members for this specialized class. + */ + + keyVectorSerializeWrite = new VectorSerializeRow( + new BinarySortableSerializeWrite(bigTableKeyColumnMap.length)); + keyVectorSerializeWrite.init(bigTableKeyTypeInfos, bigTableKeyColumnMap); + + currentKeyOutput = new Output(); + saveKeyOutput = new Output(); + + needCommonSetup = false; + } + + if (needHashTableSetup) { + // Setup our hash table specialization. It will be the first time the process + // method is called, or after a Hybrid Grace reload. + + /* + * Get our Multi-Key hash multi-set information for this specialized class. + */ + + hashMultiSet = (VectorMapJoinBytesHashMultiSet) vectorMapJoinHashTable; + + needHashTableSetup = false; + } + + batchCounter++; + + final int inputLogicalSize = batch.size; + + if (inputLogicalSize == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); + } + return; + } + + // Do the per-batch setup for an outer join. + + outerPerBatchSetup(batch); + + // For outer join, remember our input rows before ON expression filtering or before + // hash table matching so we can generate results for all rows (matching and non matching) + // later. + boolean inputSelectedInUse = batch.selectedInUse; + if (inputSelectedInUse) { + // if (!verifyMonotonicallyIncreasing(batch.selected, batch.size)) { + // throw new HiveException("batch.selected is not in sort order and unique"); + // } + System.arraycopy(batch.selected, 0, inputSelected, 0, inputLogicalSize); + } + + // Filtering for outer join just removes rows available for hash table matching. + boolean someRowsFilteredOut = false; + if (bigTableFilterExpressions.length > 0) { + // Since the input + for (VectorExpression ve : bigTableFilterExpressions) { + ve.evaluate(batch); + } + someRowsFilteredOut = (batch.size != inputLogicalSize); + if (LOG.isDebugEnabled()) { + if (batch.selectedInUse) { + if (inputSelectedInUse) { + LOG.debug(CLASS_NAME + + " inputSelected " + intArrayToRangesString(inputSelected, inputLogicalSize) + + " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } else { + LOG.debug(CLASS_NAME + + " inputLogicalSize " + inputLogicalSize + + " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } + } + } + } + + // Perform any key expressions. Results will go into scratch columns. + if (bigTableKeyExpressions != null) { + for (VectorExpression ve : bigTableKeyExpressions) { + ve.evaluate(batch); + } + } + + /* + * Multi-Key specific declarations. + */ + + // None. + + /* + * Multi-Key Long check for repeating. + */ + + // If all BigTable input columns to key expressions are isRepeating, then + // calculate key once; lookup once. + // Also determine if any nulls are present since for a join that means no match. + boolean allKeyInputColumnsRepeating; + boolean someKeyInputColumnIsNull = false; // Only valid if allKeyInputColumnsRepeating is true. + if (bigTableKeyColumnMap.length == 0) { + allKeyInputColumnsRepeating = false; + } else { + allKeyInputColumnsRepeating = true; + for (int i = 0; i < bigTableKeyColumnMap.length; i++) { + ColumnVector colVector = batch.cols[bigTableKeyColumnMap[i]]; + if (!colVector.isRepeating) { + allKeyInputColumnsRepeating = false; + break; + } + if (!colVector.noNulls && colVector.isNull[0]) { + someKeyInputColumnIsNull = true; + } + } + } + + if (allKeyInputColumnsRepeating) { + + /* + * Repeating. + */ + + // All key input columns are repeating. Generate key once. Lookup once. + // Since the key is repeated, we must use entry 0 regardless of selectedInUse. + + /* + * Multi-Key specific repeated lookup. + */ + + JoinUtil.JoinResult joinResult; + if (batch.size == 0) { + // Whole repeated key batch was filtered out. + joinResult = JoinUtil.JoinResult.NOMATCH; + } else if (someKeyInputColumnIsNull) { + // Any (repeated) null key column is no match for whole batch. + joinResult = JoinUtil.JoinResult.NOMATCH; + } else { + + // All key input columns are repeating. Generate key once. Lookup once. + keyVectorSerializeWrite.setOutput(currentKeyOutput); + keyVectorSerializeWrite.serializeWrite(batch, 0); + byte[] keyBytes = currentKeyOutput.getData(); + int keyLength = currentKeyOutput.getLength(); + joinResult = hashMultiSet.contains(keyBytes, 0, keyLength, hashMultiSetResults[0]); + } + + /* + * Common repeated join result processing. + */ + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); + } + finishOuterBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0], someRowsFilteredOut, + inputSelectedInUse, inputLogicalSize); + } else { + + /* + * NOT Repeating. + */ + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " non-repeated"); + } + + int selected[] = batch.selected; + boolean selectedInUse = batch.selectedInUse; + + int hashMultiSetResultCount = 0; + int allMatchCount = 0; + int equalKeySeriesCount = 0; + int spillCount = 0; + + boolean atLeastOneNonMatch = someRowsFilteredOut; + + /* + * Multi-Key specific variables. + */ + + Output temp; + + // We optimize performance by only looking up the first key in a series of equal keys. + boolean haveSaveKey = false; + JoinUtil.JoinResult saveJoinResult = JoinUtil.JoinResult.NOMATCH; + + // Logical loop over the rows in the batch since the batch may have selected in use. + for (int logical = 0; logical < batch.size; logical++) { + int batchIndex = (selectedInUse ? selected[logical] : logical); + + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, taskName + ", " + getOperatorId() + " candidate " + CLASS_NAME + " batch"); + + /* + * Multi-Key outer null detection. + */ + + // Generate binary sortable key for current row in vectorized row batch. + keyVectorSerializeWrite.setOutput(currentKeyOutput); + keyVectorSerializeWrite.serializeWrite(batch, batchIndex); + if (keyVectorSerializeWrite.getHasAnyNulls()) { + + // Have that the NULL does not interfere with the current equal key series, if there + // is one. We do not set saveJoinResult. + // + // Let a current MATCH equal key series keep going, or + // Let a current SPILL equal key series keep going, or + // Let a current NOMATCH keep not matching. + + atLeastOneNonMatch = true; + + // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " NULL"); + } else { + + /* + * Multi-Key outer get key. + */ + + // Generated earlier to get possible null(s). + + /* + * Equal key series checking. + */ + + if (!haveSaveKey || !saveKeyOutput.arraysEquals(currentKeyOutput)) { + + // New key. + + if (haveSaveKey) { + // Move on with our counts. + switch (saveJoinResult) { + case MATCH: + hashMultiSetResultCount++; + equalKeySeriesCount++; + break; + case SPILL: + hashMultiSetResultCount++; + break; + case NOMATCH: + break; + } + } + + // Regardless of our matching result, we keep that information to make multiple use + // of it for a possible series of equal keys. + haveSaveKey = true; + + /* + * Multi-Key specific save key. + */ + + temp = saveKeyOutput; + saveKeyOutput = currentKeyOutput; + currentKeyOutput = temp; + + /* + * Multi-Key specific lookup key. + */ + + byte[] keyBytes = saveKeyOutput.getData(); + int keyLength = saveKeyOutput.getLength(); + saveJoinResult = + hashMultiSet.contains( + keyBytes, 0, keyLength, + hashMultiSetResults[hashMultiSetResultCount]); + + /* + * Common outer join result processing. + */ + + switch (saveJoinResult) { + case MATCH: + equalKeySeriesValueCounts[equalKeySeriesCount] = hashMultiSetResults[hashMultiSetResultCount].count(); + equalKeySeriesAllMatchIndices[equalKeySeriesCount] = allMatchCount; + equalKeySeriesDuplicateCounts[equalKeySeriesCount] = 1; + allMatchs[allMatchCount++] = batchIndex; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH isSingleValue " + equalKeySeriesIsSingleValue[equalKeySeriesCount] + " currentKey " + currentKey); + break; + + case SPILL: + spills[spillCount] = batchIndex; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; + spillCount++; + break; + + case NOMATCH: + atLeastOneNonMatch = true; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH" + " currentKey " + currentKey); + break; + } + } else { + // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " Key Continues " + saveKey + " " + saveJoinResult.name()); + + // Series of equal keys. + + switch (saveJoinResult) { + case MATCH: + equalKeySeriesDuplicateCounts[equalKeySeriesCount]++; + allMatchs[allMatchCount++] = batchIndex; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH duplicate"); + break; + + case SPILL: + spills[spillCount] = batchIndex; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; + spillCount++; + break; + + case NOMATCH: + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH duplicate"); + break; + } + } + // if (!verifyMonotonicallyIncreasing(allMatchs, allMatchCount)) { + // throw new HiveException("allMatchs is not in sort order and unique"); + // } + } + } + + if (haveSaveKey) { + // Update our counts for the last key. + switch (saveJoinResult) { + case MATCH: + hashMultiSetResultCount++; + equalKeySeriesCount++; + break; + case SPILL: + hashMultiSetResultCount++; + break; + case NOMATCH: + break; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + + " allMatchs " + intArrayToRangesString(allMatchs,allMatchCount) + + " equalKeySeriesValueCounts " + longArrayToRangesString(equalKeySeriesValueCounts, equalKeySeriesCount) + + " equalKeySeriesAllMatchIndices " + intArrayToRangesString(equalKeySeriesAllMatchIndices, equalKeySeriesCount) + + " equalKeySeriesDuplicateCounts " + intArrayToRangesString(equalKeySeriesDuplicateCounts, equalKeySeriesCount) + + " atLeastOneNonMatch " + atLeastOneNonMatch + + " inputSelectedInUse " + inputSelectedInUse + + " inputLogicalSize " + inputLogicalSize + + " spills " + intArrayToRangesString(spills, spillCount) + + " spillHashMultiSetResultIndices " + intArrayToRangesString(spillHashMultiSetResultIndices, spillCount) + + " hashMultiSetResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); + } + + // We will generate results for all matching and non-matching rows. + finishOuterBigOnly(batch, + allMatchCount, equalKeySeriesCount, atLeastOneNonMatch, + inputSelectedInUse, inputLogicalSize, + spillCount, hashMultiSetResultCount); + } + + if (batch.size > 0) { + // Forward any remaining selected rows. + forwardBigTableBatch(batch); + } + + } catch (IOException e) { + throw new HiveException(e); + } catch (Exception e) { + throw new HiveException(e); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyStringOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyStringOperator.java new file mode 100644 index 0000000..925c1f4 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyStringOperator.java @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +import java.io.IOException; +import java.util.Arrays; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; +// Single-Column String hash table import. +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap; + +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMultiSet; +// Single-Column String specific imports. +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; + +/* + * Specialized class for doing a vectorized map join that is an outer join on a Single-Column String + * using a hash map. + */ +public class VectorMapJoinOuterBigOnlyStringOperator + extends VectorMapJoinOuterBigOnlyGenerateResultOperator { + + private static final long serialVersionUID = 1L; + + //------------------------------------------------------------------------------------------------ + + private static final String CLASS_NAME = VectorMapJoinOuterBigOnlyStringOperator.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + protected String getLoggingPrefix() { + return super.getLoggingPrefix(CLASS_NAME); + } + + //------------------------------------------------------------------------------------------------ + + // (none) + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + + // The hash map for this specialized class. + private transient VectorMapJoinBytesHashMultiSet hashMultiSet; + + //--------------------------------------------------------------------------- + // Single-Column String specific members. + // + + // The column number for this one column join specialization. + private transient int singleJoinColumn; + + //--------------------------------------------------------------------------- + // Pass-thru constructors. + // + + /** Kryo ctor. */ + protected VectorMapJoinOuterBigOnlyStringOperator() { + super(); + } + + public VectorMapJoinOuterBigOnlyStringOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorMapJoinOuterBigOnlyStringOperator(CompilationOpContext ctx, OperatorDesc conf, + VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException { + super(ctx, conf, vContext, vectorDesc); + } + + //--------------------------------------------------------------------------- + // Process Single-Column String Outer Join on a vectorized row batch. + // + + @Override + public void process(Object row, int tag) throws HiveException { + + try { + VectorizedRowBatch batch = (VectorizedRowBatch) row; + + alias = (byte) tag; + + if (needCommonSetup) { + // Our one time process method initialization. + commonSetup(batch); + + /* + * Initialize Single-Column String members for this specialized class. + */ + + singleJoinColumn = bigTableKeyColumnMap[0]; + + needCommonSetup = false; + } + + if (needHashTableSetup) { + // Setup our hash table specialization. It will be the first time the process + // method is called, or after a Hybrid Grace reload. + + /* + * Get our Single-Column String hash multi-set information for this specialized class. + */ + + hashMultiSet = (VectorMapJoinBytesHashMultiSet) vectorMapJoinHashTable; + + needHashTableSetup = false; + } + + batchCounter++; + + final int inputLogicalSize = batch.size; + + if (inputLogicalSize == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); + } + return; + } + + // Do the per-batch setup for an outer join. + + outerPerBatchSetup(batch); + + // For outer join, remember our input rows before ON expression filtering or before + // hash table matching so we can generate results for all rows (matching and non matching) + // later. + boolean inputSelectedInUse = batch.selectedInUse; + if (inputSelectedInUse) { + // if (!verifyMonotonicallyIncreasing(batch.selected, batch.size)) { + // throw new HiveException("batch.selected is not in sort order and unique"); + // } + System.arraycopy(batch.selected, 0, inputSelected, 0, inputLogicalSize); + } + + // Filtering for outer join just removes rows available for hash table matching. + boolean someRowsFilteredOut = false; + if (bigTableFilterExpressions.length > 0) { + // Since the input + for (VectorExpression ve : bigTableFilterExpressions) { + ve.evaluate(batch); + } + someRowsFilteredOut = (batch.size != inputLogicalSize); + if (LOG.isDebugEnabled()) { + if (batch.selectedInUse) { + if (inputSelectedInUse) { + LOG.debug(CLASS_NAME + + " inputSelected " + intArrayToRangesString(inputSelected, inputLogicalSize) + + " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } else { + LOG.debug(CLASS_NAME + + " inputLogicalSize " + inputLogicalSize + + " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } + } + } + } + + // Perform any key expressions. Results will go into scratch columns. + if (bigTableKeyExpressions != null) { + for (VectorExpression ve : bigTableKeyExpressions) { + ve.evaluate(batch); + } + } + + /* + * Single-Column String specific declarations. + */ + + // The one join column for this specialized class. + BytesColumnVector joinColVector = (BytesColumnVector) batch.cols[singleJoinColumn]; + byte[][] vector = joinColVector.vector; + int[] start = joinColVector.start; + int[] length = joinColVector.length; + + /* + * Single-Column String check for repeating. + */ + + // Check single column for repeating. + boolean allKeyInputColumnsRepeating = joinColVector.isRepeating; + + if (allKeyInputColumnsRepeating) { + + /* + * Repeating. + */ + + // All key input columns are repeating. Generate key once. Lookup once. + // Since the key is repeated, we must use entry 0 regardless of selectedInUse. + + /* + * Single-Column String specific repeated lookup. + */ + + JoinUtil.JoinResult joinResult; + if (batch.size == 0) { + // Whole repeated key batch was filtered out. + joinResult = JoinUtil.JoinResult.NOMATCH; + } else if (!joinColVector.noNulls && joinColVector.isNull[0]) { + // Any (repeated) null key column is no match for whole batch. + joinResult = JoinUtil.JoinResult.NOMATCH; + } else { + // Handle *repeated* join key, if found. + byte[] keyBytes = vector[0]; + int keyStart = start[0]; + int keyLength = length[0]; + joinResult = hashMultiSet.contains(keyBytes, keyStart, keyLength, hashMultiSetResults[0]); + } + + /* + * Common repeated join result processing. + */ + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); + } + finishOuterBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0], someRowsFilteredOut, + inputSelectedInUse, inputLogicalSize); + } else { + + /* + * NOT Repeating. + */ + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " non-repeated"); + } + + int selected[] = batch.selected; + boolean selectedInUse = batch.selectedInUse; + + int hashMultiSetResultCount = 0; + int allMatchCount = 0; + int equalKeySeriesCount = 0; + int spillCount = 0; + + boolean atLeastOneNonMatch = someRowsFilteredOut; + + /* + * Single-Column String specific variables. + */ + + int saveKeyBatchIndex = -1; + + // We optimize performance by only looking up the first key in a series of equal keys. + boolean haveSaveKey = false; + JoinUtil.JoinResult saveJoinResult = JoinUtil.JoinResult.NOMATCH; + + // Logical loop over the rows in the batch since the batch may have selected in use. + for (int logical = 0; logical < batch.size; logical++) { + int batchIndex = (selectedInUse ? selected[logical] : logical); + + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, taskName + ", " + getOperatorId() + " candidate " + CLASS_NAME + " batch"); + + /* + * Single-Column String outer null detection. + */ + + boolean isNull = !joinColVector.noNulls && joinColVector.isNull[batchIndex]; + + if (isNull) { + + // Have that the NULL does not interfere with the current equal key series, if there + // is one. We do not set saveJoinResult. + // + // Let a current MATCH equal key series keep going, or + // Let a current SPILL equal key series keep going, or + // Let a current NOMATCH keep not matching. + + atLeastOneNonMatch = true; + + // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " NULL"); + } else { + + /* + * Single-Column String outer get key. + */ + + // Implicit -- use batchIndex. + + /* + * Equal key series checking. + */ + + if (!haveSaveKey || + StringExpr.equal(vector[saveKeyBatchIndex], start[saveKeyBatchIndex], length[saveKeyBatchIndex], + vector[batchIndex], start[batchIndex], length[batchIndex]) == false) { + // New key. + + if (haveSaveKey) { + // Move on with our counts. + switch (saveJoinResult) { + case MATCH: + hashMultiSetResultCount++; + equalKeySeriesCount++; + break; + case SPILL: + hashMultiSetResultCount++; + break; + case NOMATCH: + break; + } + } + + // Regardless of our matching result, we keep that information to make multiple use + // of it for a possible series of equal keys. + haveSaveKey = true; + + /* + * Single-Column String specific save key. + */ + + saveKeyBatchIndex = batchIndex; + + /* + * Single-Column Long specific lookup key. + */ + + byte[] keyBytes = vector[batchIndex]; + int keyStart = start[batchIndex]; + int keyLength = length[batchIndex]; + saveJoinResult = + hashMultiSet.contains( + keyBytes, keyStart, keyLength, + hashMultiSetResults[hashMultiSetResultCount]); + + /* + * Common outer join result processing. + */ + + switch (saveJoinResult) { + case MATCH: + equalKeySeriesValueCounts[equalKeySeriesCount] = hashMultiSetResults[hashMultiSetResultCount].count(); + equalKeySeriesAllMatchIndices[equalKeySeriesCount] = allMatchCount; + equalKeySeriesDuplicateCounts[equalKeySeriesCount] = 1; + allMatchs[allMatchCount++] = batchIndex; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH isSingleValue " + equalKeySeriesIsSingleValue[equalKeySeriesCount] + " currentKey " + currentKey); + break; + + case SPILL: + spills[spillCount] = batchIndex; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; + spillCount++; + break; + + case NOMATCH: + atLeastOneNonMatch = true; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH" + " currentKey " + currentKey); + break; + } + } else { + // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " Key Continues " + saveKey + " " + saveJoinResult.name()); + + // Series of equal keys. + + switch (saveJoinResult) { + case MATCH: + equalKeySeriesDuplicateCounts[equalKeySeriesCount]++; + allMatchs[allMatchCount++] = batchIndex; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH duplicate"); + break; + + case SPILL: + spills[spillCount] = batchIndex; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; + spillCount++; + break; + + case NOMATCH: + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH duplicate"); + break; + } + } + // if (!verifyMonotonicallyIncreasing(allMatchs, allMatchCount)) { + // throw new HiveException("allMatchs is not in sort order and unique"); + // } + } + } + + if (haveSaveKey) { + // Update our counts for the last key. + switch (saveJoinResult) { + case MATCH: + hashMultiSetResultCount++; + equalKeySeriesCount++; + break; + case SPILL: + hashMultiSetResultCount++; + break; + case NOMATCH: + break; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + + " allMatchs " + intArrayToRangesString(allMatchs,allMatchCount) + + " equalKeySeriesValueCounts " + longArrayToRangesString(equalKeySeriesValueCounts, equalKeySeriesCount) + + " equalKeySeriesAllMatchIndices " + intArrayToRangesString(equalKeySeriesAllMatchIndices, equalKeySeriesCount) + + " equalKeySeriesDuplicateCounts " + intArrayToRangesString(equalKeySeriesDuplicateCounts, equalKeySeriesCount) + + " atLeastOneNonMatch " + atLeastOneNonMatch + + " inputSelectedInUse " + inputSelectedInUse + + " inputLogicalSize " + inputLogicalSize + + " spills " + intArrayToRangesString(spills, spillCount) + + " spillHashMultiSetResultIndices " + intArrayToRangesString(spillHashMultiSetResultIndices, spillCount) + + " hashMultiSetResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); + } + + // We will generate results for all matching and non-matching rows. + finishOuterBigOnly(batch, + allMatchCount, equalKeySeriesCount, atLeastOneNonMatch, + inputSelectedInUse, inputLogicalSize, + spillCount, hashMultiSetResultCount); + } + + if (batch.size > 0) { + // Forward any remaining selected rows. + forwardBigTableBatch(batch); + } + + } catch (IOException e) { + throw new HiveException(e); + } catch (Exception e) { + throw new HiveException(e); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java index 2e5c568..c73ea8a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java @@ -186,226 +186,6 @@ protected void outerPerBatchSetup(VectorizedRowBatch batch) { } /** - * Apply the value expression to rows in the (original) input selected array. - * - * @param batch - * The vectorized row batch. - * @param inputSelectedInUse - * Whether the (original) input batch is selectedInUse. - * @param inputLogicalSize - * The (original) input batch size. - */ - private void doValueExprOnInputSelected(VectorizedRowBatch batch, - boolean inputSelectedInUse, int inputLogicalSize) throws HiveException { - - int saveBatchSize = batch.size; - int[] saveSelected = batch.selected; - boolean saveSelectedInUse = batch.selectedInUse; - - batch.size = inputLogicalSize; - batch.selected = inputSelected; - batch.selectedInUse = inputSelectedInUse; - - if (bigTableValueExpressions != null) { - for(VectorExpression ve: bigTableValueExpressions) { - ve.evaluate(batch); - } - } - - batch.size = saveBatchSize; - batch.selected = saveSelected; - batch.selectedInUse = saveSelectedInUse; - } - - /** - * Apply the value expression to rows specified by a selected array. - * - * @param batch - * The vectorized row batch. - * @param selected - * The (physical) batch indices to apply the expression to. - * @param size - * The size of selected. - */ - private void doValueExpr(VectorizedRowBatch batch, - int[] selected, int size) throws HiveException { - - int saveBatchSize = batch.size; - int[] saveSelected = batch.selected; - boolean saveSelectedInUse = batch.selectedInUse; - - batch.size = size; - batch.selected = selected; - batch.selectedInUse = true; - - if (bigTableValueExpressions != null) { - for(VectorExpression ve: bigTableValueExpressions) { - ve.evaluate(batch); - } - } - - batch.size = saveBatchSize; - batch.selected = saveSelected; - batch.selectedInUse = saveSelectedInUse; - } - - /** - * Remove (subtract) members from the input selected array and produce the results into - * a difference array. - * - * @param inputSelectedInUse - * Whether the (original) input batch is selectedInUse. - * @param inputLogicalSize - * The (original) input batch size. - * @param remove - * The indices to remove. They must all be present in input selected array. - * @param removeSize - * The size of remove. - * @param difference - * The resulting difference -- the input selected array indices not in the - * remove array. - * @return - * The resulting size of the difference array. - * @throws HiveException - */ - private int subtractFromInputSelected(boolean inputSelectedInUse, int inputLogicalSize, - int[] remove, int removeSize, int[] difference) throws HiveException { - - // if (!verifyMonotonicallyIncreasing(remove, removeSize)) { - // throw new HiveException("remove is not in sort order and unique"); - // } - - int differenceCount = 0; - - // Determine which rows are left. - int removeIndex = 0; - if (inputSelectedInUse) { - for (int i = 0; i < inputLogicalSize; i++) { - int candidateIndex = inputSelected[i]; - if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) { - removeIndex++; - } else { - difference[differenceCount++] = candidateIndex; - } - } - } else { - for (int candidateIndex = 0; candidateIndex < inputLogicalSize; candidateIndex++) { - if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) { - removeIndex++; - } else { - difference[differenceCount++] = candidateIndex; - } - } - } - - if (removeIndex != removeSize) { - throw new HiveException("Not all batch indices removed"); - } - - // if (!verifyMonotonicallyIncreasing(difference, differenceCount)) { - // throw new HiveException("difference is not in sort order and unique"); - // } - - return differenceCount; - } - - /** - * Remove (subtract) members from an array and produce the results into - * a difference array. - - * @param all - * The selected array containing all members. - * @param allSize - * The size of all. - * @param remove - * The indices to remove. They must all be present in input selected array. - * @param removeSize - * The size of remove. - * @param difference - * The resulting difference -- the all array indices not in the - * remove array. - * @return - * The resulting size of the difference array. - * @throws HiveException - */ - private int subtract(int[] all, int allSize, - int[] remove, int removeSize, int[] difference) throws HiveException { - - // if (!verifyMonotonicallyIncreasing(remove, removeSize)) { - // throw new HiveException("remove is not in sort order and unique"); - // } - - int differenceCount = 0; - - // Determine which rows are left. - int removeIndex = 0; - for (int i = 0; i < allSize; i++) { - int candidateIndex = all[i]; - if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) { - removeIndex++; - } else { - difference[differenceCount++] = candidateIndex; - } - } - - if (removeIndex != removeSize) { - throw new HiveException("Not all batch indices removed"); - } - - return differenceCount; - } - - /** - * Sort merge two select arrays so the resulting array is ordered by (batch) index. - * - * @param selected1 - * @param selected1Count - * @param selected2 - * @param selected2Count - * @param sortMerged - * The resulting sort merge of selected1 and selected2. - * @return - * The resulting size of the sortMerged array. - * @throws HiveException - */ - private int sortMerge(int[] selected1, int selected1Count, - int[] selected2, int selected2Count, int[] sortMerged) throws HiveException { - - // if (!verifyMonotonicallyIncreasing(selected1, selected1Count)) { - // throw new HiveException("selected1 is not in sort order and unique"); - // } - - // if (!verifyMonotonicallyIncreasing(selected2, selected2Count)) { - // throw new HiveException("selected1 is not in sort order and unique"); - // } - - - int sortMergeCount = 0; - - int selected1Index = 0; - int selected2Index = 0; - for (int i = 0; i < selected1Count + selected2Count; i++) { - if (selected1Index < selected1Count && selected2Index < selected2Count) { - if (selected1[selected1Index] < selected2[selected2Index]) { - sortMerged[sortMergeCount++] = selected1[selected1Index++]; - } else { - sortMerged[sortMergeCount++] = selected2[selected2Index++]; - } - } else if (selected1Index < selected1Count) { - sortMerged[sortMergeCount++] = selected1[selected1Index++]; - } else { - sortMerged[sortMergeCount++] = selected2[selected2Index++]; - } - } - - // if (!verifyMonotonicallyIncreasing(sortMerged, sortMergeCount)) { - // throw new HiveException("sortMerged is not in sort order and unique"); - // } - - return sortMergeCount; - } - - /** * Generate the outer join output results for one vectorized row batch. * * @param batch @@ -433,7 +213,7 @@ public void finishOuter(VectorizedRowBatch batch, // Get rid of spills before we start modifying the batch. if (spillCount > 0) { - spillHashMapBatch(batch, (VectorMapJoinHashTableResult[]) hashMapResults, + spillHashTableBatch(batch, (VectorMapJoinHashTableResult[]) hashMapResults, spills, spillHashMapResultIndices, spillCount); } @@ -442,7 +222,8 @@ public void finishOuter(VectorizedRowBatch batch, // Subtract the spills to get all match and non-match rows. int nonSpillCount = subtractFromInputSelected( - inputSelectedInUse, inputLogicalSize, spills, spillCount, nonSpills); + inputSelectedInUse, inputLogicalSize, inputSelected, + spills, spillCount, nonSpills); if (LOG.isDebugEnabled()) { LOG.debug("finishOuter spillCount > 0" + @@ -469,11 +250,12 @@ public void finishOuter(VectorizedRowBatch batch, } else { // Run value expressions over original (whole) input batch. - doValueExprOnInputSelected(batch, inputSelectedInUse, inputLogicalSize); + doValueExprOnInputSelected(batch, inputSelectedInUse, inputLogicalSize, inputSelected); if (atLeastOneNonMatch) { noMatchCount = subtractFromInputSelected( - inputSelectedInUse, inputLogicalSize, allMatchs, allMatchCount, noMatchs); + inputSelectedInUse, inputLogicalSize, inputSelected, + allMatchs, allMatchCount, noMatchs); if (LOG.isDebugEnabled()) { LOG.debug("finishOuter spillCount == 0" + @@ -553,44 +335,6 @@ public void finishOuter(VectorizedRowBatch batch, } } - /** - * Generate the non matching outer join output results for one vectorized row batch. - * - * For each non matching row specified by parameter, generate nulls for the small table results. - * - * @param batch - * The big table batch with any matching and any non matching rows both as - * selected in use. - * @param noMatchs - * A subset of the rows of the batch that are non matches. - * @param noMatchSize - * Number of non matches in noMatchs. - */ - protected void generateOuterNulls(VectorizedRowBatch batch, int[] noMatchs, - int noMatchSize) throws IOException, HiveException { - - // Set null information in the small table results area. - - for (int i = 0; i < noMatchSize; i++) { - int batchIndex = noMatchs[i]; - - // Mark any scratch small table scratch columns that would normally receive a copy of the - // key as null, too. - for (int column : bigTableOuterKeyOutputVectorColumns) { - ColumnVector colVector = batch.cols[column]; - colVector.noNulls = false; - colVector.isNull[batchIndex] = true; - } - - // Small table values are set to null. - for (int column : smallTableOutputVectorColumns) { - ColumnVector colVector = batch.cols[column]; - colVector.noNulls = false; - colVector.isNull[batchIndex] = true; - } - } - } - /** * Generate the outer join output results for one vectorized row batch with a repeated key. * @@ -601,8 +345,8 @@ protected void generateOuterNulls(VectorizedRowBatch batch, int[] noMatchs, * selected in use. * @param joinResult * The hash map lookup result for the repeated key. - * @param hashMapResults - * The array of all hash map results for the batch. + * @param hashMapResult + * The repeated hash map result for the batch. * @param someRowsFilteredOut * Whether some rows of the repeated key batch were knocked out by the filter. * @param inputSelectedInUse @@ -633,7 +377,7 @@ public void finishOuterRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult jo // we need to generate no match results for those too... // Run value expressions over original (whole) input batch. - doValueExprOnInputSelected(batch, inputSelectedInUse, inputLogicalSize); + doValueExprOnInputSelected(batch, inputSelectedInUse, inputLogicalSize, inputSelected); // Now calculate which rows were filtered out (they are logically no matches). @@ -641,7 +385,8 @@ public void finishOuterRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult jo // (current) batch selected. int noMatchCount = subtractFromInputSelected( - inputSelectedInUse, inputLogicalSize, batch.selected, batch.size, noMatchs); + inputSelectedInUse, inputLogicalSize, inputSelected, + batch.selected, batch.size, noMatchs); generateOuterNulls(batch, noMatchs, noMatchCount); @@ -685,7 +430,8 @@ public void finishOuterRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult jo // (current) batch selected. int noMatchCount = subtractFromInputSelected( - inputSelectedInUse, inputLogicalSize, batch.selected, batch.size, noMatchs); + inputSelectedInUse, inputLogicalSize, inputSelected, + batch.selected, batch.size, noMatchs); System.arraycopy(noMatchs, 0, batch.selected, 0, noMatchCount); batch.size = noMatchCount; @@ -722,32 +468,4 @@ public void finishOuterRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult jo break; } } - - /** - * Generate the non-match outer join output results for the whole repeating vectorized - * row batch. - * - * Each row will get nulls for all small table values. - * - * @param batch - * The big table batch. - */ - protected void generateOuterNullsRepeatedAll(VectorizedRowBatch batch) throws HiveException { - - for (int column : smallTableOutputVectorColumns) { - ColumnVector colVector = batch.cols[column]; - colVector.noNulls = false; - colVector.isNull[0] = true; - colVector.isRepeating = true; - } - - // Mark any scratch small table scratch columns that would normally receive a copy of the key - // as null, too. - for (int column : bigTableOuterKeyOutputVectorColumns) { - ColumnVector colVector = batch.cols[column]; - colVector.noNulls = false; - colVector.isNull[0] = true; - colVector.isRepeating = true; - } - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 9bb104d..0d090a0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -73,6 +73,9 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterLongOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterMultiKeyOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOperator; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterBigOnlyLongOperator; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterBigOnlyMultiKeyOperator; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterBigOnlyStringOperator; import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFOperator; import org.apache.hadoop.hive.ql.exec.vector.udf.VectorUDFAdaptor; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; @@ -3251,6 +3254,13 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { isInnerBigOnly = true; } + boolean isOuterBigOnly = false; + if ((joinType == JoinDesc.LEFT_OUTER_JOIN || + joinType == JoinDesc.RIGHT_OUTER_JOIN) && + isBigTableOnlyResults(desc)) { + isOuterBigOnly = true; + } + // By default, we can always use the multi-key class. hashTableKeyType = HashTableKeyType.MULTI_KEY; @@ -3303,8 +3313,13 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { break; case JoinDesc.LEFT_OUTER_JOIN: case JoinDesc.RIGHT_OUTER_JOIN: - vectorMapJoinVariation = VectorMapJoinVariation.OUTER; - hashTableKind = HashTableKind.HASH_MAP; + if (!isOuterBigOnly) { + vectorMapJoinVariation = VectorMapJoinVariation.OUTER; + hashTableKind = HashTableKind.HASH_MAP; + } else { + vectorMapJoinVariation = VectorMapJoinVariation.OUTER_BIG_ONLY; + hashTableKind = HashTableKind.HASH_MULTISET; + } break; case JoinDesc.LEFT_SEMI_JOIN: vectorMapJoinVariation = VectorMapJoinVariation.LEFT_SEMI; @@ -3335,6 +3350,9 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { case OUTER: opClass = VectorMapJoinOuterLongOperator.class; break; + case OUTER_BIG_ONLY: + opClass = VectorMapJoinOuterBigOnlyLongOperator.class; + break; default: throw new HiveException("Unknown operator variation " + vectorMapJoinVariation); } @@ -3353,6 +3371,9 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { case OUTER: opClass = VectorMapJoinOuterStringOperator.class; break; + case OUTER_BIG_ONLY: + opClass = VectorMapJoinOuterBigOnlyStringOperator.class; + break; default: throw new HiveException("Unknown operator variation " + vectorMapJoinVariation); } @@ -3371,6 +3392,9 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { case OUTER: opClass = VectorMapJoinOuterMultiKeyOperator.class; break; + case OUTER_BIG_ONLY: + opClass = VectorMapJoinOuterBigOnlyMultiKeyOperator.class; + break; default: throw new HiveException("Unknown operator variation " + vectorMapJoinVariation); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index dc4f085..dac5330 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -543,7 +543,9 @@ public String getProjectedOutputColumnNums() { @Explain(vectorization = Vectorization.DETAIL, displayName = "bigTableOuterKeyMapping", explainLevels = { Level.DEFAULT, Level.EXTENDED }) public List getBigTableOuterKey() { - if (!isNative || vectorMapJoinDesc.getVectorMapJoinVariation() != VectorMapJoinVariation.OUTER) { + if (!isNative || + (vectorMapJoinDesc.getVectorMapJoinVariation() != VectorMapJoinVariation.OUTER && + vectorMapJoinDesc.getVectorMapJoinVariation() != VectorMapJoinVariation.OUTER_BIG_ONLY)) { return null; } return columnMappingToStringList(vectorMapJoinInfo.getBigTableOuterKeyMapping()); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/VectorMapJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/VectorMapJoinDesc.java index 58032ca..4f404d9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/VectorMapJoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/VectorMapJoinDesc.java @@ -89,6 +89,7 @@ public PrimitiveTypeInfo getPrimitiveTypeInfo() { INNER_BIG_ONLY, INNER, LEFT_SEMI, + OUTER_BIG_ONLY, OUTER } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java index 0514e3f..c635066 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java @@ -113,7 +113,9 @@ public static MapJoinDesc createMapJoinDesc(MapJoinTestDescription testDesc) { Byte[] order = new Byte[] {(byte) 0, (byte) 1}; mapJoinDesc.setTagOrder(order); - mapJoinDesc.setNoOuterJoin(testDesc.vectorMapJoinVariation != VectorMapJoinVariation.OUTER); + mapJoinDesc.setNoOuterJoin( + testDesc.vectorMapJoinVariation != VectorMapJoinVariation.OUTER && + testDesc.vectorMapJoinVariation != VectorMapJoinVariation.OUTER_BIG_ONLY); Map> filterMap = new HashMap>(); filterMap.put((byte) 0, new ArrayList()); // None. @@ -139,6 +141,7 @@ public static MapJoinDesc createMapJoinDesc(MapJoinTestDescription testDesc) { joinDescType = JoinDesc.LEFT_SEMI_JOIN; break; case OUTER: + case OUTER_BIG_ONLY: joinDescType = JoinDesc.LEFT_OUTER_JOIN; break; default: @@ -182,6 +185,9 @@ public static VectorMapJoinDesc createVectorMapJoinDesc(MapJoinTestDescription t case OUTER: hashTableKind = HashTableKind.HASH_MAP; break; + case OUTER_BIG_ONLY: + hashTableKind = HashTableKind.HASH_MULTISET; + break; default: throw new RuntimeException("unknown operator variation " + testDesc.vectorMapJoinVariation); } @@ -306,6 +312,11 @@ public static VectorMapJoinCommonOperator createNativeVectorMapJoinOperator( new VectorMapJoinOuterLongOperator(new CompilationOpContext(), mapJoinDesc, vContext, vectorDesc); break; + case OUTER_BIG_ONLY: + operator = + new VectorMapJoinOuterBigOnlyLongOperator(new CompilationOpContext(), + mapJoinDesc, vContext, vectorDesc); + break; default: throw new RuntimeException("unknown operator variation " + VectorMapJoinVariation); } @@ -332,6 +343,11 @@ public static VectorMapJoinCommonOperator createNativeVectorMapJoinOperator( new VectorMapJoinOuterStringOperator(new CompilationOpContext(), mapJoinDesc, vContext, vectorDesc); break; + case OUTER_BIG_ONLY: + operator = + new VectorMapJoinOuterBigOnlyStringOperator(new CompilationOpContext(), + mapJoinDesc, vContext, vectorDesc); + break; default: throw new RuntimeException("unknown operator variation " + VectorMapJoinVariation); } @@ -358,6 +374,11 @@ public static VectorMapJoinCommonOperator createNativeVectorMapJoinOperator( new VectorMapJoinOuterMultiKeyOperator(new CompilationOpContext(), mapJoinDesc, vContext, vectorDesc); break; + case OUTER_BIG_ONLY: + operator = + new VectorMapJoinOuterBigOnlyMultiKeyOperator(new CompilationOpContext(), + mapJoinDesc, vContext, vectorDesc); + break; default: throw new RuntimeException("unknown operator variation " + VectorMapJoinVariation); } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestData.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestData.java index d763695..24c9c87 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestData.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestData.java @@ -215,6 +215,7 @@ public static void generateVariationData(MapJoinTestData testData, switch (testDesc.vectorMapJoinVariation) { case INNER_BIG_ONLY: case LEFT_SEMI: + case OUTER_BIG_ONLY: testData.generateRandomSmallTableCounts(testDesc, random); break; case INNER: diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestDescription.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestDescription.java index bde4424..eeac65e 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestDescription.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestDescription.java @@ -138,6 +138,7 @@ public MapJoinTestDescription ( switch (vectorMapJoinVariation) { case INNER_BIG_ONLY: case LEFT_SEMI: + case OUTER_BIG_ONLY: trimAwaySmallTableValueInfo(); break; case INNER: diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinOperator.java index 4c41f9c..7d433cb 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinOperator.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinOperator.java @@ -190,7 +190,8 @@ public String getName() { public void testLong() throws Exception { for (KeyConfig longKeyConfig : longKeyConfigs) { for (VectorMapJoinVariation vectorMapJoinVariation : VectorMapJoinVariation.values()) { - if (vectorMapJoinVariation == VectorMapJoinVariation.NONE){ + if (vectorMapJoinVariation == VectorMapJoinVariation.NONE || + vectorMapJoinVariation == VectorMapJoinVariation.OUTER_BIG_ONLY){ continue; } doTestLong(longKeyConfig.seed, longKeyConfig.primitiveTypeInfo, vectorMapJoinVariation); @@ -201,7 +202,7 @@ public void testLong() throws Exception { public void doTestLong(long seed, TypeInfo numberTypeInfo, VectorMapJoinVariation vectorMapJoinVariation) throws Exception { - int rowCount = 10000; + int rowCount = 100000; HiveConf hiveConf = new HiveConf(); @@ -245,7 +246,8 @@ public void doTestLong(long seed, TypeInfo numberTypeInfo, public void testMultiKey() throws Exception { long seed = 87543; for (VectorMapJoinVariation vectorMapJoinVariation : VectorMapJoinVariation.values()) { - if (vectorMapJoinVariation == VectorMapJoinVariation.NONE){ + if (vectorMapJoinVariation == VectorMapJoinVariation.NONE || + vectorMapJoinVariation == VectorMapJoinVariation.OUTER_BIG_ONLY){ continue; } doTestMultiKey(seed, vectorMapJoinVariation); @@ -254,7 +256,7 @@ public void testMultiKey() throws Exception { public void doTestMultiKey(long seed, VectorMapJoinVariation vectorMapJoinVariation) throws Exception { - int rowCount = 10000; + int rowCount = 100000; HiveConf hiveConf = new HiveConf(); @@ -300,7 +302,8 @@ public void doTestMultiKey(long seed, VectorMapJoinVariation vectorMapJoinVariat public void testString() throws Exception { long seed = 87543; for (VectorMapJoinVariation vectorMapJoinVariation : VectorMapJoinVariation.values()) { - if (vectorMapJoinVariation == VectorMapJoinVariation.NONE){ + if (vectorMapJoinVariation == VectorMapJoinVariation.NONE || + vectorMapJoinVariation == VectorMapJoinVariation.OUTER_BIG_ONLY){ continue; } doTestString(seed, vectorMapJoinVariation); @@ -309,7 +312,7 @@ public void testString() throws Exception { public void doTestString(long seed, VectorMapJoinVariation vectorMapJoinVariation) throws Exception { - int rowCount = 10000; + int rowCount = 100000; HiveConf hiveConf = new HiveConf(); @@ -432,6 +435,7 @@ private RowTestObjectsMultiSet createExpectedTestRowMultiSet(MapJoinTestDescript } break; case INNER_BIG_ONLY: + case OUTER_BIG_ONLY: { // Value count rows. final int valueCount = testData.smallTableValueCounts.get(smallTableKeyIndex); @@ -460,7 +464,8 @@ private RowTestObjectsMultiSet createExpectedTestRowMultiSet(MapJoinTestDescript // No match. - if (testDesc.vectorMapJoinVariation == VectorMapJoinVariation.OUTER) { + if (testDesc.vectorMapJoinVariation == VectorMapJoinVariation.OUTER || + testDesc.vectorMapJoinVariation == VectorMapJoinVariation.OUTER_BIG_ONLY) { // We need to add a non-match row with nulls for small table values. @@ -483,12 +488,12 @@ private RowTestObjectsMultiSet createExpectedTestRowMultiSet(MapJoinTestDescript return expectedTestRowMultiSet; } - private void executeTest(MapJoinTestDescription testDesc, MapJoinTestData testData) throws Exception { + private void executeTest(MapJoinTestDescription testDesc, MapJoinTestData testData) + throws Exception { RowTestObjectsMultiSet expectedTestRowMultiSet = createExpectedTestRowMultiSet(testDesc, testData); - // UNDONE: Inner count System.out.println("*BENCHMARK* expectedTestRowMultiSet rowCount " + expectedTestRowMultiSet.getRowCount() + " totalCount " + expectedTestRowMultiSet.getTotalCount()); @@ -507,7 +512,8 @@ private boolean isVectorOutput(MapJoinTestImplementation mapJoinImplementation) private void executeTestImplementation( MapJoinTestImplementation mapJoinImplementation, - MapJoinTestDescription testDesc, MapJoinTestData testData, RowTestObjectsMultiSet expectedTestRowMultiSet) + MapJoinTestDescription testDesc, MapJoinTestData testData, + RowTestObjectsMultiSet expectedTestRowMultiSet) throws Exception { System.out.println("*BENCHMARK* Starting " + mapJoinImplementation + " test"); @@ -540,10 +546,13 @@ private void executeTestImplementation( ((CountCollectorTestOperator) testCollectorOperator).getRowCount()); // Verify the output! + VectorMapJoinVariation vectorMapJoinVariation = testDesc.vectorMapJoinVariation; if (!expectedTestRowMultiSet.verify(outputTestRowMultiSet)) { - System.out.println("*BENCHMARK* verify failed for " + mapJoinImplementation); + System.out.println("*BENCHMARK* verify failed for " + + vectorMapJoinVariation + " " + mapJoinImplementation); } else { - System.out.println("*BENCHMARK* verify succeeded for " + mapJoinImplementation); + System.out.println("*BENCHMARK* verify succeeded for " + + vectorMapJoinVariation + " " + mapJoinImplementation); } } } \ No newline at end of file diff --git ql/src/test/queries/clientpositive/vector_left_outer_join.q ql/src/test/queries/clientpositive/vector_left_outer_join.q index f980fd8..e2441c1 100644 --- ql/src/test/queries/clientpositive/vector_left_outer_join.q +++ ql/src/test/queries/clientpositive/vector_left_outer_join.q @@ -6,7 +6,7 @@ set hive.auto.convert.join=true; set hive.mapjoin.hybridgrace.hashtable=false; set hive.fetch.task.conversion=none; -explain vectorization +explain vectorization detail select count(*) from (select c.ctinyint from alltypesorc c left outer join alltypesorc cd diff --git ql/src/test/results/clientpositive/llap/vector_join30.q.out ql/src/test/results/clientpositive/llap/vector_join30.q.out index 4b2f06f..f69200a 100644 --- ql/src/test/results/clientpositive/llap/vector_join30.q.out +++ ql/src/test/results/clientpositive/llap/vector_join30.q.out @@ -487,7 +487,7 @@ STAGE PLANS: 0 _col0 (type: string) 1 _col0 (type: string) Map Join Vectorization: - className: VectorMapJoinOuterStringOperator + className: VectorMapJoinOuterBigOnlyStringOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true outputColumnNames: _col2, _col3 diff --git ql/src/test/results/clientpositive/llap/vector_left_outer_join.q.out ql/src/test/results/clientpositive/llap/vector_left_outer_join.q.out index 3d78cfe..b6b1ef2 100644 --- ql/src/test/results/clientpositive/llap/vector_left_outer_join.q.out +++ ql/src/test/results/clientpositive/llap/vector_left_outer_join.q.out @@ -1,4 +1,4 @@ -PREHOOK: query: explain vectorization +PREHOOK: query: explain vectorization detail select count(*) from (select c.ctinyint from alltypesorc c left outer join alltypesorc cd @@ -7,7 +7,7 @@ left outer join alltypesorc hd on hd.ctinyint = c.ctinyint ) t1 PREHOOK: type: QUERY -POSTHOOK: query: explain vectorization +POSTHOOK: query: explain vectorization detail select count(*) from (select c.ctinyint from alltypesorc c left outer join alltypesorc cd @@ -38,9 +38,16 @@ STAGE PLANS: TableScan alias: c Statistics: Num rows: 12288 Data size: 73392 Basic stats: COMPLETE Column stats: COMPLETE + TableScan Vectorization: + native: true + vectorizationSchemaColumns: [0:ctinyint:tinyint, 1:csmallint:smallint, 2:cint:int, 3:cbigint:bigint, 4:cfloat:float, 5:cdouble:double, 6:cstring1:string, 7:cstring2:string, 8:ctimestamp1:timestamp, 9:ctimestamp2:timestamp, 10:cboolean1:boolean, 11:cboolean2:boolean, 12:ROW__ID:struct] Select Operator expressions: ctinyint (type: tinyint), cint (type: int) outputColumnNames: _col0, _col1 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [0, 2] Statistics: Num rows: 12288 Data size: 73392 Basic stats: COMPLETE Column stats: COMPLETE Map Join Operator condition map: @@ -48,6 +55,14 @@ STAGE PLANS: keys: 0 _col1 (type: int) 1 _col0 (type: int) + Map Join Vectorization: + bigTableKeyColumnNums: [2] + bigTableRetainedColumnNums: [0] + bigTableValueColumnNums: [0] + className: VectorMapJoinOuterBigOnlyLongOperator + native: true + nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true + projectedOutputColumnNums: [0] outputColumnNames: _col0 input vertices: 1 Map 3 @@ -58,16 +73,34 @@ STAGE PLANS: keys: 0 _col0 (type: tinyint) 1 _col0 (type: tinyint) + Map Join Vectorization: + bigTableKeyColumnNums: [0] + className: VectorMapJoinOuterBigOnlyLongOperator + native: true + nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true input vertices: 1 Map 4 Statistics: Num rows: 2338217 Data size: 18705736 Basic stats: COMPLETE Column stats: COMPLETE Group By Operator aggregations: count() + Group By Vectorization: + aggregators: VectorUDAFCountStar(*) -> bigint + className: VectorGroupByOperator + groupByMode: HASH + native: false + vectorProcessingMode: HASH + projectedOutputColumnNums: [0] mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator sort order: + Reduce Sink Vectorization: + className: VectorReduceSinkEmptyKeyOperator + keyColumnNums: [] + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + valueColumnNums: [0] Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE value expressions: _col0 (type: bigint) Execution mode: vectorized, llap @@ -81,19 +114,38 @@ STAGE PLANS: allNative: false usesVectorUDFAdaptor: false vectorized: true + rowBatchContext: + dataColumnCount: 12 + includeColumns: [0, 2] + dataColumns: ctinyint:tinyint, csmallint:smallint, cint:int, cbigint:bigint, cfloat:float, cdouble:double, cstring1:string, cstring2:string, ctimestamp1:timestamp, ctimestamp2:timestamp, cboolean1:boolean, cboolean2:boolean + partitionColumnCount: 0 + scratchColumnTypeNames: [] Map 3 Map Operator Tree: TableScan alias: cd Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE + TableScan Vectorization: + native: true + vectorizationSchemaColumns: [0:ctinyint:tinyint, 1:csmallint:smallint, 2:cint:int, 3:cbigint:bigint, 4:cfloat:float, 5:cdouble:double, 6:cstring1:string, 7:cstring2:string, 8:ctimestamp1:timestamp, 9:ctimestamp2:timestamp, 10:cboolean1:boolean, 11:cboolean2:boolean, 12:ROW__ID:struct] Select Operator expressions: cint (type: int) outputColumnNames: _col0 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [2] Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col0 (type: int) sort order: + Map-reduce partition columns: _col0 (type: int) + Reduce Sink Vectorization: + className: VectorReduceSinkLongOperator + keyColumnNums: [2] + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + valueColumnNums: [] Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE Execution mode: vectorized, llap LLAP IO: all inputs @@ -106,19 +158,38 @@ STAGE PLANS: allNative: true usesVectorUDFAdaptor: false vectorized: true + rowBatchContext: + dataColumnCount: 12 + includeColumns: [2] + dataColumns: ctinyint:tinyint, csmallint:smallint, cint:int, cbigint:bigint, cfloat:float, cdouble:double, cstring1:string, cstring2:string, ctimestamp1:timestamp, ctimestamp2:timestamp, cboolean1:boolean, cboolean2:boolean + partitionColumnCount: 0 + scratchColumnTypeNames: [] Map 4 Map Operator Tree: TableScan alias: hd Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE + TableScan Vectorization: + native: true + vectorizationSchemaColumns: [0:ctinyint:tinyint, 1:csmallint:smallint, 2:cint:int, 3:cbigint:bigint, 4:cfloat:float, 5:cdouble:double, 6:cstring1:string, 7:cstring2:string, 8:ctimestamp1:timestamp, 9:ctimestamp2:timestamp, 10:cboolean1:boolean, 11:cboolean2:boolean, 12:ROW__ID:struct] Select Operator expressions: ctinyint (type: tinyint) outputColumnNames: _col0 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [0] Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE Reduce Output Operator key expressions: _col0 (type: tinyint) sort order: + Map-reduce partition columns: _col0 (type: tinyint) + Reduce Sink Vectorization: + className: VectorReduceSinkLongOperator + keyColumnNums: [0] + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + valueColumnNums: [] Statistics: Num rows: 12288 Data size: 36696 Basic stats: COMPLETE Column stats: COMPLETE Execution mode: vectorized, llap LLAP IO: all inputs @@ -131,22 +202,45 @@ STAGE PLANS: allNative: true usesVectorUDFAdaptor: false vectorized: true + rowBatchContext: + dataColumnCount: 12 + includeColumns: [0] + dataColumns: ctinyint:tinyint, csmallint:smallint, cint:int, cbigint:bigint, cfloat:float, cdouble:double, cstring1:string, cstring2:string, ctimestamp1:timestamp, ctimestamp2:timestamp, cboolean1:boolean, cboolean2:boolean + partitionColumnCount: 0 + scratchColumnTypeNames: [] Reducer 2 Execution mode: vectorized, llap Reduce Vectorization: enabled: true enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true + reduceColumnNullOrder: + reduceColumnSortOrder: allNative: false usesVectorUDFAdaptor: false vectorized: true + rowBatchContext: + dataColumnCount: 1 + dataColumns: VALUE._col0:bigint + partitionColumnCount: 0 + scratchColumnTypeNames: [] Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) + Group By Vectorization: + aggregators: VectorUDAFCountMerge(col 0:bigint) -> bigint + className: VectorGroupByOperator + groupByMode: MERGEPARTIAL + native: false + vectorProcessingMode: GLOBAL + projectedOutputColumnNums: [0] mode: mergepartial outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE File Output Operator compressed: false + File Sink Vectorization: + className: VectorFileSinkOperator + native: false Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat diff --git ql/src/test/results/clientpositive/llap/vector_leftsemi_mapjoin.q.out ql/src/test/results/clientpositive/llap/vector_leftsemi_mapjoin.q.out index 9c51b32..9b0b605 100644 --- ql/src/test/results/clientpositive/llap/vector_leftsemi_mapjoin.q.out +++ ql/src/test/results/clientpositive/llap/vector_leftsemi_mapjoin.q.out @@ -14655,7 +14655,7 @@ STAGE PLANS: bigTableKeyColumnNums: [1] bigTableRetainedColumnNums: [0] bigTableValueColumnNums: [0] - className: VectorMapJoinOuterStringOperator + className: VectorMapJoinOuterBigOnlyStringOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [0] @@ -19220,7 +19220,7 @@ STAGE PLANS: bigTableKeyColumnNums: [1] bigTableRetainedColumnNums: [0] bigTableValueColumnNums: [0] - className: VectorMapJoinOuterStringOperator + className: VectorMapJoinOuterBigOnlyStringOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [0] diff --git ql/src/test/results/clientpositive/llap/vector_outer_join1.q.out ql/src/test/results/clientpositive/llap/vector_outer_join1.q.out index c74a588..f3988ab 100644 --- ql/src/test/results/clientpositive/llap/vector_outer_join1.q.out +++ ql/src/test/results/clientpositive/llap/vector_outer_join1.q.out @@ -446,7 +446,7 @@ STAGE PLANS: bigTableKeyColumnNums: [0] bigTableRetainedColumnNums: [0] bigTableValueColumnNums: [0] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [0] @@ -712,7 +712,7 @@ STAGE PLANS: bigTableKeyColumnNums: [2] bigTableRetainedColumnNums: [0] bigTableValueColumnNums: [0] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [0] @@ -730,7 +730,7 @@ STAGE PLANS: bigTableKeyColumnNums: [0] bigTableRetainedColumnNums: [0] bigTableValueColumnNums: [0] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [0] diff --git ql/src/test/results/clientpositive/llap/vector_outer_join2.q.out ql/src/test/results/clientpositive/llap/vector_outer_join2.q.out index 2e90aae..e59be5e 100644 --- ql/src/test/results/clientpositive/llap/vector_outer_join2.q.out +++ ql/src/test/results/clientpositive/llap/vector_outer_join2.q.out @@ -287,7 +287,7 @@ STAGE PLANS: bigTableKeyColumnNums: [2] bigTableRetainedColumnNums: [3] bigTableValueColumnNums: [3] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [3] @@ -305,7 +305,7 @@ STAGE PLANS: bigTableKeyColumnNums: [3] bigTableRetainedColumnNums: [3] bigTableValueColumnNums: [3] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [3] diff --git ql/src/test/results/clientpositive/spark/spark_vectorized_dynamic_partition_pruning.q.out ql/src/test/results/clientpositive/spark/spark_vectorized_dynamic_partition_pruning.q.out index 5e936fc..9413f56 100644 --- ql/src/test/results/clientpositive/spark/spark_vectorized_dynamic_partition_pruning.q.out +++ ql/src/test/results/clientpositive/spark/spark_vectorized_dynamic_partition_pruning.q.out @@ -11205,7 +11205,7 @@ STAGE PLANS: 1 _col0 (type: string) Map Join Vectorization: bigTableKeyColumnNums: [0] - className: VectorMapJoinOuterStringOperator + className: VectorMapJoinOuterBigOnlyStringOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true input vertices: @@ -11395,7 +11395,7 @@ STAGE PLANS: 1 _col0 (type: string) Map Join Vectorization: bigTableKeyColumnNums: [0] - className: VectorMapJoinOuterStringOperator + className: VectorMapJoinOuterBigOnlyStringOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true input vertices: diff --git ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out index 9f8dea3..855d6c8 100644 --- ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out +++ ql/src/test/results/clientpositive/spark/vector_left_outer_join.q.out @@ -1,4 +1,4 @@ -PREHOOK: query: explain vectorization +PREHOOK: query: explain vectorization detail select count(*) from (select c.ctinyint from alltypesorc c left outer join alltypesorc cd @@ -7,7 +7,7 @@ left outer join alltypesorc hd on hd.ctinyint = c.ctinyint ) t1 PREHOOK: type: QUERY -POSTHOOK: query: explain vectorization +POSTHOOK: query: explain vectorization detail select count(*) from (select c.ctinyint from alltypesorc c left outer join alltypesorc cd @@ -35,11 +35,21 @@ STAGE PLANS: TableScan alias: cd Statistics: Num rows: 12288 Data size: 2907994 Basic stats: COMPLETE Column stats: NONE + TableScan Vectorization: + native: true + vectorizationSchemaColumns: [0:ctinyint:tinyint, 1:csmallint:smallint, 2:cint:int, 3:cbigint:bigint, 4:cfloat:float, 5:cdouble:double, 6:cstring1:string, 7:cstring2:string, 8:ctimestamp1:timestamp, 9:ctimestamp2:timestamp, 10:cboolean1:boolean, 11:cboolean2:boolean, 12:ROW__ID:struct] Select Operator expressions: cint (type: int) outputColumnNames: _col0 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [2] Statistics: Num rows: 12288 Data size: 2907994 Basic stats: COMPLETE Column stats: NONE Spark HashTable Sink Operator + Spark Hash Table Sink Vectorization: + className: VectorSparkHashTableSinkOperator + native: true keys: 0 _col1 (type: int) 1 _col0 (type: int) @@ -53,6 +63,12 @@ STAGE PLANS: allNative: true usesVectorUDFAdaptor: false vectorized: true + rowBatchContext: + dataColumnCount: 12 + includeColumns: [2] + dataColumns: ctinyint:tinyint, csmallint:smallint, cint:int, cbigint:bigint, cfloat:float, cdouble:double, cstring1:string, cstring2:string, ctimestamp1:timestamp, ctimestamp2:timestamp, cboolean1:boolean, cboolean2:boolean + partitionColumnCount: 0 + scratchColumnTypeNames: [] Local Work: Map Reduce Local Work Map 4 @@ -60,11 +76,21 @@ STAGE PLANS: TableScan alias: hd Statistics: Num rows: 12288 Data size: 2907994 Basic stats: COMPLETE Column stats: NONE + TableScan Vectorization: + native: true + vectorizationSchemaColumns: [0:ctinyint:tinyint, 1:csmallint:smallint, 2:cint:int, 3:cbigint:bigint, 4:cfloat:float, 5:cdouble:double, 6:cstring1:string, 7:cstring2:string, 8:ctimestamp1:timestamp, 9:ctimestamp2:timestamp, 10:cboolean1:boolean, 11:cboolean2:boolean, 12:ROW__ID:struct] Select Operator expressions: ctinyint (type: tinyint) outputColumnNames: _col0 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [0] Statistics: Num rows: 12288 Data size: 2907994 Basic stats: COMPLETE Column stats: NONE Spark HashTable Sink Operator + Spark Hash Table Sink Vectorization: + className: VectorSparkHashTableSinkOperator + native: true keys: 0 _col0 (type: tinyint) 1 _col0 (type: tinyint) @@ -78,6 +104,12 @@ STAGE PLANS: allNative: true usesVectorUDFAdaptor: false vectorized: true + rowBatchContext: + dataColumnCount: 12 + includeColumns: [0] + dataColumns: ctinyint:tinyint, csmallint:smallint, cint:int, cbigint:bigint, cfloat:float, cdouble:double, cstring1:string, cstring2:string, ctimestamp1:timestamp, ctimestamp2:timestamp, cboolean1:boolean, cboolean2:boolean + partitionColumnCount: 0 + scratchColumnTypeNames: [] Local Work: Map Reduce Local Work @@ -92,9 +124,16 @@ STAGE PLANS: TableScan alias: c Statistics: Num rows: 12288 Data size: 2907994 Basic stats: COMPLETE Column stats: NONE + TableScan Vectorization: + native: true + vectorizationSchemaColumns: [0:ctinyint:tinyint, 1:csmallint:smallint, 2:cint:int, 3:cbigint:bigint, 4:cfloat:float, 5:cdouble:double, 6:cstring1:string, 7:cstring2:string, 8:ctimestamp1:timestamp, 9:ctimestamp2:timestamp, 10:cboolean1:boolean, 11:cboolean2:boolean, 12:ROW__ID:struct] Select Operator expressions: ctinyint (type: tinyint), cint (type: int) outputColumnNames: _col0, _col1 + Select Vectorization: + className: VectorSelectOperator + native: true + projectedOutputColumnNums: [0, 2] Statistics: Num rows: 12288 Data size: 2907994 Basic stats: COMPLETE Column stats: NONE Map Join Operator condition map: @@ -102,6 +141,14 @@ STAGE PLANS: keys: 0 _col1 (type: int) 1 _col0 (type: int) + Map Join Vectorization: + bigTableKeyColumnNums: [2] + bigTableRetainedColumnNums: [0] + bigTableValueColumnNums: [0] + className: VectorMapJoinOuterBigOnlyLongOperator + native: true + nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true + projectedOutputColumnNums: [0] outputColumnNames: _col0 input vertices: 1 Map 3 @@ -112,16 +159,34 @@ STAGE PLANS: keys: 0 _col0 (type: tinyint) 1 _col0 (type: tinyint) + Map Join Vectorization: + bigTableKeyColumnNums: [0] + className: VectorMapJoinOuterBigOnlyLongOperator + native: true + nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true input vertices: 1 Map 4 Statistics: Num rows: 14867 Data size: 3518672 Basic stats: COMPLETE Column stats: NONE Group By Operator aggregations: count() + Group By Vectorization: + aggregators: VectorUDAFCountStar(*) -> bigint + className: VectorGroupByOperator + groupByMode: HASH + native: false + vectorProcessingMode: HASH + projectedOutputColumnNums: [0] mode: hash outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE Reduce Output Operator sort order: + Reduce Sink Vectorization: + className: VectorReduceSinkEmptyKeyOperator + keyColumnNums: [] + native: true + nativeConditionsMet: hive.vectorized.execution.reducesink.new.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true, No PTF TopN IS true, No DISTINCT columns IS true, BinarySortableSerDe for keys IS true, LazyBinarySerDe for values IS true + valueColumnNums: [0] Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: _col0 (type: bigint) Execution mode: vectorized @@ -134,6 +199,12 @@ STAGE PLANS: allNative: false usesVectorUDFAdaptor: false vectorized: true + rowBatchContext: + dataColumnCount: 12 + includeColumns: [0, 2] + dataColumns: ctinyint:tinyint, csmallint:smallint, cint:int, cbigint:bigint, cfloat:float, cdouble:double, cstring1:string, cstring2:string, ctimestamp1:timestamp, ctimestamp2:timestamp, cboolean1:boolean, cboolean2:boolean + partitionColumnCount: 0 + scratchColumnTypeNames: [] Local Work: Map Reduce Local Work Reducer 2 @@ -141,17 +212,34 @@ STAGE PLANS: Reduce Vectorization: enabled: true enableConditionsMet: hive.vectorized.execution.reduce.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true + reduceColumnNullOrder: + reduceColumnSortOrder: allNative: false usesVectorUDFAdaptor: false vectorized: true + rowBatchContext: + dataColumnCount: 1 + dataColumns: VALUE._col0:bigint + partitionColumnCount: 0 + scratchColumnTypeNames: [] Reduce Operator Tree: Group By Operator aggregations: count(VALUE._col0) + Group By Vectorization: + aggregators: VectorUDAFCountMerge(col 0:bigint) -> bigint + className: VectorGroupByOperator + groupByMode: MERGEPARTIAL + native: false + vectorProcessingMode: GLOBAL + projectedOutputColumnNums: [0] mode: mergepartial outputColumnNames: _col0 Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE File Output Operator compressed: false + File Sink Vectorization: + className: VectorFileSinkOperator + native: false Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE table: input format: org.apache.hadoop.mapred.SequenceFileInputFormat diff --git ql/src/test/results/clientpositive/spark/vector_outer_join1.q.out ql/src/test/results/clientpositive/spark/vector_outer_join1.q.out index ecac4da..d396f07 100644 --- ql/src/test/results/clientpositive/spark/vector_outer_join1.q.out +++ ql/src/test/results/clientpositive/spark/vector_outer_join1.q.out @@ -490,7 +490,7 @@ STAGE PLANS: bigTableKeyColumnNums: [0] bigTableRetainedColumnNums: [0] bigTableValueColumnNums: [0] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [0] @@ -799,7 +799,7 @@ STAGE PLANS: bigTableKeyColumnNums: [2] bigTableRetainedColumnNums: [0] bigTableValueColumnNums: [0] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [0] @@ -817,7 +817,7 @@ STAGE PLANS: bigTableKeyColumnNums: [0] bigTableRetainedColumnNums: [0] bigTableValueColumnNums: [0] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [0] diff --git ql/src/test/results/clientpositive/spark/vector_outer_join2.q.out ql/src/test/results/clientpositive/spark/vector_outer_join2.q.out index 92ad63e..92a3b79 100644 --- ql/src/test/results/clientpositive/spark/vector_outer_join2.q.out +++ ql/src/test/results/clientpositive/spark/vector_outer_join2.q.out @@ -373,7 +373,7 @@ STAGE PLANS: bigTableKeyColumnNums: [2] bigTableRetainedColumnNums: [3] bigTableValueColumnNums: [3] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [3] @@ -391,7 +391,7 @@ STAGE PLANS: bigTableKeyColumnNums: [3] bigTableRetainedColumnNums: [3] bigTableValueColumnNums: [3] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [3]