diff --git common/src/java/org/apache/hadoop/hive/conf/HiveConf.java common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 5700fb9..a7a4868 100644 --- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -560,6 +560,8 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal HIVE_IN_TEZ_TEST("hive.in.tez.test", false, "internal use only, true when in testing tez", true), + HIVE_MAPJOIN_TESTING_NO_HASH_TABLE_LOAD("hive.mapjoin.testing.no.hash.table.load", false, "internal use only, true when in testing map join", + true), LOCALMODEAUTO("hive.exec.mode.local.auto", false, "Let Hive determine whether to run in local mode automatically"), diff --git itests/hive-jmh/pom.xml itests/hive-jmh/pom.xml index af8eb19..0ff584c 100644 --- itests/hive-jmh/pom.xml +++ itests/hive-jmh/pom.xml @@ -56,7 +56,13 @@ org.apache.hive hive-exec + ${project.version} + + + org.apache.hive + hive-exec ${project.version} + tests org.apache.hadoop diff --git itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/AbstractMapJoin.java itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/AbstractMapJoin.java new file mode 100644 index 0000000..171a4f6 --- /dev/null +++ itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/AbstractMapJoin.java @@ -0,0 +1,155 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.benchmark.vectorization.mapjoin; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.testrow.TestCountCollectorOperator; +import org.apache.hadoop.hive.ql.exec.testrow.TestCountVectorCollectorOperator; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.TestMapJoinConfig; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.TestMapJoinData; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.TestMapJoinDescription; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.TestMapJoinConfig.MapJoinImplementation; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.Random; +import java.util.concurrent.TimeUnit; + +// UNDONE: For now, just run once cold. +@BenchmarkMode(Mode.SingleShotTime) +@Fork(1) +@State(Scope.Thread) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public abstract class AbstractMapJoin { + protected VectorMapJoinVariation vectorMapJoinVariation; + protected MapJoinImplementation mapJoinImplementation; + protected TestMapJoinDescription testDesc; + protected TestMapJoinData testData; + + protected MapJoinOperator operator; + + @Benchmark + // @Warmup(iterations = 0, time = 1, timeUnit = TimeUnit.MILLISECONDS) + @Measurement(iterations = 1, time = 1, timeUnit = TimeUnit.MILLISECONDS) + public void bench() throws Exception { + executeBenchmarkImplementation(mapJoinImplementation, testDesc, testData, operator); + } + + protected void setupMapJoin(HiveConf hiveConf, long seed, int rowCount, + VectorMapJoinVariation vectorMapJoinVariation, MapJoinImplementation mapJoinImplementation, + String[] bigTableColumnNames, TypeInfo[] bigTableTypeInfos, int[] bigTableKeyColumnNums, + String[] smallTableValueColumnNames, TypeInfo[] smallTableValueTypeInfos, + int[] bigTableRetainColumnNums, + int[] smallTableRetainKeyColumnNums, int[] smallTableRetainValueColumnNums) throws Exception { + + this.vectorMapJoinVariation = vectorMapJoinVariation; + this.mapJoinImplementation = mapJoinImplementation; + testDesc = new TestMapJoinDescription( + hiveConf, vectorMapJoinVariation, + bigTableColumnNames, bigTableTypeInfos, + bigTableKeyColumnNums, + smallTableValueColumnNames, smallTableValueTypeInfos, + bigTableRetainColumnNums, + smallTableRetainKeyColumnNums, smallTableRetainValueColumnNums); + + // Prepare data. Good for ANY implementation variation. + Random random = new Random(seed); + + random = new Random(seed); + testData = new TestMapJoinData(rowCount, testDesc, random); + + // UNDONE: Add small table columns... + doAddSmallTableCoumns(testDesc, testData); + + operator = setupBenchmarkImplementation( + mapJoinImplementation, testDesc, testData); + } + + public static void doAddSmallTableCoumns(TestMapJoinDescription testDesc, TestMapJoinData testData) { + for (int batchNum = 0; batchNum < testData.bigTableBatches.length; batchNum++) { + VectorizedRowBatch bigTableBatch = testData.bigTableBatches[batchNum]; + ColumnVector[] newCols = new ColumnVector[bigTableBatch.cols.length + testDesc.smallTableValueTypeInfos.length]; + System.arraycopy(bigTableBatch.cols, 0, newCols, 0, bigTableBatch.cols.length); + + for (int s = 0; s < testDesc.smallTableValueTypeInfos.length; s++) { + newCols[bigTableBatch.cols.length + s] = + VectorizedBatchUtil.createColumnVector(testDesc.smallTableValueTypeInfos[s]); + } + bigTableBatch.cols = newCols; + bigTableBatch.numCols = newCols.length; + } + } + + private static boolean isVectorOutput(MapJoinImplementation mapJoinImplementation) { + return + (mapJoinImplementation != MapJoinImplementation.ROW_MODE_HASH_MAP && + mapJoinImplementation != MapJoinImplementation.ROW_MODE_OPTIMIZED); + } + + protected static MapJoinOperator setupBenchmarkImplementation( + MapJoinImplementation mapJoinImplementation, TestMapJoinDescription testDesc, + TestMapJoinData testData) + throws Exception { + + // UNDONE: Parameterize for implementation variation? + MapJoinDesc mapJoinDesc = TestMapJoinConfig.createMapJoinDesc(testDesc); + + final boolean isVectorOutput = isVectorOutput(mapJoinImplementation); + + // This collector is just a counter. + Operator testCollectorOperator = + (!isVectorOutput ? new TestCountCollectorOperator() : + new TestCountVectorCollectorOperator()); + + MapJoinOperator operator = + TestMapJoinConfig.createMapJoinImplementation( + mapJoinImplementation, testDesc, testCollectorOperator, testData, mapJoinDesc); + return operator; + } + + private static void executeBenchmarkImplementation( + MapJoinImplementation mapJoinImplementation, TestMapJoinDescription testDesc, + TestMapJoinData testData, MapJoinOperator operator) + throws Exception { + + final boolean isVectorOutput = isVectorOutput(mapJoinImplementation); + + if (!isVectorOutput) { + TestMapJoinData.driveBigTableData(testDesc, testData, operator); + } else { + TestMapJoinData.driveVectorBigTableData(testDesc, testData, operator); + } + } +} diff --git itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/MapJoinOneLongKeyBenchBase.java itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/MapJoinOneLongKeyBenchBase.java new file mode 100644 index 0000000..9768fef --- /dev/null +++ itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/MapJoinOneLongKeyBenchBase.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.benchmark.vectorization.mapjoin; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.TestMapJoinConfig.MapJoinImplementation; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; + +public abstract class MapJoinOneLongKeyBenchBase extends AbstractMapJoin { + + public void doSetup(VectorMapJoinVariation vectorMapJoinVariation, + MapJoinImplementation mapJoinImplementation) throws Exception { + + HiveConf hiveConf = new HiveConf(); + + long seed = 2543; + + int rowCount = 1000000; + + String[] bigTableColumnNames = new String[] {"number1"}; + TypeInfo[] bigTableTypeInfos = + new TypeInfo[] { + TypeInfoFactory.longTypeInfo}; + int[] bigTableKeyColumnNums = new int[] {0}; + + String[] smallTableValueColumnNames = new String[] {"sv1", "sv2"}; + TypeInfo[] smallTableValueTypeInfos = + new TypeInfo[] {TypeInfoFactory.dateTypeInfo, TypeInfoFactory.stringTypeInfo}; + + int[] bigTableRetainColumnNums = new int[] {0}; + + int[] smallTableRetainKeyColumnNums = new int[] {}; + int[] smallTableRetainValueColumnNums = new int[] {0, 1}; + + setupMapJoin(hiveConf, seed, rowCount, + vectorMapJoinVariation, mapJoinImplementation, + bigTableColumnNames, bigTableTypeInfos, bigTableKeyColumnNums, + smallTableValueColumnNames, smallTableValueTypeInfos, + bigTableRetainColumnNums, + smallTableRetainKeyColumnNums, smallTableRetainValueColumnNums); + } +} \ No newline at end of file diff --git itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/VectorizedMapJoinBench.java itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/VectorizedMapJoinBench.java new file mode 100644 index 0000000..1254232 --- /dev/null +++ itests/hive-jmh/src/main/java/org/apache/hive/benchmark/vectorization/mapjoin/VectorizedMapJoinBench.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.benchmark.vectorization.mapjoin; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; +import org.apache.hadoop.hive.ql.exec.testrow.TestCollectorOperator; +import org.apache.hadoop.hive.ql.exec.testrow.TestCountCollectorOperator; +import org.apache.hadoop.hive.ql.exec.testrow.TestCountVectorCollectorOperator; +import org.apache.hadoop.hive.ql.exec.testrow.TestRow; +import org.apache.hadoop.hive.ql.exec.testrow.TestRowCollectorOperator; +import org.apache.hadoop.hive.ql.exec.testrow.TestRowCollectorOperatorBase; +import org.apache.hadoop.hive.ql.exec.testrow.TestRowMultiSet; +import org.apache.hadoop.hive.ql.exec.testrow.TestRowVectorCollectorOperator; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorBatchDebug; +import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping; +import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSourceMapping; +import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorRandomRowSource; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.exec.vector.batchgen.VectorBatchGenerator; +import org.apache.hadoop.hive.ql.exec.vector.batchgen.VectorBatchGenerator.GenerateType; +import org.apache.hadoop.hive.ql.exec.vector.batchgen.VectorBatchGenerator.GenerateType.GenerateCategory; +import org.apache.hadoop.hive.ql.exec.vector.expressions.ColAndCol; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.TestMapJoinConfig; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.TestMapJoinConfig.MapJoinImplementation; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.TestMapJoinData; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.TestMapJoinDescription; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastMultiKeyHashMap; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastTableContainer; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VerifyFastRow; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.JoinCondDesc; +import org.apache.hadoop.hive.ql.plan.JoinDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; +import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hive.benchmark.vectorization.VectorizedArithmeticBench; +import org.apache.hive.common.util.HashCodeUtil; +import org.apache.hive.common.util.ReflectionUtil; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.annotations.Setup; + +/* + * Build with "mvn clean install -DskipTests -Pdist,itests" at main hive directory. + * + * From itests/hive-jmh directory, run: + * java -jar target/benchmarks.jar org.apache.hive.benchmark.vectorization.mapjoin.VectorizedMapJoinBench + */ +@State(Scope.Benchmark) +public class VectorizedMapJoinBench extends AbstractMapJoin { + + public static class MapJoinOneLongKeyInnerRowModeHashMapBench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.INNER, MapJoinImplementation.ROW_MODE_HASH_MAP); + } + } + + public static class MapJoinOneLongKeyInnerRowModeOptimized_Bench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.INNER, MapJoinImplementation.ROW_MODE_OPTIMIZED); + } + } + + public static class MapJoinOneLongKeyInnerVectorPassThrough_Bench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.INNER, MapJoinImplementation.VECTOR_PASS_THROUGH); + } + } + + public static class MapJoinOneLongKeyInnerNativeVectorOptimizedBench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.INNER, MapJoinImplementation.NATIVE_VECTOR_OPTIMIZED); + } + } + + public static class MapJoinOneLongKeyInnerNativeVectorFastBench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.INNER, MapJoinImplementation.NATIVE_VECTOR_FAST); + } + } + + //----------------------------------------------------------------------------------------------- + + public static class MapJoinOneLongKeyInnerBigOnlyRowModeHashMapBench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.INNER_BIG_ONLY, MapJoinImplementation.ROW_MODE_HASH_MAP); + } + } + + public static class MapJoinOneLongKeyInnerBigOnlyRowModeOptimized_Bench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.INNER_BIG_ONLY, MapJoinImplementation.ROW_MODE_OPTIMIZED); + } + } + + public static class MapJoinOneLongKeyInnerBigOnlyVectorPassThrough_Bench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.INNER_BIG_ONLY, MapJoinImplementation.VECTOR_PASS_THROUGH); + } + } + + public static class MapJoinOneLongKeyInnerBigOnlyNativeVectorOptimizedBench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.INNER_BIG_ONLY, MapJoinImplementation.NATIVE_VECTOR_OPTIMIZED); + } + } + + public static class MapJoinOneLongKeyInnerBigOnlyNativeVectorFastBench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.INNER_BIG_ONLY, MapJoinImplementation.NATIVE_VECTOR_FAST); + } + } + + //----------------------------------------------------------------------------------------------- + + public static class MapJoinOneLongKeyLeftSemiRowModeHashMapBench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.LEFT_SEMI, MapJoinImplementation.ROW_MODE_HASH_MAP); + } + } + + public static class MapJoinOneLongKeyLeftSemiRowModeOptimized_Bench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.LEFT_SEMI, MapJoinImplementation.ROW_MODE_OPTIMIZED); + } + } + + public static class MapJoinOneLongKeyLeftSemiVectorPassThrough_Bench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.LEFT_SEMI, MapJoinImplementation.VECTOR_PASS_THROUGH); + } + } + + public static class MapJoinOneLongKeyLeftSemiNativeVectorOptimizedBench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.LEFT_SEMI, MapJoinImplementation.NATIVE_VECTOR_OPTIMIZED); + } + } + + public static class MapJoinOneLongKeyLeftSemiNativeVectorFastBench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.LEFT_SEMI, MapJoinImplementation.NATIVE_VECTOR_FAST); + } + } + + //----------------------------------------------------------------------------------------------- + + public static class MapJoinOneLongKeyOuterRowModeHashMapBench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.OUTER, MapJoinImplementation.ROW_MODE_HASH_MAP); + } + } + + public static class MapJoinOneLongKeyOuterRowModeOptimized_Bench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.OUTER, MapJoinImplementation.ROW_MODE_OPTIMIZED); + } + } + + public static class MapJoinOneLongKeyOuterVectorPassThrough_Bench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.OUTER, MapJoinImplementation.VECTOR_PASS_THROUGH); + } + } + + public static class MapJoinOneLongKeyOuterNativeVectorOptimizedBench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.OUTER, MapJoinImplementation.NATIVE_VECTOR_OPTIMIZED); + } + } + + public static class MapJoinOneLongKeyOuterNativeVectorFastBench extends MapJoinOneLongKeyBenchBase { + + @Setup + public void setup() throws Exception { + doSetup(VectorMapJoinVariation.OUTER, MapJoinImplementation.NATIVE_VECTOR_FAST); + } + } + + //----------------------------------------------------------------------------------------------- + + public static void main(String[] args) throws RunnerException { + Options opt = new OptionsBuilder().include(".*" + VectorizedMapJoinBench.class.getSimpleName() + + ".*").build(); + new Runner(opt).run(); + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index 384e664..a1e0bab 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@ -70,6 +70,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + import com.esotericsoftware.kryo.KryoException; /** @@ -100,6 +102,8 @@ protected HybridHashTableContainer firstSmallTable; // The first small table; // Only this table has spilled big table rows + protected transient boolean isTestingNoHashTableLoad; + /** Kryo ctor. */ protected MapJoinOperator() { super(); @@ -165,6 +169,12 @@ protected void initializeOp(Configuration hconf) throws HiveException { generateMapMetaData(); + isTestingNoHashTableLoad = HiveConf.getBoolVar(hconf, + HiveConf.ConfVars.HIVE_MAPJOIN_TESTING_NO_HASH_TABLE_LOAD); + if (isTestingNoHashTableLoad) { + return; + } + final ExecMapperContext mapContext = getExecContext(); final MapredContext mrContext = MapredContext.get(); @@ -239,6 +249,14 @@ protected void completeInitializationOp(Object[] os) throws HiveException { } } + @VisibleForTesting + public void setTestMapJoinTableContainer(int posSmallTable, + MapJoinTableContainer testMapJoinTableContainer, + MapJoinTableContainerSerDe mapJoinTableContainerSerDe) { + mapJoinTables[posSmallTable] = testMapJoinTableContainer; + mapJoinTableSerdes[posSmallTable] = mapJoinTableContainerSerDe; + } + @Override protected List getValueObjectInspectors( byte alias, List[] aliasToObjectInspectors) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java index 3cf6561..3519e1d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java @@ -76,7 +76,7 @@ public void init(ExecMapperContext context, MapredContext mrContext, Configurati if (desc.getVectorMode() && HiveConf.getBoolVar( hconf, HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED)) { VectorMapJoinDesc vectorDesc = (VectorMapJoinDesc) desc.getVectorDesc(); - useFastContainer = vectorDesc != null && vectorDesc.hashTableImplementationType() == + useFastContainer = vectorDesc != null && vectorDesc.getHashTableImplementationType() == VectorMapJoinDesc.HashTableImplementationType.FAST; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorBatchDebug.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorBatchDebug.java new file mode 100644 index 0000000..155c9b8 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorBatchDebug.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector; + +import java.sql.Timestamp; + +import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VectorBatchDebug { + private static final Logger LOG = LoggerFactory.getLogger(VectorBatchDebug.class); + + public static String displayBytes(byte[] bytes, int start, int length) { + StringBuilder sb = new StringBuilder(); + for (int i = start; i < start + length; i++) { + char ch = (char) bytes[i]; + if (ch < ' ' || ch > '~') { + sb.append(String.format("\\%03d", bytes[i] & 0xff)); + } else { + sb.append(ch); + } + } + return sb.toString(); + } + + public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) { + StringBuilder sb = new StringBuilder(); + sb.append(prefix + " row " + index + " "); + for (int p = 0; p < batch.projectionSize; p++) { + int column = batch.projectedColumns[p]; + if (p == column) { + sb.append("(col " + p + ") "); + } else { + sb.append("(proj col " + p + " col " + column + ") "); + } + ColumnVector colVector = batch.cols[column]; + if (colVector == null) { + sb.append("(null ColumnVector)"); + } else { + boolean isRepeating = colVector.isRepeating; + if (isRepeating) { + sb.append("(repeating)"); + } + index = (isRepeating ? 0 : index); + if (colVector.noNulls || !colVector.isNull[index]) { + if (colVector instanceof LongColumnVector) { + sb.append(((LongColumnVector) colVector).vector[index]); + } else if (colVector instanceof DoubleColumnVector) { + sb.append(((DoubleColumnVector) colVector).vector[index]); + } else if (colVector instanceof BytesColumnVector) { + BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector; + byte[] bytes = bytesColumnVector.vector[index]; + int start = bytesColumnVector.start[index]; + int length = bytesColumnVector.length[index]; + if (bytes == null) { + sb.append("(Unexpected null bytes with start " + start + " length " + length + ")"); + } else { + sb.append("bytes: '" + displayBytes(bytes, start, length) + "'"); + } + } else if (colVector instanceof DecimalColumnVector) { + sb.append(((DecimalColumnVector) colVector).vector[index].toString()); + } else if (colVector instanceof TimestampColumnVector) { + Timestamp timestamp = new Timestamp(0); + ((TimestampColumnVector) colVector).timestampUpdate(timestamp, index); + sb.append(timestamp.toString()); + } else if (colVector instanceof IntervalDayTimeColumnVector) { + HiveIntervalDayTime intervalDayTime = ((IntervalDayTimeColumnVector) colVector).asScratchIntervalDayTime(index); + sb.append(intervalDayTime.toString()); + } else { + sb.append("Unknown"); + } + } else { + sb.append("NULL"); + } + } + sb.append(" "); + } + System.err.println(sb.toString()); + // LOG.info(sb.toString()); + } + + public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) { + for (int i = 0; i < batch.size; i++) { + int index = (batch.selectedInUse ? batch.selected[i] : i); + debugDisplayOneRow(batch, index, prefix); + } + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java index 7b8e7ea..25d8c8d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinCommonOperator.java @@ -33,6 +33,8 @@ import org.apache.hadoop.hive.ql.exec.HashTableLoader; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorColumnMapping; import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping; @@ -65,6 +67,8 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import com.google.common.annotations.VisibleForTesting; + import com.google.common.base.Preconditions; /** @@ -362,9 +366,9 @@ protected void setupVOutContext(List outputColumnNames) { @Override protected HashTableLoader getHashTableLoader(Configuration hconf) { VectorMapJoinDesc vectorDesc = (VectorMapJoinDesc) conf.getVectorDesc(); - HashTableImplementationType hashTableImplementationType = vectorDesc.hashTableImplementationType(); + HashTableImplementationType hashTableImplementationType = vectorDesc.getHashTableImplementationType(); HashTableLoader hashTableLoader; - switch (vectorDesc.hashTableImplementationType()) { + switch (vectorDesc.getHashTableImplementationType()) { case OPTIMIZED: // Use the Tez hash table loader. hashTableLoader = HashTableLoaderFactory.getLoader(hconf); @@ -442,9 +446,32 @@ protected void completeInitializationOp(Object[] os) throws HiveException { // setup mapJoinTables and serdes super.completeInitializationOp(os); + if (isTestingNoHashTableLoad) { + return; + } + + MapJoinTableContainer mapJoinTableContainer = + mapJoinTables[posSingleVectorMapJoinSmallTable]; + + setUpHashTable(); + } + + @VisibleForTesting + @Override + public void setTestMapJoinTableContainer(int posSmallTable, + MapJoinTableContainer testMapJoinTableContainer, + MapJoinTableContainerSerDe mapJoinTableContainerSerDe) { + + mapJoinTables[posSingleVectorMapJoinSmallTable] = testMapJoinTableContainer; + + setUpHashTable(); + } + + private void setUpHashTable() { + VectorMapJoinDesc vectorDesc = (VectorMapJoinDesc) conf.getVectorDesc(); - HashTableImplementationType hashTableImplementationType = vectorDesc.hashTableImplementationType(); - switch (vectorDesc.hashTableImplementationType()) { + HashTableImplementationType hashTableImplementationType = vectorDesc.getHashTableImplementationType(); + switch (vectorDesc.getHashTableImplementationType()) { case OPTIMIZED: { // Create our vector map join optimized hash table variation *above* the diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java index 2fe4b93..90b65c3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/fast/VectorMapJoinFastTableContainer.java @@ -95,10 +95,10 @@ private VectorMapJoinFastHashTable createHashTable(int newThreshold) { boolean isOuterJoin = !desc.isNoOuterJoin(); VectorMapJoinDesc vectorDesc = (VectorMapJoinDesc) desc.getVectorDesc(); - HashTableImplementationType hashTableImplementationType = vectorDesc.hashTableImplementationType(); - HashTableKind hashTableKind = vectorDesc.hashTableKind(); - HashTableKeyType hashTableKeyType = vectorDesc.hashTableKeyType(); - boolean minMaxEnabled = vectorDesc.minMaxEnabled(); + HashTableImplementationType hashTableImplementationType = vectorDesc.getHashTableImplementationType(); + HashTableKind hashTableKind = vectorDesc.getHashTableKind(); + HashTableKeyType hashTableKeyType = vectorDesc.getHashTableKeyType(); + boolean minMaxEnabled = vectorDesc.getMinMaxEnabled(); int writeBufferSize = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLEWBSIZE); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedCreateHashTable.java ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedCreateHashTable.java index 111a6d2..5013798 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedCreateHashTable.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/optimized/VectorMapJoinOptimizedCreateHashTable.java @@ -41,9 +41,9 @@ public static VectorMapJoinOptimizedHashTable createHashTable(MapJoinDesc desc, boolean isOuterJoin = !desc.isNoOuterJoin(); VectorMapJoinDesc vectorDesc = (VectorMapJoinDesc) desc.getVectorDesc(); - HashTableKind hashTableKind = vectorDesc.hashTableKind(); - HashTableKeyType hashTableKeyType = vectorDesc.hashTableKeyType(); - boolean minMaxEnabled = vectorDesc.minMaxEnabled(); + HashTableKind hashTableKind = vectorDesc.getHashTableKind(); + HashTableKeyType hashTableKeyType = vectorDesc.getHashTableKeyType(); + boolean minMaxEnabled = vectorDesc.getMinMaxEnabled(); VectorMapJoinOptimizedHashTable hashTable = null; diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 933e47d..bde3b0a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -140,7 +140,7 @@ import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind; -import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.OperatorVariation; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorDeserializeType; import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; @@ -2472,7 +2472,7 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { HashTableImplementationType hashTableImplementationType = HashTableImplementationType.NONE; HashTableKind hashTableKind = HashTableKind.NONE; HashTableKeyType hashTableKeyType = HashTableKeyType.NONE; - OperatorVariation operatorVariation = OperatorVariation.NONE; + VectorMapJoinVariation vectorMapJoinVariation = VectorMapJoinVariation.NONE; if (vectorDesc.getIsFastHashTableEnabled()) { hashTableImplementationType = HashTableImplementationType.FAST; @@ -2530,20 +2530,20 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { switch (joinType) { case JoinDesc.INNER_JOIN: if (!isInnerBigOnly) { - operatorVariation = OperatorVariation.INNER; + vectorMapJoinVariation = VectorMapJoinVariation.INNER; hashTableKind = HashTableKind.HASH_MAP; } else { - operatorVariation = OperatorVariation.INNER_BIG_ONLY; + vectorMapJoinVariation = VectorMapJoinVariation.INNER_BIG_ONLY; hashTableKind = HashTableKind.HASH_MULTISET; } break; case JoinDesc.LEFT_OUTER_JOIN: case JoinDesc.RIGHT_OUTER_JOIN: - operatorVariation = OperatorVariation.OUTER; + vectorMapJoinVariation = VectorMapJoinVariation.OUTER; hashTableKind = HashTableKind.HASH_MAP; break; case JoinDesc.LEFT_SEMI_JOIN: - operatorVariation = OperatorVariation.LEFT_SEMI; + vectorMapJoinVariation = VectorMapJoinVariation.LEFT_SEMI; hashTableKind = HashTableKind.HASH_SET; break; default: @@ -2558,7 +2558,7 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { case SHORT: case INT: case LONG: - switch (operatorVariation) { + switch (vectorMapJoinVariation) { case INNER: opClass = VectorMapJoinInnerLongOperator.class; break; @@ -2572,11 +2572,11 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { opClass = VectorMapJoinOuterLongOperator.class; break; default: - throw new HiveException("Unknown operator variation " + operatorVariation); + throw new HiveException("Unknown operator variation " + vectorMapJoinVariation); } break; case STRING: - switch (operatorVariation) { + switch (vectorMapJoinVariation) { case INNER: opClass = VectorMapJoinInnerStringOperator.class; break; @@ -2590,11 +2590,11 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { opClass = VectorMapJoinOuterStringOperator.class; break; default: - throw new HiveException("Unknown operator variation " + operatorVariation); + throw new HiveException("Unknown operator variation " + vectorMapJoinVariation); } break; case MULTI_KEY: - switch (operatorVariation) { + switch (vectorMapJoinVariation) { case INNER: opClass = VectorMapJoinInnerMultiKeyOperator.class; break; @@ -2608,7 +2608,7 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { opClass = VectorMapJoinOuterMultiKeyOperator.class; break; default: - throw new HiveException("Unknown operator variation " + operatorVariation); + throw new HiveException("Unknown operator variation " + vectorMapJoinVariation); } break; default: @@ -2621,7 +2621,7 @@ private boolean isBigTableOnlyResults(MapJoinDesc desc) { vectorDesc.setHashTableImplementationType(hashTableImplementationType); vectorDesc.setHashTableKind(hashTableKind); vectorDesc.setHashTableKeyType(hashTableKeyType); - vectorDesc.setOperatorVariation(operatorVariation); + vectorDesc.setVectorMapJoinVariation(vectorMapJoinVariation); vectorDesc.setMinMaxEnabled(minMaxEnabled); vectorDesc.setVectorMapJoinInfo(vectorMapJoinInfo); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index 0d8e459..e1b4ae6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -35,7 +35,7 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; import org.apache.hadoop.hive.ql.plan.Explain.Vectorization; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType; -import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.OperatorVariation; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation; /** * Map Join operator Descriptor implementation. @@ -404,7 +404,7 @@ public void setDynamicPartitionHashJoin(boolean isDistributedHashJoin) { public MapJoinOperatorExplainVectorization(MapJoinDesc mapJoinDesc, VectorDesc vectorDesc) { // VectorMapJoinOperator is not native vectorized. - super(vectorDesc, ((VectorMapJoinDesc) vectorDesc).hashTableImplementationType() != HashTableImplementationType.NONE); + super(vectorDesc, ((VectorMapJoinDesc) vectorDesc).getHashTableImplementationType() != HashTableImplementationType.NONE); this.mapJoinDesc = mapJoinDesc; vectorMapJoinDesc = (VectorMapJoinDesc) vectorDesc; vectorMapJoinInfo = vectorMapJoinDesc.getVectorMapJoinInfo(); @@ -539,7 +539,7 @@ public String getProjectedOutputColumns() { @Explain(vectorization = Vectorization.DETAIL, displayName = "bigTableOuterKeyMapping", explainLevels = { Level.DEFAULT, Level.EXTENDED }) public List getBigTableOuterKey() { - if (!isNative || vectorMapJoinDesc.operatorVariation() != OperatorVariation.OUTER) { + if (!isNative || vectorMapJoinDesc.getVectorMapJoinVariation() != VectorMapJoinVariation.OUTER) { return null; } return columnMappingToStringList(vectorMapJoinInfo.getBigTableOuterKeyMapping()); diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/VectorMapJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/VectorMapJoinDesc.java index 60400de..99a4958 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/VectorMapJoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/VectorMapJoinDesc.java @@ -83,7 +83,7 @@ public PrimitiveTypeInfo getPrimitiveTypeInfo() { } } - public static enum OperatorVariation { + public static enum VectorMapJoinVariation { NONE, INNER_BIG_ONLY, INNER, @@ -94,7 +94,7 @@ public PrimitiveTypeInfo getPrimitiveTypeInfo() { private HashTableImplementationType hashTableImplementationType; private HashTableKind hashTableKind; private HashTableKeyType hashTableKeyType; - private OperatorVariation operatorVariation; + private VectorMapJoinVariation vectorMapJoinVariation; private boolean minMaxEnabled; private VectorMapJoinInfo vectorMapJoinInfo; @@ -103,7 +103,7 @@ public VectorMapJoinDesc() { hashTableImplementationType = HashTableImplementationType.NONE; hashTableKind = HashTableKind.NONE; hashTableKeyType = HashTableKeyType.NONE; - operatorVariation = OperatorVariation.NONE; + vectorMapJoinVariation = VectorMapJoinVariation.NONE; minMaxEnabled = false; vectorMapJoinInfo = null; } @@ -114,7 +114,7 @@ public VectorMapJoinDesc clone() { clone.hashTableImplementationType = this.hashTableImplementationType; clone.hashTableKind = this.hashTableKind; clone.hashTableKeyType = this.hashTableKeyType; - clone.operatorVariation = this.operatorVariation; + clone.vectorMapJoinVariation = this.vectorMapJoinVariation; clone.minMaxEnabled = this.minMaxEnabled; if (vectorMapJoinInfo != null) { throw new RuntimeException("Cloning VectorMapJoinInfo not supported"); @@ -122,7 +122,7 @@ public VectorMapJoinDesc clone() { return clone; } - public HashTableImplementationType hashTableImplementationType() { + public HashTableImplementationType getHashTableImplementationType() { return hashTableImplementationType; } @@ -130,7 +130,7 @@ public void setHashTableImplementationType(HashTableImplementationType hashTable this.hashTableImplementationType = hashTableImplementationType; } - public HashTableKind hashTableKind() { + public HashTableKind getHashTableKind() { return hashTableKind; } @@ -138,7 +138,7 @@ public void setHashTableKind(HashTableKind hashTableKind) { this.hashTableKind = hashTableKind; } - public HashTableKeyType hashTableKeyType() { + public HashTableKeyType getHashTableKeyType() { return hashTableKeyType; } @@ -146,15 +146,15 @@ public void setHashTableKeyType(HashTableKeyType hashTableKeyType) { this.hashTableKeyType = hashTableKeyType; } - public OperatorVariation operatorVariation() { - return operatorVariation; + public VectorMapJoinVariation getVectorMapJoinVariation() { + return vectorMapJoinVariation; } - public void setOperatorVariation(OperatorVariation operatorVariation) { - this.operatorVariation = operatorVariation; + public void setVectorMapJoinVariation(VectorMapJoinVariation vectorMapJoinVariation) { + this.vectorMapJoinVariation = vectorMapJoinVariation; } - public boolean minMaxEnabled() { + public boolean getMinMaxEnabled() { return minMaxEnabled; } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestCollectorOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestCollectorOperator.java new file mode 100644 index 0000000..fb94dcf --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestCollectorOperator.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.testrow; + +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; + +public class TestCollectorOperator extends Operator { + + private static final long serialVersionUID = 1L; + + public TestCollectorOperator() { + super(); + } + + @Override + public void process(Object row, int tag) throws HiveException { + // Do nothing. + } + + @Override + public String getName() { + return TestCollectorOperator.class.getSimpleName(); + } + + @Override + public OperatorType getType() { + return null; + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestCountCollectorOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestCountCollectorOperator.java new file mode 100644 index 0000000..f6f5fe4 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestCountCollectorOperator.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.testrow; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +public class TestCountCollectorOperator extends TestCollectorOperator { + + private static final long serialVersionUID = 1L; + + protected int rowCount; + + public TestCountCollectorOperator() { + super(); + rowCount = 0; + } + + public int getRowCount() { + return rowCount; + } + + @Override + public void process(Object row, int tag) throws HiveException { + // Do nothing but count. + rowCount++; + } + + @Override + public String getName() { + return TestCountCollectorOperator.class.getSimpleName(); + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestCountVectorCollectorOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestCountVectorCollectorOperator.java new file mode 100644 index 0000000..f9821ca --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestCountVectorCollectorOperator.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.testrow; + +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.metadata.HiveException; + +public class TestCountVectorCollectorOperator extends TestCountCollectorOperator { + + private static final long serialVersionUID = 1L; + + public TestCountVectorCollectorOperator() { + super(); + } + + public int getRowCount() { + return rowCount; + } + + @Override + public void process(Object row, int tag) throws HiveException { + // Do nothing but count the batch size. + VectorizedRowBatch batch = (VectorizedRowBatch) row; + rowCount += batch.size; + } + + @Override + public String getName() { + return TestCountVectorCollectorOperator.class.getSimpleName(); + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestDescription.java ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestDescription.java new file mode 100644 index 0000000..5b53d41 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestDescription.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.testrow; + +import org.apache.hadoop.hive.conf.HiveConf; + +public abstract class TestDescription { + + public final HiveConf hiveConf; + + public TestDescription ( + HiveConf hiveConf) { + this.hiveConf = hiveConf; + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRow.java ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRow.java new file mode 100644 index 0000000..f1b179c --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRow.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.testrow; + +import java.util.Arrays; + +public final class TestRow implements Comparable{ + + private final Object[] row; + + // Not included in equals. + private int index; + + public TestRow(Object[] row) { + this.row = row; + index = -1; // Not used value. + } + + public Object[] getRow() { + return row; + } + + public void setIndex(int index) { + this.index = index; + } + + public int getIndex() { + return index; + } + + @Override + public int hashCode() { + int hashCode = Arrays.hashCode(row); + return hashCode; + } + + @Override + public Object clone() { + return new TestRow(row); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (!(obj instanceof TestRow)) { + return false; + } + final TestRow other = (TestRow) obj; + return Arrays.equals(this.row, other.row); + } + + @Override + public String toString() { + return Arrays.toString(row); + } + + @Override + public int compareTo(TestRow obj) { + final TestRow other = (TestRow) obj; + int thisLength = this.row.length; + int otherLength = other.row.length; + if (thisLength != otherLength) { + return (thisLength < otherLength ? -1 : 1); + } + for (int i = 0; i < thisLength; i++) { + Object thisObject = this.row[i]; + Object otherObject = other.row[i]; + if (thisObject == null || otherObject == null) { + if (thisObject == null && otherObject == null) { + continue; + } + // Does this make sense? + return (thisObject == null ? -1 : 1); + } + int compareTo = ((Comparable) thisObject).compareTo((Comparable) otherObject); + if (compareTo != 0) { + return compareTo; + } + } + return 0; + } + } diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRowCollectorOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRowCollectorOperator.java new file mode 100644 index 0000000..89d5f8d --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRowCollectorOperator.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.testrow; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; + +public abstract class TestRowCollectorOperator extends TestRowCollectorOperatorBase { + + private static final long serialVersionUID = 1L; + + private final ObjectInspector[] outputObjectInspectors; + + public TestRowCollectorOperator(ObjectInspector[] outputObjectInspectors) { + super(); + this.outputObjectInspectors = outputObjectInspectors; + } + + @Override + public void process(Object row, int tag) throws HiveException { + rowCount++; + Object[] rowObjectArray = (Object[]) row; + Object[] resultObjectArray = new Object[rowObjectArray.length]; + for (int c = 0; c < rowObjectArray.length; c++) { + resultObjectArray[c] = ((PrimitiveObjectInspector) outputObjectInspectors[c]).copyObject(rowObjectArray[c]); + } + nextTestRow(new TestRow(resultObjectArray)); + } + + @Override + public String getName() { + return TestRowCollectorOperator.class.getSimpleName(); + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRowCollectorOperatorBase.java ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRowCollectorOperatorBase.java new file mode 100644 index 0000000..c91f50f --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRowCollectorOperatorBase.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.testrow; + +public abstract class TestRowCollectorOperatorBase extends TestCountCollectorOperator { + + private static final long serialVersionUID = 1L; + + public TestRowCollectorOperatorBase() { + super(); + } + + public abstract void nextTestRow(TestRow testRow); +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRowMap.java ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRowMap.java new file mode 100644 index 0000000..b18573b --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRowMap.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.testrow; + +import java.util.Iterator; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.Map.Entry; + +public class TestRowMap { + private SortedMap sortedMap; + + public TestRowMap() { + sortedMap = new TreeMap(); + } + + public Object find(TestRow testRow) { + return sortedMap.get(testRow); + } + + public void put(TestRow testRow, Object object) { + sortedMap.put(testRow, object); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (!(obj instanceof TestRowMap)) { + return false; + } + final TestRowMap other = (TestRowMap) obj; + final int thisSize = this.sortedMap.size(); + final int otherSize = other.sortedMap.size(); + Iterator> thisIterator = this.sortedMap.entrySet().iterator(); + Iterator> otherIterator = other.sortedMap.entrySet().iterator(); + for (int i = 0; i < thisSize; i++) { + Entry thisEntry = thisIterator.next(); + Entry otherEntry = otherIterator.next(); + if (!thisEntry.getKey().equals(otherEntry.getKey())) { + return false; + } + // Check object. + if (!thisEntry.getValue().equals(otherEntry.getValue())) { + return false; + } + } + if (thisSize != otherSize) { + return false; + } + return true; + } + + @Override + public String toString() { + return sortedMap.toString(); + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRowMultiSet.java ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRowMultiSet.java new file mode 100644 index 0000000..b7f9237 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRowMultiSet.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.testrow; + +import java.util.Iterator; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.Map.Entry; + +public class TestRowMultiSet { + private SortedMap sortedMap; + private int rowCount; + private int totalCount; + + public TestRowMultiSet() { + sortedMap = new TreeMap(); + rowCount = 0; + totalCount = 0; + } + + public int getRowCount() { + return rowCount; + } + + public int getTotalCount() { + return totalCount; + } + + public void add(TestRow testRow) { + if (sortedMap.containsKey(testRow)) { + Integer count = sortedMap.get(testRow); + count++; + } else { + sortedMap.put(testRow, 1); + rowCount++; + } + totalCount++; + } + + public boolean verify(TestRowMultiSet other) { + + final int thisSize = this.sortedMap.size(); + final int otherSize = other.sortedMap.size(); + if (thisSize != otherSize) { + System.out.println("*VERIFY* count " + thisSize + " doesn't match otherSize " + otherSize); + return false; + } + Iterator> thisIterator = this.sortedMap.entrySet().iterator(); + Iterator> otherIterator = other.sortedMap.entrySet().iterator(); + for (int i = 0; i < thisSize; i++) { + Entry thisEntry = thisIterator.next(); + Entry otherEntry = otherIterator.next(); + if (!thisEntry.getKey().equals(otherEntry.getKey())) { + System.out.println("*VERIFY* thisEntry.getKey() " + thisEntry.getKey() + " doesn't match otherEntry.getKey() " + otherEntry.getKey()); + return false; + } + // Check multi-set count. + if (!thisEntry.getValue().equals(otherEntry.getValue())) { + System.out.println("*VERIFY* key " + thisEntry.getKey() + " count " + thisEntry.getValue() + " doesn't match " + otherEntry.getValue()); + return false; + } + } + if (thisSize != otherSize) { + return false; + } + return true; + } + + @Override + public String toString() { + return sortedMap.toString(); + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRowVectorCollectorOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRowVectorCollectorOperator.java new file mode 100644 index 0000000..184957f --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/testrow/TestRowVectorCollectorOperator.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.testrow; + +import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +public abstract class TestRowVectorCollectorOperator extends TestRowCollectorOperatorBase { + + private final ObjectInspector[] outputObjectInspectors; + private final VectorExtractRow vectorExtractRow; + + public TestRowVectorCollectorOperator(TypeInfo[] outputTypeInfos, + ObjectInspector[] outputObjectInspectors) throws HiveException { + super(); + this.outputObjectInspectors = outputObjectInspectors; + vectorExtractRow = new VectorExtractRow(); + vectorExtractRow.init(outputTypeInfos); + } + + @Override + public void process(Object row, int tag) throws HiveException { + VectorizedRowBatch batch = (VectorizedRowBatch) row; + rowCount += batch.size; + boolean selectedInUse = batch.selectedInUse; + int[] selected = batch.selected; + for (int logical = 0; logical < batch.size; logical++) { + int batchIndex = (selectedInUse ? selected[logical] : logical); + Object[] rowObjects = new Object[outputObjectInspectors.length]; + vectorExtractRow.extractRow(batch, batchIndex, rowObjects); + for (int c = 0; c < rowObjects.length; c++) { + rowObjects[c] = ((PrimitiveObjectInspector) outputObjectInspectors[c]).copyObject(rowObjects[c]); + } + nextTestRow(new TestRow(rowObjects)); + } + } + + @Override + public String getName() { + return TestRowVectorCollectorOperator.class.getSimpleName(); + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/batchgen/TestVectorBatchGenerate.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/batchgen/TestVectorBatchGenerate.java new file mode 100644 index 0000000..6533a69 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/batchgen/TestVectorBatchGenerate.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.batchgen; + +import org.apache.hadoop.hive.ql.exec.vector.VectorBatchDebug; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.batchgen.VectorBatchGenerator; +import org.apache.hadoop.hive.ql.exec.vector.batchgen.VectorBatchGenerator.GenerateType; +import org.apache.hadoop.hive.ql.exec.vector.batchgen.VectorBatchGenerator.GenerateType.GenerateCategory; +import org.junit.Test; + +import java.util.Random; + +public class TestVectorBatchGenerate { + + @Test + public void testTryIt() throws Exception { + GenerateType[] generateTypes = + new GenerateType[] {new GenerateType(GenerateCategory.INT), new GenerateType(GenerateCategory.BYTE)}; + VectorBatchGenerator generator = new VectorBatchGenerator(generateTypes); + + VectorizedRowBatch batch = generator.createBatch(); + + Random random = new Random(); + generator.generateBatch(batch, random, VectorizedRowBatch.DEFAULT_SIZE); + VectorBatchDebug.debugDisplayBatch(batch, "testTryIt"); + } + + @Test + public void testTryIt2() throws Exception { + GenerateType[] generateTypes = + new GenerateType[] {new GenerateType(GenerateCategory.BOOLEAN), new GenerateType(GenerateCategory.LONG), new GenerateType(GenerateCategory.DOUBLE)}; + VectorBatchGenerator generator = new VectorBatchGenerator(generateTypes); + + VectorizedRowBatch batch = generator.createBatch(); + + Random random = new Random(); + generator.generateBatch(batch, random, VectorizedRowBatch.DEFAULT_SIZE); + VectorBatchDebug.debugDisplayBatch(batch, "testTryIt2"); + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/batchgen/VectorBatchGenerator.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/batchgen/VectorBatchGenerator.java new file mode 100644 index 0000000..5a4a514 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/batchgen/VectorBatchGenerator.java @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.batchgen; + +import java.util.Arrays; +import java.util.Random; + +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.batchgen.VectorBatchGenerator.GenerateType.GenerateCategory; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; + +import com.google.common.base.Preconditions; + +public class VectorBatchGenerator { + + public static class GenerateType { + + // UNDONE: Missing date/time interval data types + public enum GenerateCategory { + BOOLEAN("boolean", true), + BYTE("tinyint", true), + SHORT("smallint", true), + INT("int", true), + LONG("bigint", true), + FLOAT("float", true), + DOUBLE("double", true), + STRING("string", true), + DATE("date", true), + TIMESTAMP("timestamp", true), + BINARY("binary", true), + DECIMAL("decimal", true), + VARCHAR("varchar", true), + CHAR("char", true), + LIST("array", false), + MAP("map", false), + STRUCT("struct", false), + UNION("uniontype", false); + + GenerateCategory(String name, boolean isPrimitive) { + this.name = name; + this.isPrimitive = isPrimitive; + } + + final boolean isPrimitive; + final String name; + + public boolean isPrimitive() { + return isPrimitive; + } + + public String getName() { + return name; + } + + public static GenerateCategory generateCategoryFromPrimitiveCategory(PrimitiveCategory primitiveCategory) { + switch (primitiveCategory) { + case BOOLEAN: + return GenerateCategory.BOOLEAN; + case BYTE: + return GenerateCategory.BYTE; + case SHORT: + return GenerateCategory.SHORT; + case INT: + return GenerateCategory.INT; + case LONG: + return GenerateCategory.LONG; + case FLOAT: + return GenerateCategory.FLOAT; + case DOUBLE: + return GenerateCategory.DOUBLE; + case STRING: + return GenerateCategory.STRING; + case DATE: + return GenerateCategory.DATE; + case TIMESTAMP: + return GenerateCategory.TIMESTAMP; + case BINARY: + return GenerateCategory.BINARY; + case DECIMAL: + return GenerateCategory.DECIMAL; + case VARCHAR: + return GenerateCategory.VARCHAR; + case CHAR: + return GenerateCategory.CHAR; + default: + return null; + } + } + } + + private GenerateCategory category; + + public GenerateType(GenerateCategory category) { + this.category = category; + } + + public GenerateCategory getCategory() { + return category; + } + + /* + * BOOLEAN .. LONG: Min and max. + */ + private long integerMin; + private long integerMax; + + /* + * FLOAT: Min and max. + */ + private float floatMin; + private float floatMax; + + /* + * DOUBLE: Min and max. + */ + private double doubleMin; + private double doubleMax; + + /* + * STRING: + * Range, values, empty strings. + */ + + /* + * CHAR: strategic blanks, string length beyond max + */ + + /* + * VARCHAR: string length beyond max + */ + } + + private VectorColumnGroupGenerator[] columnGroups; + private boolean[] isGenerateSeries; + + public VectorBatchGenerator(GenerateType[] generateTypes) { + final int size = generateTypes.length; + columnGroups = new VectorColumnGroupGenerator[size]; + for (int i = 0; i < size; i++) { + columnGroups[i] = new VectorColumnGroupGenerator(i, generateTypes[i]); + } + isGenerateSeries = new boolean[size]; + // UNDONE: For now, all... + Arrays.fill(isGenerateSeries, true); + } + + public VectorBatchGenerator(VectorColumnGroupGenerator[] columnGroups) { + this.columnGroups = columnGroups; + } + + public void assignColumnVectors(VectorizedRowBatch batch, int columnNum, + VectorColumnGroupGenerator columnGroup) { + // UNDONE: Multiple types... + GenerateType[] generateTypes = columnGroup.generateTypes(); + GenerateType generateType = generateTypes[0]; + ColumnVector colVector; + switch (generateType.getCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + colVector = new LongColumnVector(); + break; + + case FLOAT: + case DOUBLE: + colVector = new DoubleColumnVector(); + break; + + case STRING: + colVector = new BytesColumnVector(); + break; + + // UNDONE + case DATE: + case TIMESTAMP: + case BINARY: + case DECIMAL: + case VARCHAR: + case CHAR: + case LIST: + case MAP: + case STRUCT: + case UNION: + default: + throw new RuntimeException("Unsupported catagory " + generateType.getCategory()); + } + colVector.init(); + batch.cols[columnNum] = colVector; + } + + public VectorizedRowBatch createBatch() { + final int size = columnGroups.length; + VectorizedRowBatch batch = new VectorizedRowBatch(size); + for (int i = 0; i < size; i++) { + assignColumnVectors(batch, i, columnGroups[i]); + } + return batch; + } + + public void generateBatch(VectorizedRowBatch batch, Random random, + int size) { + + // Clear value arrays. + for (int c = 0; c < columnGroups.length; c++) { + columnGroups[c].clearColumnValueArrays(); + } + + // Generate row values. + int i = 0; + while (true) { + for (int c = 0; c < columnGroups.length; c++) { + columnGroups[c].generateRowValues(i, random); + } + if (i + 1 >= size) { + break; + } + + // Null out some row column entries. + // UNDONE + + // Consider generating a column group equal value series? + if (i < size - 1) { + for (int c = 0; c < columnGroups.length; c++) { + if (isGenerateSeries[c]) { + int seriesCount = getSeriesCount(random); + if (seriesCount == 1) { + continue; + } + seriesCount = Math.min(seriesCount, size - i); + Preconditions.checkState(seriesCount > 1); + + // Fill values down for equal value series. + VectorColumnGroupGenerator columnGroup = columnGroups[c]; + columnGroup.fillDownRowValues(i, seriesCount, random); + + // For all the other column groups, generate new values down. + for (int other = 0; other < columnGroups.length; other++) { + if (other != c) { + VectorColumnGroupGenerator otherColumnGroup = columnGroups[other]; + otherColumnGroup.generateDownRowValues(i, seriesCount, random); + + // Also, null down. + // UNDONE + } + } + + // Fill down null flags. + // UNDONE + + i += (seriesCount - 1); + break; + } + } + } + // Recheck. + i++; + if (i >= size) { + break; + } + } + + // Optionally, do some filtering of rows... + // UNDONE + + // From the value arrays and our isRepeated, selected, isNull arrays, generate the batch! + for (int c = 0; c < columnGroups.length; c++) { + VectorColumnGroupGenerator columnGroup = columnGroups[c]; + + // UNDONE: Provide isRepeated, selected, isNull + columnGroup.populateBatch(batch, size, false); + } + + batch.size = size; + } + + private int getSeriesCount(Random random) { + // UNDONE: For now... + if (random.nextBoolean()) { + return 1; + } else { + return 1 + random.nextInt(10); + } + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/batchgen/VectorColumnGroupGenerator.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/batchgen/VectorColumnGroupGenerator.java new file mode 100644 index 0000000..91f7757 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/batchgen/VectorColumnGroupGenerator.java @@ -0,0 +1,492 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.batchgen; + +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.Random; + +import org.apache.hadoop.hive.common.type.RandomTypeUtil; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.batchgen.VectorBatchGenerator.GenerateType; +import org.apache.hadoop.hive.ql.exec.vector.batchgen.VectorBatchGenerator.GenerateType.GenerateCategory; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.Text; + +public class VectorColumnGroupGenerator { + + private GenerateType[] generateTypes; + private int[] columnNums; + private Object[] arrays; + + public VectorColumnGroupGenerator(int columnNum, GenerateType generateType) { + columnNums = new int[] {columnNum}; + generateTypes = new GenerateType[] {generateType}; + allocateArrays(VectorizedRowBatch.DEFAULT_SIZE); + } + + public VectorColumnGroupGenerator(int startColumnNum, GenerateType[] generateTypes) { + columnNums = new int[generateTypes.length]; + for (int i = 0; i < generateTypes.length; i++) { + columnNums[i] = startColumnNum + i; + } + this.generateTypes = generateTypes; + allocateArrays(VectorizedRowBatch.DEFAULT_SIZE); + } + + public GenerateType[] generateTypes() { + return generateTypes; + } + + private void allocateArrays(int size) { + arrays = new Object[generateTypes.length]; + for (int i = 0; i < generateTypes.length; i++) { + GenerateType generateType = generateTypes[i]; + GenerateCategory category = generateType.getCategory(); + Object array = null; + switch (category) { + case BOOLEAN: + array = new boolean[size]; + break; + case BYTE: + array = new byte[size]; + break; + case SHORT: + array = new short[size]; + break; + case INT: + array = new int[size]; + break; + case LONG: + array = new long[size]; + break; + case FLOAT: + array = new float[size]; + break; + case DOUBLE: + array = new double[size]; + break; + case STRING: + array = new String[size]; + break; + case TIMESTAMP: + array = new Timestamp[size]; + break; + + // UNDONE + case DATE: + case BINARY: + case DECIMAL: + case VARCHAR: + case CHAR: + + case LIST: + case MAP: + case STRUCT: + case UNION: + default: + } + arrays[i] = array; + } + } + + public void clearColumnValueArrays() { + for (int i = 0; i < generateTypes.length; i++) { + GenerateType generateType = generateTypes[i]; + GenerateCategory category = generateType.getCategory(); + Object array = arrays[i]; + switch (category) { + case BOOLEAN: + Arrays.fill(((boolean[]) array), false); + break; + case BYTE: + Arrays.fill(((byte[]) array), (byte) 0); + break; + case SHORT: + Arrays.fill(((short[]) array), (short) 0); + break; + case INT: + Arrays.fill(((int[]) array), 0); + break; + case LONG: + Arrays.fill(((long[]) array), 0); + break; + case FLOAT: + Arrays.fill(((float[]) array), 0); + break; + case DOUBLE: + Arrays.fill(((double[]) array), 0); + break; + case STRING: + Arrays.fill(((String[]) array), null); + break; + case TIMESTAMP: + Arrays.fill(((Timestamp[]) array), null); + break; + + // UNDONE + case DATE: + case BINARY: + case DECIMAL: + case VARCHAR: + case CHAR: + + case LIST: + case MAP: + case STRUCT: + case UNION: + default: + } + } + } + + public void generateRowValues(int rowIndex, Random random) { + for (int i = 0; i < generateTypes.length; i++) { + generateRowColumnValue(rowIndex, i, random); + } + } + + private void generateRowColumnValue(int rowIndex, int columnIndex, Random random) { + GenerateType generateType = generateTypes[columnIndex]; + GenerateCategory category = generateType.getCategory(); + Object array = arrays[columnIndex]; + switch (category) { + case BOOLEAN: + { + boolean value = random.nextBoolean(); + ((boolean[]) array)[rowIndex] = value; + } + break; + case BYTE: + { + byte value = + (byte) + (random.nextBoolean() ? + -random.nextInt(-((int) Byte.MIN_VALUE) + 1) : + random.nextInt((int) Byte.MAX_VALUE + 1)); + ((byte[]) array)[rowIndex] = value; + } + break; + case SHORT: + { + short value = + (short) + (random.nextBoolean() ? + -random.nextInt(-((int) Short.MIN_VALUE) + 1) : + random.nextInt((int) Short.MAX_VALUE + 1)); + ((short[]) array)[rowIndex] = value; + } + break; + case INT: + { + int value = random.nextInt(); + ((int[]) array)[rowIndex] = value; + } + break; + case LONG: + { + long value = random.nextLong(); + ((long[]) array)[rowIndex] = value; + } + break; + case FLOAT: + { + float value = random.nextLong(); + ((float[]) array)[rowIndex] = value; + } + break; + case DOUBLE: + { + double value = random.nextLong(); + ((double[]) array)[rowIndex] = value; + } + break; + + case STRING: + { + String value = RandomTypeUtil.getRandString(random); + ((String[]) array)[rowIndex] = value; + } + break; + + case TIMESTAMP: + { + Timestamp value = RandomTypeUtil.getRandTimestamp(random); + ((Timestamp[]) array)[rowIndex] = value; + } + break; + + // UNDONE + case DATE: + case BINARY: + case DECIMAL: + case VARCHAR: + case CHAR: + + case LIST: + case MAP: + case STRUCT: + case UNION: + default: + } + } + + public void fillDownRowValues(int rowIndex, int seriesCount, Random random) { + for (int i = 0; i < generateTypes.length; i++) { + fillDownRowColumnValue(rowIndex, i, seriesCount, random); + } + } + + private void fillDownRowColumnValue(int rowIndex, int columnIndex, int seriesCount, Random random) { + GenerateType generateType = generateTypes[columnIndex]; + GenerateCategory category = generateType.getCategory(); + Object array = arrays[columnIndex]; + switch (category) { + case BOOLEAN: + { + boolean[] booleanArray = ((boolean[]) array); + boolean value = booleanArray[rowIndex]; + for (int i = 1; i < seriesCount; i++) { + booleanArray[rowIndex + i] = value; + } + } + break; + case BYTE: + { + byte[] byteArray = ((byte[]) array); + byte value = byteArray[rowIndex]; + for (int i = 1; i < seriesCount; i++) { + byteArray[rowIndex + i] = value; + } + } + break; + case SHORT: + { + short[] shortArray = ((short[]) array); + short value = shortArray[rowIndex]; + for (int i = 1; i < seriesCount; i++) { + shortArray[rowIndex + i] = value; + } + } + break; + case INT: + { + int[] intArray = ((int[]) array); + int value = intArray[rowIndex]; + for (int i = 1; i < seriesCount; i++) { + intArray[rowIndex + i] = value; + } + } + break; + case LONG: + { + long[] longArray = ((long[]) array); + long value = longArray[rowIndex]; + for (int i = 1; i < seriesCount; i++) { + longArray[rowIndex + i] = value; + } + } + break; + case FLOAT: + { + float[] floatArray = ((float[]) array); + float value = floatArray[rowIndex]; + for (int i = 1; i < seriesCount; i++) { + floatArray[rowIndex + i] = value; + } + } + break; + case DOUBLE: + { + double[] doubleArray = ((double[]) array); + double value = doubleArray[rowIndex]; + for (int i = 1; i < seriesCount; i++) { + doubleArray[rowIndex + i] = value; + } + } + break; + case STRING: + { + String[] stringArray = ((String[]) array); + String value = stringArray[rowIndex]; + for (int i = 1; i < seriesCount; i++) { + stringArray[rowIndex + i] = value; + } + } + break; + case TIMESTAMP: + { + Timestamp[] timestampArray = ((Timestamp[]) array); + Timestamp value = timestampArray[rowIndex]; + for (int i = 1; i < seriesCount; i++) { + timestampArray[rowIndex + i] = value; + } + } + break; + + // UNDONE + case DATE: + + case BINARY: + case DECIMAL: + case VARCHAR: + case CHAR: + + case LIST: + case MAP: + case STRUCT: + case UNION: + default: + } + } + + public void generateDownRowValues(int rowIndex, int seriesCount, Random random) { + for (int i = 0; i < generateTypes.length; i++) { + for (int g = 1; g < seriesCount; g++) { + generateRowColumnValue(rowIndex + g, i, random); + } + } + } + + public void populateBatch(VectorizedRowBatch batch, int size, boolean isRepeated) { + + // UNDONE: Haven't finished isRepeated + assert !isRepeated; + + for (int i = 0; i < size; i++) { + for (int g = 0; g < generateTypes.length; g++) { + populateBatchColumn(batch, g, size); + } + } + } + + private void populateBatchColumn(VectorizedRowBatch batch, int logicalColumnIndex, int size) { + int columnNum = columnNums[logicalColumnIndex]; + ColumnVector colVector = batch.cols[columnNum]; + + GenerateType generateType = generateTypes[logicalColumnIndex]; + GenerateCategory category = generateType.getCategory(); + Object array = arrays[logicalColumnIndex]; + switch (category) { + case BOOLEAN: + { + boolean[] booleanArray = ((boolean[]) array); + long[] vector = ((LongColumnVector) colVector).vector; + for (int i = 0; i < size; i++) { + vector[i] = (booleanArray[i] ? 1 : 0); + } + } + break; + case BYTE: + { + byte[] byteArray = ((byte[]) array); + long[] vector = ((LongColumnVector) colVector).vector; + for (int i = 0; i < size; i++) { + vector[i] = byteArray[i]; + } + } + break; + case SHORT: + { + short[] shortArray = ((short[]) array); + long[] vector = ((LongColumnVector) colVector).vector; + for (int i = 0; i < size; i++) { + vector[i] = shortArray[i]; + } + } + break; + case INT: + { + int[] intArray = ((int[]) array); + long[] vector = ((LongColumnVector) colVector).vector; + for (int i = 0; i < size; i++) { + vector[i] = intArray[i]; + } + } + break; + case LONG: + { + long[] longArray = ((long[]) array); + long[] vector = ((LongColumnVector) colVector).vector; + for (int i = 0; i < size; i++) { + vector[i] = longArray[i]; + } + } + break; + case FLOAT: + { + float[] floatArray = ((float[]) array); + double[] vector = ((DoubleColumnVector) colVector).vector; + for (int i = 0; i < size; i++) { + vector[i] = floatArray[i]; + } + } + break; + case DOUBLE: + { + double[] doubleArray = ((double[]) array); + double[] vector = ((DoubleColumnVector) colVector).vector; + for (int i = 0; i < size; i++) { + vector[i] = doubleArray[i]; + } + } + break; + case STRING: + { + String[] stringArray = ((String[]) array); + BytesColumnVector bytesColVec = ((BytesColumnVector) colVector); + for (int i = 0; i < size; i++) { + byte[] bytes = stringArray[i].getBytes(); + bytesColVec.setVal(i, bytes); + } + } + break; + case TIMESTAMP: + { + Timestamp[] timestampArray = ((Timestamp[]) array); + TimestampColumnVector timestampColVec = ((TimestampColumnVector) colVector); + for (int i = 0; i < size; i++) { + Timestamp timestamp = timestampArray[i]; + timestampColVec.set(i, timestamp); + } + } + break; + + // UNDONE + + case DATE: + + case BINARY: + case DECIMAL: + case VARCHAR: + case CHAR: + + case LIST: + case MAP: + case STRUCT: + case UNION: + default: + } + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinConfig.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinConfig.java new file mode 100644 index 0000000..569437e --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinConfig.java @@ -0,0 +1,659 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; +import org.apache.hadoop.hive.ql.exec.testrow.TestCollectorOperator; +import org.apache.hadoop.hive.ql.exec.testrow.TestCountCollectorOperator; +import org.apache.hadoop.hive.ql.exec.testrow.TestDescription; +import org.apache.hadoop.hive.ql.exec.testrow.TestRow; +import org.apache.hadoop.hive.ql.exec.testrow.TestRowCollectorOperatorBase; +import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping; +import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSourceMapping; +import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastTableContainer; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VerifyFastRow; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.JoinCondDesc; +import org.apache.hadoop.hive.ql.plan.JoinDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; +import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hive.common.util.ReflectionUtil; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; + +public class TestMapJoinConfig { + + public static enum MapJoinImplementation { + ROW_MODE_HASH_MAP, + ROW_MODE_OPTIMIZED, + VECTOR_PASS_THROUGH, + NATIVE_VECTOR_OPTIMIZED, + NATIVE_VECTOR_FAST + } + + public static MapJoinDesc createMapJoinDesc(TestMapJoinDescription testDesc) { + + MapJoinDesc mapJoinDesc = new MapJoinDesc(); + mapJoinDesc.setPosBigTable(0); + List keyExpr = new ArrayList(); + for (int i = 0; i < testDesc.bigTableKeyColumnNums.length; i++) { + keyExpr.add(new ExprNodeColumnDesc(testDesc.bigTableKeyTypeInfos[i], testDesc.bigTableKeyColumnNames[i], "B", false)); + } + + Map> keyMap = new HashMap>(); + keyMap.put((byte)0, keyExpr); + + List smallTableExpr = new ArrayList(); + for (int i = 0; i < testDesc.smallTableValueColumnNames.length; i++) { + smallTableExpr.add(new ExprNodeColumnDesc(testDesc.smallTableValueTypeInfos[i], testDesc.smallTableValueColumnNames[i], "S", false)); + } + keyMap.put((byte)1, smallTableExpr); + + mapJoinDesc.setKeys(keyMap); + mapJoinDesc.setExprs(keyMap); + + Byte[] order = new Byte[] {(byte) 0, (byte) 1}; + mapJoinDesc.setTagOrder(order); + mapJoinDesc.setNoOuterJoin(testDesc.vectorMapJoinVariation != VectorMapJoinVariation.OUTER); + + Map> filterMap = new HashMap>(); + filterMap.put((byte) 0, new ArrayList()); // None. + mapJoinDesc.setFilters(filterMap); + + List bigTableRetainColumnNumsList = intArrayToList(testDesc.bigTableRetainColumnNums); + + // For now, just small table values... + List smallTableRetainColumnNumsList = intArrayToList(testDesc.smallTableRetainValueColumnNums); + + Map> retainListMap = new HashMap>(); + retainListMap.put((byte) 0, bigTableRetainColumnNumsList); + retainListMap.put((byte) 1, smallTableRetainColumnNumsList); + mapJoinDesc.setRetainList(retainListMap); + + int joinDescType; + switch (testDesc.vectorMapJoinVariation) { + case INNER: + case INNER_BIG_ONLY: + joinDescType = JoinDesc.INNER_JOIN; + break; + case LEFT_SEMI: + joinDescType = JoinDesc.LEFT_SEMI_JOIN; + break; + case OUTER: + joinDescType = JoinDesc.LEFT_OUTER_JOIN; + break; + default: + throw new RuntimeException("unknown operator variation " + testDesc.vectorMapJoinVariation); + } + JoinCondDesc[] conds = new JoinCondDesc[1]; + conds[0] = new JoinCondDesc(0, 1, joinDescType); + mapJoinDesc.setConds(conds); + + TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(testDesc.hiveConf, PlanUtils + .getFieldSchemasFromColumnList(keyExpr, "")); + mapJoinDesc.setKeyTblDesc(keyTableDesc); + + TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc( + PlanUtils.getFieldSchemasFromColumnList(smallTableExpr, "")); + ArrayList valueTableDescsList = new ArrayList(); + valueTableDescsList.add(null); + valueTableDescsList.add(valueTableDesc); + mapJoinDesc.setValueTblDescs(valueTableDescsList); + mapJoinDesc.setValueFilteredTblDescs(valueTableDescsList); + + mapJoinDesc.setOutputColumnNames(Arrays.asList(testDesc.outputColumnNames)); + + return mapJoinDesc; + } + + public static VectorMapJoinDesc createVectorMapJoinDesc(TestMapJoinDescription testDesc) { + VectorMapJoinDesc vectorDesc = new VectorMapJoinDesc(); + vectorDesc.setHashTableImplementationType(HashTableImplementationType.FAST); + HashTableKind hashTableKind; + switch (testDesc.vectorMapJoinVariation) { + case INNER: + hashTableKind = HashTableKind.HASH_MAP; + break; + case INNER_BIG_ONLY: + hashTableKind = HashTableKind.HASH_MULTISET; + break; + case LEFT_SEMI: + hashTableKind = HashTableKind.HASH_SET; + break; + case OUTER: + hashTableKind = HashTableKind.HASH_MAP; + break; + default: + throw new RuntimeException("unknown operator variation " + testDesc.vectorMapJoinVariation); + } + vectorDesc.setHashTableKind(hashTableKind); + HashTableKeyType hashTableKeyType = HashTableKeyType.MULTI_KEY; // Assume. + if (testDesc.bigTableKeyTypeInfos.length == 1) { + switch (((PrimitiveTypeInfo) testDesc.bigTableKeyTypeInfos[0]).getPrimitiveCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + hashTableKeyType = HashTableKeyType.LONG; + break; + case STRING: + hashTableKeyType = HashTableKeyType.STRING; + break; + default: + // Stay with MULTI_KEY + } + } + vectorDesc.setHashTableKeyType(hashTableKeyType); + vectorDesc.setVectorMapJoinVariation(testDesc.vectorMapJoinVariation); + vectorDesc.setMinMaxEnabled(false); + + VectorMapJoinInfo vectorMapJoinInfo = new VectorMapJoinInfo(); + + vectorMapJoinInfo.setBigTableKeyColumnMap(testDesc.bigTableKeyColumnNums); + vectorMapJoinInfo.setBigTableKeyColumnNames(testDesc.bigTableKeyColumnNames); + vectorMapJoinInfo.setBigTableKeyTypeInfos(testDesc.bigTableKeyTypeInfos); + vectorMapJoinInfo.setBigTableKeyExpressions(null); + + vectorMapJoinInfo.setBigTableValueColumnMap(new int[0]); + vectorMapJoinInfo.setBigTableValueColumnNames(new String[0]); + vectorMapJoinInfo.setBigTableValueTypeInfos(new TypeInfo[0]); + vectorMapJoinInfo.setBigTableValueExpressions(null); + + VectorColumnSourceMapping projectionMapping = + new VectorColumnSourceMapping("Projection Mapping"); + + + VectorColumnOutputMapping bigTableRetainedMapping = + new VectorColumnOutputMapping("Big Table Retained Mapping"); + for (int i = 0; i < testDesc.bigTableTypeInfos.length; i++) { + bigTableRetainedMapping.add(i, i, testDesc.bigTableTypeInfos[i]); + projectionMapping.add(i, i, testDesc.bigTableKeyTypeInfos[i]); + } + + VectorColumnOutputMapping bigTableOuterKeyMapping = + new VectorColumnOutputMapping("Big Table Outer Key Mapping"); + + // The order of the fields in the LazyBinary small table value must be used, so + // we use the source ordering flavor for the mapping. + VectorColumnSourceMapping smallTableMapping = + new VectorColumnSourceMapping("Small Table Mapping"); + int outputColumn = testDesc.bigTableTypeInfos.length; + for (int i = 0; i < testDesc.smallTableValueTypeInfos.length; i++) { + smallTableMapping.add(i, outputColumn, testDesc.smallTableValueTypeInfos[i]); + projectionMapping.add(outputColumn, outputColumn, testDesc.smallTableValueTypeInfos[i]); + outputColumn++; + } + + // Convert dynamic arrays and maps to simple arrays. + + bigTableRetainedMapping.finalize(); + + bigTableOuterKeyMapping.finalize(); + + smallTableMapping.finalize(); + + vectorMapJoinInfo.setBigTableRetainedMapping(bigTableRetainedMapping); + vectorMapJoinInfo.setBigTableOuterKeyMapping(bigTableOuterKeyMapping); + vectorMapJoinInfo.setSmallTableMapping(smallTableMapping); + + projectionMapping.finalize(); + + // Verify we added an entry for each output. + assert projectionMapping.isSourceSequenceGood(); + + vectorMapJoinInfo.setProjectionMapping(projectionMapping); + + assert projectionMapping.getCount() == testDesc.outputColumnNames.length; + + vectorDesc.setVectorMapJoinInfo(vectorMapJoinInfo); + + return vectorDesc; + } + + public static VectorMapJoinCommonOperator createNativeVectorMapJoinOperator( + VectorMapJoinVariation VectorMapJoinVariation, MapJoinDesc mapJoinDesc, + VectorMapJoinDesc vectorDesc, VectorizationContext vContext) + throws HiveException { + VectorMapJoinCommonOperator operator; + switch (vectorDesc.getHashTableKeyType()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + switch (VectorMapJoinVariation) { + case INNER: + operator = + new VectorMapJoinInnerLongOperator(new CompilationOpContext(), + vContext, mapJoinDesc); + break; + case INNER_BIG_ONLY: + operator = + new VectorMapJoinInnerBigOnlyLongOperator(new CompilationOpContext(), + vContext, mapJoinDesc); + break; + case LEFT_SEMI: + operator = + new VectorMapJoinLeftSemiLongOperator(new CompilationOpContext(), + vContext, mapJoinDesc); + break; + case OUTER: + operator = + new VectorMapJoinOuterLongOperator(new CompilationOpContext(), + vContext, mapJoinDesc); + break; + default: + throw new RuntimeException("unknown operator variation " + VectorMapJoinVariation); + } + break; + case STRING: + switch (VectorMapJoinVariation) { + case INNER: + operator = + new VectorMapJoinInnerStringOperator(new CompilationOpContext(), + vContext, mapJoinDesc); + break; + case INNER_BIG_ONLY: + operator = + new VectorMapJoinInnerBigOnlyStringOperator(new CompilationOpContext(), + vContext, mapJoinDesc); + break; + case LEFT_SEMI: + operator = + new VectorMapJoinLeftSemiStringOperator(new CompilationOpContext(), + vContext, mapJoinDesc); + break; + case OUTER: + operator = + new VectorMapJoinOuterStringOperator(new CompilationOpContext(), + vContext, mapJoinDesc); + break; + default: + throw new RuntimeException("unknown operator variation " + VectorMapJoinVariation); + } + break; + case MULTI_KEY: + switch (VectorMapJoinVariation) { + case INNER: + operator = + new VectorMapJoinInnerMultiKeyOperator(new CompilationOpContext(), + vContext, mapJoinDesc); + break; + case INNER_BIG_ONLY: + operator = + new VectorMapJoinInnerBigOnlyMultiKeyOperator(new CompilationOpContext(), + vContext, mapJoinDesc); + break; + case LEFT_SEMI: + operator = + new VectorMapJoinLeftSemiMultiKeyOperator(new CompilationOpContext(), + vContext, mapJoinDesc); + break; + case OUTER: + operator = + new VectorMapJoinOuterMultiKeyOperator(new CompilationOpContext(), + vContext, mapJoinDesc); + break; + default: + throw new RuntimeException("unknown operator variation " + VectorMapJoinVariation); + } + break; + default: + throw new RuntimeException("Unknown hash table key type " + vectorDesc.getHashTableKeyType()); + } + return operator; + } + + public static VectorizationContext createVectorizationContext(TestMapJoinDescription testDesc) + throws HiveException { + VectorizationContext vContext = + new VectorizationContext("test", testDesc.bigTableColumnNamesList); + + // Create scratch columns to hold small table results. + for (int i = 0; i < testDesc.smallTableValueTypeInfos.length; i++) { + vContext.allocateScratchColumn(testDesc.smallTableValueTypeInfos[i]); + } + return vContext; + } + + private static boolean hasFilter(MapJoinDesc mapJoinDesc, int alias) { + int[][] filterMaps = mapJoinDesc.getFilterMap(); + return filterMaps != null && filterMaps[alias] != null; + } + + public static MapJoinTableContainerSerDe createMapJoinTableContainerSerDe(MapJoinDesc mapJoinDesc) + throws SerDeException { + + final Byte smallTablePos = 1; + + // UNDONE: Why do we need to specify BinarySortableSerDe explicitly here??? + TableDesc keyTableDesc = mapJoinDesc.getKeyTblDesc(); + AbstractSerDe keySerializer = (AbstractSerDe) ReflectionUtil.newInstance( + BinarySortableSerDe.class, null); + SerDeUtils.initializeSerDe(keySerializer, null, keyTableDesc.getProperties(), null); + MapJoinObjectSerDeContext keyContext = new MapJoinObjectSerDeContext(keySerializer, false); + + TableDesc valueTableDesc; + if (mapJoinDesc.getNoOuterJoin()) { + valueTableDesc = mapJoinDesc.getValueTblDescs().get(smallTablePos); + } else { + valueTableDesc = mapJoinDesc.getValueFilteredTblDescs().get(smallTablePos); + } + AbstractSerDe valueSerDe = (AbstractSerDe) ReflectionUtil.newInstance( + valueTableDesc.getDeserializerClass(), null); + SerDeUtils.initializeSerDe(valueSerDe, null, valueTableDesc.getProperties(), null); + MapJoinObjectSerDeContext valueContext = + new MapJoinObjectSerDeContext(valueSerDe, hasFilter(mapJoinDesc, smallTablePos)); + MapJoinTableContainerSerDe mapJoinTableContainerSerDe = + new MapJoinTableContainerSerDe(keyContext, valueContext); + return mapJoinTableContainerSerDe; + } + + public static void connectOperators( + TestMapJoinDescription testDesc, + Operator operator, + Operator testCollectorOperator) throws HiveException { + Operator[] parents = new Operator[] {operator}; + testCollectorOperator.setParentOperators(Arrays.asList(parents)); + Operator[] childOperators = new Operator[] {testCollectorOperator}; + operator.setChildOperators(Arrays.asList(childOperators)); + HiveConf.setBoolVar(testDesc.hiveConf, + HiveConf.ConfVars.HIVE_MAPJOIN_TESTING_NO_HASH_TABLE_LOAD, true); + operator.initialize(testDesc.hiveConf, testDesc.inputObjectInspectors); + } + + private static List intArrayToList(int[] intArray) { + List intList = new ArrayList(intArray.length); + for (int i = 0; i < intArray.length; i++) { + intList.add(intArray[i]); + } + return intList; + } + + private static void loadTableContainerData(TestMapJoinDescription testDesc, TestMapJoinData testData, + MapJoinTableContainer mapJoinTableContainer ) + throws IOException, SerDeException, HiveException { + + LazyBinarySerializeWrite valueSerializeWrite = null; + Output valueOutput = null; + if (testData.smallTableValues != null) { + valueSerializeWrite = new LazyBinarySerializeWrite(testDesc.smallTableValueTypeInfos.length); + valueOutput = new Output(); + } + BytesWritable valueBytesWritable = new BytesWritable(); + + BytesWritable keyBytesWritable = new BytesWritable(); + BinarySortableSerializeWrite keySerializeWrite = + new BinarySortableSerializeWrite(testDesc.bigTableKeyTypeInfos.length); + Output keyOutput = new Output(); + int round = 0; + boolean atLeastOneValueAdded = false; + while (true) { + for (Entry testRowEntry : testData.smallTableKeyHashMap.entrySet()) { + final int smallTableKeyIndex = testRowEntry.getValue(); + final int valueCount = testData.smallTableValueCounts.get(smallTableKeyIndex); + boolean addEntry = round + 1 <= valueCount; + + if (addEntry) { + atLeastOneValueAdded = true; + + TestRow valueRow = null; + if (testData.smallTableValues != null) { + ArrayList valueList = testData.smallTableValues.get(smallTableKeyIndex); + valueRow = valueList.get(round); + } + + Object[] smallTableKey = testRowEntry.getKey().getRow(); + keyOutput.reset(); + keySerializeWrite.set(keyOutput); + + for (int index = 0; index < testDesc.bigTableKeyTypeInfos.length; index++) { + + Writable keyWritable = (Writable) smallTableKey[index]; + + VerifyFastRow.serializeWrite( + keySerializeWrite, (PrimitiveTypeInfo) testDesc.bigTableKeyTypeInfos[index], keyWritable); + } + + keyBytesWritable.set(keyOutput.getData(), 0, keyOutput.getLength()); + + if (valueRow == null) { + // Empty value. + mapJoinTableContainer.putRow(keyBytesWritable, valueBytesWritable); + } else { + Object[] smallTableValue = valueRow.getRow(); + valueOutput.reset(); + valueSerializeWrite.set(valueOutput); + for (int index = 0; index < testDesc.smallTableValueTypeInfos.length; index++) { + + Writable valueWritable = (Writable) smallTableValue[index]; + + VerifyFastRow.serializeWrite( + valueSerializeWrite, (PrimitiveTypeInfo) testDesc.smallTableValueTypeInfos[index], valueWritable); + } + valueBytesWritable.set(valueOutput.getData(), 0, valueOutput.getLength()); + mapJoinTableContainer.putRow(keyBytesWritable, valueBytesWritable); + } + } + } + if (testData.smallTableValues == null || !atLeastOneValueAdded) { + break; + } + round++; + atLeastOneValueAdded = false; + } + mapJoinTableContainer.seal(); + } + + public static MapJoinOperator createMapJoin(TestMapJoinDescription testDesc, + Operator collectorOperator, TestMapJoinData testData, + MapJoinDesc mapJoinDesc, boolean isVectorMapJoin, boolean isOriginalMapJoin) + throws SerDeException, IOException, HiveException { + + final Byte bigTablePos = 0; + + MapJoinTableContainerSerDe mapJoinTableContainerSerDe = + TestMapJoinConfig.createMapJoinTableContainerSerDe(mapJoinDesc); + + MapJoinObjectSerDeContext valCtx = mapJoinTableContainerSerDe.getValueContext(); + + MapJoinTableContainer mapJoinTableContainer = + (isOriginalMapJoin ? + new HashMapWrapper( + testDesc.hiveConf, -1) : + new MapJoinBytesTableContainer( + testDesc.hiveConf, valCtx, testData.smallTableKeyHashMap.size(), 0)); + + mapJoinTableContainer.setSerde( + mapJoinTableContainerSerDe.getKeyContext(), + mapJoinTableContainerSerDe.getValueContext()); + + loadTableContainerData(testDesc, testData, mapJoinTableContainer); + + MapJoinOperator operator; + if (!isVectorMapJoin) { + operator = new MapJoinOperator(new CompilationOpContext()); + operator.setConf(mapJoinDesc); + } else { + VectorizationContext vContext = new VectorizationContext("test", testDesc.bigTableColumnNamesList); + // Create scratch columns to hold small table results. + for (int i = 0; i < testDesc.smallTableValueTypeInfos.length; i++) { + vContext.allocateScratchColumn(testDesc.smallTableValueTypeInfos[i]); + } + + // This is what the Vectorizer class does. + List bigTableFilters = mapJoinDesc.getFilters().get(bigTablePos); + boolean isOuterAndFiltered = (!mapJoinDesc.isNoOuterJoin() && bigTableFilters.size() > 0); + if (!isOuterAndFiltered) { + operator = new VectorMapJoinOperator(new CompilationOpContext(), vContext, mapJoinDesc); + } else { + operator = new VectorMapJoinOuterFilteredOperator(new CompilationOpContext(), vContext, mapJoinDesc); + } + } + + TestMapJoinConfig.connectOperators(testDesc, operator, collectorOperator); + + operator.setTestMapJoinTableContainer(1, mapJoinTableContainer, mapJoinTableContainerSerDe); + + return operator; + } + + public static MapJoinOperator createNativeVectorMapJoin(TestMapJoinDescription testDesc, + Operator collectorOperator, TestMapJoinData testData, + MapJoinDesc mapJoinDesc, HashTableImplementationType hashTableImplementationType) + throws SerDeException, IOException, HiveException { + + VectorMapJoinDesc vectorDesc = TestMapJoinConfig.createVectorMapJoinDesc(testDesc); + mapJoinDesc.setVectorDesc(vectorDesc); + + vectorDesc.setHashTableImplementationType(hashTableImplementationType); + + VectorMapJoinInfo vectorMapJoinInfo = vectorDesc.getVectorMapJoinInfo(); + + MapJoinTableContainer mapJoinTableContainer; + switch (vectorDesc.getHashTableImplementationType()) { + case OPTIMIZED: + mapJoinTableContainer = + new MapJoinBytesTableContainer( + testDesc.hiveConf, null, testData.smallTableKeyHashMap.size(), 0); + + MapJoinTableContainerSerDe mapJoinTableContainerSerDe = + TestMapJoinConfig.createMapJoinTableContainerSerDe(mapJoinDesc); + + mapJoinTableContainer.setSerde( + mapJoinTableContainerSerDe.getKeyContext(), + mapJoinTableContainerSerDe.getValueContext()); + break; + case FAST: + mapJoinTableContainer = + new VectorMapJoinFastTableContainer( + mapJoinDesc, testDesc.hiveConf, testData.smallTableKeyHashMap.size()); + break; + default: + throw new RuntimeException("Unexpected hash table implementation type " + vectorDesc.getHashTableImplementationType()); + } + + loadTableContainerData(testDesc, testData, mapJoinTableContainer); + + VectorizationContext vContext = TestMapJoinConfig.createVectorizationContext(testDesc); + + VectorMapJoinCommonOperator operator = + TestMapJoinConfig.createNativeVectorMapJoinOperator( + testDesc.vectorMapJoinVariation, + mapJoinDesc, + vectorDesc, + vContext); + + TestMapJoinConfig.connectOperators(testDesc, operator, collectorOperator); + + operator.setTestMapJoinTableContainer(1, mapJoinTableContainer, null); + + return operator; + } + + public static MapJoinOperator createMapJoinImplementation(MapJoinImplementation mapJoinImplementation, + TestMapJoinDescription testDesc, + Operator testCollectorOperator, TestMapJoinData testData, + MapJoinDesc mapJoinDesc) throws SerDeException, IOException, HiveException { + + MapJoinOperator operator; + switch (mapJoinImplementation) { + case ROW_MODE_HASH_MAP: + + // MapJoinOperator + operator = TestMapJoinConfig.createMapJoin( + testDesc, testCollectorOperator, testData, mapJoinDesc, /* isVectorMapJoin */ false, + /* isOriginalMapJoin */ true); + break; + + case ROW_MODE_OPTIMIZED: + + // MapJoinOperator + operator = TestMapJoinConfig.createMapJoin( + testDesc, testCollectorOperator, testData, mapJoinDesc, /* isVectorMapJoin */ false, + /* isOriginalMapJoin */ false); + break; + + case VECTOR_PASS_THROUGH: + + // VectorMapJoinOperator + operator = TestMapJoinConfig.createMapJoin( + testDesc, testCollectorOperator, testData, mapJoinDesc, /* isVectorMapJoin */ true, + /* n/a */ false); + break; + + case NATIVE_VECTOR_OPTIMIZED: + operator = TestMapJoinConfig.createNativeVectorMapJoin( + testDesc, testCollectorOperator, testData, mapJoinDesc, HashTableImplementationType.OPTIMIZED); + break; + + case NATIVE_VECTOR_FAST: + operator = TestMapJoinConfig.createNativeVectorMapJoin( + testDesc, testCollectorOperator, testData, mapJoinDesc, HashTableImplementationType.FAST); + break; + default: + throw new RuntimeException("Unexpected MapJoin Operator Implementation " + mapJoinImplementation); + } + return operator; + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinData.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinData.java new file mode 100644 index 0000000..6c144f7 --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinData.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Random; +import java.util.Map.Entry; + +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.testrow.TestRow; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorRandomRowSource; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.batchgen.VectorBatchGenerator; +import org.apache.hadoop.hive.ql.exec.vector.batchgen.VectorBatchGenerator.GenerateType; +import org.apache.hadoop.hive.ql.exec.vector.batchgen.VectorBatchGenerator.GenerateType.GenerateCategory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +public class TestMapJoinData { + + final int bigTableBatchesLength; + final int bigTableBatchesSize; + public final VectorizedRowBatch[] bigTableBatches; + TestRow[] bigTableTestKeys; + HashMap smallTableKeyHashMap; + ArrayList smallTableValueCounts; + ArrayList> smallTableValues; + + public TestMapJoinData(int rowCount, TestMapJoinDescription testDesc, + Random random) throws HiveException { + + GenerateType[] generateTypes = generateTypesFromTypeInfos(testDesc.bigTableTypeInfos); + VectorBatchGenerator generator = new VectorBatchGenerator(generateTypes); + + final int maxBatchSize = VectorizedRowBatch.DEFAULT_SIZE; + bigTableBatchesLength = (rowCount + maxBatchSize -1) / maxBatchSize; + bigTableBatchesSize = rowCount; + + int sizeCountDown = rowCount; + bigTableBatches = createBigTableBatches(generator, bigTableBatchesLength); + for (int i = 0; i < bigTableBatchesLength; i++) { + final int size = Math.min(sizeCountDown, maxBatchSize); + generator.generateBatch(bigTableBatches[i], random, size); + sizeCountDown -= size; + } + + VectorExtractRow vectorExtractRow = new VectorExtractRow(); + vectorExtractRow.init(testDesc.bigTableKeyTypeInfos); + + bigTableTestKeys = + getTestKeys(bigTableBatches, vectorExtractRow, testDesc.bigTableKeyTypeInfos.length, + testDesc.bigTableObjectInspectors); + + smallTableKeyHashMap = new HashMap(); + final int keyProbes = Math.max(1, bigTableBatchesSize / 2); + for (int i = 0; i < keyProbes; i++) { + int index = random.nextInt(bigTableBatchesSize); + TestRow bigTableTestKey = bigTableTestKeys[index]; + smallTableKeyHashMap.put((TestRow) bigTableTestKey.clone(), -1); + } + + // UNDONE: For now, don't add more small keys... + /* + // Add more small table keys that are not in Big Table batches. + final int smallTableAdditionalLength = 1 + random.nextInt(4); + final int smallTableAdditionalSize = smallTableAdditionalLength * maxBatchSize; + VectorizedRowBatch[] smallTableAdditionalBatches = createBigTableBatches(generator, smallTableAdditionalLength); + for (int i = 0; i < smallTableAdditionalLength; i++) { + generator.generateBatch(smallTableAdditionalBatches[i], random, maxBatchSize); + } + TestRow[] additionalTestKeys = getTestKeys(smallTableAdditionalBatches, vectorExtractRow, + testDesc.bigTableKeyTypeInfos.length, testDesc.bigTableObjectInspectors); + final int smallTableAdditionKeyProbes = smallTableAdditionalSize / 2; + for (int i = 0; i < smallTableAdditionKeyProbes; i++) { + int index = random.nextInt(smallTableAdditionalSize); + TestRow additionalTestKey = additionalTestKeys[index]; + smallTableKeyHashMap.put((TestRow) additionalTestKey.clone(), -1); + } + */ + + // Number the test rows with collection order. + int addCount = 0; + for (Entry testRowEntry : smallTableKeyHashMap.entrySet()) { + testRowEntry.setValue(addCount++); + } + + generateVariationData(this, testDesc, random); + } + + public static void driveBigTableData(TestMapJoinDescription testDesc, TestMapJoinData testData, + MapJoinOperator operator) throws HiveException { + VectorExtractRow vectorExtractRow = new VectorExtractRow(); + vectorExtractRow.init(testDesc.bigTableKeyTypeInfos); + + final int columnCount = testDesc.bigTableKeyTypeInfos.length; + Object[] row = new Object[columnCount]; + + int bigTableRowCount = 0; + for (int i = 0; i < testData.bigTableBatches.length; i++) { + VectorizedRowBatch bigTableBatch = testData.bigTableBatches[i]; + + // Extract rows and call process per row + for (int r = 0; r < bigTableBatch.size; r++) { + vectorExtractRow.extractRow(bigTableBatch, r, row); + bigTableRowCount++; + operator.process(row, 0); + } + } + operator.closeOp(false); + } + + public static void driveVectorBigTableData(TestMapJoinDescription testDesc, TestMapJoinData testData, + MapJoinOperator operator) throws HiveException { + int bigTableRowCount = 0; + for (int i = 0; i < testData.bigTableBatches.length; i++) { + VectorizedRowBatch bigTableBatch = testData.bigTableBatches[i]; + ColumnVector[] newCols = new ColumnVector[bigTableBatch.cols.length + testDesc.smallTableValueTypeInfos.length]; + System.arraycopy(bigTableBatch.cols, 0, newCols, 0, bigTableBatch.cols.length); + + bigTableRowCount += bigTableBatch.size; + + // UNDONE: For now, save information to preserve the batch... + final int saveSize = bigTableBatch.size; + operator.process(bigTableBatch, 0); + bigTableBatch.size = saveSize; + bigTableBatch.selectedInUse = false; + } + operator.closeOp(false); + } + + public static void generateVariationData(TestMapJoinData testData, + TestMapJoinDescription testDesc, Random random) { + switch (testDesc.vectorMapJoinVariation) { + case INNER_BIG_ONLY: + case LEFT_SEMI: + testData.generateRandomSmallTableCounts(testDesc, random); + break; + case INNER: + case OUTER: + testData.generateRandomSmallTableCounts(testDesc, random); + testData.generateRandomSmallTableValues(testDesc, random); + break; + default: + throw new RuntimeException("Unknown operator variation " + testDesc.vectorMapJoinVariation); + } + } + + private static TestRow generateRandomSmallTableValueRow(TestMapJoinDescription testDesc, Random random) { + final int columnCount = testDesc.smallTableValueTypeInfos.length; + Object[] smallTableValueRow = VectorRandomRowSource.randomWritablePrimitiveRow(columnCount, random, + testDesc.smallTableValuePrimitiveTypeInfos); + for (int c = 0; c < smallTableValueRow.length; c++) { + smallTableValueRow[c] = ((PrimitiveObjectInspector) testDesc.smallTableObjectInspectors[c]).copyObject(smallTableValueRow[c]); + } + return new TestRow(smallTableValueRow); + } + + private void generateRandomSmallTableCounts(TestMapJoinDescription testDesc, Random random) { + smallTableValueCounts = new ArrayList(); + for (Entry testKeyEntry : smallTableKeyHashMap.entrySet()) { + final int valueCount = 1 + random.nextInt(19); + smallTableValueCounts.add(valueCount); + } + } + + private void generateRandomSmallTableValues(TestMapJoinDescription testDesc, Random random) { + smallTableValues = new ArrayList>(); + for (Entry testKeyEntry : smallTableKeyHashMap.entrySet()) { + ArrayList valueList = new ArrayList(); + smallTableValues.add(valueList); + final int valueCount = smallTableValueCounts.get(testKeyEntry.getValue()); + for (int v = 0; v < valueCount; v++) { + valueList.add(generateRandomSmallTableValueRow(testDesc, random)); + } + } + } + + public VectorizedRowBatch[] createBigTableBatches(VectorBatchGenerator generator, int bigTableBatchesLength) { + + VectorizedRowBatch[] batches = new VectorizedRowBatch[bigTableBatchesLength]; + // UNDONE: Only up to rowCount + for (int i = 0; i < bigTableBatchesLength; i++) { + batches[i] = generator.createBatch(); + } + return batches; + } + + private static TestRow[] getTestKeys(VectorizedRowBatch[] batches, VectorExtractRow vectorExtractRow, + int columnCount, ObjectInspector[] objectInspectors) { + TestRow[] testKeys = new TestRow[batches.length * VectorizedRowBatch.DEFAULT_SIZE]; + int index = 0; + for (int b = 0; b < batches.length; b++) { + VectorizedRowBatch batch = batches[b]; + for (int i = 0; i < batch.size; i++) { + Object[] rowObjects = new Object[columnCount]; + vectorExtractRow.extractRow(batch, i, rowObjects); + for (int c = 0; c < rowObjects.length; c++) { + rowObjects[c] = ((PrimitiveObjectInspector) objectInspectors[c]).copyObject(rowObjects[c]); + } + testKeys[index++] = new TestRow(rowObjects); + } + } + return testKeys; + } + + private static GenerateType[] generateTypesFromTypeInfos(TypeInfo[] typeInfos) { + final int size = typeInfos.length; + GenerateType[] generateTypes = new GenerateType[size]; + for (int i = 0; i < size; i++) { + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfos[i]; + GenerateCategory category = + GenerateCategory.generateCategoryFromPrimitiveCategory(primitiveTypeInfo.getPrimitiveCategory()); + generateTypes[i] = new GenerateType(category); + } + return generateTypes; + } + + public static void doAddSmallTableCoumns(TestMapJoinDescription testDesc, TestMapJoinData testData) { + for (int batchNum = 0; batchNum < testData.bigTableBatches.length; batchNum++) { + VectorizedRowBatch bigTableBatch = testData.bigTableBatches[batchNum]; + ColumnVector[] newCols = new ColumnVector[bigTableBatch.cols.length + testDesc.smallTableValueTypeInfos.length]; + System.arraycopy(bigTableBatch.cols, 0, newCols, 0, bigTableBatch.cols.length); + + for (int s = 0; s < testDesc.smallTableValueTypeInfos.length; s++) { + newCols[bigTableBatch.cols.length + s] = + VectorizedBatchUtil.createColumnVector(testDesc.smallTableValueTypeInfos[s]); + } + bigTableBatch.cols = newCols; + bigTableBatch.numCols = newCols.length; + } + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinDescription.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinDescription.java new file mode 100644 index 0000000..8d4278b --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinDescription.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.testrow.TestDescription; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; + +public class TestMapJoinDescription extends TestDescription { + + final VectorMapJoinVariation vectorMapJoinVariation; + + // Adjustable. + String[] bigTableColumnNames; + TypeInfo[] bigTableTypeInfos; + int[] bigTableKeyColumnNums; + String[] smallTableValueColumnNames; + public TypeInfo[] smallTableValueTypeInfos; + public int[] bigTableRetainColumnNums; + int[] smallTableRetainKeyColumnNums; + int[] smallTableRetainValueColumnNums; + + // Derived. + List bigTableColumnNamesList; + String[] bigTableKeyColumnNames; + TypeInfo[] bigTableKeyTypeInfos; + List smallTableValueColumnNamesList; + ObjectInspector[] bigTableObjectInspectors; + List bigTableObjectInspectorsList; + StandardStructObjectInspector bigTableStandardObjectInspector; + PrimitiveTypeInfo[] smallTableValuePrimitiveTypeInfos; + ObjectInspector[] smallTableObjectInspectors; + PrimitiveCategory[] smallTablePrimitiveCategories; + List smallTableObjectInspectorsList; + StandardStructObjectInspector smallTableStandardObjectInspector; + ObjectInspector[] inputObjectInspectors; + String[] outputColumnNames; + TypeInfo[] outputTypeInfos; + ObjectInspector[] outputObjectInspectors; + + public TestMapJoinDescription ( + HiveConf hiveConf, + VectorMapJoinVariation vectorMapJoinVariation, + String[] bigTableColumnNames, TypeInfo[] bigTableTypeInfos, + int[] bigTableKeyColumnNums, + String[] smallTableValueColumnNames, TypeInfo[] smallTableValueTypeInfos, + int[] bigTableRetainColumnNums, + int[] smallTableRetainKeyColumnNums, int[] smallTableRetainValueColumnNums) { + + super(hiveConf); + this.vectorMapJoinVariation = vectorMapJoinVariation; + + this.bigTableColumnNames = bigTableColumnNames; + this.bigTableTypeInfos = bigTableTypeInfos; + this.bigTableKeyColumnNums = bigTableKeyColumnNums; + this.smallTableValueColumnNames = smallTableValueColumnNames; + this.smallTableValueTypeInfos = smallTableValueTypeInfos; + this.bigTableRetainColumnNums = bigTableRetainColumnNums; + this.smallTableRetainKeyColumnNums = smallTableRetainKeyColumnNums; + this.smallTableRetainValueColumnNums = smallTableRetainValueColumnNums; + + switch (vectorMapJoinVariation) { + case INNER_BIG_ONLY: + case LEFT_SEMI: + trimAwaySmallTableValueInfo(); + break; + case INNER: + case OUTER: + break; + default: + throw new RuntimeException("Unknown operator variation " + vectorMapJoinVariation); + } + + computeDerived(); + } + + public void computeDerived() { + bigTableColumnNamesList = Arrays.asList(bigTableColumnNames); + + bigTableKeyColumnNames = new String[bigTableKeyColumnNums.length]; + bigTableKeyTypeInfos = new TypeInfo[bigTableKeyColumnNums.length]; + for (int i = 0; i < bigTableKeyColumnNums.length; i++) { + bigTableKeyColumnNames[i] = bigTableColumnNames[bigTableKeyColumnNums[i]]; + bigTableKeyTypeInfos[i] = bigTableTypeInfos[bigTableKeyColumnNums[i]]; + } + + smallTableValueColumnNamesList = Arrays.asList(smallTableValueColumnNames); + + bigTableObjectInspectors = new ObjectInspector[bigTableTypeInfos.length]; + for (int i = 0; i < bigTableTypeInfos.length; i++) { + bigTableObjectInspectors[i] = + PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector((PrimitiveTypeInfo) bigTableTypeInfos[i]); + } + bigTableObjectInspectorsList = Arrays.asList(bigTableObjectInspectors); + + smallTableObjectInspectors = new ObjectInspector[smallTableValueTypeInfos.length]; + smallTablePrimitiveCategories = new PrimitiveCategory[smallTableValueTypeInfos.length]; + smallTableValuePrimitiveTypeInfos = new PrimitiveTypeInfo[smallTableValueTypeInfos.length]; + for (int i = 0; i < smallTableValueTypeInfos.length; i++) { + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) smallTableValueTypeInfos[i]; + smallTableObjectInspectors[i] = + PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveTypeInfo); + smallTablePrimitiveCategories[i] = primitiveTypeInfo.getPrimitiveCategory(); + smallTableValuePrimitiveTypeInfos[i] = primitiveTypeInfo; + } + smallTableObjectInspectorsList = Arrays.asList(smallTableObjectInspectors); + + bigTableStandardObjectInspector = + ObjectInspectorFactory.getStandardStructObjectInspector( + bigTableColumnNamesList, Arrays.asList((ObjectInspector[]) bigTableObjectInspectors)); + smallTableStandardObjectInspector = + ObjectInspectorFactory.getStandardStructObjectInspector( + smallTableValueColumnNamesList, Arrays.asList((ObjectInspector[]) smallTableObjectInspectors)); + + inputObjectInspectors = + new ObjectInspector[] { bigTableStandardObjectInspector, smallTableStandardObjectInspector }; + + int outputLength = + bigTableRetainColumnNums.length + + smallTableRetainKeyColumnNums.length + + smallTableRetainValueColumnNums.length; + outputColumnNames = createOutputColumnNames(outputLength); + + outputTypeInfos = new TypeInfo[outputLength]; + int outputIndex = 0; + for (int i = 0; i < bigTableRetainColumnNums.length; i++) { + outputTypeInfos[outputIndex++] = bigTableTypeInfos[bigTableRetainColumnNums[i]]; + } + // for (int i = 0; i < smallTableRetainKeyColumnNums.length; i++) { + // outputTypeInfos[outputIndex++] = smallTableTypeInfos[smallTableRetainKeyColumnNums[i]]; + // } + for (int i = 0; i < smallTableRetainValueColumnNums.length; i++) { + outputTypeInfos[outputIndex++] = smallTableValueTypeInfos[smallTableRetainValueColumnNums[i]]; + } + + outputObjectInspectors = new ObjectInspector[outputLength]; + for (int i = 0; i < outputLength; i++) { + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) outputTypeInfos[i]; + outputObjectInspectors[i] = + PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(primitiveTypeInfo); + } + } + + public void trimAwaySmallTableValueInfo() { + smallTableValueColumnNames = new String[] {}; + smallTableValueTypeInfos = new TypeInfo[] {}; + smallTableRetainKeyColumnNums = new int[] {}; + smallTableRetainValueColumnNums = new int[] {}; + } + + private String[] createOutputColumnNames(int outputColumnCount) { + String[] outputColumnNames = new String[outputColumnCount]; + int counter = 1; + for (int i = 0; i < outputColumnCount; i++) { + outputColumnNames[i] = "out" + counter++; + } + return outputColumnNames; + } +} \ No newline at end of file diff --git ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinOperator.java ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinOperator.java new file mode 100644 index 0000000..e9ea54f --- /dev/null +++ ql/src/test/org/apache/hadoop/hive/ql/exec/vector/mapjoin/TestMapJoinOperator.java @@ -0,0 +1,520 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.vector.mapjoin; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CompilationOpContext; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinBytesTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectSerDeContext; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer; +import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe; +import org.apache.hadoop.hive.ql.exec.testrow.TestCollectorOperator; +import org.apache.hadoop.hive.ql.exec.testrow.TestCountCollectorOperator; +import org.apache.hadoop.hive.ql.exec.testrow.TestCountVectorCollectorOperator; +import org.apache.hadoop.hive.ql.exec.testrow.TestRow; +import org.apache.hadoop.hive.ql.exec.testrow.TestRowCollectorOperator; +import org.apache.hadoop.hive.ql.exec.testrow.TestRowCollectorOperatorBase; +import org.apache.hadoop.hive.ql.exec.testrow.TestRowMultiSet; +import org.apache.hadoop.hive.ql.exec.testrow.TestRowVectorCollectorOperator; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorBatchDebug; +import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping; +import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSourceMapping; +import org.apache.hadoop.hive.ql.exec.vector.VectorExtractRow; +import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorRandomRowSource; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.exec.vector.batchgen.VectorBatchGenerator; +import org.apache.hadoop.hive.ql.exec.vector.batchgen.VectorBatchGenerator.GenerateType; +import org.apache.hadoop.hive.ql.exec.vector.batchgen.VectorBatchGenerator.GenerateType.GenerateCategory; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.TestMapJoinConfig.MapJoinImplementation; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastMultiKeyHashMap; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VectorMapJoinFastTableContainer; +import org.apache.hadoop.hive.ql.exec.vector.mapjoin.fast.VerifyFastRow; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.JoinCondDesc; +import org.apache.hadoop.hive.ql.plan.JoinDesc; +import org.apache.hadoop.hive.ql.plan.MapJoinDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.PlanUtils; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo; +import org.apache.hadoop.hive.ql.plan.api.OperatorType; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.serde2.AbstractSerDe; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.binarysortable.fast.BinarySortableSerializeWrite; +import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hive.common.util.HashCodeUtil; +import org.apache.hive.common.util.ReflectionUtil; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Random; +import java.util.SortedMap; +import java.util.TreeMap; + +import junit.framework.Assert; + +public class TestMapJoinOperator { + + /* + * This test collector operator is for MapJoin row-mode. + */ + private class TestMultiSetCollectorOperator extends TestRowCollectorOperator { + + private final TestRowMultiSet testRowMultiSet; + + public TestMultiSetCollectorOperator( + ObjectInspector[] outputObjectInspectors, + TestRowMultiSet testRowMultiSet) { + super(outputObjectInspectors); + this.testRowMultiSet = testRowMultiSet; + } + + public TestRowMultiSet getTestRowMultiSet() { + return testRowMultiSet; + } + + public void nextTestRow(TestRow testRow) { + testRowMultiSet.add(testRow); + } + + @Override + public String getName() { + return TestMultiSetCollectorOperator.class.getSimpleName(); + } + } + + private class TestMultiSetVectorCollectorOperator extends TestRowVectorCollectorOperator { + + private final TestRowMultiSet testRowMultiSet; + + public TestRowMultiSet getTestRowMultiSet() { + return testRowMultiSet; + } + + public TestMultiSetVectorCollectorOperator(TypeInfo[] outputTypeInfos, + ObjectInspector[] outputObjectInspectors, TestRowMultiSet testRowMultiSet) + throws HiveException { + super(outputTypeInfos, outputObjectInspectors); + this.testRowMultiSet = testRowMultiSet; + } + + public void nextTestRow(TestRow testRow) { + testRowMultiSet.add(testRow); + } + + @Override + public String getName() { + return TestMultiSetVectorCollectorOperator.class.getSimpleName(); + } + } + + private static class KeyConfig { + long seed; + PrimitiveTypeInfo primitiveTypeInfo; + KeyConfig(long seed, PrimitiveTypeInfo primitiveTypeInfo) { + this.seed = seed; + this.primitiveTypeInfo = primitiveTypeInfo; + } + } + private static KeyConfig[] longKeyConfigs = new KeyConfig[] { + new KeyConfig(234882L, TypeInfoFactory.longTypeInfo), + new KeyConfig(4600L, TypeInfoFactory.intTypeInfo), + new KeyConfig(98743L, TypeInfoFactory.shortTypeInfo)}; + + @Test + public void testLong() throws Exception { + for (KeyConfig longKeyConfig : longKeyConfigs) { + for (VectorMapJoinVariation vectorMapJoinVariation : VectorMapJoinVariation.values()) { + if (vectorMapJoinVariation == VectorMapJoinVariation.NONE){ + continue; + } + doTestLong(longKeyConfig.seed, longKeyConfig.primitiveTypeInfo, vectorMapJoinVariation); + } + } + } + + public void doTestLong(long seed, TypeInfo numberTypeInfo, + VectorMapJoinVariation vectorMapJoinVariation) throws Exception { + + int rowCount = 10000; + + HiveConf hiveConf = new HiveConf(); + + String[] bigTableColumnNames = new String[] {"number1"}; + TypeInfo[] bigTableTypeInfos = + new TypeInfo[] { + TypeInfoFactory.longTypeInfo}; + int[] bigTableKeyColumnNums = new int[] {0}; + + String[] smallTableValueColumnNames = new String[] {"sv1", "sv2"}; + TypeInfo[] smallTableValueTypeInfos = + new TypeInfo[] {TypeInfoFactory.dateTypeInfo, TypeInfoFactory.stringTypeInfo}; + + int[] bigTableRetainColumnNums = new int[] {0}; + + int[] smallTableRetainKeyColumnNums = new int[] {}; + int[] smallTableRetainValueColumnNums = new int[] {0, 1}; + + //---------------------------------------------------------------------------------------------- + + TestMapJoinDescription testDesc = new TestMapJoinDescription( + hiveConf, vectorMapJoinVariation, + bigTableColumnNames, bigTableTypeInfos, + bigTableKeyColumnNums, + smallTableValueColumnNames, smallTableValueTypeInfos, + bigTableRetainColumnNums, + smallTableRetainKeyColumnNums, smallTableRetainValueColumnNums); + + // Prepare data. Good for ANY implementation variation. + Random random = new Random(seed); + TestMapJoinData testData = new TestMapJoinData(rowCount, testDesc, random); + + // UNDONE: Add small table columns... + TestMapJoinData.doAddSmallTableCoumns(testDesc, testData); + + executeTest(testDesc, testData); + } + + @Test + public void testMultiKey() throws Exception { + + Random random = new Random(4459); + + final int batchCount = 1; + + HiveConf hiveConf = new HiveConf(); + + String[] bigTableColumnNames = new String[] {"b1", "b2", "b3"}; + TypeInfo[] bigTableTypeInfos = + new TypeInfo[] { + TypeInfoFactory.booleanTypeInfo, + TypeInfoFactory.longTypeInfo, + TypeInfoFactory.doubleTypeInfo}; + int[] bigTableKeyColumnNums = new int[] {0, 1, 2}; + + String[] smallTableValueColumnNames = new String[] {"sv1"}; + TypeInfo[] smallTableValueTypeInfos = + new TypeInfo[] {TypeInfoFactory.stringTypeInfo}; + + int[] bigTableRetainColumnNums = new int[] {0, 1, 2}; + + int[] smallTableRetainKeyColumnNums = new int[] {}; + int[] smallTableRetainValueColumnNums = new int[] {0}; + + /* + for (VectorMapJoinVariation VectorMapJoinVariation : VectorMapJoinVariation.values()) { + + // UNDONE: Test not ready yet? + if (VectorMapJoinVariation != VectorMapJoinVariation.OUTER) { + continue; + } + TestMapJoinDescription testMapJoinDescription = new TestMapJoinDescription( + hiveConf, VectorMapJoinVariation, + bigTableColumnNames, bigTableTypeInfos, + bigTableKeyColumnNums, + smallTableValueColumnNames, smallTableValueTypeInfos, + bigTableRetainColumnNums, + smallTableRetainKeyColumnNums, smallTableRetainValueColumnNums); + executeTest(batchCount, testMapJoinDescription, random); + } + */ + } + + @Test + public void testString() throws Exception { + + Random random = new Random(82303); + + final int batchCount = 10; + + HiveConf hiveConf = new HiveConf(); + + String[] bigTableColumnNames = new String[] {"b1"}; + TypeInfo[] bigTableTypeInfos = + new TypeInfo[] { + TypeInfoFactory.stringTypeInfo}; + int[] bigTableKeyColumnNums = new int[] {0}; + + String[] smallTableValueColumnNames = new String[] {"sv1", "sv2"}; + TypeInfo[] smallTableValueTypeInfos = + new TypeInfo[] {TypeInfoFactory.dateTypeInfo, TypeInfoFactory.timestampTypeInfo}; + + int[] bigTableRetainColumnNums = new int[] {0}; + + int[] smallTableRetainKeyColumnNums = new int[] {}; + int[] smallTableRetainValueColumnNums = new int[] {0, 1}; + + /* + for (VectorMapJoinVariation VectorMapJoinVariation : VectorMapJoinVariation.values()) { + if (VectorMapJoinVariation != VectorMapJoinVariation.OUTER) { + continue; + } + TestMapJoinDescription testMapJoinDescription = new TestMapJoinDescription( + hiveConf, VectorMapJoinVariation, + bigTableColumnNames, bigTableTypeInfos, + bigTableKeyColumnNums, + smallTableValueColumnNames, smallTableValueTypeInfos, + bigTableRetainColumnNums, + smallTableRetainKeyColumnNums, smallTableRetainValueColumnNums); + executeTest(batchCount, testMapJoinDescription, random); + } + */ + + } + + private void addBigTableRetained(TestMapJoinDescription testDesc, Object[] bigTableRowObjects, + Object[] outputObjects) { + final int bigTableRetainColumnNumsLength = testDesc.bigTableRetainColumnNums.length; + for (int o = 0; o < bigTableRetainColumnNumsLength; o++) { + outputObjects[o] = bigTableRowObjects[testDesc.bigTableRetainColumnNums[o]]; + } + } + + private void addToOutput(TestMapJoinDescription testDesc, TestRowMultiSet expectedTestRowMultiSet, + Object[] outputObjects) { + for (int c = 0; c < outputObjects.length; c++) { + PrimitiveObjectInspector primitiveObjInsp = ((PrimitiveObjectInspector) testDesc.outputObjectInspectors[c]); + Object outputObject = outputObjects[c]; + outputObjects[c] = primitiveObjInsp.copyObject(outputObject); + } + expectedTestRowMultiSet.add(new TestRow(outputObjects)); + } + + /* + * Simulate the join by driving the test big table data by our test small table HashMap and + * create the expected output as a multi-set of TestRow (i.e. TestRow and occurrence count). + */ + private TestRowMultiSet createExpectedTestRowMultiSet(TestMapJoinDescription testDesc, + TestMapJoinData testData) throws HiveException { + + TestRowMultiSet expectedTestRowMultiSet = new TestRowMultiSet(); + + VectorExtractRow vectorExtractRow = new VectorExtractRow(); + vectorExtractRow.init(testDesc.bigTableKeyTypeInfos); + + final int bigTableColumnCount = testDesc.bigTableTypeInfos.length; + Object[] bigTableRowObjects = new Object[bigTableColumnCount]; + + final int bigTableKeyColumnCount = testDesc.bigTableKeyTypeInfos.length; + Object[] bigTableKeyObjects = new Object[bigTableKeyColumnCount]; + + for (int i = 0; i < testData.bigTableBatches.length; i++) { + VectorizedRowBatch bigTableBatch = testData.bigTableBatches[i]; + + for (int r = 0; r < bigTableBatch.size; r++) { + vectorExtractRow.extractRow(bigTableBatch, r, bigTableRowObjects); + + // Form key object array + for (int k = 0; k < bigTableKeyColumnCount; k++) { + int keyColumnNum = testDesc.bigTableKeyColumnNums[k]; + bigTableKeyObjects[k] = bigTableRowObjects[keyColumnNum]; + bigTableKeyObjects[k] = ((PrimitiveObjectInspector) testDesc.bigTableObjectInspectors[keyColumnNum]).copyObject(bigTableKeyObjects[k]); + } + TestRow testKey = new TestRow(bigTableKeyObjects); + + if (testData.smallTableKeyHashMap.containsKey(testKey)) { + + int smallTableKeyIndex = testData.smallTableKeyHashMap.get(testKey); + + switch (testDesc.vectorMapJoinVariation) { + case INNER: + case OUTER: + { + // One row per value. + ArrayList valueList = testData.smallTableValues.get(smallTableKeyIndex); + final int valueCount = valueList.size(); + for (int v = 0; v < valueCount; v++) { + Object[] outputObjects = new Object[testDesc.outputColumnNames.length]; + + addBigTableRetained(testDesc, bigTableRowObjects, outputObjects); + + Object[] valueRow = valueList.get(v).getRow(); + final int bigTableRetainColumnNumsLength = testDesc.bigTableRetainColumnNums.length; + final int smallTableRetainValueColumnNumsLength = testDesc.smallTableRetainValueColumnNums.length; + for (int o = 0; o < smallTableRetainValueColumnNumsLength; o++) { + outputObjects[bigTableRetainColumnNumsLength + o] = valueRow[testDesc.smallTableRetainValueColumnNums[o]]; + } + + addToOutput(testDesc, expectedTestRowMultiSet, outputObjects); + } + } + break; + case INNER_BIG_ONLY: + { + // Value count rows. + final int valueCount = testData.smallTableValueCounts.get(smallTableKeyIndex); + for (int v = 0; v < valueCount; v++) { + Object[] outputObjects = new Object[testDesc.outputColumnNames.length]; + + addBigTableRetained(testDesc, bigTableRowObjects, outputObjects); + addToOutput(testDesc, expectedTestRowMultiSet, outputObjects); + } + } + break; + case LEFT_SEMI: + { + // One row (existence). + Object[] outputObjects = new Object[testDesc.outputColumnNames.length]; + + addBigTableRetained(testDesc, bigTableRowObjects, outputObjects); + addToOutput(testDesc, expectedTestRowMultiSet, outputObjects); + } + break; + default: + throw new RuntimeException("Unknown operator variation " + testDesc.vectorMapJoinVariation); + } + + } else { + + // No match. + + if (testDesc.vectorMapJoinVariation == VectorMapJoinVariation.OUTER) { + + // We need to add a non-match row with nulls for small table values. + + Object[] outputObjects = new Object[testDesc.outputColumnNames.length]; + + addBigTableRetained(testDesc, bigTableRowObjects, outputObjects); + + final int bigTableRetainColumnNumsLength = testDesc.bigTableRetainColumnNums.length; + final int smallTableRetainValueColumnNumsLength = testDesc.smallTableRetainValueColumnNums.length; + for (int o = 0; o < smallTableRetainValueColumnNumsLength; o++) { + outputObjects[bigTableRetainColumnNumsLength + o] = null; + } + + addToOutput(testDesc, expectedTestRowMultiSet, outputObjects); + } + } + } + } + + return expectedTestRowMultiSet; + } + + private void executeTest(TestMapJoinDescription testDesc, TestMapJoinData testData) throws Exception { + + TestRowMultiSet expectedTestRowMultiSet = + createExpectedTestRowMultiSet(testDesc, testData); + + // UNDONE: Inner count + System.out.println("*BENCHMARK* expectedTestRowMultiSet rowCount " + expectedTestRowMultiSet.getRowCount() + + " totalCount " + expectedTestRowMultiSet.getTotalCount()); + + // Execute all implementation variations. + for (MapJoinImplementation mapJoinImplementation : MapJoinImplementation.values()) { + executeTestImplementation(mapJoinImplementation, testDesc, testData, + expectedTestRowMultiSet); + } + } + + private boolean isVectorOutput(MapJoinImplementation mapJoinImplementation) { + return + (mapJoinImplementation != MapJoinImplementation.ROW_MODE_HASH_MAP && + mapJoinImplementation != MapJoinImplementation.ROW_MODE_OPTIMIZED); + } + + private void executeTestImplementation( + MapJoinImplementation mapJoinImplementation, + TestMapJoinDescription testDesc, TestMapJoinData testData, TestRowMultiSet expectedTestRowMultiSet) + throws Exception { + + System.out.println("*BENCHMARK* Starting " + mapJoinImplementation + " test"); + + // UNDONE: Parameterize for implementation variation? + MapJoinDesc mapJoinDesc = TestMapJoinConfig.createMapJoinDesc(testDesc); + + final boolean isVectorOutput = isVectorOutput(mapJoinImplementation); + + TestRowMultiSet outputTestRowMultiSet = new TestRowMultiSet(); + + Operator testCollectorOperator = + (!isVectorOutput ? + new TestMultiSetCollectorOperator( + testDesc.outputObjectInspectors, outputTestRowMultiSet) : + new TestMultiSetVectorCollectorOperator( + testDesc.outputTypeInfos, testDesc.outputObjectInspectors, outputTestRowMultiSet)); + + MapJoinOperator operator = + TestMapJoinConfig.createMapJoinImplementation( + mapJoinImplementation, testDesc, testCollectorOperator, testData, mapJoinDesc); + + if (!isVectorOutput) { + TestMapJoinData.driveBigTableData(testDesc, testData, operator); + } else { + TestMapJoinData.driveVectorBigTableData(testDesc, testData, operator); + } + + System.out.println("*BENCHMARK* executeTestImplementation row count " + + ((TestCountCollectorOperator) testCollectorOperator).getRowCount()); + + // Verify the output! + if (!expectedTestRowMultiSet.verify(outputTestRowMultiSet)) { + System.out.println("*BENCHMARK* verify failed for " + mapJoinImplementation); + } else { + System.out.println("*BENCHMARK* verify succeeded for " + mapJoinImplementation); + } + } +} \ No newline at end of file