diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index b35f075..387f47d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -87,6 +87,7 @@ OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx; JoinOperator joinOp = (JoinOperator) nd; + long maxSize = context.conf.getLongVar(HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf); if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) { @@ -110,7 +111,7 @@ numBuckets = 1; } LOG.info("Estimated number of buckets " + numBuckets); - int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets); + int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets, false, maxSize); if (mapJoinConversionPos < 0) { Object retval = checkAndConvertSMBJoin(context, joinOp, tezBucketJoinProcCtx); if (retval == null) { @@ -134,7 +135,7 @@ // check if we can convert to map join no bucket scaling. LOG.info("Convert to non-bucketed map join"); if (numBuckets != 1) { - mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1); + mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1, false, maxSize); } if (mapJoinConversionPos < 0) { // we are just converting to a common merge join operator. The shuffle @@ -359,7 +360,7 @@ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcCont // MapRecordProcessor and ReduceRecordProcessor with respect to the sources. @SuppressWarnings({"rawtypes","unchecked"}) Set set = - OperatorUtils.findOperatorsUpstream((Collection)parentOp.getParentOperators(), + OperatorUtils.findOperatorsUpstream(parentOp.getParentOperators(), ReduceSinkOperator.class); if (size < 0) { size = set.size(); @@ -505,44 +506,42 @@ private boolean checkColEquality(List> grandParentColNames, } public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, - int buckets) throws SemanticException { - /* - * HIVE-9038: Join tests fail in tez when we have more than 1 join on the same key and there is - * an outer join down the join tree that requires filterTag. We disable this conversion to map - * join here now. We need to emulate the behavior of HashTableSinkOperator as in MR or create a - * new operation to be able to support this. This seems like a corner case enough to special - * case this for now. - */ - if (joinOp.getConf().getConds().length > 1) { - boolean hasOuter = false; - for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) { - switch (joinCondDesc.getType()) { - case JoinDesc.INNER_JOIN: - case JoinDesc.LEFT_SEMI_JOIN: - case JoinDesc.UNIQUE_JOIN: - hasOuter = false; - break; - - case JoinDesc.FULL_OUTER_JOIN: - case JoinDesc.LEFT_OUTER_JOIN: - case JoinDesc.RIGHT_OUTER_JOIN: - hasOuter = true; - break; - - default: - throw new SemanticException("Unknown join type " + joinCondDesc.getType()); + int buckets, boolean skipJoinTypeChecks, long maxSize) throws SemanticException { + if (!skipJoinTypeChecks) { + /* + * HIVE-9038: Join tests fail in tez when we have more than 1 join on the same key and there is + * an outer join down the join tree that requires filterTag. We disable this conversion to map + * join here now. We need to emulate the behavior of HashTableSinkOperator as in MR or create a + * new operation to be able to support this. This seems like a corner case enough to special + * case this for now. + */ + if (joinOp.getConf().getConds().length > 1) { + boolean hasOuter = false; + for (JoinCondDesc joinCondDesc : joinOp.getConf().getConds()) { + switch (joinCondDesc.getType()) { + case JoinDesc.INNER_JOIN: + case JoinDesc.LEFT_SEMI_JOIN: + case JoinDesc.UNIQUE_JOIN: + hasOuter = false; + break; + + case JoinDesc.FULL_OUTER_JOIN: + case JoinDesc.LEFT_OUTER_JOIN: + case JoinDesc.RIGHT_OUTER_JOIN: + hasOuter = true; + break; + + default: + throw new SemanticException("Unknown join type " + joinCondDesc.getType()); + } + } + if (hasOuter) { + return -1; } - } - if (hasOuter) { - return -1; } } Set bigTableCandidateSet = MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds()); - - long maxSize = context.conf.getLongVar( - HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD); - int bigTablePosition = -1; // big input cumulative row count long bigInputCumulativeCardinality = -1L; @@ -576,7 +575,7 @@ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext c // on size and there's another one that's bigger. return -1; } - + if (inputSize/buckets > maxSize) { if (!bigTableCandidateSet.contains(pos)) { // can't use the current table as the big table, but it's too @@ -826,7 +825,9 @@ private boolean convertJoinDynamicPartitionedHashJoin(JoinOperator joinOp, Optim // Since we don't have big table index yet, must start with estimate of numReducers int numReducers = estimateNumBuckets(joinOp, false); LOG.info("Try dynamic partitioned hash join with estimated " + numReducers + " reducers"); - int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers); + int bigTablePos = getMapJoinConversionPos(joinOp, context, numReducers, false, + context.conf.getLongVar( + HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD)); if (bigTablePos >= 0) { // Now that we have the big table index, get real numReducers value based on big table RS ReduceSinkOperator bigTableParentRS = @@ -869,9 +870,14 @@ private void fallbackToReduceSideJoin(JoinOperator joinOp, OptimizeTezProcContex } } + int pos = getMapJoinConversionPos(joinOp, context, estimateNumBuckets(joinOp, false), + true, Long.MAX_VALUE); + if (pos < 0) { + LOG.info("Could not get a valid join position. Defaulting to position 0"); + pos = 0; + } // we are just converting to a common merge join operator. The shuffle // join in map-reduce case. - int pos = 0; // it doesn't matter which position we use in this case. LOG.info("Fallback to common merge join operator"); convertJoinSMBJoin(joinOp, context, pos, 0, false); } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 46d279e..461ba37 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -168,7 +168,8 @@ public Object process(Node nd, Stack stack, getParentFromStack(context.currentMergeJoinOperator, stack); // Set the big table position. Both the reduce work and merge join operator // should be set with the same value. - int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp); +// int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp); + int pos = context.currentMergeJoinOperator.getConf().getBigTablePosition(); work.setTag(pos); context.currentMergeJoinOperator.getConf().setBigTablePosition(pos); tezWork.setVertexType(work, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES); diff --git ql/src/test/results/clientpositive/tez/explainuser_3.q.out ql/src/test/results/clientpositive/tez/explainuser_3.q.out index f444058..c88ad9e 100644 --- ql/src/test/results/clientpositive/tez/explainuser_3.q.out +++ ql/src/test/results/clientpositive/tez/explainuser_3.q.out @@ -31,13 +31,13 @@ Stage-0 Stage-1 Reducer 2 vectorized File Output Operator [FS_8] - Select Operator [SEL_7] (rows=10 width=170) + Select Operator [SEL_7] (rows=10 width=171) Output:["_col0","_col1"] <-Map 1 [SIMPLE_EDGE] vectorized SHUFFLE [RS_6] - Select Operator [SEL_5] (rows=10 width=170) + Select Operator [SEL_5] (rows=10 width=171) Output:["_col0","_col1"] - TableScan [TS_0] (rows=10 width=170) + TableScan [TS_0] (rows=10 width=171) default@acid_vectorized,acid_vectorized, ACID table,Tbl:COMPLETE,Col:NONE,Output:["a","b"] PREHOOK: query: explain select key, value diff --git ql/src/test/results/clientpositive/tez/metadataonly1.q.out ql/src/test/results/clientpositive/tez/metadataonly1.q.out index 15f5ed5..4075b81 100644 --- ql/src/test/results/clientpositive/tez/metadataonly1.q.out +++ ql/src/test/results/clientpositive/tez/metadataonly1.q.out @@ -772,7 +772,7 @@ STAGE PLANS: keys: 0 _col0 (type: string) 1 _col0 (type: string) - Position of Big Table: 0 + Position of Big Table: 1 Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE Group By Operator aggregations: count() diff --git ql/src/test/results/clientpositive/tez/vector_complex_all.q.out ql/src/test/results/clientpositive/tez/vector_complex_all.q.out index 1d59a3d..04ff49f 100644 --- ql/src/test/results/clientpositive/tez/vector_complex_all.q.out +++ ql/src/test/results/clientpositive/tez/vector_complex_all.q.out @@ -111,9 +111,9 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_create_complex #### A masked pattern was here #### orc_create_complex.str orc_create_complex.mp orc_create_complex.lst orc_create_complex.strct -line1 {"key11":"value11","key12":"value12","key13":"value13"} ["a","b","c"] {"a":"one","b":"two"} -line2 {"key21":"value21","key22":"value22","key23":"value23"} ["d","e","f"] {"a":"three","b":"four"} -line3 {"key31":"value31","key32":"value32","key33":"value33"} ["g","h","i"] {"a":"five","b":"six"} +line1 {"key13":"value13","key12":"value12","key11":"value11"} ["a","b","c"] {"a":"one","b":"two"} +line2 {"key21":"value21","key23":"value23","key22":"value22"} ["d","e","f"] {"a":"three","b":"four"} +line3 {"key33":"value33","key31":"value31","key32":"value32"} ["g","h","i"] {"a":"five","b":"six"} PREHOOK: query: -- However, since this query is not referencing the complex fields, it should vectorize. EXPLAIN SELECT COUNT(*) FROM orc_create_complex diff --git ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out index 0a62262..0435d28 100644 --- ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out +++ ql/src/test/results/clientpositive/tez/vectorized_ptf.q.out @@ -2117,7 +2117,7 @@ STAGE PLANS: 0 p_partkey (type: int) 1 _col0 (type: int) outputColumnNames: _col12, _col13, _col14, _col15, _col16, _col17, _col18, _col19, _col20 - Position of Big Table: 0 + Position of Big Table: 1 Statistics: Num rows: 28 Data size: 17646 Basic stats: COMPLETE Column stats: NONE Select Operator expressions: _col12 (type: int), _col13 (type: string), _col14 (type: string), _col15 (type: string), _col16 (type: string), _col17 (type: int), _col18 (type: string), _col19 (type: double), _col20 (type: string)