diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 27b6a877cb..44e2b0c846 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -68,6 +68,9 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.Statistics; import org.apache.hadoop.hive.ql.stats.StatsUtils; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,6 +91,9 @@ private static final Logger LOG = LoggerFactory.getLogger(ConvertJoinMapJoin.class.getName()); public float hashTableLoadFactor; private long maxJoinMemory; + private HashMapDataStructureType hashMapDataStructure; + private boolean fastHashTableAvailable; + private boolean useFastHashTable; @Override /* @@ -102,6 +108,7 @@ OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx; hashTableLoadFactor = context.conf.getFloatVar(ConfVars.HIVEHASHTABLELOADFACTOR); + fastHashTableAvailable = context.conf.getBoolVar(ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED); JoinOperator joinOp = (JoinOperator) nd; // adjust noconditional task size threshold for LLAP @@ -116,6 +123,9 @@ LOG.info("maxJoinMemory: {}", maxJoinMemory); + hashMapDataStructure = HashMapDataStructureType.of(joinOp.getConf()); + + TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); boolean hiveConvertJoin = context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN) & !context.parseContext.getDisableMapJoin(); @@ -193,6 +203,32 @@ return null; } + private enum HashMapDataStructureType { + COMPOSITE_KEYED, LONG_KEYED; + + public static HashMapDataStructureType of(JoinDesc conf) { + ExprNodeDesc[][] keys = conf.getJoinKeys(); + if (keys != null && keys[0].length == 1) { + TypeInfo typeInfo = keys[0][0].getTypeInfo(); + if (typeInfo instanceof PrimitiveTypeInfo) { + PrimitiveTypeInfo pti = ((PrimitiveTypeInfo) typeInfo); + PrimitiveCategory pCat = pti.getPrimitiveCategory(); + switch (pCat) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + return HashMapDataStructureType.LONG_KEYED; + default: + break; + } + } + } + return HashMapDataStructureType.COMPOSITE_KEYED; + } + } + private boolean selectJoinForLlap(OptimizeTezProcContext context, JoinOperator joinOp, TezBucketJoinProcCtx tezBucketJoinProcCtx, LlapClusterStateForCompile llapInfo, @@ -239,6 +275,11 @@ private boolean selectJoinForLlap(OptimizeTezProcContext context, JoinOperator j LOG.info("Cost of Bucket Map Join : numNodes = " + numNodes + " total small table size = " + totalSize + " networkCostMJ = " + networkCostMJ); + if (totalSize <= maxJoinMemory) { + // mapjoin is applicable; don't try the below algos.. + return false; + } + if (networkCostDPHJ < networkCostMJ) { LOG.info("Dynamically partitioned Hash Join chosen"); return convertJoinDynamicPartitionedHashJoin(joinOp, context); @@ -252,7 +293,22 @@ private boolean selectJoinForLlap(OptimizeTezProcContext context, JoinOperator j } public long computeOnlineDataSize(Statistics statistics) { - return computeOnlineDataSizeFast3(statistics); + long estimate = computeOnlineDataSizeOptimized(statistics); + if (fastHashTableAvailable) { + estimate = Long.min(estimate, computeOnlineDataSizeFast(statistics)); + } + return estimate; + } + + public long computeOnlineDataSizeFast(Statistics statistics) { + switch (hashMapDataStructure) { + case LONG_KEYED: + return computeOnlineDataSizeFast2(statistics); + case COMPOSITE_KEYED: + return computeOnlineDataSizeFast3(statistics); + default: + throw new RuntimeException("invalid mode"); + } } public long computeOnlineDataSizeFast2(Statistics statistics) { @@ -904,6 +960,7 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c // total size of the inputs long totalSize = 0; + long totalSizeFast = 0; // convert to DPHJ boolean convertDPHJ = false; @@ -969,6 +1026,7 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c // We are replacing the current big table with a new one, thus // we need to count the current one as a map table then. totalSize += computeOnlineDataSize(bigInputStat); + totalSizeFast += computeOnlineDataSizeFast(bigInputStat); // Check if number of distinct keys is greater than given max number of entries // for HashMap if (checkMapJoinThresholds && !checkNumberOfEntriesForHashTable(joinOp, bigTablePosition, context)) { @@ -978,6 +1036,7 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c // This is not the first table and we are not using it as big table, // in fact, we're adding this table as a map table totalSize += inputSize; + totalSizeFast += computeOnlineDataSize(currInputStat); // Check if number of distinct keys is greater than given max number of entries // for HashMap if (checkMapJoinThresholds && !checkNumberOfEntriesForHashTable(joinOp, pos, context)) { @@ -1026,7 +1085,8 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c // We store the total memory that this MapJoin is going to use, // which is calculated as totalSize/buckets, with totalSize // equal to sum of small tables size. - joinOp.getConf().setInMemoryDataSize(totalSize/buckets); + joinOp.getConf().setInMemoryDataSize(totalSize / buckets); + useFastHashTable = (totalSizeFast / buckets <= maxJoinMemory); return bigTablePosition; } @@ -1097,6 +1157,7 @@ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcCo if (joinExprs.size() == 0) { // In case of cross join, we disable hybrid grace hash join mapJoinOp.getConf().setHybridHashJoin(false); } + mapJoinOp.getConf().setUseFastHashTables(fastHashTableAvailable && useFastHashTable); Operator parentBigTableOp = mapJoinOp.getParentOperators().get(bigTablePosition); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java index 48974f8dda..4f7140b363 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java @@ -41,25 +41,56 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; -import org.apache.hadoop.hive.ql.exec.vector.reducesink.*; -import org.apache.hadoop.hive.ql.exec.vector.udf.VectorUDFArgDesc; -import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.DataTypePhysicalVariation; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.io.api.LlapProxy; import org.apache.hadoop.hive.ql.CompilationOpContext; -import org.apache.hadoop.hive.ql.exec.*; +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.FetchOperator; +import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.FilterOperator; +import org.apache.hadoop.hive.ql.exec.GroupByOperator; +import org.apache.hadoop.hive.ql.exec.MapJoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorFactory; +import org.apache.hadoop.hive.ql.exec.PTFOperator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.SelectOperator; +import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.TopNKeyOperator; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; +import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationDesc; +import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping; +import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSourceMapping; import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor; +import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.HiveVectorAdaptorUsageMode; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.InConstantType; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; +import org.apache.hadoop.hive.ql.exec.vector.VectorizationOperator; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport.Support; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedUDAFs; +import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; +import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; import org.apache.hadoop.hive.ql.exec.vector.filesink.VectorFileSinkArrowOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyLongOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinInnerBigOnlyMultiKeyOperator; @@ -74,28 +105,18 @@ import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterMultiKeyOperator; import org.apache.hadoop.hive.ql.exec.vector.mapjoin.VectorMapJoinOuterStringOperator; import org.apache.hadoop.hive.ql.exec.vector.ptf.VectorPTFOperator; +import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkCommonOperator; +import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkEmptyKeyOperator; +import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkLongOperator; +import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkMultiKeyOperator; +import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkObjectHashOperator; +import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkStringOperator; import org.apache.hadoop.hive.ql.exec.vector.udf.VectorUDFAdaptor; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector.Type; -import org.apache.hadoop.hive.ql.exec.vector.VectorAggregationDesc; -import org.apache.hadoop.hive.ql.exec.vector.VectorColumnOutputMapping; -import org.apache.hadoop.hive.ql.exec.vector.VectorColumnSourceMapping; -import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOperator; -import org.apache.hadoop.hive.ql.exec.vector.VectorMapJoinOuterFilteredOperator; -import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; -import org.apache.hadoop.hive.ql.exec.vector.VectorizationOperator; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedUDAFs; -import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.HiveVectorAdaptorUsageMode; -import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext.InConstantType; -import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedSupport.Support; -import org.apache.hadoop.hive.ql.exec.vector.expressions.IdentityExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression; -import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression; +import org.apache.hadoop.hive.ql.exec.vector.udf.VectorUDFArgDesc; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.NullRowsInputFormat; import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; @@ -104,7 +125,9 @@ import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; +import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc; import org.apache.hadoop.hive.ql.plan.AggregationDesc; import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc; @@ -124,44 +147,43 @@ import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PTFDesc; -import org.apache.hadoop.hive.ql.plan.SelectDesc; -import org.apache.hadoop.hive.ql.plan.TopNKeyDesc; -import org.apache.hadoop.hive.ql.plan.VectorAppMasterEventDesc; -import org.apache.hadoop.hive.ql.plan.VectorDesc; -import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc; -import org.apache.hadoop.hive.ql.plan.VectorFilterDesc; -import org.apache.hadoop.hive.ql.plan.VectorPTFDesc; -import org.apache.hadoop.hive.ql.plan.VectorPTFInfo; -import org.apache.hadoop.hive.ql.plan.VectorPTFDesc.SupportedFunctionType; -import org.apache.hadoop.hive.ql.plan.VectorTableScanDesc; -import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc.ProcessingMode; -import org.apache.hadoop.hive.ql.plan.VectorSparkHashTableSinkDesc; -import org.apache.hadoop.hive.ql.plan.VectorSparkPartitionPruningSinkDesc; -import org.apache.hadoop.hive.ql.plan.VectorTopNKeyDesc; -import org.apache.hadoop.hive.ql.plan.VectorLimitDesc; -import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo; -import org.apache.hadoop.hive.ql.plan.VectorSMBJoinDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SMBJoinDesc; +import org.apache.hadoop.hive.ql.plan.SelectDesc; import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc; -import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.ql.plan.TopNKeyDesc; +import org.apache.hadoop.hive.ql.plan.VectorAppMasterEventDesc; +import org.apache.hadoop.hive.ql.plan.VectorDesc; +import org.apache.hadoop.hive.ql.plan.VectorFileSinkDesc; +import org.apache.hadoop.hive.ql.plan.VectorFilterDesc; import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc; +import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc.ProcessingMode; +import org.apache.hadoop.hive.ql.plan.VectorLimitDesc; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableImplementationType; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKeyType; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.HashTableKind; import org.apache.hadoop.hive.ql.plan.VectorMapJoinDesc.VectorMapJoinVariation; +import org.apache.hadoop.hive.ql.plan.VectorMapJoinInfo; +import org.apache.hadoop.hive.ql.plan.VectorPTFDesc; +import org.apache.hadoop.hive.ql.plan.VectorPTFDesc.SupportedFunctionType; +import org.apache.hadoop.hive.ql.plan.VectorPTFInfo; +import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorDeserializeType; import org.apache.hadoop.hive.ql.plan.VectorReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.VectorReduceSinkInfo; -import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; +import org.apache.hadoop.hive.ql.plan.VectorSMBJoinDesc; import org.apache.hadoop.hive.ql.plan.VectorSelectDesc; +import org.apache.hadoop.hive.ql.plan.VectorSparkHashTableSinkDesc; +import org.apache.hadoop.hive.ql.plan.VectorSparkPartitionPruningSinkDesc; +import org.apache.hadoop.hive.ql.plan.VectorTableScanDesc; +import org.apache.hadoop.hive.ql.plan.VectorTopNKeyDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; import org.apache.hadoop.hive.ql.plan.ptf.OrderExpressionDef; @@ -209,11 +231,74 @@ import org.apache.hadoop.hive.ql.udf.UDFToShort; import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear; import org.apache.hadoop.hive.ql.udf.UDFYear; -import org.apache.hadoop.hive.ql.udf.generic.*; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBRound; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCase; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCeil; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCharacterLength; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFCoalesce; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFConcat; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFDate; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFDateAdd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFDateDiff; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFDateSub; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFElt; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFloor; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIf; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInBloomFilter; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFInitCap; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLTrim; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLag; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLead; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLength; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLower; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFNvl; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPDivide; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPMinus; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPMod; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPMultiply; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNegative; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPPlus; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPPositive; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOctetLength; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFPosMod; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFPower; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFRTrim; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFRegExp; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFRound; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFTimestamp; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToChar; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDate; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDecimal; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToIntervalDayTime; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToIntervalYearMonth; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToString; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToVarchar; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFTrim; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFWhen; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.NullStructSerDe; import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -228,17 +313,17 @@ import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hive.common.util.AnnotationUtils; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hive.common.util.AnnotationUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import com.google.common.collect.ImmutableSet; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; public class Vectorizer implements PhysicalPlanResolver { @@ -710,7 +795,7 @@ public OperatorType getType() { public DummyVectorOperator(VectorizationContext vContext) { super(); - this.conf = (DummyRootVectorDesc) new DummyRootVectorDesc(); + this.conf = new DummyRootVectorDesc(); this.vContext = vContext; } @@ -2056,7 +2141,7 @@ private void validateAndVectorizeMapOperators(TableScanOperator tableScanOperato private void vectorizeTableScanOperatorInPlace(TableScanOperator tableScanOperator, VectorTaskColumnInfo vectorTaskColumnInfo) { - TableScanDesc tableScanDesc = (TableScanDesc) tableScanOperator.getConf(); + TableScanDesc tableScanDesc = tableScanOperator.getConf(); VectorTableScanDesc vectorTableScanDesc = new VectorTableScanDesc(); tableScanDesc.setVectorDesc(vectorTableScanDesc); @@ -2738,7 +2823,7 @@ private boolean validatePTFOperator(PTFOperator op, VectorizationContext vContex HiveConf.ConfVars.HIVE_VECTORIZATION_PTF_ENABLED.varname + " IS false)"); return false; } - PTFDesc ptfDesc = (PTFDesc) op.getConf(); + PTFDesc ptfDesc = op.getConf(); boolean isMapSide = ptfDesc.isMapSide(); if (isMapSide) { setOperatorIssue("PTF Mapper not supported"); @@ -3513,7 +3598,8 @@ private boolean canSpecializeMapJoin(Operator op, MapJoi boolean isFastHashTableEnabled = HiveConf.getBoolVar(hiveConf, - HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED); + HiveConf.ConfVars.HIVE_VECTORIZATION_MAPJOIN_NATIVE_FAST_HASHTABLE_ENABLED) && + desc.getUseFastHashTables(); // Especially since LLAP is prone to turn it off in the MapJoinDesc in later // physical optimizer stages... diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java index dc4f085203..83f6ae953d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java @@ -390,6 +390,8 @@ public void setDynamicPartitionHashJoin(boolean isDistributedHashJoin) { private static final Set vectorizableMapJoinNativeEngines = new LinkedHashSet(Arrays.asList("tez", "spark")); + private boolean useFastHashTables; + public class MapJoinOperatorExplainVectorization extends OperatorExplainVectorization { private final MapJoinDesc mapJoinDesc; @@ -596,6 +598,14 @@ public SMBJoinOperatorExplainVectorization getSMBJoinVectorization() { return new SMBJoinOperatorExplainVectorization((SMBJoinDesc) this, vectorSMBJoinDesc); } + public void setUseFastHashTables(boolean useFastHashTables) { + this.useFastHashTables = useFastHashTables; + } + + public boolean getUseFastHashTables() { + return useFastHashTables; + } + @Override public boolean isSame(OperatorDesc other) { if (super.isSame(other)) { diff --git ql/src/test/queries/clientpositive/bucket_map_join_tez2.q ql/src/test/queries/clientpositive/bucket_map_join_tez2.q index 2189b96154..4b2cad806e 100644 --- ql/src/test/queries/clientpositive/bucket_map_join_tez2.q +++ ql/src/test/queries/clientpositive/bucket_map_join_tez2.q @@ -79,7 +79,7 @@ set hive.convert.join.bucket.mapjoin.tez = true; explain select a.key, b.key from (select key from tab_part_n11 where key > 1) a right outer join (select key from tab_part_n11 where key > 2) b on a.key = b.key; -set hive.auto.convert.join.noconditionaltask.size=2800; +set hive.auto.convert.join.noconditionaltask.size=2000; set hive.convert.join.bucket.mapjoin.tez = false; explain select a.key, b.key from (select distinct key from tab_n10) a join tab_n10 b on b.key = a.key; set hive.convert.join.bucket.mapjoin.tez = true; diff --git ql/src/test/queries/clientpositive/tez_smb_main.q ql/src/test/queries/clientpositive/tez_smb_main.q index b88149b154..c7516b8947 100644 --- ql/src/test/queries/clientpositive/tez_smb_main.q +++ ql/src/test/queries/clientpositive/tez_smb_main.q @@ -70,7 +70,7 @@ select count(*) from tab_n11 a join tab_part_n12 b on a.key = b.key; -set hive.auto.convert.join.noconditionaltask.size=1400; +set hive.auto.convert.join.noconditionaltask.size=1000; set hive.mapjoin.hybridgrace.minwbsize=125; set hive.mapjoin.hybridgrace.minnumpartitions=4; set hive.llap.memory.oversubscription.max.executors.per.query=0; diff --git ql/src/test/results/clientpositive/llap/orc_llap.q.out ql/src/test/results/clientpositive/llap/orc_llap.q.out index a639b687e2..f4f827827e 100644 --- ql/src/test/results/clientpositive/llap/orc_llap.q.out +++ ql/src/test/results/clientpositive/llap/orc_llap.q.out @@ -1021,8 +1021,8 @@ STAGE PLANS: Tez #### A masked pattern was here #### Edges: - Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) - Reducer 3 <- Reducer 2 (CUSTOM_SIMPLE_EDGE) + Map 2 <- Map 1 (BROADCAST_EDGE) + Reducer 3 <- Map 2 (CUSTOM_SIMPLE_EDGE) #### A masked pattern was here #### Vertices: Map 1 @@ -1046,7 +1046,7 @@ STAGE PLANS: value expressions: _col2 (type: string) Execution mode: vectorized, llap LLAP IO: all inputs - Map 4 + Map 2 Map Operator Tree: TableScan alias: o2 @@ -1059,38 +1059,31 @@ STAGE PLANS: expressions: csmallint (type: smallint), cstring2 (type: string) outputColumnNames: _col0, _col2 Statistics: Num rows: 136968 Data size: 11042828 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - key expressions: _col0 (type: smallint) - sort order: + - Map-reduce partition columns: _col0 (type: smallint) - Statistics: Num rows: 136968 Data size: 11042828 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col2 (type: string) + Map Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: smallint) + 1 _col0 (type: smallint) + outputColumnNames: _col2, _col5 + input vertices: + 0 Map 1 + Statistics: Num rows: 636522 Data size: 114343414 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: hash(_col2,_col5) (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 636522 Data size: 114343414 Basic stats: COMPLETE Column stats: COMPLETE + Group By Operator + aggregations: sum(_col0) + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col0 (type: bigint) Execution mode: vectorized, llap LLAP IO: all inputs - Reducer 2 - Execution mode: llap - Reduce Operator Tree: - Merge Join Operator - condition map: - Inner Join 0 to 1 - keys: - 0 _col0 (type: smallint) - 1 _col0 (type: smallint) - outputColumnNames: _col2, _col5 - Statistics: Num rows: 636522 Data size: 114343414 Basic stats: COMPLETE Column stats: COMPLETE - Select Operator - expressions: hash(_col2,_col5) (type: int) - outputColumnNames: _col0 - Statistics: Num rows: 636522 Data size: 114343414 Basic stats: COMPLETE Column stats: COMPLETE - Group By Operator - aggregations: sum(_col0) - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - sort order: - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col0 (type: bigint) Reducer 3 Execution mode: vectorized, llap Reduce Operator Tree: diff --git ql/src/test/results/clientpositive/vectorized_mapjoin2.q.out ql/src/test/results/clientpositive/vectorized_mapjoin2.q.out index 438e56475b..c43018d8e4 100644 --- ql/src/test/results/clientpositive/vectorized_mapjoin2.q.out +++ ql/src/test/results/clientpositive/vectorized_mapjoin2.q.out @@ -106,7 +106,7 @@ STAGE PLANS: bigTableKeyExpressions: col 0:int className: VectorMapJoinOperator native: false - nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Fast Hash Table and No Hybrid Hash Join IS true + nativeConditionsMet: hive.mapjoin.optimized.hashtable IS true, hive.vectorized.execution.mapjoin.native.enabled IS true, One MapJoin Condition IS true, No nullsafe IS true, Small table vectorizes IS true, Optimized Table and Supports Key Types IS true nativeConditionsNotMet: hive.execution.engine mr IN [tez, spark] IS false Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column stats: NONE Group By Operator