diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index fe1ef37..e9c14b1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -319,9 +319,9 @@ private void processAllSideEventsSetParallelism(String inputName, Multimap bucketToGroupedSplitMap) throws IOException { // the bucket to task map should have been setup by the big table. LOG.info("Processing events for input " + inputName); - if (bucketToTaskMap.isEmpty()) { - LOG.info("We don't have a routing table yet. Will need to wait for the main input" - + " initialization"); + if (inputNameInputSpecMap.get(mainWorkName) == null) { + LOG.info("We don't have a routing table yet. Will need to wait for the main input " + + mainWorkName + " initialization"); inputToGroupedSplitMap.put(inputName, bucketToGroupedSplitMap); return; } @@ -351,6 +351,9 @@ private void processAllSideEvents(String inputName, for (Entry> entry : bucketToSerializedSplitMap.asMap().entrySet()) { Collection destTasks = bucketToTaskMap.get(entry.getKey()); + if ((destTasks == null) || (destTasks.isEmpty())) { + continue; + } for (Integer task : destTasks) { int count = 0; for (ByteBuffer buf : entry.getValue()) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java index 1d645a0..914b4e7 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java @@ -201,7 +201,19 @@ public Object call() { jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName()); mergeMapOp.initialize(jconf, null); // if there are no files/partitions to read, we need to skip trying to read - boolean skipRead = mergeMapOp.getConf().getPathToAliases().isEmpty(); + MultiMRInput multiMRInput = multiMRInputMap.get(mergeMapWork.getName()); + boolean skipRead = false; + if (multiMRInput == null) { + l4j.info("Multi MR Input for work " + mergeMapWork.getName() + " is null. Skipping read."); + skipRead = true; + } else { + Collection keyValueReaders = multiMRInput.getKeyValueReaders(); + if ((keyValueReaders == null) || (keyValueReaders.isEmpty())) { + l4j.info("Key value readers are null or empty and hence skipping read. " + + "KeyValueReaders = " + keyValueReaders); + skipRead = true; + } + } if (skipRead) { List> children = new ArrayList>(); children.addAll(mergeMapOp.getConf().getAliasToWork().values()); diff --git ql/src/test/queries/clientpositive/tez_smb_empty.q ql/src/test/queries/clientpositive/tez_smb_empty.q index 196cc97..2427377 100644 --- ql/src/test/queries/clientpositive/tez_smb_empty.q +++ ql/src/test/queries/clientpositive/tez_smb_empty.q @@ -53,3 +53,16 @@ explain select count(*) from tab s1 left outer join empty s2 on s1.key=s2.key join tab s3 on s1.key = s3.key; select count(*) from tab s1 left outer join empty s2 on s1.key=s2.key join tab s3 on s1.key = s3.key; + +explain +select count(*) from empty s1 join empty s3 on s1.key=s3.key; + +select count(*) from empty s1 join empty s3 on s1.key=s3.key; + +set hive.auto.convert.sortmerge.join.bigtable.selection.policy = org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJ; + +explain +select count(*) from empty s1 join tab s3 on s1.key=s3.key; + +select count(*) from empty s1 join tab s3 on s1.key=s3.key; + diff --git ql/src/test/results/clientpositive/tez/tez_smb_empty.q.out ql/src/test/results/clientpositive/tez/tez_smb_empty.q.out index 82ec31d..a32fb12 100644 --- ql/src/test/results/clientpositive/tez/tez_smb_empty.q.out +++ ql/src/test/results/clientpositive/tez/tez_smb_empty.q.out @@ -674,3 +674,171 @@ POSTHOOK: Input: default@tab POSTHOOK: Input: default@tab@ds=2008-04-08 #### A masked pattern was here #### 480 +PREHOOK: query: explain +select count(*) from empty s1 join empty s3 on s1.key=s3.key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from empty s1 join empty s3 on s1.key=s3.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: s1 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: key (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Map Operator Tree: + TableScan + alias: s1 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Select Operator + expressions: key (type: int) + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from empty s1 join empty s3 on s1.key=s3.key +PREHOOK: type: QUERY +PREHOOK: Input: default@empty +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from empty s1 join empty s3 on s1.key=s3.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@empty +#### A masked pattern was here #### +0 +PREHOOK: query: explain +select count(*) from empty s1 join tab s3 on s1.key=s3.key +PREHOOK: type: QUERY +POSTHOOK: query: explain +select count(*) from empty s1 join tab s3 on s1.key=s3.key +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: s3 + Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE + Map Operator Tree: + TableScan + alias: s1 + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 key (type: int) + 1 key (type: int) + Statistics: Num rows: 133 Data size: 1411 Basic stats: COMPLETE Column stats: NONE + Group By Operator + aggregations: count() + mode: hash + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + Reduce Output Operator + sort order: + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + value expressions: _col0 (type: bigint) + Reducer 2 + Reduce Operator Tree: + Group By Operator + aggregations: count(VALUE._col0) + mode: mergepartial + outputColumnNames: _col0 + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + File Output Operator + compressed: false + Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select count(*) from empty s1 join tab s3 on s1.key=s3.key +PREHOOK: type: QUERY +PREHOOK: Input: default@empty +PREHOOK: Input: default@tab +PREHOOK: Input: default@tab@ds=2008-04-08 +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from empty s1 join tab s3 on s1.key=s3.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@empty +POSTHOOK: Input: default@tab +POSTHOOK: Input: default@tab@ds=2008-04-08 +#### A masked pattern was here #### +0