diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinBigOnlyGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinBigOnlyGenerateResultOperator.java new file mode 100644 index 0000000..2326bae --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinBigOnlyGenerateResultOperator.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSet; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTableResult; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSetResult; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; + +/** + * This class has methods for generating vectorized join results for the big table only + * variation of inner joins. + * + * When an inner join does not have any small table columns in the join result, we use this + * variation we call inner big only. This variation uses a hash multi-set instead of hash map + * since there are no values (just a count). + * + * Note that if a inner key appears in the small table results area, we use the inner join + * projection optimization and are able to use this variation. + */ +public abstract class VectorMapJoinBigOnlyGenerateResultOperator + extends VectorMapJoinGenerateResultOperator { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = + LoggerFactory.getLogger(VectorMapJoinBigOnlyGenerateResultOperator.class.getName()); + + //--------------------------------------------------------------------------- + // Inner big-table only join specific members. + // + + // An array of hash multi-set results so we can do lookups on the whole batch before output result + // generation. + protected transient VectorMapJoinHashMultiSetResult hashMultiSetResults[]; + + // Pre-allocated member for storing the (physical) batch index of matching row (single- or + // multi-small-table-valued) indexes during a process call. + protected transient int[] allMatchs; + + /* + * Pre-allocated members for storing information on single- and multi-valued-small-table matches. + * + * ~ValueCounts + * Number of (empty) small table values. + * ~AllMatchIndices + * (Logical) indices into allMatchs to the first row of a match of a + * possible series of duplicate keys. + * ~DuplicateCounts + * The duplicate count for each matched key. + * + */ + protected transient long[] equalKeySeriesValueCounts; + protected transient int[] equalKeySeriesAllMatchIndices; + protected transient int[] equalKeySeriesDuplicateCounts; + + + // Pre-allocated member for storing the (physical) batch index of rows that need to be spilled. + protected transient int[] spills; + + // Pre-allocated member for storing index into the hashMultiSetResults for each spilled row. + protected transient int[] spillHashMultiSetResultIndices; + + /** Kryo ctor. */ + protected VectorMapJoinBigOnlyGenerateResultOperator() { + super(); + } + + public VectorMapJoinBigOnlyGenerateResultOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorMapJoinBigOnlyGenerateResultOperator(CompilationOpContext ctx, OperatorDesc conf, + VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException { + super(ctx, conf, vContext, vectorDesc); + } + + /* + * Setup our inner big table only join specific members. + */ + protected void commonSetup(VectorizedRowBatch batch) throws HiveException { + super.commonSetup(batch); + + // Inner big-table only join specific. + VectorMapJoinHashMultiSet baseHashMultiSet = (VectorMapJoinHashMultiSet) vectorMapJoinHashTable; + + hashMultiSetResults = new VectorMapJoinHashMultiSetResult[batch.DEFAULT_SIZE]; + for (int i = 0; i < hashMultiSetResults.length; i++) { + hashMultiSetResults[i] = baseHashMultiSet.createHashMultiSetResult(); + } + + allMatchs = new int[batch.DEFAULT_SIZE]; + + equalKeySeriesValueCounts = new long[batch.DEFAULT_SIZE]; + equalKeySeriesAllMatchIndices = new int[batch.DEFAULT_SIZE]; + equalKeySeriesDuplicateCounts = new int[batch.DEFAULT_SIZE]; + + spills = new int[batch.DEFAULT_SIZE]; + spillHashMultiSetResultIndices = new int[batch.DEFAULT_SIZE]; + } + + //----------------------------------------------------------------------------------------------- + + /** + * Generate the single value match inner big table only join output results for a match. + * + * @param batch + * The big table batch. + * @param allMatchs + * A subset of the rows of the batch that are matches. + * @param allMatchesIndex + * The logical index into allMatchs of the first equal key. + * @param duplicateCount + * The number of duplicates or equal keys. + * @param numSel + * The current count of rows in the rebuilding of the selected array. + * + * @return + * The new count of selected rows. + */ + protected int generateHashMultiSetResultSingleValue(VectorizedRowBatch batch, + int[] allMatchs, int allMatchesIndex, int duplicateCount, int numSel) + throws HiveException, IOException { + + // LOG.debug("generateHashMultiSetResultSingleValue enter..."); + + // Generate result within big table batch itself. + + // LOG.debug("generateHashMultiSetResultSingleValue with big table..."); + + for (int i = 0; i < duplicateCount; i++) { + + int batchIndex = allMatchs[allMatchesIndex + i]; + + // Use the big table row as output. + batch.selected[numSel++] = batchIndex; + } + + return numSel; + } + + /** + * Generate results for a N x M cross product. + * + * @param batch + * The big table batch. + * @param allMatchs + * The all match selected array that contains (physical) batch indices. + * @param allMatchesIndex + * The index of the match key. + * @param duplicateCount + * Number of equal key rows. + * @param count + * Value count. + */ + protected void generateHashMultiSetResultMultiValue(VectorizedRowBatch batch, + int[] allMatchs, int allMatchesIndex, + int duplicateCount, long count) throws HiveException, IOException { + + // LOG.debug("generateHashMultiSetResultMultiValue allMatchesIndex " + allMatchesIndex + " duplicateCount " + duplicateCount + " count " + count); + + // TODO: Look at repeating optimizations... + + for (int i = 0; i < duplicateCount; i++) { + + int batchIndex = allMatchs[allMatchesIndex + i]; + + for (long l = 0; l < count; l++) { + + // Copy the BigTable values into the overflow batch. Since the overflow batch may + // not get flushed here, we must copy by value. + if (bigTableRetainedVectorCopy != null) { + bigTableRetainedVectorCopy.copyByValue(batch, batchIndex, + overflowBatch, overflowBatch.size); + } + + overflowBatch.size++; + if (overflowBatch.size == overflowBatch.DEFAULT_SIZE) { + forwardOverflow(); + } + } + } + } + + /** + * Generate the inner big table only join output results for one vectorized row batch with + * a repeated key. + * + * @param batch + * The big table batch with any matching and any non matching rows both as + * selected in use. + * @param hashMultiSetResult + * The hash multi-set results for the batch. + */ + protected int generateHashMultiSetResultRepeatedAll(VectorizedRowBatch batch, + VectorMapJoinHashMultiSetResult hashMultiSetResult) throws HiveException { + + long count = hashMultiSetResult.count(); + + if (batch.selectedInUse) { + // The selected array is already filled in as we want it. + } else { + int[] selected = batch.selected; + for (int i = 0; i < batch.size; i++) { + selected[i] = i; + } + batch.selectedInUse = true; + } + + do { + forwardBigTableBatch(batch); + count--; + } while (count > 0); + + // We forwarded the batch in this method. + return 0; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index 3821cc6..e72ff94 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -426,6 +426,294 @@ protected void generateHashMapResultRepeatedAll(VectorizedRowBatch batch, batch.size = numSel; } + /** + * Apply the value expression to rows in the (original) input selected array. + * + * @param batch + * The vectorized row batch. + * @param inputSelectedInUse + * Whether the (original) input batch is selectedInUse. + * @param inputLogicalSize + * The (original) input batch size. + */ + protected void doValueExprOnInputSelected(VectorizedRowBatch batch, + boolean inputSelectedInUse, int inputLogicalSize, int[] inputSelected) + throws HiveException { + + int saveBatchSize = batch.size; + int[] saveSelected = batch.selected; + boolean saveSelectedInUse = batch.selectedInUse; + + batch.size = inputLogicalSize; + batch.selected = inputSelected; + batch.selectedInUse = inputSelectedInUse; + + if (bigTableValueExpressions != null) { + for(VectorExpression ve: bigTableValueExpressions) { + ve.evaluate(batch); + } + } + + batch.size = saveBatchSize; + batch.selected = saveSelected; + batch.selectedInUse = saveSelectedInUse; + } + + /** + * Apply the value expression to rows specified by a selected array. + * + * @param batch + * The vectorized row batch. + * @param selected + * The (physical) batch indices to apply the expression to. + * @param size + * The size of selected. + */ + protected void doValueExpr(VectorizedRowBatch batch, + int[] selected, int size) throws HiveException { + + int saveBatchSize = batch.size; + int[] saveSelected = batch.selected; + boolean saveSelectedInUse = batch.selectedInUse; + + batch.size = size; + batch.selected = selected; + batch.selectedInUse = true; + + if (bigTableValueExpressions != null) { + for(VectorExpression ve: bigTableValueExpressions) { + ve.evaluate(batch); + } + } + + batch.size = saveBatchSize; + batch.selected = saveSelected; + batch.selectedInUse = saveSelectedInUse; + } + + /** + * Remove (subtract) members from the input selected array and produce the results into + * a difference array. + * + * @param inputSelectedInUse + * Whether the (original) input batch is selectedInUse. + * @param inputLogicalSize + * The (original) input batch size. + * @param remove + * The indices to remove. They must all be present in input selected array. + * @param removeSize + * The size of remove. + * @param difference + * The resulting difference -- the input selected array indices not in the + * remove array. + * @return + * The resulting size of the difference array. + * @throws HiveException + */ + protected int subtractFromInputSelected(boolean inputSelectedInUse, int inputLogicalSize, + int[] inputSelected, + int[] remove, int removeSize, int[] difference) throws HiveException { + + // if (!verifyMonotonicallyIncreasing(remove, removeSize)) { + // throw new HiveException("remove is not in sort order and unique"); + // } + + int differenceCount = 0; + + // Determine which rows are left. + int removeIndex = 0; + if (inputSelectedInUse) { + for (int i = 0; i < inputLogicalSize; i++) { + int candidateIndex = inputSelected[i]; + if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) { + removeIndex++; + } else { + difference[differenceCount++] = candidateIndex; + } + } + } else { + for (int candidateIndex = 0; candidateIndex < inputLogicalSize; candidateIndex++) { + if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) { + removeIndex++; + } else { + difference[differenceCount++] = candidateIndex; + } + } + } + + if (removeIndex != removeSize) { + throw new HiveException("Not all batch indices removed"); + } + + // if (!verifyMonotonicallyIncreasing(difference, differenceCount)) { + // throw new HiveException("difference is not in sort order and unique"); + // } + + return differenceCount; + } + + /** + * Remove (subtract) members from an array and produce the results into + * a difference array. + + * @param all + * The selected array containing all members. + * @param allSize + * The size of all. + * @param remove + * The indices to remove. They must all be present in input selected array. + * @param removeSize + * The size of remove. + * @param difference + * The resulting difference -- the all array indices not in the + * remove array. + * @return + * The resulting size of the difference array. + * @throws HiveException + */ + protected int subtract(int[] all, int allSize, + int[] remove, int removeSize, int[] difference) throws HiveException { + + // if (!verifyMonotonicallyIncreasing(remove, removeSize)) { + // throw new HiveException("remove is not in sort order and unique"); + // } + + int differenceCount = 0; + + // Determine which rows are left. + int removeIndex = 0; + for (int i = 0; i < allSize; i++) { + int candidateIndex = all[i]; + if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) { + removeIndex++; + } else { + difference[differenceCount++] = candidateIndex; + } + } + + if (removeIndex != removeSize) { + throw new HiveException("Not all batch indices removed"); + } + + return differenceCount; + } + + /** + * Sort merge two select arrays so the resulting array is ordered by (batch) index. + * + * @param selected1 + * @param selected1Count + * @param selected2 + * @param selected2Count + * @param sortMerged + * The resulting sort merge of selected1 and selected2. + * @return + * The resulting size of the sortMerged array. + * @throws HiveException + */ + protected int sortMerge(int[] selected1, int selected1Count, + int[] selected2, int selected2Count, int[] sortMerged) throws HiveException { + + // if (!verifyMonotonicallyIncreasing(selected1, selected1Count)) { + // throw new HiveException("selected1 is not in sort order and unique"); + // } + + // if (!verifyMonotonicallyIncreasing(selected2, selected2Count)) { + // throw new HiveException("selected1 is not in sort order and unique"); + // } + + + int sortMergeCount = 0; + + int selected1Index = 0; + int selected2Index = 0; + for (int i = 0; i < selected1Count + selected2Count; i++) { + if (selected1Index < selected1Count && selected2Index < selected2Count) { + if (selected1[selected1Index] < selected2[selected2Index]) { + sortMerged[sortMergeCount++] = selected1[selected1Index++]; + } else { + sortMerged[sortMergeCount++] = selected2[selected2Index++]; + } + } else if (selected1Index < selected1Count) { + sortMerged[sortMergeCount++] = selected1[selected1Index++]; + } else { + sortMerged[sortMergeCount++] = selected2[selected2Index++]; + } + } + + // if (!verifyMonotonicallyIncreasing(sortMerged, sortMergeCount)) { + // throw new HiveException("sortMerged is not in sort order and unique"); + // } + + return sortMergeCount; + } + + /** + * Generate the non matching outer join output results for one vectorized row batch. + * + * For each non matching row specified by parameter, generate nulls for the small table results. + * + * @param batch + * The big table batch with any matching and any non matching rows both as + * selected in use. + * @param noMatchs + * A subset of the rows of the batch that are non matches. + * @param noMatchSize + * Number of non matches in noMatchs. + */ + protected void generateOuterNulls(VectorizedRowBatch batch, int[] noMatchs, + int noMatchSize) throws IOException, HiveException { + + // Set null information in the small table results area. + + for (int i = 0; i < noMatchSize; i++) { + int batchIndex = noMatchs[i]; + + // Mark any scratch small table scratch columns that would normally receive a copy of the + // key as null, too. + for (int column : bigTableOuterKeyOutputVectorColumns) { + ColumnVector colVector = batch.cols[column]; + colVector.noNulls = false; + colVector.isNull[batchIndex] = true; + } + + // Small table values are set to null. + for (int column : smallTableOutputVectorColumns) { + ColumnVector colVector = batch.cols[column]; + colVector.noNulls = false; + colVector.isNull[batchIndex] = true; + } + } + } + + /** + * Generate the non-match outer join output results for the whole repeating vectorized + * row batch. + * + * Each row will get nulls for all small table values. + * + * @param batch + * The big table batch. + */ + protected void generateOuterNullsRepeatedAll(VectorizedRowBatch batch) throws HiveException { + + for (int column : smallTableOutputVectorColumns) { + ColumnVector colVector = batch.cols[column]; + colVector.noNulls = false; + colVector.isNull[0] = true; + colVector.isRepeating = true; + } + + // Mark any scratch small table scratch columns that would normally receive a copy of the key + // as null, too. + for (int column : bigTableOuterKeyOutputVectorColumns) { + ColumnVector colVector = batch.cols[column]; + colVector.noNulls = false; + colVector.isNull[0] = true; + colVector.isRepeating = true; + } + } + //----------------------------------------------------------------------------------------------- /* @@ -494,7 +782,7 @@ private void spillSerializeRow(VectorizedRowBatch batch, int batchIndex, // LOG.debug("spillSerializeRow spilled batchIndex " + batchIndex + ", length " + length); } - protected void spillHashMapBatch(VectorizedRowBatch batch, + protected void spillHashTableBatch(VectorizedRowBatch batch, VectorMapJoinHashTableResult[] hashTableResults, int[] spills, int[] spillHashTableResultIndices, int spillCount) throws HiveException, IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java index f791d95..4a9f73b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyGenerateResultOperator.java @@ -46,45 +46,11 @@ * projection optimization and are able to use this variation. */ public abstract class VectorMapJoinInnerBigOnlyGenerateResultOperator - extends VectorMapJoinGenerateResultOperator { + extends VectorMapJoinBigOnlyGenerateResultOperator { private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(VectorMapJoinInnerBigOnlyGenerateResultOperator.class.getName()); - - //--------------------------------------------------------------------------- - // Inner big-table only join specific members. - // - - // An array of hash multi-set results so we can do lookups on the whole batch before output result - // generation. - protected transient VectorMapJoinHashMultiSetResult hashMultiSetResults[]; - - // Pre-allocated member for storing the (physical) batch index of matching row (single- or - // multi-small-table-valued) indexes during a process call. - protected transient int[] allMatchs; - - /* - * Pre-allocated members for storing information on single- and multi-valued-small-table matches. - * - * ~ValueCounts - * Number of (empty) small table values. - * ~AllMatchIndices - * (Logical) indices into allMatchs to the first row of a match of a - * possible series of duplicate keys. - * ~DuplicateCounts - * The duplicate count for each matched key. - * - */ - protected transient long[] equalKeySeriesValueCounts; - protected transient int[] equalKeySeriesAllMatchIndices; - protected transient int[] equalKeySeriesDuplicateCounts; - - - // Pre-allocated member for storing the (physical) batch index of rows that need to be spilled. - protected transient int[] spills; - - // Pre-allocated member for storing index into the hashMultiSetResults for each spilled row. - protected transient int[] spillHashMapResultIndices; + private static final Logger LOG = + LoggerFactory.getLogger(VectorMapJoinInnerBigOnlyGenerateResultOperator.class.getName()); /** Kryo ctor. */ protected VectorMapJoinInnerBigOnlyGenerateResultOperator() { @@ -100,30 +66,6 @@ public VectorMapJoinInnerBigOnlyGenerateResultOperator(CompilationOpContext ctx, super(ctx, conf, vContext, vectorDesc); } - /* - * Setup our inner big table only join specific members. - */ - protected void commonSetup(VectorizedRowBatch batch) throws HiveException { - super.commonSetup(batch); - - // Inner big-table only join specific. - VectorMapJoinHashMultiSet baseHashMultiSet = (VectorMapJoinHashMultiSet) vectorMapJoinHashTable; - - hashMultiSetResults = new VectorMapJoinHashMultiSetResult[batch.DEFAULT_SIZE]; - for (int i = 0; i < hashMultiSetResults.length; i++) { - hashMultiSetResults[i] = baseHashMultiSet.createHashMultiSetResult(); - } - - allMatchs = new int[batch.DEFAULT_SIZE]; - - equalKeySeriesValueCounts = new long[batch.DEFAULT_SIZE]; - equalKeySeriesAllMatchIndices = new int[batch.DEFAULT_SIZE]; - equalKeySeriesDuplicateCounts = new int[batch.DEFAULT_SIZE]; - - spills = new int[batch.DEFAULT_SIZE]; - spillHashMapResultIndices = new int[batch.DEFAULT_SIZE]; - } - //----------------------------------------------------------------------------------------------- /* @@ -156,8 +98,8 @@ protected void finishInnerBigOnly(VectorizedRowBatch batch, // Get rid of spills before we start modifying the batch. if (spillCount > 0) { - spillHashMapBatch(batch, hashTableResults, - spills, spillHashMapResultIndices, spillCount); + spillHashTableBatch(batch, hashTableResults, + spills, spillHashMultiSetResultIndices, spillCount); } /* @@ -186,121 +128,6 @@ protected void finishInnerBigOnly(VectorizedRowBatch batch, batch.selectedInUse = true; } - /** - * Generate the single value match inner big table only join output results for a match. - * - * @param batch - * The big table batch. - * @param allMatchs - * A subset of the rows of the batch that are matches. - * @param allMatchesIndex - * The logical index into allMatchs of the first equal key. - * @param duplicateCount - * The number of duplicates or equal keys. - * @param numSel - * The current count of rows in the rebuilding of the selected array. - * - * @return - * The new count of selected rows. - */ - private int generateHashMultiSetResultSingleValue(VectorizedRowBatch batch, - int[] allMatchs, int allMatchesIndex, int duplicateCount, int numSel) - throws HiveException, IOException { - - // LOG.debug("generateHashMultiSetResultSingleValue enter..."); - - // Generate result within big table batch itself. - - // LOG.debug("generateHashMultiSetResultSingleValue with big table..."); - - for (int i = 0; i < duplicateCount; i++) { - - int batchIndex = allMatchs[allMatchesIndex + i]; - - // Use the big table row as output. - batch.selected[numSel++] = batchIndex; - } - - return numSel; - } - - /** - * Generate results for a N x M cross product. - * - * @param batch - * The big table batch. - * @param allMatchs - * The all match selected array that contains (physical) batch indices. - * @param allMatchesIndex - * The index of the match key. - * @param duplicateCount - * Number of equal key rows. - * @param count - * Value count. - */ - private void generateHashMultiSetResultMultiValue(VectorizedRowBatch batch, - int[] allMatchs, int allMatchesIndex, - int duplicateCount, long count) throws HiveException, IOException { - - // LOG.debug("generateHashMultiSetResultMultiValue allMatchesIndex " + allMatchesIndex + " duplicateCount " + duplicateCount + " count " + count); - - // TODO: Look at repeating optimizations... - - for (int i = 0; i < duplicateCount; i++) { - - int batchIndex = allMatchs[allMatchesIndex + i]; - - for (long l = 0; l < count; l++) { - - // Copy the BigTable values into the overflow batch. Since the overflow batch may - // not get flushed here, we must copy by value. - if (bigTableRetainedVectorCopy != null) { - bigTableRetainedVectorCopy.copyByValue(batch, batchIndex, - overflowBatch, overflowBatch.size); - } - - overflowBatch.size++; - if (overflowBatch.size == overflowBatch.DEFAULT_SIZE) { - forwardOverflow(); - } - } - } - } - - /** - * Generate the inner big table only join output results for one vectorized row batch with - * a repeated key. - * - * @param batch - * The big table batch with any matching and any non matching rows both as - * selected in use. - * @param hashMultiSetResult - * The hash multi-set results for the batch. - */ - protected int generateHashMultiSetResultRepeatedAll(VectorizedRowBatch batch, - VectorMapJoinHashMultiSetResult hashMultiSetResult) throws HiveException { - - long count = hashMultiSetResult.count(); - - if (batch.selectedInUse) { - // The selected array is already filled in as we want it. - } else { - int[] selected = batch.selected; - for (int i = 0; i < batch.size; i++) { - selected[i] = i; - } - batch.selectedInUse = true; - } - - do { - forwardBigTableBatch(batch); - count--; - } while (count > 0); - - // We forwarded the batch in this method. - return 0; - } - protected void finishInnerBigOnlyRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult, VectorMapJoinHashMultiSetResult hashMultiSetResult) throws HiveException, IOException { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java index 678fa42..f80f0d4 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyLongOperator.java @@ -43,7 +43,8 @@ * Specialized class for doing a vectorized map join that is an inner join on a Single-Column Long * and only big table columns appear in the join result so a hash multi-set is used. */ -public class VectorMapJoinInnerBigOnlyLongOperator extends VectorMapJoinInnerBigOnlyGenerateResultOperator { +public class VectorMapJoinInnerBigOnlyLongOperator + extends VectorMapJoinInnerBigOnlyGenerateResultOperator { private static final long serialVersionUID = 1L; @@ -312,7 +313,8 @@ public void process(Object row, int tag) throws HiveException { // Key out of range for whole hash table. saveJoinResult = JoinUtil.JoinResult.NOMATCH; } else { - saveJoinResult = hashMultiSet.contains(currentKey, hashMultiSetResults[hashMultiSetResultCount]); + saveJoinResult = + hashMultiSet.contains(currentKey, hashMultiSetResults[hashMultiSetResultCount]); } } @@ -331,7 +333,7 @@ public void process(Object row, int tag) throws HiveException { case SPILL: spills[spillCount] = batchIndex; - spillHashMapResultIndices[spillCount] = hashMultiSetResultCount; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; spillCount++; break; @@ -351,7 +353,7 @@ public void process(Object row, int tag) throws HiveException { case SPILL: spills[spillCount] = batchIndex; - spillHashMapResultIndices[spillCount] = hashMultiSetResultCount; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; spillCount++; break; @@ -385,8 +387,8 @@ public void process(Object row, int tag) throws HiveException { " equalKeySeriesAllMatchIndices " + intArrayToRangesString(equalKeySeriesAllMatchIndices, equalKeySeriesCount) + " equalKeySeriesDuplicateCounts " + intArrayToRangesString(equalKeySeriesDuplicateCounts, equalKeySeriesCount) + " spills " + intArrayToRangesString(spills, spillCount) + - " spillHashMapResultIndices " + intArrayToRangesString(spillHashMapResultIndices, spillCount) + - " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); + " spillHashMultiSetResultIndices " + intArrayToRangesString(spillHashMultiSetResultIndices, spillCount) + + " hashMultiSetResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); } finishInnerBigOnly(batch, diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java index 866aa60..940a49d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyMultiKeyOperator.java @@ -322,7 +322,10 @@ public void process(Object row, int tag) throws HiveException { byte[] keyBytes = saveKeyOutput.getData(); int keyLength = saveKeyOutput.getLength(); - saveJoinResult = hashMultiSet.contains(keyBytes, 0, keyLength, hashMultiSetResults[hashMultiSetResultCount]); + saveJoinResult = + hashMultiSet.contains( + keyBytes, 0, keyLength, + hashMultiSetResults[hashMultiSetResultCount]); } /* @@ -340,7 +343,7 @@ public void process(Object row, int tag) throws HiveException { case SPILL: spills[spillCount] = batchIndex; - spillHashMapResultIndices[spillCount] = hashMultiSetResultCount; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; spillCount++; break; @@ -360,7 +363,7 @@ public void process(Object row, int tag) throws HiveException { case SPILL: spills[spillCount] = batchIndex; - spillHashMapResultIndices[spillCount] = hashMultiSetResultCount; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; spillCount++; break; @@ -394,7 +397,7 @@ public void process(Object row, int tag) throws HiveException { " equalKeySeriesAllMatchIndices " + intArrayToRangesString(equalKeySeriesAllMatchIndices, equalKeySeriesCount) + " equalKeySeriesDuplicateCounts " + intArrayToRangesString(equalKeySeriesDuplicateCounts, equalKeySeriesCount) + " spills " + intArrayToRangesString(spills, spillCount) + - " spillHashMapResultIndices " + intArrayToRangesString(spillHashMapResultIndices, spillCount) + + " spillHashMultiSetResultIndices " + intArrayToRangesString(spillHashMultiSetResultIndices, spillCount) + " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java index a0c3b9c..d5492d1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerBigOnlyStringOperator.java @@ -299,7 +299,10 @@ public void process(Object row, int tag) throws HiveException { byte[] keyBytes = vector[batchIndex]; int keyStart = start[batchIndex]; int keyLength = length[batchIndex]; - saveJoinResult = hashMultiSet.contains(keyBytes, keyStart, keyLength, hashMultiSetResults[hashMultiSetResultCount]); + saveJoinResult = + hashMultiSet.contains( + keyBytes, keyStart, keyLength, + hashMultiSetResults[hashMultiSetResultCount]); } /* @@ -317,7 +320,7 @@ public void process(Object row, int tag) throws HiveException { case SPILL: spills[spillCount] = batchIndex; - spillHashMapResultIndices[spillCount] = hashMultiSetResultCount; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; spillCount++; break; @@ -337,7 +340,7 @@ public void process(Object row, int tag) throws HiveException { case SPILL: spills[spillCount] = batchIndex; - spillHashMapResultIndices[spillCount] = hashMultiSetResultCount; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; spillCount++; break; @@ -371,7 +374,7 @@ public void process(Object row, int tag) throws HiveException { " equalKeySeriesAllMatchIndices " + intArrayToRangesString(equalKeySeriesAllMatchIndices, equalKeySeriesCount) + " equalKeySeriesDuplicateCounts " + intArrayToRangesString(equalKeySeriesDuplicateCounts, equalKeySeriesCount) + " spills " + intArrayToRangesString(spills, spillCount) + - " spillHashMapResultIndices " + intArrayToRangesString(spillHashMapResultIndices, spillCount) + + " spillHashMultiSetResultIndices " + intArrayToRangesString(spillHashMultiSetResultIndices, spillCount) + " hashMapResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java index ea2c04d..7db35fa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinInnerGenerateResultOperator.java @@ -193,7 +193,7 @@ protected void finishInner(VectorizedRowBatch batch, } if (spillCount > 0) { - spillHashMapBatch(batch, (VectorMapJoinHashTableResult[]) hashMapResults, + spillHashTableBatch(batch, (VectorMapJoinHashTableResult[]) hashMapResults, spills, spillHashMapResultIndices, spillCount); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java index f68d4c4..b5c5efd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinLeftSemiGenerateResultOperator.java @@ -132,7 +132,7 @@ protected void finishLeftSemi(VectorizedRowBatch batch, // Get rid of spills before we start modifying the batch. if (spillCount > 0) { - spillHashMapBatch(batch, hashTableResults, + spillHashTableBatch(batch, hashTableResults, spills, spillHashMapResultIndices, spillCount); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyGenerateResultOperator.java new file mode 100644 index 0000000..b07687d --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyGenerateResultOperator.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMap; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMapResult; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSet; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashTableResult; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinHashMultiSetResult; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; + +/** + * This class has methods for generating vectorized join results for the big table only + * variation of outer joins. + * + * When an outer join does not have any small table columns in the join result, we use this + * variation we call outer big only. This variation uses a hash multi-set instead of hash map + * since there are no values (just a count). + * + * Note that if a outer key appears in the small table results area, we use the outer join + * projection optimization and are able to use this variation. + */ +public abstract class VectorMapJoinOuterBigOnlyGenerateResultOperator + extends VectorMapJoinBigOnlyGenerateResultOperator { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger( + VectorMapJoinOuterBigOnlyGenerateResultOperator.class.getName()); + + //--------------------------------------------------------------------------- + // Outer big-table only join specific members. + // + + // Pre-allocated member for remembering the big table's selected array at the beginning of + // the process method before applying any filter. For outer join we need to remember which + // rows did not match since they will appear the in outer join result with NULLs for the + // small table. + protected transient int[] inputSelected; + + // Pre-allocated member for storing any non-spills, non-matches, or merged row indexes during a + // process method call. + protected transient int[] nonSpills; + protected transient int[] noMatchs; + protected transient int[] merged; + + /** Kryo ctor. */ + protected VectorMapJoinOuterBigOnlyGenerateResultOperator() { + super(); + } + + public VectorMapJoinOuterBigOnlyGenerateResultOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorMapJoinOuterBigOnlyGenerateResultOperator(CompilationOpContext ctx, OperatorDesc conf, + VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException { + super(ctx, conf, vContext, vectorDesc); + } + + /* + * Setup our outer join specific members. + */ + protected void commonSetup(VectorizedRowBatch batch) throws HiveException { + super.commonSetup(batch); + + inputSelected = new int[batch.DEFAULT_SIZE]; + + nonSpills = new int[batch.DEFAULT_SIZE]; + noMatchs = new int[batch.DEFAULT_SIZE]; + merged = new int[batch.DEFAULT_SIZE]; + + } + + //----------------------------------------------------------------------------------------------- + + /* + * Outer big table only join (hash multi-set). + */ + + /** + * Do the per-batch setup for an outer join. + */ + protected void outerPerBatchSetup(VectorizedRowBatch batch) { + + // For join operators that can generate small table results, reset their + // (target) scratch columns. + + for (int column : smallTableOutputVectorColumns) { + ColumnVector smallTableColumn = batch.cols[column]; + smallTableColumn.reset(); + } + + for (int column : bigTableOuterKeyOutputVectorColumns) { + ColumnVector bigTableOuterKeyColumn = batch.cols[column]; + bigTableOuterKeyColumn.reset(); + } + } + + /** + * Generate the outer join output results for one vectorized row batch. + * + * @param batch + * The big table batch with any matching and any non matching rows both as + * selected in use. + * @param allMatchCount + * Number of matches in allMatchs. + * @param equalKeySeriesCount + * Number of single value matches. + * @param atLeastOneNonMatch + * Whether at least one row was a non-match. + * @param inputSelectedInUse + * A copy of the batch's selectedInUse flag on input to the process method. + * @param inputLogicalSize + * The batch's size on input to the process method. + * @param spillCount + * Number of spills in spills. + * @param hashMapResultCount + * Number of entries in hashMapResults. + */ + public void finishOuterBigOnly(VectorizedRowBatch batch, + int allMatchCount, int equalKeySeriesCount, boolean atLeastOneNonMatch, + boolean inputSelectedInUse, int inputLogicalSize, + int spillCount, int hashMapResultCount) throws IOException, HiveException { + + // Get rid of spills before we start modifying the batch. + if (spillCount > 0) { + spillHashTableBatch(batch, (VectorMapJoinHashTableResult[]) hashMultiSetResults, + spills, spillHashMultiSetResultIndices, spillCount); + } + + int noMatchCount = 0; + if (spillCount > 0) { + + // Subtract the spills to get all match and non-match rows. + int nonSpillCount = subtractFromInputSelected( + inputSelectedInUse, inputLogicalSize, inputSelected, + spills, spillCount, nonSpills); + + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter spillCount > 0" + + " nonSpills " + intArrayToRangesString(nonSpills, nonSpillCount)); + } + + // Big table value expressions apply to ALL matching and non-matching rows. + if (bigTableValueExpressions != null) { + + doValueExpr(batch, nonSpills, nonSpillCount); + + } + + if (atLeastOneNonMatch) { + noMatchCount = subtract(nonSpills, nonSpillCount, allMatchs, allMatchCount, + noMatchs); + + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter spillCount > 0" + + " noMatchs " + intArrayToRangesString(noMatchs, noMatchCount)); + } + + } + } else { + + // Run value expressions over original (whole) input batch. + doValueExprOnInputSelected(batch, inputSelectedInUse, inputLogicalSize, inputSelected); + + if (atLeastOneNonMatch) { + noMatchCount = subtractFromInputSelected( + inputSelectedInUse, inputLogicalSize, inputSelected, + allMatchs, allMatchCount, noMatchs); + + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter spillCount == 0" + + " noMatchs " + intArrayToRangesString(noMatchs, noMatchCount)); + } + } + } + + // When we generate results into the overflow batch, we may still end up with fewer rows + // in the big table batch. So, nulSel and the batch's selected array will be rebuilt with + // just the big table rows that need to be forwarded, minus any rows processed with the + // overflow batch. + if (allMatchCount > 0) { + + int numSel = 0; + for (int i = 0; i < equalKeySeriesCount; i++) { + long count = equalKeySeriesValueCounts[i]; + int allMatchesIndex = equalKeySeriesAllMatchIndices[i]; + int duplicateCount = equalKeySeriesDuplicateCounts[i]; + + if (count == 1) { + numSel = generateHashMultiSetResultSingleValue( + batch, allMatchs, allMatchesIndex, duplicateCount, numSel); + } else { + generateHashMultiSetResultMultiValue(batch, + allMatchs, allMatchesIndex, + duplicateCount, count); + } + } + + // The number of single value rows that were generated in the big table batch. + batch.size = numSel; + batch.selectedInUse = true; + + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter allMatchCount > 0" + + " batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } + + } else { + batch.size = 0; + } + + if (noMatchCount > 0) { + if (batch.size > 0) { + + generateOuterNulls(batch, noMatchs, noMatchCount); + + // Merge noMatchs and (match) selected. + int mergeCount = sortMerge( + noMatchs, noMatchCount, batch.selected, batch.size, merged); + + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter noMatchCount > 0 && batch.size > 0" + + " merged " + intArrayToRangesString(merged, mergeCount)); + } + + System.arraycopy(merged, 0, batch.selected, 0, mergeCount); + batch.size = mergeCount; + batch.selectedInUse = true; + } else { + + // We can use the whole batch for output of no matches. + + generateOuterNullsRepeatedAll(batch); + + System.arraycopy(noMatchs, 0, batch.selected, 0, noMatchCount); + batch.size = noMatchCount; + batch.selectedInUse = true; + + if (LOG.isDebugEnabled()) { + LOG.debug("finishOuter noMatchCount > 0 && batch.size == 0" + + " batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } + } + } + } + + /** + * Generate the outer join output results for one vectorized row batch with a repeated key. + * + * Any filter expressions will apply now since hash map lookup for outer join is complete. + * + * @param batch + * The big table batch with any matching and any non matching rows both as + * selected in use. + * @param joinResult + * The hash map lookup result for the repeated key. + * @param hashMultiSetResult + * The repeated hash multi-set result for the batch. + * @param someRowsFilteredOut + * Whether some rows of the repeated key batch were knocked out by the filter. + * @param inputSelectedInUse + * A copy of the batch's selectedInUse flag on input to the process method. + * @param inputLogicalSize + * The batch's size on input to the process method. + * @param scratch1 + * Pre-allocated storage to internal use. + * @param scratch2 + * Pre-allocated storage to internal use. + */ + public void finishOuterBigOnlyRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult joinResult, + VectorMapJoinHashMultiSetResult hashMultiSetResult, boolean someRowsFilteredOut, + boolean inputSelectedInUse, int inputLogicalSize) + throws IOException, HiveException { + + // LOG.debug("finishOuterRepeated batch #" + batchCounter + " " + joinResult.name() + " batch.size " + batch.size + " someRowsFilteredOut " + someRowsFilteredOut); + + switch (joinResult) { + case MATCH: + + // Rows we looked up as one repeated key are a match. But filtered out rows + // need to be generated as non-matches, too. + + if (someRowsFilteredOut) { + + // For the filtered out rows that didn't (logically) get looked up in the hash table, + // we need to generate no match results for those too... + + // Run value expressions over original (whole) input batch. + doValueExprOnInputSelected(batch, inputSelectedInUse, inputLogicalSize, inputSelected); + + // Now calculate which rows were filtered out (they are logically no matches). + + // Determine which rows are non matches by determining the delta between inputSelected and + // (current) batch selected. + + int noMatchCount = subtractFromInputSelected( + inputSelectedInUse, inputLogicalSize, inputSelected, + batch.selected, batch.size, noMatchs); + + // The repeated match batch is output by generateHashMultiSetResultRepeatedAll. + generateHashMultiSetResultRepeatedAll(batch, hashMultiSetResult); + batch.size = 0; + batch.selectedInUse = false; + + generateOuterNulls(batch, noMatchs, noMatchCount); + + System.arraycopy(noMatchs, 0, batch.selected, 0, noMatchCount); + batch.size = noMatchCount; + batch.selectedInUse = true; + } else { + + // Just run our value expressions over input batch. + + if (bigTableValueExpressions != null) { + for(VectorExpression ve: bigTableValueExpressions) { + ve.evaluate(batch); + } + } + + // The repeated match batch is output by generateHashMultiSetResultRepeatedAll. + generateHashMultiSetResultRepeatedAll(batch, hashMultiSetResult); + batch.size = 0; + batch.selectedInUse = false; + } + break; + + case SPILL: + + // Rows we looked up as one repeated key need to spill. But filtered out rows + // need to be generated as non-matches, too. + + spillBatchRepeated(batch, (VectorMapJoinHashTableResult) hashMultiSetResult); + + // After using selected to generate spills, generate non-matches, if any. + if (someRowsFilteredOut) { + + // Determine which rows are non matches by determining the delta between inputSelected and + // (current) batch selected. + + int noMatchCount = subtractFromInputSelected( + inputSelectedInUse, inputLogicalSize, inputSelected, + batch.selected, batch.size, noMatchs); + + System.arraycopy(noMatchs, 0, batch.selected, 0, noMatchCount); + batch.size = noMatchCount; + batch.selectedInUse = true; + + generateOuterNullsRepeatedAll(batch); + } else { + batch.size = 0; + } + + break; + + case NOMATCH: + + if (someRowsFilteredOut) { + + // When the repeated no match is due to filtering, we need to restore the + // selected information. + + if (inputSelectedInUse) { + System.arraycopy(inputSelected, 0, batch.selected, 0, inputLogicalSize); + } + batch.size = inputLogicalSize; + } + + // Run our value expressions over whole batch. + if (bigTableValueExpressions != null) { + for(VectorExpression ve: bigTableValueExpressions) { + ve.evaluate(batch); + } + } + + generateOuterNullsRepeatedAll(batch); + break; + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyLongOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyLongOperator.java new file mode 100644 index 0000000..175ec73 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyLongOperator.java @@ -0,0 +1,465 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +import java.io.IOException; +import java.util.Arrays; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; +// Single-Column Long hash table import. +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMap; + +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinLongHashMultiSet; +// Single-Column Long specific imports. +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; + +/* + * Specialized class for doing a vectorized map join that is an outer join on a Single-Column Long + * using a hash map. + */ +public class VectorMapJoinOuterBigOnlyLongOperator + extends VectorMapJoinOuterBigOnlyGenerateResultOperator { + + private static final long serialVersionUID = 1L; + + //------------------------------------------------------------------------------------------------ + + private static final String CLASS_NAME = VectorMapJoinOuterBigOnlyLongOperator.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + protected String getLoggingPrefix() { + return super.getLoggingPrefix(CLASS_NAME); + } + + //------------------------------------------------------------------------------------------------ + + // (none) + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + + // The hash map for this specialized class. + private transient VectorMapJoinLongHashMultiSet hashMultiSet; + + //--------------------------------------------------------------------------- + // Single-Column Long specific members. + // + + // For integers, we have optional min/max filtering. + private transient boolean useMinMax; + private transient long min; + private transient long max; + + // The column number for this one column join specialization. + private transient int singleJoinColumn; + + //--------------------------------------------------------------------------- + // Pass-thru constructors. + // + + /** Kryo ctor. */ + protected VectorMapJoinOuterBigOnlyLongOperator() { + super(); + } + + public VectorMapJoinOuterBigOnlyLongOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorMapJoinOuterBigOnlyLongOperator(CompilationOpContext ctx, OperatorDesc conf, + VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException { + super(ctx, conf, vContext, vectorDesc); + } + + //--------------------------------------------------------------------------- + // Process Single-Column Long Outer Join on a vectorized row batch. + // + + @Override + public void process(Object row, int tag) throws HiveException { + + try { + VectorizedRowBatch batch = (VectorizedRowBatch) row; + + alias = (byte) tag; + + if (needCommonSetup) { + // Our one time process method initialization. + commonSetup(batch); + + /* + * Initialize Single-Column Long members for this specialized class. + */ + + singleJoinColumn = bigTableKeyColumnMap[0]; + + needCommonSetup = false; + } + + if (needHashTableSetup) { + // Setup our hash table specialization. It will be the first time the process + // method is called, or after a Hybrid Grace reload. + + /* + * Get our Single-Column Long hash multi-set information for this specialized class. + */ + + hashMultiSet = (VectorMapJoinLongHashMultiSet) vectorMapJoinHashTable; + useMinMax = hashMultiSet.useMinMax(); + if (useMinMax) { + min = hashMultiSet.min(); + max = hashMultiSet.max(); + } + + needHashTableSetup = false; + } + + batchCounter++; + + final int inputLogicalSize = batch.size; + + if (inputLogicalSize == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); + } + return; + } + + // Do the per-batch setup for an outer join. + + outerPerBatchSetup(batch); + + // For outer join, remember our input rows before ON expression filtering or before + // hash table matching so we can generate results for all rows (matching and non matching) + // later. + boolean inputSelectedInUse = batch.selectedInUse; + if (inputSelectedInUse) { + // if (!verifyMonotonicallyIncreasing(batch.selected, batch.size)) { + // throw new HiveException("batch.selected is not in sort order and unique"); + // } + System.arraycopy(batch.selected, 0, inputSelected, 0, inputLogicalSize); + } + + // Filtering for outer join just removes rows available for hash table matching. + boolean someRowsFilteredOut = false; + if (bigTableFilterExpressions.length > 0) { + // Since the input + for (VectorExpression ve : bigTableFilterExpressions) { + ve.evaluate(batch); + } + someRowsFilteredOut = (batch.size != inputLogicalSize); + if (LOG.isDebugEnabled()) { + if (batch.selectedInUse) { + if (inputSelectedInUse) { + LOG.debug(CLASS_NAME + + " inputSelected " + intArrayToRangesString(inputSelected, inputLogicalSize) + + " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } else { + LOG.debug(CLASS_NAME + + " inputLogicalSize " + inputLogicalSize + + " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } + } + } + } + + // Perform any key expressions. Results will go into scratch columns. + if (bigTableKeyExpressions != null) { + for (VectorExpression ve : bigTableKeyExpressions) { + ve.evaluate(batch); + } + } + + /* + * Single-Column Long specific declarations. + */ + + // The one join column for this specialized class. + LongColumnVector joinColVector = (LongColumnVector) batch.cols[singleJoinColumn]; + long[] vector = joinColVector.vector; + + /* + * Single-Column Long check for repeating. + */ + + // Check single column for repeating. + boolean allKeyInputColumnsRepeating = joinColVector.isRepeating; + + if (allKeyInputColumnsRepeating) { + + /* + * Repeating. + */ + + // All key input columns are repeating. Generate key once. Lookup once. + // Since the key is repeated, we must use entry 0 regardless of selectedInUse. + + /* + * Single-Column Long specific repeated lookup. + */ + + JoinUtil.JoinResult joinResult; + if (batch.size == 0) { + // Whole repeated key batch was filtered out. + joinResult = JoinUtil.JoinResult.NOMATCH; + } else if (!joinColVector.noNulls && joinColVector.isNull[0]) { + // Any (repeated) null key column is no match for whole batch. + joinResult = JoinUtil.JoinResult.NOMATCH; + } else { + // Handle *repeated* join key, if found. + long key = vector[0]; + // LOG.debug(CLASS_NAME + " repeated key " + key); + if (useMinMax && (key < min || key > max)) { + // Out of range for whole batch. + joinResult = JoinUtil.JoinResult.NOMATCH; + } else { + joinResult = hashMultiSet.contains(key, hashMultiSetResults[0]); + } + } + + /* + * Common repeated join result processing. + */ + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); + } + finishOuterBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0], someRowsFilteredOut, + inputSelectedInUse, inputLogicalSize); + } else { + + /* + * NOT Repeating. + */ + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " non-repeated"); + } + + int selected[] = batch.selected; + boolean selectedInUse = batch.selectedInUse; + + int hashMultiSetResultCount = 0; + int allMatchCount = 0; + int equalKeySeriesCount = 0; + int spillCount = 0; + + boolean atLeastOneNonMatch = someRowsFilteredOut; + + /* + * Single-Column Long specific variables. + */ + + long saveKey = 0; + + // We optimize performance by only looking up the first key in a series of equal keys. + boolean haveSaveKey = false; + JoinUtil.JoinResult saveJoinResult = JoinUtil.JoinResult.NOMATCH; + + // Logical loop over the rows in the batch since the batch may have selected in use. + for (int logical = 0; logical < batch.size; logical++) { + int batchIndex = (selectedInUse ? selected[logical] : logical); + + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, taskName + ", " + getOperatorId() + " candidate " + CLASS_NAME + " batch"); + + /* + * Single-Column Long outer null detection. + */ + + boolean isNull = !joinColVector.noNulls && joinColVector.isNull[batchIndex]; + + if (isNull) { + + // Have that the NULL does not interfere with the current equal key series, if there + // is one. We do not set saveJoinResult. + // + // Let a current MATCH equal key series keep going, or + // Let a current SPILL equal key series keep going, or + // Let a current NOMATCH keep not matching. + + atLeastOneNonMatch = true; + + // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " NULL"); + } else { + + /* + * Single-Column Long outer get key. + */ + + long currentKey = vector[batchIndex]; + + /* + * Equal key series checking. + */ + + if (!haveSaveKey || currentKey != saveKey) { + // New key. + + if (haveSaveKey) { + // Move on with our counts. + switch (saveJoinResult) { + case MATCH: + hashMultiSetResultCount++; + equalKeySeriesCount++; + break; + case SPILL: + hashMultiSetResultCount++; + break; + case NOMATCH: + break; + } + } + + // Regardless of our matching result, we keep that information to make multiple use + // of it for a possible series of equal keys. + haveSaveKey = true; + + /* + * Single-Column Long specific save key. + */ + + saveKey = currentKey; + + /* + * Single-Column Long specific lookup key. + */ + + if (useMinMax && (currentKey < min || currentKey > max)) { + // Key out of range for whole hash table. + saveJoinResult = JoinUtil.JoinResult.NOMATCH; + } else { + saveJoinResult = + hashMultiSet.contains(currentKey, hashMultiSetResults[hashMultiSetResultCount]); + } + + // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " New Key " + currentKey + " " + saveJoinResult.name()); + + /* + * Common outer join result processing. + */ + + switch (saveJoinResult) { + case MATCH: + equalKeySeriesValueCounts[equalKeySeriesCount] = hashMultiSetResults[hashMultiSetResultCount].count(); + equalKeySeriesAllMatchIndices[equalKeySeriesCount] = allMatchCount; + equalKeySeriesDuplicateCounts[equalKeySeriesCount] = 1; + allMatchs[allMatchCount++] = batchIndex; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH isSingleValue " + equalKeySeriesIsSingleValue[equalKeySeriesCount] + " currentKey " + currentKey); + break; + + case SPILL: + spills[spillCount] = batchIndex; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; + spillCount++; + break; + + case NOMATCH: + atLeastOneNonMatch = true; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH" + " currentKey " + currentKey); + break; + } + } else { + // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " Key Continues " + saveKey + " " + saveJoinResult.name()); + + // Series of equal keys. + + switch (saveJoinResult) { + case MATCH: + equalKeySeriesDuplicateCounts[equalKeySeriesCount]++; + allMatchs[allMatchCount++] = batchIndex; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH duplicate"); + break; + + case SPILL: + spills[spillCount] = batchIndex; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; + spillCount++; + break; + + case NOMATCH: + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH duplicate"); + break; + } + } + // if (!verifyMonotonicallyIncreasing(allMatchs, allMatchCount)) { + // throw new HiveException("allMatchs is not in sort order and unique"); + // } + } + } + + if (haveSaveKey) { + // Update our counts for the last key. + switch (saveJoinResult) { + case MATCH: + hashMultiSetResultCount++; + equalKeySeriesCount++; + break; + case SPILL: + hashMultiSetResultCount++; + break; + case NOMATCH: + break; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + + " allMatchs " + intArrayToRangesString(allMatchs,allMatchCount) + + " equalKeySeriesValueCounts " + longArrayToRangesString(equalKeySeriesValueCounts, equalKeySeriesCount) + + " equalKeySeriesAllMatchIndices " + intArrayToRangesString(equalKeySeriesAllMatchIndices, equalKeySeriesCount) + + " equalKeySeriesDuplicateCounts " + intArrayToRangesString(equalKeySeriesDuplicateCounts, equalKeySeriesCount) + + " atLeastOneNonMatch " + atLeastOneNonMatch + + " inputSelectedInUse " + inputSelectedInUse + + " inputLogicalSize " + inputLogicalSize + + " spills " + intArrayToRangesString(spills, spillCount) + + " spillHashMultiSetResultIndices " + intArrayToRangesString(spillHashMultiSetResultIndices, spillCount) + + " hashMultiSetResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); + } + + // We will generate results for all matching and non-matching rows. + finishOuterBigOnly(batch, + allMatchCount, equalKeySeriesCount, atLeastOneNonMatch, + inputSelectedInUse, inputLogicalSize, + spillCount, hashMultiSetResultCount); + } + + if (batch.size > 0) { + // Forward any remaining selected rows. + forwardBigTableBatch(batch); + } + + } catch (IOException e) { + throw new HiveException(e); + } catch (Exception e) { + throw new HiveException(e); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyMultiKeyOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyMultiKeyOperator.java new file mode 100644 index 0000000..3d75956 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyMultiKeyOperator.java @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +import java.io.IOException; +import java.util.Arrays; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; +// Multi-Key hash table import. +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap; + +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMultiSet; +// Multi-Key specific imports. +import org.apache.hadoop.hive.ql.exec.vector.VectorSerializeRow; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; + +import com.google.common.base.Preconditions; + +/* + * Specialized class for doing a vectorized map join that is an outer join on Multi-Key + * using a hash map. + */ +public class VectorMapJoinOuterBigOnlyMultiKeyOperator + extends VectorMapJoinOuterBigOnlyGenerateResultOperator { + + private static final long serialVersionUID = 1L; + + //------------------------------------------------------------------------------------------------ + + private static final String CLASS_NAME = VectorMapJoinOuterBigOnlyMultiKeyOperator.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + protected String getLoggingPrefix() { + return super.getLoggingPrefix(CLASS_NAME); + } + + //------------------------------------------------------------------------------------------------ + + // (none) + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + + // The hash map for this specialized class. + private transient VectorMapJoinBytesHashMultiSet hashMultiSet; + + //--------------------------------------------------------------------------- + // Multi-Key specific members. + // + + // Object that can take a set of columns in row in a vectorized row batch and serialized it. + private transient VectorSerializeRow keyVectorSerializeWrite; + + // The BinarySortable serialization of the current key. + private transient Output currentKeyOutput; + + // The BinarySortable serialization of the saved key for a possible series of equal keys. + private transient Output saveKeyOutput; + + //--------------------------------------------------------------------------- + // Pass-thru constructors. + // + + /** Kryo ctor. */ + protected VectorMapJoinOuterBigOnlyMultiKeyOperator() { + super(); + } + + public VectorMapJoinOuterBigOnlyMultiKeyOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorMapJoinOuterBigOnlyMultiKeyOperator(CompilationOpContext ctx, OperatorDesc conf, + VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException { + super(ctx, conf, vContext, vectorDesc); + } + + //--------------------------------------------------------------------------- + // Process Multi-Key Outer Join on a vectorized row batch. + // + + @Override + public void process(Object row, int tag) throws HiveException { + + try { + VectorizedRowBatch batch = (VectorizedRowBatch) row; + + alias = (byte) tag; + + if (needCommonSetup) { + // Our one time process method initialization. + commonSetup(batch); + + /* + * Initialize Multi-Key members for this specialized class. + */ + + keyVectorSerializeWrite = new VectorSerializeRow( + new BinarySortableSerializeWrite(bigTableKeyColumnMap.length)); + keyVectorSerializeWrite.init(bigTableKeyTypeInfos, bigTableKeyColumnMap); + + currentKeyOutput = new Output(); + saveKeyOutput = new Output(); + + needCommonSetup = false; + } + + if (needHashTableSetup) { + // Setup our hash table specialization. It will be the first time the process + // method is called, or after a Hybrid Grace reload. + + /* + * Get our Multi-Key hash multi-set information for this specialized class. + */ + + hashMultiSet = (VectorMapJoinBytesHashMultiSet) vectorMapJoinHashTable; + + needHashTableSetup = false; + } + + batchCounter++; + + final int inputLogicalSize = batch.size; + + if (inputLogicalSize == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); + } + return; + } + + // Do the per-batch setup for an outer join. + + outerPerBatchSetup(batch); + + // For outer join, remember our input rows before ON expression filtering or before + // hash table matching so we can generate results for all rows (matching and non matching) + // later. + boolean inputSelectedInUse = batch.selectedInUse; + if (inputSelectedInUse) { + // if (!verifyMonotonicallyIncreasing(batch.selected, batch.size)) { + // throw new HiveException("batch.selected is not in sort order and unique"); + // } + System.arraycopy(batch.selected, 0, inputSelected, 0, inputLogicalSize); + } + + // Filtering for outer join just removes rows available for hash table matching. + boolean someRowsFilteredOut = false; + if (bigTableFilterExpressions.length > 0) { + // Since the input + for (VectorExpression ve : bigTableFilterExpressions) { + ve.evaluate(batch); + } + someRowsFilteredOut = (batch.size != inputLogicalSize); + if (LOG.isDebugEnabled()) { + if (batch.selectedInUse) { + if (inputSelectedInUse) { + LOG.debug(CLASS_NAME + + " inputSelected " + intArrayToRangesString(inputSelected, inputLogicalSize) + + " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } else { + LOG.debug(CLASS_NAME + + " inputLogicalSize " + inputLogicalSize + + " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } + } + } + } + + // Perform any key expressions. Results will go into scratch columns. + if (bigTableKeyExpressions != null) { + for (VectorExpression ve : bigTableKeyExpressions) { + ve.evaluate(batch); + } + } + + /* + * Multi-Key specific declarations. + */ + + // None. + + /* + * Multi-Key Long check for repeating. + */ + + // If all BigTable input columns to key expressions are isRepeating, then + // calculate key once; lookup once. + // Also determine if any nulls are present since for a join that means no match. + boolean allKeyInputColumnsRepeating; + boolean someKeyInputColumnIsNull = false; // Only valid if allKeyInputColumnsRepeating is true. + if (bigTableKeyColumnMap.length == 0) { + allKeyInputColumnsRepeating = false; + } else { + allKeyInputColumnsRepeating = true; + for (int i = 0; i < bigTableKeyColumnMap.length; i++) { + ColumnVector colVector = batch.cols[bigTableKeyColumnMap[i]]; + if (!colVector.isRepeating) { + allKeyInputColumnsRepeating = false; + break; + } + if (!colVector.noNulls && colVector.isNull[0]) { + someKeyInputColumnIsNull = true; + } + } + } + + if (allKeyInputColumnsRepeating) { + + /* + * Repeating. + */ + + // All key input columns are repeating. Generate key once. Lookup once. + // Since the key is repeated, we must use entry 0 regardless of selectedInUse. + + /* + * Multi-Key specific repeated lookup. + */ + + JoinUtil.JoinResult joinResult; + if (batch.size == 0) { + // Whole repeated key batch was filtered out. + joinResult = JoinUtil.JoinResult.NOMATCH; + } else if (someKeyInputColumnIsNull) { + // Any (repeated) null key column is no match for whole batch. + joinResult = JoinUtil.JoinResult.NOMATCH; + } else { + + // All key input columns are repeating. Generate key once. Lookup once. + keyVectorSerializeWrite.setOutput(currentKeyOutput); + keyVectorSerializeWrite.serializeWrite(batch, 0); + byte[] keyBytes = currentKeyOutput.getData(); + int keyLength = currentKeyOutput.getLength(); + joinResult = hashMultiSet.contains(keyBytes, 0, keyLength, hashMultiSetResults[0]); + } + + /* + * Common repeated join result processing. + */ + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); + } + finishOuterBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0], someRowsFilteredOut, + inputSelectedInUse, inputLogicalSize); + } else { + + /* + * NOT Repeating. + */ + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " non-repeated"); + } + + int selected[] = batch.selected; + boolean selectedInUse = batch.selectedInUse; + + int hashMultiSetResultCount = 0; + int allMatchCount = 0; + int equalKeySeriesCount = 0; + int spillCount = 0; + + boolean atLeastOneNonMatch = someRowsFilteredOut; + + /* + * Multi-Key specific variables. + */ + + Output temp; + + // We optimize performance by only looking up the first key in a series of equal keys. + boolean haveSaveKey = false; + JoinUtil.JoinResult saveJoinResult = JoinUtil.JoinResult.NOMATCH; + + // Logical loop over the rows in the batch since the batch may have selected in use. + for (int logical = 0; logical < batch.size; logical++) { + int batchIndex = (selectedInUse ? selected[logical] : logical); + + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, taskName + ", " + getOperatorId() + " candidate " + CLASS_NAME + " batch"); + + /* + * Multi-Key outer null detection. + */ + + // Generate binary sortable key for current row in vectorized row batch. + keyVectorSerializeWrite.setOutput(currentKeyOutput); + keyVectorSerializeWrite.serializeWrite(batch, batchIndex); + if (keyVectorSerializeWrite.getHasAnyNulls()) { + + // Have that the NULL does not interfere with the current equal key series, if there + // is one. We do not set saveJoinResult. + // + // Let a current MATCH equal key series keep going, or + // Let a current SPILL equal key series keep going, or + // Let a current NOMATCH keep not matching. + + atLeastOneNonMatch = true; + + // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " NULL"); + } else { + + /* + * Multi-Key outer get key. + */ + + // Generated earlier to get possible null(s). + + /* + * Equal key series checking. + */ + + if (!haveSaveKey || !saveKeyOutput.arraysEquals(currentKeyOutput)) { + + // New key. + + if (haveSaveKey) { + // Move on with our counts. + switch (saveJoinResult) { + case MATCH: + hashMultiSetResultCount++; + equalKeySeriesCount++; + break; + case SPILL: + hashMultiSetResultCount++; + break; + case NOMATCH: + break; + } + } + + // Regardless of our matching result, we keep that information to make multiple use + // of it for a possible series of equal keys. + haveSaveKey = true; + + /* + * Multi-Key specific save key. + */ + + temp = saveKeyOutput; + saveKeyOutput = currentKeyOutput; + currentKeyOutput = temp; + + /* + * Multi-Key specific lookup key. + */ + + byte[] keyBytes = saveKeyOutput.getData(); + int keyLength = saveKeyOutput.getLength(); + saveJoinResult = + hashMultiSet.contains( + keyBytes, 0, keyLength, + hashMultiSetResults[hashMultiSetResultCount]); + + /* + * Common outer join result processing. + */ + + switch (saveJoinResult) { + case MATCH: + equalKeySeriesValueCounts[equalKeySeriesCount] = hashMultiSetResults[hashMultiSetResultCount].count(); + equalKeySeriesAllMatchIndices[equalKeySeriesCount] = allMatchCount; + equalKeySeriesDuplicateCounts[equalKeySeriesCount] = 1; + allMatchs[allMatchCount++] = batchIndex; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH isSingleValue " + equalKeySeriesIsSingleValue[equalKeySeriesCount] + " currentKey " + currentKey); + break; + + case SPILL: + spills[spillCount] = batchIndex; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; + spillCount++; + break; + + case NOMATCH: + atLeastOneNonMatch = true; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH" + " currentKey " + currentKey); + break; + } + } else { + // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " Key Continues " + saveKey + " " + saveJoinResult.name()); + + // Series of equal keys. + + switch (saveJoinResult) { + case MATCH: + equalKeySeriesDuplicateCounts[equalKeySeriesCount]++; + allMatchs[allMatchCount++] = batchIndex; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH duplicate"); + break; + + case SPILL: + spills[spillCount] = batchIndex; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; + spillCount++; + break; + + case NOMATCH: + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH duplicate"); + break; + } + } + // if (!verifyMonotonicallyIncreasing(allMatchs, allMatchCount)) { + // throw new HiveException("allMatchs is not in sort order and unique"); + // } + } + } + + if (haveSaveKey) { + // Update our counts for the last key. + switch (saveJoinResult) { + case MATCH: + hashMultiSetResultCount++; + equalKeySeriesCount++; + break; + case SPILL: + hashMultiSetResultCount++; + break; + case NOMATCH: + break; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + + " allMatchs " + intArrayToRangesString(allMatchs,allMatchCount) + + " equalKeySeriesValueCounts " + longArrayToRangesString(equalKeySeriesValueCounts, equalKeySeriesCount) + + " equalKeySeriesAllMatchIndices " + intArrayToRangesString(equalKeySeriesAllMatchIndices, equalKeySeriesCount) + + " equalKeySeriesDuplicateCounts " + intArrayToRangesString(equalKeySeriesDuplicateCounts, equalKeySeriesCount) + + " atLeastOneNonMatch " + atLeastOneNonMatch + + " inputSelectedInUse " + inputSelectedInUse + + " inputLogicalSize " + inputLogicalSize + + " spills " + intArrayToRangesString(spills, spillCount) + + " spillHashMultiSetResultIndices " + intArrayToRangesString(spillHashMultiSetResultIndices, spillCount) + + " hashMultiSetResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); + } + + // We will generate results for all matching and non-matching rows. + finishOuterBigOnly(batch, + allMatchCount, equalKeySeriesCount, atLeastOneNonMatch, + inputSelectedInUse, inputLogicalSize, + spillCount, hashMultiSetResultCount); + } + + if (batch.size > 0) { + // Forward any remaining selected rows. + forwardBigTableBatch(batch); + } + + } catch (IOException e) { + throw new HiveException(e); + } catch (Exception e) { + throw new HiveException(e); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyStringOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyStringOperator.java new file mode 100644 index 0000000..925c1f4 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterBigOnlyStringOperator.java @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +import java.io.IOException; +import java.util.Arrays; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; +// Single-Column String hash table import. +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMap; + +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.hashtable.VectorMapJoinBytesHashMultiSet; +// Single-Column String specific imports. +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; + +/* + * Specialized class for doing a vectorized map join that is an outer join on a Single-Column String + * using a hash map. + */ +public class VectorMapJoinOuterBigOnlyStringOperator + extends VectorMapJoinOuterBigOnlyGenerateResultOperator { + + private static final long serialVersionUID = 1L; + + //------------------------------------------------------------------------------------------------ + + private static final String CLASS_NAME = VectorMapJoinOuterBigOnlyStringOperator.class.getName(); + private static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME); + + protected String getLoggingPrefix() { + return super.getLoggingPrefix(CLASS_NAME); + } + + //------------------------------------------------------------------------------------------------ + + // (none) + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + + // The hash map for this specialized class. + private transient VectorMapJoinBytesHashMultiSet hashMultiSet; + + //--------------------------------------------------------------------------- + // Single-Column String specific members. + // + + // The column number for this one column join specialization. + private transient int singleJoinColumn; + + //--------------------------------------------------------------------------- + // Pass-thru constructors. + // + + /** Kryo ctor. */ + protected VectorMapJoinOuterBigOnlyStringOperator() { + super(); + } + + public VectorMapJoinOuterBigOnlyStringOperator(CompilationOpContext ctx) { + super(ctx); + } + + public VectorMapJoinOuterBigOnlyStringOperator(CompilationOpContext ctx, OperatorDesc conf, + VectorizationContext vContext, VectorDesc vectorDesc) throws HiveException { + super(ctx, conf, vContext, vectorDesc); + } + + //--------------------------------------------------------------------------- + // Process Single-Column String Outer Join on a vectorized row batch. + // + + @Override + public void process(Object row, int tag) throws HiveException { + + try { + VectorizedRowBatch batch = (VectorizedRowBatch) row; + + alias = (byte) tag; + + if (needCommonSetup) { + // Our one time process method initialization. + commonSetup(batch); + + /* + * Initialize Single-Column String members for this specialized class. + */ + + singleJoinColumn = bigTableKeyColumnMap[0]; + + needCommonSetup = false; + } + + if (needHashTableSetup) { + // Setup our hash table specialization. It will be the first time the process + // method is called, or after a Hybrid Grace reload. + + /* + * Get our Single-Column String hash multi-set information for this specialized class. + */ + + hashMultiSet = (VectorMapJoinBytesHashMultiSet) vectorMapJoinHashTable; + + needHashTableSetup = false; + } + + batchCounter++; + + final int inputLogicalSize = batch.size; + + if (inputLogicalSize == 0) { + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " empty"); + } + return; + } + + // Do the per-batch setup for an outer join. + + outerPerBatchSetup(batch); + + // For outer join, remember our input rows before ON expression filtering or before + // hash table matching so we can generate results for all rows (matching and non matching) + // later. + boolean inputSelectedInUse = batch.selectedInUse; + if (inputSelectedInUse) { + // if (!verifyMonotonicallyIncreasing(batch.selected, batch.size)) { + // throw new HiveException("batch.selected is not in sort order and unique"); + // } + System.arraycopy(batch.selected, 0, inputSelected, 0, inputLogicalSize); + } + + // Filtering for outer join just removes rows available for hash table matching. + boolean someRowsFilteredOut = false; + if (bigTableFilterExpressions.length > 0) { + // Since the input + for (VectorExpression ve : bigTableFilterExpressions) { + ve.evaluate(batch); + } + someRowsFilteredOut = (batch.size != inputLogicalSize); + if (LOG.isDebugEnabled()) { + if (batch.selectedInUse) { + if (inputSelectedInUse) { + LOG.debug(CLASS_NAME + + " inputSelected " + intArrayToRangesString(inputSelected, inputLogicalSize) + + " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } else { + LOG.debug(CLASS_NAME + + " inputLogicalSize " + inputLogicalSize + + " filtered batch.selected " + intArrayToRangesString(batch.selected, batch.size)); + } + } + } + } + + // Perform any key expressions. Results will go into scratch columns. + if (bigTableKeyExpressions != null) { + for (VectorExpression ve : bigTableKeyExpressions) { + ve.evaluate(batch); + } + } + + /* + * Single-Column String specific declarations. + */ + + // The one join column for this specialized class. + BytesColumnVector joinColVector = (BytesColumnVector) batch.cols[singleJoinColumn]; + byte[][] vector = joinColVector.vector; + int[] start = joinColVector.start; + int[] length = joinColVector.length; + + /* + * Single-Column String check for repeating. + */ + + // Check single column for repeating. + boolean allKeyInputColumnsRepeating = joinColVector.isRepeating; + + if (allKeyInputColumnsRepeating) { + + /* + * Repeating. + */ + + // All key input columns are repeating. Generate key once. Lookup once. + // Since the key is repeated, we must use entry 0 regardless of selectedInUse. + + /* + * Single-Column String specific repeated lookup. + */ + + JoinUtil.JoinResult joinResult; + if (batch.size == 0) { + // Whole repeated key batch was filtered out. + joinResult = JoinUtil.JoinResult.NOMATCH; + } else if (!joinColVector.noNulls && joinColVector.isNull[0]) { + // Any (repeated) null key column is no match for whole batch. + joinResult = JoinUtil.JoinResult.NOMATCH; + } else { + // Handle *repeated* join key, if found. + byte[] keyBytes = vector[0]; + int keyStart = start[0]; + int keyLength = length[0]; + joinResult = hashMultiSet.contains(keyBytes, keyStart, keyLength, hashMultiSetResults[0]); + } + + /* + * Common repeated join result processing. + */ + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " repeated joinResult " + joinResult.name()); + } + finishOuterBigOnlyRepeated(batch, joinResult, hashMultiSetResults[0], someRowsFilteredOut, + inputSelectedInUse, inputLogicalSize); + } else { + + /* + * NOT Repeating. + */ + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + " non-repeated"); + } + + int selected[] = batch.selected; + boolean selectedInUse = batch.selectedInUse; + + int hashMultiSetResultCount = 0; + int allMatchCount = 0; + int equalKeySeriesCount = 0; + int spillCount = 0; + + boolean atLeastOneNonMatch = someRowsFilteredOut; + + /* + * Single-Column String specific variables. + */ + + int saveKeyBatchIndex = -1; + + // We optimize performance by only looking up the first key in a series of equal keys. + boolean haveSaveKey = false; + JoinUtil.JoinResult saveJoinResult = JoinUtil.JoinResult.NOMATCH; + + // Logical loop over the rows in the batch since the batch may have selected in use. + for (int logical = 0; logical < batch.size; logical++) { + int batchIndex = (selectedInUse ? selected[logical] : logical); + + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, taskName + ", " + getOperatorId() + " candidate " + CLASS_NAME + " batch"); + + /* + * Single-Column String outer null detection. + */ + + boolean isNull = !joinColVector.noNulls && joinColVector.isNull[batchIndex]; + + if (isNull) { + + // Have that the NULL does not interfere with the current equal key series, if there + // is one. We do not set saveJoinResult. + // + // Let a current MATCH equal key series keep going, or + // Let a current SPILL equal key series keep going, or + // Let a current NOMATCH keep not matching. + + atLeastOneNonMatch = true; + + // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " NULL"); + } else { + + /* + * Single-Column String outer get key. + */ + + // Implicit -- use batchIndex. + + /* + * Equal key series checking. + */ + + if (!haveSaveKey || + StringExpr.equal(vector[saveKeyBatchIndex], start[saveKeyBatchIndex], length[saveKeyBatchIndex], + vector[batchIndex], start[batchIndex], length[batchIndex]) == false) { + // New key. + + if (haveSaveKey) { + // Move on with our counts. + switch (saveJoinResult) { + case MATCH: + hashMultiSetResultCount++; + equalKeySeriesCount++; + break; + case SPILL: + hashMultiSetResultCount++; + break; + case NOMATCH: + break; + } + } + + // Regardless of our matching result, we keep that information to make multiple use + // of it for a possible series of equal keys. + haveSaveKey = true; + + /* + * Single-Column String specific save key. + */ + + saveKeyBatchIndex = batchIndex; + + /* + * Single-Column Long specific lookup key. + */ + + byte[] keyBytes = vector[batchIndex]; + int keyStart = start[batchIndex]; + int keyLength = length[batchIndex]; + saveJoinResult = + hashMultiSet.contains( + keyBytes, keyStart, keyLength, + hashMultiSetResults[hashMultiSetResultCount]); + + /* + * Common outer join result processing. + */ + + switch (saveJoinResult) { + case MATCH: + equalKeySeriesValueCounts[equalKeySeriesCount] = hashMultiSetResults[hashMultiSetResultCount].count(); + equalKeySeriesAllMatchIndices[equalKeySeriesCount] = allMatchCount; + equalKeySeriesDuplicateCounts[equalKeySeriesCount] = 1; + allMatchs[allMatchCount++] = batchIndex; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH isSingleValue " + equalKeySeriesIsSingleValue[equalKeySeriesCount] + " currentKey " + currentKey); + break; + + case SPILL: + spills[spillCount] = batchIndex; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; + spillCount++; + break; + + case NOMATCH: + atLeastOneNonMatch = true; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH" + " currentKey " + currentKey); + break; + } + } else { + // LOG.debug(CLASS_NAME + " logical " + logical + " batchIndex " + batchIndex + " Key Continues " + saveKey + " " + saveJoinResult.name()); + + // Series of equal keys. + + switch (saveJoinResult) { + case MATCH: + equalKeySeriesDuplicateCounts[equalKeySeriesCount]++; + allMatchs[allMatchCount++] = batchIndex; + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " MATCH duplicate"); + break; + + case SPILL: + spills[spillCount] = batchIndex; + spillHashMultiSetResultIndices[spillCount] = hashMultiSetResultCount; + spillCount++; + break; + + case NOMATCH: + // VectorizedBatchUtil.debugDisplayOneRow(batch, batchIndex, CLASS_NAME + " NOMATCH duplicate"); + break; + } + } + // if (!verifyMonotonicallyIncreasing(allMatchs, allMatchCount)) { + // throw new HiveException("allMatchs is not in sort order and unique"); + // } + } + } + + if (haveSaveKey) { + // Update our counts for the last key. + switch (saveJoinResult) { + case MATCH: + hashMultiSetResultCount++; + equalKeySeriesCount++; + break; + case SPILL: + hashMultiSetResultCount++; + break; + case NOMATCH: + break; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug(CLASS_NAME + " batch #" + batchCounter + + " allMatchs " + intArrayToRangesString(allMatchs,allMatchCount) + + " equalKeySeriesValueCounts " + longArrayToRangesString(equalKeySeriesValueCounts, equalKeySeriesCount) + + " equalKeySeriesAllMatchIndices " + intArrayToRangesString(equalKeySeriesAllMatchIndices, equalKeySeriesCount) + + " equalKeySeriesDuplicateCounts " + intArrayToRangesString(equalKeySeriesDuplicateCounts, equalKeySeriesCount) + + " atLeastOneNonMatch " + atLeastOneNonMatch + + " inputSelectedInUse " + inputSelectedInUse + + " inputLogicalSize " + inputLogicalSize + + " spills " + intArrayToRangesString(spills, spillCount) + + " spillHashMultiSetResultIndices " + intArrayToRangesString(spillHashMultiSetResultIndices, spillCount) + + " hashMultiSetResults " + Arrays.toString(Arrays.copyOfRange(hashMultiSetResults, 0, hashMultiSetResultCount))); + } + + // We will generate results for all matching and non-matching rows. + finishOuterBigOnly(batch, + allMatchCount, equalKeySeriesCount, atLeastOneNonMatch, + inputSelectedInUse, inputLogicalSize, + spillCount, hashMultiSetResultCount); + } + + if (batch.size > 0) { + // Forward any remaining selected rows. + forwardBigTableBatch(batch); + } + + } catch (IOException e) { + throw new HiveException(e); + } catch (Exception e) { + throw new HiveException(e); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java index 2e5c568..c73ea8a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinOuterGenerateResultOperator.java @@ -186,226 +186,6 @@ protected void outerPerBatchSetup(VectorizedRowBatch batch) { } /** - * Apply the value expression to rows in the (original) input selected array. - * - * @param batch - * The vectorized row batch. - * @param inputSelectedInUse - * Whether the (original) input batch is selectedInUse. - * @param inputLogicalSize - * The (original) input batch size. - */ - private void doValueExprOnInputSelected(VectorizedRowBatch batch, - boolean inputSelectedInUse, int inputLogicalSize) throws HiveException { - - int saveBatchSize = batch.size; - int[] saveSelected = batch.selected; - boolean saveSelectedInUse = batch.selectedInUse; - - batch.size = inputLogicalSize; - batch.selected = inputSelected; - batch.selectedInUse = inputSelectedInUse; - - if (bigTableValueExpressions != null) { - for(VectorExpression ve: bigTableValueExpressions) { - ve.evaluate(batch); - } - } - - batch.size = saveBatchSize; - batch.selected = saveSelected; - batch.selectedInUse = saveSelectedInUse; - } - - /** - * Apply the value expression to rows specified by a selected array. - * - * @param batch - * The vectorized row batch. - * @param selected - * The (physical) batch indices to apply the expression to. - * @param size - * The size of selected. - */ - private void doValueExpr(VectorizedRowBatch batch, - int[] selected, int size) throws HiveException { - - int saveBatchSize = batch.size; - int[] saveSelected = batch.selected; - boolean saveSelectedInUse = batch.selectedInUse; - - batch.size = size; - batch.selected = selected; - batch.selectedInUse = true; - - if (bigTableValueExpressions != null) { - for(VectorExpression ve: bigTableValueExpressions) { - ve.evaluate(batch); - } - } - - batch.size = saveBatchSize; - batch.selected = saveSelected; - batch.selectedInUse = saveSelectedInUse; - } - - /** - * Remove (subtract) members from the input selected array and produce the results into - * a difference array. - * - * @param inputSelectedInUse - * Whether the (original) input batch is selectedInUse. - * @param inputLogicalSize - * The (original) input batch size. - * @param remove - * The indices to remove. They must all be present in input selected array. - * @param removeSize - * The size of remove. - * @param difference - * The resulting difference -- the input selected array indices not in the - * remove array. - * @return - * The resulting size of the difference array. - * @throws HiveException - */ - private int subtractFromInputSelected(boolean inputSelectedInUse, int inputLogicalSize, - int[] remove, int removeSize, int[] difference) throws HiveException { - - // if (!verifyMonotonicallyIncreasing(remove, removeSize)) { - // throw new HiveException("remove is not in sort order and unique"); - // } - - int differenceCount = 0; - - // Determine which rows are left. - int removeIndex = 0; - if (inputSelectedInUse) { - for (int i = 0; i < inputLogicalSize; i++) { - int candidateIndex = inputSelected[i]; - if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) { - removeIndex++; - } else { - difference[differenceCount++] = candidateIndex; - } - } - } else { - for (int candidateIndex = 0; candidateIndex < inputLogicalSize; candidateIndex++) { - if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) { - removeIndex++; - } else { - difference[differenceCount++] = candidateIndex; - } - } - } - - if (removeIndex != removeSize) { - throw new HiveException("Not all batch indices removed"); - } - - // if (!verifyMonotonicallyIncreasing(difference, differenceCount)) { - // throw new HiveException("difference is not in sort order and unique"); - // } - - return differenceCount; - } - - /** - * Remove (subtract) members from an array and produce the results into - * a difference array. - - * @param all - * The selected array containing all members. - * @param allSize - * The size of all. - * @param remove - * The indices to remove. They must all be present in input selected array. - * @param removeSize - * The size of remove. - * @param difference - * The resulting difference -- the all array indices not in the - * remove array. - * @return - * The resulting size of the difference array. - * @throws HiveException - */ - private int subtract(int[] all, int allSize, - int[] remove, int removeSize, int[] difference) throws HiveException { - - // if (!verifyMonotonicallyIncreasing(remove, removeSize)) { - // throw new HiveException("remove is not in sort order and unique"); - // } - - int differenceCount = 0; - - // Determine which rows are left. - int removeIndex = 0; - for (int i = 0; i < allSize; i++) { - int candidateIndex = all[i]; - if (removeIndex < removeSize && candidateIndex == remove[removeIndex]) { - removeIndex++; - } else { - difference[differenceCount++] = candidateIndex; - } - } - - if (removeIndex != removeSize) { - throw new HiveException("Not all batch indices removed"); - } - - return differenceCount; - } - - /** - * Sort merge two select arrays so the resulting array is ordered by (batch) index. - * - * @param selected1 - * @param selected1Count - * @param selected2 - * @param selected2Count - * @param sortMerged - * The resulting sort merge of selected1 and selected2. - * @return - * The resulting size of the sortMerged array. - * @throws HiveException - */ - private int sortMerge(int[] selected1, int selected1Count, - int[] selected2, int selected2Count, int[] sortMerged) throws HiveException { - - // if (!verifyMonotonicallyIncreasing(selected1, selected1Count)) { - // throw new HiveException("selected1 is not in sort order and unique"); - // } - - // if (!verifyMonotonicallyIncreasing(selected2, selected2Count)) { - // throw new HiveException("selected1 is not in sort order and unique"); - // } - - - int sortMergeCount = 0; - - int selected1Index = 0; - int selected2Index = 0; - for (int i = 0; i < selected1Count + selected2Count; i++) { - if (selected1Index < selected1Count && selected2Index < selected2Count) { - if (selected1[selected1Index] < selected2[selected2Index]) { - sortMerged[sortMergeCount++] = selected1[selected1Index++]; - } else { - sortMerged[sortMergeCount++] = selected2[selected2Index++]; - } - } else if (selected1Index < selected1Count) { - sortMerged[sortMergeCount++] = selected1[selected1Index++]; - } else { - sortMerged[sortMergeCount++] = selected2[selected2Index++]; - } - } - - // if (!verifyMonotonicallyIncreasing(sortMerged, sortMergeCount)) { - // throw new HiveException("sortMerged is not in sort order and unique"); - // } - - return sortMergeCount; - } - - /** * Generate the outer join output results for one vectorized row batch. * * @param batch @@ -433,7 +213,7 @@ public void finishOuter(VectorizedRowBatch batch, // Get rid of spills before we start modifying the batch. if (spillCount > 0) { - spillHashMapBatch(batch, (VectorMapJoinHashTableResult[]) hashMapResults, + spillHashTableBatch(batch, (VectorMapJoinHashTableResult[]) hashMapResults, spills, spillHashMapResultIndices, spillCount); } @@ -442,7 +222,8 @@ public void finishOuter(VectorizedRowBatch batch, // Subtract the spills to get all match and non-match rows. int nonSpillCount = subtractFromInputSelected( - inputSelectedInUse, inputLogicalSize, spills, spillCount, nonSpills); + inputSelectedInUse, inputLogicalSize, inputSelected, + spills, spillCount, nonSpills); if (LOG.isDebugEnabled()) { LOG.debug("finishOuter spillCount > 0" + @@ -469,11 +250,12 @@ public void finishOuter(VectorizedRowBatch batch, } else { // Run value expressions over original (whole) input batch. - doValueExprOnInputSelected(batch, inputSelectedInUse, inputLogicalSize); + doValueExprOnInputSelected(batch, inputSelectedInUse, inputLogicalSize, inputSelected); if (atLeastOneNonMatch) { noMatchCount = subtractFromInputSelected( - inputSelectedInUse, inputLogicalSize, allMatchs, allMatchCount, noMatchs); + inputSelectedInUse, inputLogicalSize, inputSelected, + allMatchs, allMatchCount, noMatchs); if (LOG.isDebugEnabled()) { LOG.debug("finishOuter spillCount == 0" + @@ -553,44 +335,6 @@ public void finishOuter(VectorizedRowBatch batch, } } - /** - * Generate the non matching outer join output results for one vectorized row batch. - * - * For each non matching row specified by parameter, generate nulls for the small table results. - * - * @param batch - * The big table batch with any matching and any non matching rows both as - * selected in use. - * @param noMatchs - * A subset of the rows of the batch that are non matches. - * @param noMatchSize - * Number of non matches in noMatchs. - */ - protected void generateOuterNulls(VectorizedRowBatch batch, int[] noMatchs, - int noMatchSize) throws IOException, HiveException { - - // Set null information in the small table results area. - - for (int i = 0; i < noMatchSize; i++) { - int batchIndex = noMatchs[i]; - - // Mark any scratch small table scratch columns that would normally receive a copy of the - // key as null, too. - for (int column : bigTableOuterKeyOutputVectorColumns) { - ColumnVector colVector = batch.cols[column]; - colVector.noNulls = false; - colVector.isNull[batchIndex] = true; - } - - // Small table values are set to null. - for (int column : smallTableOutputVectorColumns) { - ColumnVector colVector = batch.cols[column]; - colVector.noNulls = false; - colVector.isNull[batchIndex] = true; - } - } - } - /** * Generate the outer join output results for one vectorized row batch with a repeated key. * @@ -601,8 +345,8 @@ protected void generateOuterNulls(VectorizedRowBatch batch, int[] noMatchs, * selected in use. * @param joinResult * The hash map lookup result for the repeated key. - * @param hashMapResults - * The array of all hash map results for the batch. + * @param hashMapResult + * The repeated hash map result for the batch. * @param someRowsFilteredOut * Whether some rows of the repeated key batch were knocked out by the filter. * @param inputSelectedInUse @@ -633,7 +377,7 @@ public void finishOuterRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult jo // we need to generate no match results for those too... // Run value expressions over original (whole) input batch. - doValueExprOnInputSelected(batch, inputSelectedInUse, inputLogicalSize); + doValueExprOnInputSelected(batch, inputSelectedInUse, inputLogicalSize, inputSelected); // Now calculate which rows were filtered out (they are logically no matches). @@ -641,7 +385,8 @@ public void finishOuterRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult jo // (current) batch selected. int noMatchCount = subtractFromInputSelected( - inputSelectedInUse, inputLogicalSize, batch.selected, batch.size, noMatchs); + inputSelectedInUse, inputLogicalSize, inputSelected, + batch.selected, batch.size, noMatchs); generateOuterNulls(batch, noMatchs, noMatchCount); @@ -685,7 +430,8 @@ public void finishOuterRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult jo // (current) batch selected. int noMatchCount = subtractFromInputSelected( - inputSelectedInUse, inputLogicalSize, batch.selected, batch.size, noMatchs); + inputSelectedInUse, inputLogicalSize, inputSelected, + batch.selected, batch.size, noMatchs); System.arraycopy(noMatchs, 0, batch.selected, 0, noMatchCount); batch.size = noMatchCount; @@ -722,32 +468,4 @@ public void finishOuterRepeated(VectorizedRowBatch batch, JoinUtil.JoinResult jo break; } } - - /** - * Generate the non-match outer join output results for the whole repeating vectorized - * row batch. - * - * Each row will get nulls for all small table values. - * - * @param batch - * The big table batch. - */ - protected void generateOuterNullsRepeatedAll(VectorizedRowBatch batch) throws HiveException { - - for (int column : smallTableOutputVectorColumns) { - ColumnVector colVector = batch.cols[column]; - colVector.noNulls = false; - colVector.isNull[0] = true; - colVector.isRepeating = true; - } - - // Mark any scratch small table scratch columns that would normally receive a copy of the key - // as null, too. - for (int column : bigTableOuterKeyOutputVectorColumns) { - ColumnVector colVector = batch.cols[column]; - colVector.noNulls = false; - colVector.isNull[0] = true; - colVector.isRepeating = true; - } - } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index a6a3417..de760f5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -73,6 +73,9 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterLongOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterMultiKeyOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOperator; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterBigOnlyLongOperator; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterBigOnlyMultiKeyOperator; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterBigOnlyStringOperator; import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFOperator; import org.apache.hadoop.hive.ql.exec.vector.udf.VectorUDFAdaptor; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; @@ -3251,6 +3254,13 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { isInnerBigOnly = true; } + boolean isOuterBigOnly = false; + if ((joinType == JoinDesc.LEFT_OUTER_JOIN || + joinType == JoinDesc.RIGHT_OUTER_JOIN) && + isBigTableOnlyResults(desc)) { + isOuterBigOnly = true; + } + // By default, we can always use the multi-key class. hashTableKeyType = HashTableKeyType.MULTI_KEY; @@ -3303,8 +3313,13 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { break; case JoinDesc.LEFT_OUTER_JOIN: case JoinDesc.RIGHT_OUTER_JOIN: - vectorMapJoinVariation = VectorMapJoinVariation.OUTER; - hashTableKind = HashTableKind.HASH_MAP; + if (!isOuterBigOnly) { + vectorMapJoinVariation = VectorMapJoinVariation.OUTER; + hashTableKind = HashTableKind.HASH_MAP; + } else { + vectorMapJoinVariation = VectorMapJoinVariation.OUTER_BIG_ONLY; + hashTableKind = HashTableKind.HASH_MULTISET; + } break; case JoinDesc.LEFT_SEMI_JOIN: vectorMapJoinVariation = VectorMapJoinVariation.LEFT_SEMI; @@ -3335,6 +3350,9 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { case OUTER: opClass = VectorMapJoinOuterLongOperator.class; break; + case OUTER_BIG_ONLY: + opClass = VectorMapJoinOuterBigOnlyLongOperator.class; + break; default: throw new HiveException("Unknown operator variation " + vectorMapJoinVariation); } @@ -3353,6 +3371,9 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { case OUTER: opClass = VectorMapJoinOuterStringOperator.class; break; + case OUTER_BIG_ONLY: + opClass = VectorMapJoinOuterBigOnlyStringOperator.class; + break; default: throw new HiveException("Unknown operator variation " + vectorMapJoinVariation); } @@ -3371,6 +3392,9 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { case OUTER: opClass = VectorMapJoinOuterMultiKeyOperator.class; break; + case OUTER_BIG_ONLY: + opClass = VectorMapJoinOuterBigOnlyMultiKeyOperator.class; + break; default: throw new HiveException("Unknown operator variation " + vectorMapJoinVariation); } diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/VectorMapJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/VectorMapJoinDesc.java index 58032ca..4f404d9 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/VectorMapJoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/VectorMapJoinDesc.java @@ -89,6 +89,7 @@ public PrimitiveTypeInfo getPrimitiveTypeInfo() { INNER_BIG_ONLY, INNER, LEFT_SEMI, + OUTER_BIG_ONLY, OUTER } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java index 0514e3f..aa5ed09 100644 --- ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/MapJoinTestConfig.java @@ -306,6 +306,11 @@ public static VectorMapJoinCommonOperator createNativeVectorMapJoinOperator( new VectorMapJoinOuterLongOperator(new CompilationOpContext(), mapJoinDesc, vContext, vectorDesc); break; + case OUTER_BIG_ONLY: + operator = + new VectorMapJoinOuterBigOnlyLongOperator(new CompilationOpContext(), + mapJoinDesc, vContext, vectorDesc); + break; default: throw new RuntimeException("unknown operator variation " + VectorMapJoinVariation); } @@ -332,6 +337,11 @@ public static VectorMapJoinCommonOperator createNativeVectorMapJoinOperator( new VectorMapJoinOuterStringOperator(new CompilationOpContext(), mapJoinDesc, vContext, vectorDesc); break; + case OUTER_BIG_ONLY: + operator = + new VectorMapJoinOuterBigOnlyStringOperator(new CompilationOpContext(), + mapJoinDesc, vContext, vectorDesc); + break; default: throw new RuntimeException("unknown operator variation " + VectorMapJoinVariation); } @@ -358,6 +368,11 @@ public static VectorMapJoinCommonOperator createNativeVectorMapJoinOperator( new VectorMapJoinOuterMultiKeyOperator(new CompilationOpContext(), mapJoinDesc, vContext, vectorDesc); break; + case OUTER_BIG_ONLY: + operator = + new VectorMapJoinOuterBigOnlyMultiKeyOperator(new CompilationOpContext(), + mapJoinDesc, vContext, vectorDesc); + break; default: throw new RuntimeException("unknown operator variation " + VectorMapJoinVariation); } diff --git ql/src/test/results/clientpositive/llap/vector_join30.q.out ql/src/test/results/clientpositive/llap/vector_join30.q.out index 4b2f06f..f69200a 100644 --- ql/src/test/results/clientpositive/llap/vector_join30.q.out +++ ql/src/test/results/clientpositive/llap/vector_join30.q.out @@ -487,7 +487,7 @@ STAGE PLANS: 0 _col0 (type: string) 1 _col0 (type: string) Map Join Vectorization: - className: VectorMapJoinOuterStringOperator + className: VectorMapJoinOuterBigOnlyStringOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true outputColumnNames: _col2, _col3 diff --git ql/src/test/results/clientpositive/llap/vector_leftsemi_mapjoin.q.out ql/src/test/results/clientpositive/llap/vector_leftsemi_mapjoin.q.out index 9c51b32..9b0b605 100644 --- ql/src/test/results/clientpositive/llap/vector_leftsemi_mapjoin.q.out +++ ql/src/test/results/clientpositive/llap/vector_leftsemi_mapjoin.q.out @@ -14655,7 +14655,7 @@ STAGE PLANS: bigTableKeyColumnNums: [1] bigTableRetainedColumnNums: [0] bigTableValueColumnNums: [0] - className: VectorMapJoinOuterStringOperator + className: VectorMapJoinOuterBigOnlyStringOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [0] @@ -19220,7 +19220,7 @@ STAGE PLANS: bigTableKeyColumnNums: [1] bigTableRetainedColumnNums: [0] bigTableValueColumnNums: [0] - className: VectorMapJoinOuterStringOperator + className: VectorMapJoinOuterBigOnlyStringOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [0] diff --git ql/src/test/results/clientpositive/llap/vector_outer_join1.q.out ql/src/test/results/clientpositive/llap/vector_outer_join1.q.out index c74a588..f3988ab 100644 --- ql/src/test/results/clientpositive/llap/vector_outer_join1.q.out +++ ql/src/test/results/clientpositive/llap/vector_outer_join1.q.out @@ -446,7 +446,7 @@ STAGE PLANS: bigTableKeyColumnNums: [0] bigTableRetainedColumnNums: [0] bigTableValueColumnNums: [0] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [0] @@ -712,7 +712,7 @@ STAGE PLANS: bigTableKeyColumnNums: [2] bigTableRetainedColumnNums: [0] bigTableValueColumnNums: [0] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [0] @@ -730,7 +730,7 @@ STAGE PLANS: bigTableKeyColumnNums: [0] bigTableRetainedColumnNums: [0] bigTableValueColumnNums: [0] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [0] diff --git ql/src/test/results/clientpositive/llap/vector_outer_join2.q.out ql/src/test/results/clientpositive/llap/vector_outer_join2.q.out index 2e90aae..e59be5e 100644 --- ql/src/test/results/clientpositive/llap/vector_outer_join2.q.out +++ ql/src/test/results/clientpositive/llap/vector_outer_join2.q.out @@ -287,7 +287,7 @@ STAGE PLANS: bigTableKeyColumnNums: [2] bigTableRetainedColumnNums: [3] bigTableValueColumnNums: [3] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [3] @@ -305,7 +305,7 @@ STAGE PLANS: bigTableKeyColumnNums: [3] bigTableRetainedColumnNums: [3] bigTableValueColumnNums: [3] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine tez IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [3] diff --git ql/src/test/results/clientpositive/spark/vector_outer_join1.q.out ql/src/test/results/clientpositive/spark/vector_outer_join1.q.out index ecac4da..0267807 100644 --- ql/src/test/results/clientpositive/spark/vector_outer_join1.q.out +++ ql/src/test/results/clientpositive/spark/vector_outer_join1.q.out @@ -89,11 +89,11 @@ POSTHOOK: Lineage: small_alltypesorc4a.ctinyint SIMPLE [] PREHOOK: query: select * from small_alltypesorc1a PREHOOK: type: QUERY PREHOOK: Input: default@small_alltypesorc1a -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: select * from small_alltypesorc1a POSTHOOK: type: QUERY POSTHOOK: Input: default@small_alltypesorc1a -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### -64 -10462 626923679 NULL -64.0 -10462.0 821UdmGbkEf4j NULL 1969-12-31 16:00:02.496 1969-12-31 16:00:00.164 true NULL -64 -15920 528534767 NULL -64.0 -15920.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:51.859 1969-12-31 16:00:14.468 true NULL -64 -6907 253665376 NULL -64.0 -6907.0 1cGVWH7n1QU NULL NULL 1969-12-31 15:59:53.66 true NULL @@ -102,11 +102,11 @@ POSTHOOK: Output: hdfs://### HDFS PATH ### PREHOOK: query: select * from small_alltypesorc2a PREHOOK: type: QUERY PREHOOK: Input: default@small_alltypesorc2a -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: select * from small_alltypesorc2a POSTHOOK: type: QUERY POSTHOOK: Input: default@small_alltypesorc2a -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### -64 -7196 NULL -1615920595 -64.0 -7196.0 NULL X5rDjl 1969-12-31 16:00:11.912 1969-12-31 15:59:58.174 NULL false -64 -7196 NULL -1639157869 -64.0 -7196.0 NULL IJ0Oj7qAiqNGsN7gn 1969-12-31 16:00:01.785 1969-12-31 15:59:58.174 NULL false -64 -7196 NULL -527203677 -64.0 -7196.0 NULL JBE4H5RoK412Cs260I72 1969-12-31 15:59:50.184 1969-12-31 15:59:58.174 NULL true @@ -115,11 +115,11 @@ POSTHOOK: Output: hdfs://### HDFS PATH ### PREHOOK: query: select * from small_alltypesorc3a PREHOOK: type: QUERY PREHOOK: Input: default@small_alltypesorc3a -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: select * from small_alltypesorc3a POSTHOOK: type: QUERY POSTHOOK: Input: default@small_alltypesorc3a -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### NULL NULL -1015272448 -1887561756 NULL NULL jTQ68531mP 4hA4KQj2vD3fI6gX82220d NULL 1969-12-31 15:59:45.854 false false NULL NULL -850295959 -1887561756 NULL NULL WMIgGA73 4hA4KQj2vD3fI6gX82220d NULL 1969-12-31 16:00:00.348 false false NULL NULL -886426182 -1887561756 NULL NULL 0i88xYq3gx1nW4vKjp7vBp3 4hA4KQj2vD3fI6gX82220d NULL 1969-12-31 16:00:04.472 true false @@ -128,11 +128,11 @@ NULL NULL -971543377 -1645852809 NULL NULL uN803aW xH7445Rals48VOulSyR5F NULL 19 PREHOOK: query: select * from small_alltypesorc4a PREHOOK: type: QUERY PREHOOK: Input: default@small_alltypesorc4a -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: select * from small_alltypesorc4a POSTHOOK: type: QUERY POSTHOOK: Input: default@small_alltypesorc4a -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### PREHOOK: query: create table small_alltypesorc_a stored as orc as select * from (select * from (select * from small_alltypesorc1a) sq1 union all @@ -187,20 +187,20 @@ PREHOOK: query: ANALYZE TABLE small_alltypesorc_a COMPUTE STATISTICS FOR COLUMNS PREHOOK: type: ANALYZE_TABLE PREHOOK: Input: default@small_alltypesorc_a PREHOOK: Output: default@small_alltypesorc_a -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: ANALYZE TABLE small_alltypesorc_a COMPUTE STATISTICS FOR COLUMNS POSTHOOK: type: ANALYZE_TABLE POSTHOOK: Input: default@small_alltypesorc_a POSTHOOK: Output: default@small_alltypesorc_a -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### PREHOOK: query: select * from small_alltypesorc_a PREHOOK: type: QUERY PREHOOK: Input: default@small_alltypesorc_a -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: select * from small_alltypesorc_a POSTHOOK: type: QUERY POSTHOOK: Input: default@small_alltypesorc_a -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### -64 -10462 626923679 NULL -64.0 -10462.0 821UdmGbkEf4j NULL 1969-12-31 16:00:02.496 1969-12-31 16:00:00.164 true NULL -64 -15920 528534767 NULL -64.0 -15920.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:51.859 1969-12-31 16:00:14.468 true NULL -64 -6907 253665376 NULL -64.0 -6907.0 1cGVWH7n1QU NULL NULL 1969-12-31 15:59:53.66 true NULL @@ -365,14 +365,14 @@ left outer join small_alltypesorc_a cd on cd.cint = c.cint PREHOOK: type: QUERY PREHOOK: Input: default@small_alltypesorc_a -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: select * from small_alltypesorc_a c left outer join small_alltypesorc_a cd on cd.cint = c.cint POSTHOOK: type: QUERY POSTHOOK: Input: default@small_alltypesorc_a -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### -64 -10462 626923679 NULL -64.0 -10462.0 821UdmGbkEf4j NULL 1969-12-31 16:00:02.496 1969-12-31 16:00:00.164 true NULL -64 -10462 626923679 NULL -64.0 -10462.0 821UdmGbkEf4j NULL 1969-12-31 16:00:02.496 1969-12-31 16:00:00.164 true NULL -64 -15920 528534767 NULL -64.0 -15920.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:51.859 1969-12-31 16:00:14.468 true NULL -64 -15920 528534767 NULL -64.0 -15920.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:51.859 1969-12-31 16:00:14.468 true NULL -64 -15920 528534767 NULL -64.0 -15920.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:51.859 1969-12-31 16:00:14.468 true NULL -64 -8080 528534767 NULL -64.0 -8080.0 cvLH6Eat2yFsyy7p NULL 1969-12-31 15:59:58.044 1969-12-31 15:59:48.655 true NULL @@ -490,7 +490,7 @@ STAGE PLANS: bigTableKeyColumnNums: [0] bigTableRetainedColumnNums: [0] bigTableValueColumnNums: [0] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [0] @@ -539,14 +539,14 @@ left outer join small_alltypesorc_a hd on hd.ctinyint = c.ctinyint PREHOOK: type: QUERY PREHOOK: Input: default@small_alltypesorc_a -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: select c.ctinyint from small_alltypesorc_a c left outer join small_alltypesorc_a hd on hd.ctinyint = c.ctinyint POSTHOOK: type: QUERY POSTHOOK: Input: default@small_alltypesorc_a -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### -64 -64 -64 @@ -799,7 +799,7 @@ STAGE PLANS: bigTableKeyColumnNums: [2] bigTableRetainedColumnNums: [0] bigTableValueColumnNums: [0] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [0] @@ -817,7 +817,7 @@ STAGE PLANS: bigTableKeyColumnNums: [0] bigTableRetainedColumnNums: [0] bigTableValueColumnNums: [0] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [0] @@ -919,7 +919,7 @@ left outer join small_alltypesorc_a hd ) t1 PREHOOK: type: QUERY PREHOOK: Input: default@small_alltypesorc_a -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: select count(*), sum(t1.c_ctinyint) from (select c.ctinyint as c_ctinyint from small_alltypesorc_a c left outer join small_alltypesorc_a cd @@ -929,5 +929,5 @@ left outer join small_alltypesorc_a hd ) t1 POSTHOOK: type: QUERY POSTHOOK: Input: default@small_alltypesorc_a -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### 145 -8960 diff --git ql/src/test/results/clientpositive/spark/vector_outer_join2.q.out ql/src/test/results/clientpositive/spark/vector_outer_join2.q.out index 92ad63e..7e31df8 100644 --- ql/src/test/results/clientpositive/spark/vector_outer_join2.q.out +++ ql/src/test/results/clientpositive/spark/vector_outer_join2.q.out @@ -89,11 +89,11 @@ POSTHOOK: Lineage: small_alltypesorc4a_n0.ctinyint SIMPLE [(alltypesorc)alltypes PREHOOK: query: select * from small_alltypesorc1a_n0 PREHOOK: type: QUERY PREHOOK: Input: default@small_alltypesorc1a_n0 -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: select * from small_alltypesorc1a_n0 POSTHOOK: type: QUERY POSTHOOK: Input: default@small_alltypesorc1a_n0 -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### NULL NULL -1015272448 -1887561756 NULL NULL jTQ68531mP 4hA4KQj2vD3fI6gX82220d NULL 1969-12-31 15:59:45.854 false false NULL NULL -850295959 -1887561756 NULL NULL WMIgGA73 4hA4KQj2vD3fI6gX82220d NULL 1969-12-31 16:00:00.348 false false NULL NULL -886426182 -1887561756 NULL NULL 0i88xYq3gx1nW4vKjp7vBp3 4hA4KQj2vD3fI6gX82220d NULL 1969-12-31 16:00:04.472 true false @@ -102,11 +102,11 @@ NULL NULL -971543377 -1645852809 NULL NULL uN803aW xH7445Rals48VOulSyR5F NULL 19 PREHOOK: query: select * from small_alltypesorc2a_n0 PREHOOK: type: QUERY PREHOOK: Input: default@small_alltypesorc2a_n0 -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: select * from small_alltypesorc2a_n0 POSTHOOK: type: QUERY POSTHOOK: Input: default@small_alltypesorc2a_n0 -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### -64 -7196 NULL -1615920595 -64.0 -7196.0 NULL X5rDjl 1969-12-31 16:00:11.912 1969-12-31 15:59:58.174 NULL false -64 -7196 NULL -1639157869 -64.0 -7196.0 NULL IJ0Oj7qAiqNGsN7gn 1969-12-31 16:00:01.785 1969-12-31 15:59:58.174 NULL false -64 -7196 NULL -527203677 -64.0 -7196.0 NULL JBE4H5RoK412Cs260I72 1969-12-31 15:59:50.184 1969-12-31 15:59:58.174 NULL true @@ -115,11 +115,11 @@ POSTHOOK: Output: hdfs://### HDFS PATH ### PREHOOK: query: select * from small_alltypesorc3a_n0 PREHOOK: type: QUERY PREHOOK: Input: default@small_alltypesorc3a_n0 -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: select * from small_alltypesorc3a_n0 POSTHOOK: type: QUERY POSTHOOK: Input: default@small_alltypesorc3a_n0 -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### NULL -13166 626923679 NULL NULL -13166.0 821UdmGbkEf4j NULL 1969-12-31 15:59:55.089 1969-12-31 16:00:15.69 true NULL NULL -14426 626923679 NULL NULL -14426.0 821UdmGbkEf4j NULL 1969-12-31 16:00:11.505 1969-12-31 16:00:13.309 true NULL NULL -14847 626923679 NULL NULL -14847.0 821UdmGbkEf4j NULL 1969-12-31 16:00:00.612 1969-12-31 15:59:43.704 true NULL @@ -128,11 +128,11 @@ NULL -15830 253665376 NULL NULL -15830.0 1cGVWH7n1QU NULL 1969-12-31 16:00:02.58 PREHOOK: query: select * from small_alltypesorc4a_n0 PREHOOK: type: QUERY PREHOOK: Input: default@small_alltypesorc4a_n0 -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: select * from small_alltypesorc4a_n0 POSTHOOK: type: QUERY POSTHOOK: Input: default@small_alltypesorc4a_n0 -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### -60 -200 NULL NULL -60.0 -200.0 NULL NULL 1969-12-31 16:00:11.996 1969-12-31 15:59:55.451 NULL NULL -61 -7196 NULL NULL -61.0 -7196.0 NULL 8Mlns2Tl6E0g 1969-12-31 15:59:44.823 1969-12-31 15:59:58.174 NULL false -61 -7196 NULL NULL -61.0 -7196.0 NULL fUJIN 1969-12-31 16:00:11.842 1969-12-31 15:59:58.174 NULL false @@ -192,20 +192,20 @@ PREHOOK: query: ANALYZE TABLE small_alltypesorc_a_n0 COMPUTE STATISTICS FOR COLU PREHOOK: type: ANALYZE_TABLE PREHOOK: Input: default@small_alltypesorc_a_n0 PREHOOK: Output: default@small_alltypesorc_a_n0 -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: ANALYZE TABLE small_alltypesorc_a_n0 COMPUTE STATISTICS FOR COLUMNS POSTHOOK: type: ANALYZE_TABLE POSTHOOK: Input: default@small_alltypesorc_a_n0 POSTHOOK: Output: default@small_alltypesorc_a_n0 -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### PREHOOK: query: select * from small_alltypesorc_a_n0 PREHOOK: type: QUERY PREHOOK: Input: default@small_alltypesorc_a_n0 -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: select * from small_alltypesorc_a_n0 POSTHOOK: type: QUERY POSTHOOK: Input: default@small_alltypesorc_a_n0 -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### -60 -200 NULL NULL -60.0 -200.0 NULL NULL 1969-12-31 16:00:11.996 1969-12-31 15:59:55.451 NULL NULL -61 -7196 NULL NULL -61.0 -7196.0 NULL 8Mlns2Tl6E0g 1969-12-31 15:59:44.823 1969-12-31 15:59:58.174 NULL false -61 -7196 NULL NULL -61.0 -7196.0 NULL fUJIN 1969-12-31 16:00:11.842 1969-12-31 15:59:58.174 NULL false @@ -373,7 +373,7 @@ STAGE PLANS: bigTableKeyColumnNums: [2] bigTableRetainedColumnNums: [3] bigTableValueColumnNums: [3] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [3] @@ -391,7 +391,7 @@ STAGE PLANS: bigTableKeyColumnNums: [3] bigTableRetainedColumnNums: [3] bigTableValueColumnNums: [3] - className: VectorMapJoinOuterLongOperator + className: VectorMapJoinOuterBigOnlyLongOperator native: true nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, hive.execution.engine spark IN [tez, spark] IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Outer Join has keys IS true, Optimized Table and Supports Key Types IS true projectedOutputColumnNums: [3] @@ -493,7 +493,7 @@ left outer join small_alltypesorc_a_n0 hd ) t1 PREHOOK: type: QUERY PREHOOK: Input: default@small_alltypesorc_a_n0 -PREHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### POSTHOOK: query: select count(*), sum(t1.c_cbigint) from (select c.cbigint as c_cbigint from small_alltypesorc_a_n0 c left outer join small_alltypesorc_a_n0 cd @@ -503,5 +503,5 @@ left outer join small_alltypesorc_a_n0 hd ) t1 POSTHOOK: type: QUERY POSTHOOK: Input: default@small_alltypesorc_a_n0 -POSTHOOK: Output: hdfs://### HDFS PATH ### +#### A masked pattern was here #### 34 -26289186744