diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java index 8974e9b79b..5d76d5695c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java @@ -120,14 +120,15 @@ public int compare(InputSplit inp1, InputSplit inp2) { private final Multimap bucketToTaskMap = HashMultimap. create(); private final Map> inputToGroupedSplitMap = - new HashMap>(); + new HashMap<>(); private int numInputsAffectingRootInputSpecUpdate = 1; private int numInputsSeenSoFar = 0; private final Map emMap = Maps.newHashMap(); private final List finalSplits = Lists.newLinkedList(); private final Map inputNameInputSpecMap = - new HashMap(); + new HashMap<>(); + private Map inputToBucketMap; public CustomPartitionVertex(VertexManagerPluginContext context) { super(context); @@ -149,6 +150,7 @@ public void initialize() { this.mainWorkName = vertexConf.getInputName(); this.vertexType = vertexConf.getVertexType(); this.numInputsAffectingRootInputSpecUpdate = vertexConf.getNumInputs(); + this.inputToBucketMap = vertexConf.getInputToBucketMap(); } @Override @@ -252,7 +254,7 @@ public void onRootVertexInitialized(String inputName, InputDescriptor inputDescr LOG.info("Path file splits map for input name: " + inputName + " is " + pathFileSplitsMap); Multimap bucketToInitialSplitMap = - getBucketSplitMapForPath(pathFileSplitsMap); + getBucketSplitMapForPath(inputName, pathFileSplitsMap); try { int totalResource = context.getTotalAvailableResource().getMemory(); @@ -521,20 +523,21 @@ private FileSplit getFileSplitFromEvent(InputDataInformationEvent event) throws /* * This method generates the map of bucket to file splits. */ - private Multimap getBucketSplitMapForPath( + private Multimap getBucketSplitMapForPath(String inputName, Map> pathFileSplitsMap) { - int bucketNum = 0; - Multimap bucketToInitialSplitMap = - ArrayListMultimap. create(); + ArrayListMultimap.create(); + List bucketIds = new ArrayList<>(); for (Map.Entry> entry : pathFileSplitsMap.entrySet()) { - int bucketId = bucketNum % numBuckets; + // Extract the buckedID from pathFilesMap + String bucketStr = entry.getKey().substring(0, entry.getKey().length() - 2); + int bucketId = Integer.parseInt(bucketStr); + bucketIds.add(bucketId); for (FileSplit fsplit : entry.getValue()) { bucketToInitialSplitMap.put(bucketId, fsplit); } - bucketNum++; } // this is just for SMB join use-case. The numBuckets would be equal to that of the big table @@ -542,13 +545,19 @@ private FileSplit getFileSplitFromEvent(InputDataInformationEvent event) throws // data from the right buckets to the big table side. For e.g. Big table has 8 buckets and small // table has 4 buckets, bucket 0 of small table needs to be sent to bucket 4 of the big table as // well. - if (bucketNum < numBuckets) { - int loopedBucketId = 0; - for (; bucketNum < numBuckets; bucketNum++) { - for (InputSplit fsplit : bucketToInitialSplitMap.get(loopedBucketId)) { - bucketToInitialSplitMap.put(bucketNum, fsplit); + if (inputName.compareTo(mainWorkName) != 0) { + // small table + int inputNumBuckets = inputToBucketMap.get(inputName); + if (inputNumBuckets < numBuckets) { + // Need to send the splits to multiple buckets + for (int i = 1; i < numBuckets/inputNumBuckets; i++) { + int bucketIdBase = i * inputNumBuckets; + for (Integer bucketId : bucketIds) { + for (InputSplit fsplit : bucketToInitialSplitMap.get(bucketId)) { + bucketToInitialSplitMap.put(bucketIdBase + bucketId, fsplit); + } + } } - loopedBucketId++; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java index 5dd7bf3f1c..18f27d3edb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java @@ -21,6 +21,8 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.hive.ql.plan.TezWork.VertexType; import org.apache.hadoop.io.Writable; @@ -39,22 +41,24 @@ private VertexType vertexType = VertexType.AUTO_INITIALIZED_EDGES; private int numInputs; private String inputName; + private Map inputToBucketMap; public CustomVertexConfiguration() { } // this is the constructor to use for the Bucket map join case. public CustomVertexConfiguration(int numBuckets, VertexType vertexType) { - this(numBuckets, vertexType, "", 1); + this(numBuckets, vertexType, "", 1, null); } // this is the constructor to use for SMB. public CustomVertexConfiguration(int numBuckets, VertexType vertexType, String inputName, - int numInputs) { + int numInputs, Map inputToBucketMap) { this.numBuckets = numBuckets; this.vertexType = vertexType; this.numInputs = numInputs; this.inputName = inputName; + this.inputToBucketMap = inputToBucketMap; } @Override @@ -63,6 +67,14 @@ public void write(DataOutput out) throws IOException { out.writeInt(this.numBuckets); out.writeInt(numInputs); out.writeUTF(inputName); + int sz = inputToBucketMap != null ? inputToBucketMap.size() : 0; + out.writeInt(sz); + if (sz > 0) { + for (Map.Entry entry : inputToBucketMap.entrySet()) { + out.writeUTF(entry.getKey()); + out.writeInt(entry.getValue()); + } + } } @Override @@ -71,6 +83,13 @@ public void readFields(DataInput in) throws IOException { this.numBuckets = in.readInt(); this.numInputs = in.readInt(); this.inputName = in.readUTF(); + int sz = in.readInt(); + if (sz > 0) { + this.inputToBucketMap = new HashMap<>(); + for (int i = 0; i < sz; i++) { + this.inputToBucketMap.put(in.readUTF(), in.readInt()); + } + } } public int getNumBuckets() { @@ -88,4 +107,8 @@ public String getInputName() { public int getNumInputs() { return numInputs; } + + public Map getInputToBucketMap() { + return inputToBucketMap; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index e4a6f627d1..1bdc5689a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -563,13 +563,28 @@ private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, FileSyste MultiMRInput.createConfigBuilder(conf, HiveInputFormat.class).build()); } + // To be populated for SMB joins only for all the small tables + Map inputToBucketMap = new HashMap<>(); + if (mergeJoinWork.getMergeJoinOperator().getParentOperators().size() == 1 + && mergeJoinWork.getMergeJoinOperator().getOpTraits() != null) { + // This is an SMB join. + for (BaseWork work : mapWorkList) { + MapWork mw = (MapWork) work; + Map> aliasToWork = mw.getAliasToWork(); + if (aliasToWork.size() > 1) { + LOG.warn("More than 1 alias in SMB mapwork, assert"); + assert false; + } + inputToBucketMap.put(mw.getName(), mw.getWorks().get(0).getOpTraits().getNumBuckets()); + } + } VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName()); // the +1 to the size is because of the main work. CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(mergeJoinWork.getMergeJoinOperator().getConf() .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias(), - mapWorkList.size() + 1); + mapWorkList.size() + 1, inputToBucketMap); DataOutputBuffer dob = new DataOutputBuffer(); vertexConf.write(dob); byte[] userPayload = dob.getData(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 0c6e1e0288..c0cad1de42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -503,6 +503,14 @@ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcCont return false; } ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp; + // If the chosen big table has less number of buckets than any of the + // small tables, then those buckets will have no mapping to any of the + // big table buckets resulting in wrong results. + if (numBuckets > 0 && numBuckets < rsOp.getOpTraits().getNumBuckets()) { + LOG.info("Small table has more buckets than big table."); + return false; + } + if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx, false) == false) { LOG.info("We cannot convert to SMB because the sort column names do not match."); diff --git a/ql/src/test/queries/clientpositive/auto_sortmerge_join_16.q b/ql/src/test/queries/clientpositive/auto_sortmerge_join_16.q index 12ab1fa1d1..8e9bc37967 100644 --- a/ql/src/test/queries/clientpositive/auto_sortmerge_join_16.q +++ b/ql/src/test/queries/clientpositive/auto_sortmerge_join_16.q @@ -56,41 +56,110 @@ from stage_bucket_small where file_tag between 1 and 2; -load data local inpath '../../data/files/smallsrcsortbucket1outof4.txt' overwrite into table stage_bucket_big partition (file_tag='1'); +load data local inpath '../../data/files/smallsrcsortbucket1outof4.txt' overwrite into table stage_bucket_big partition (file_tag='1'); insert overwrite table bucket_big partition(day,pri) -select -key, -value, -'day1' as day, -1 as pri -from -stage_bucket_big -where -file_tag='1'; +select key, value, 'day1' as day, 1 as pri +from stage_bucket_big +where file_tag='1'; -select -a.key , -a.value , -b.value , -'day1' as day, -1 as pri +select a.key , a.value , b.value , 'day1' as day, 1 as pri from -( -select -key, -value -from bucket_big where day='day1' -) a -left outer join -( -select -key, -value -from bucket_small -where pri between 1 and 2 -) b +( select key, value + from bucket_big where day='day1' ) a +left outer join +( select key, value + from bucket_small + where pri between 1 and 2 ) b on (a.key = b.key) -; +; + +set hive.auto.convert.join.noconditionaltask.size=1; + +explain select a.key , a.value , b.value , 'day1' as day, 1 as pri +from +( select key, value + from bucket_big where day='day1' ) a +left outer join +( select key, value + from bucket_small + where pri between 1 and 2 ) b +on +(a.key = b.key) +; + +select a.key , a.value , b.value , 'day1' as day, 1 as pri +from +( select key, value + from bucket_big where day='day1' ) a +left outer join +( select key, value + from bucket_small + where pri between 1 and 2 ) b +on +(a.key = b.key) +; + +drop table bucket_big; +drop table bucket_small; + +-- Test to make sure SMB is not kicked in when small table has more buckets than big table + +CREATE TABLE bucket_big +( +key BIGINT, +value STRING +) +PARTITIONED BY (day STRING, pri bigint) +clustered by (key) sorted by (key) into 12 buckets +stored as RCFile; + +CREATE TABLE bucket_small +( +key BIGINT, +value string +) +PARTITIONED BY (pri bigint) +clustered by (key) sorted by (key) into 24 buckets +stored as RCFile; + +insert overwrite table bucket_small partition(pri) +select +key, +value, +file_tag as pri +from +stage_bucket_small +where file_tag between 1 and 2; + +insert overwrite table bucket_big partition(day,pri) +select key, value, 'day1' as day, 1 as pri +from stage_bucket_big +where file_tag='1'; + + +explain select a.key , a.value , b.value , 'day1' as day, 1 as pri + from + ( select key, value + from bucket_big where day='day1' ) a + left outer join + ( select key, value + from bucket_small + where pri between 1 and 2 ) b + on + (a.key = b.key) +; + +select a.key , a.value , b.value , 'day1' as day, 1 as pri +from +( select key, value + from bucket_big where day='day1' ) a +left outer join +( select key, value + from bucket_small + where pri between 1 and 2 ) b +on +(a.key = b.key) +; \ No newline at end of file diff --git a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_16.q.out b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_16.q.out index cb8564fd78..0220169fcb 100644 --- a/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_16.q.out +++ b/ql/src/test/results/clientpositive/llap/auto_sortmerge_join_16.q.out @@ -137,56 +137,31 @@ POSTHOOK: type: LOAD POSTHOOK: Output: default@stage_bucket_big POSTHOOK: Output: default@stage_bucket_big@file_tag=1 PREHOOK: query: insert overwrite table bucket_big partition(day,pri) -select -key, -value, -'day1' as day, -1 as pri -from -stage_bucket_big -where -file_tag='1' +select key, value, 'day1' as day, 1 as pri +from stage_bucket_big +where file_tag='1' PREHOOK: type: QUERY PREHOOK: Input: default@stage_bucket_big PREHOOK: Input: default@stage_bucket_big@file_tag=1 PREHOOK: Output: default@bucket_big POSTHOOK: query: insert overwrite table bucket_big partition(day,pri) -select -key, -value, -'day1' as day, -1 as pri -from -stage_bucket_big -where -file_tag='1' +select key, value, 'day1' as day, 1 as pri +from stage_bucket_big +where file_tag='1' POSTHOOK: type: QUERY POSTHOOK: Input: default@stage_bucket_big POSTHOOK: Input: default@stage_bucket_big@file_tag=1 POSTHOOK: Output: default@bucket_big@day=day1/pri=1 POSTHOOK: Lineage: bucket_big PARTITION(day=day1,pri=1).key SIMPLE [(stage_bucket_big)stage_bucket_big.FieldSchema(name:key, type:bigint, comment:null), ] POSTHOOK: Lineage: bucket_big PARTITION(day=day1,pri=1).value SIMPLE [(stage_bucket_big)stage_bucket_big.FieldSchema(name:value, type:string, comment:null), ] -PREHOOK: query: select -a.key , -a.value , -b.value , -'day1' as day, -1 as pri +PREHOOK: query: select a.key , a.value , b.value , 'day1' as day, 1 as pri from -( -select -key, -value -from bucket_big where day='day1' -) a -left outer join -( -select -key, -value -from bucket_small -where pri between 1 and 2 -) b +( select key, value + from bucket_big where day='day1' ) a +left outer join +( select key, value + from bucket_small + where pri between 1 and 2 ) b on (a.key = b.key) PREHOOK: type: QUERY @@ -196,27 +171,14 @@ PREHOOK: Input: default@bucket_small PREHOOK: Input: default@bucket_small@pri=1 PREHOOK: Input: default@bucket_small@pri=2 #### A masked pattern was here #### -POSTHOOK: query: select -a.key , -a.value , -b.value , -'day1' as day, -1 as pri +POSTHOOK: query: select a.key , a.value , b.value , 'day1' as day, 1 as pri from -( -select -key, -value -from bucket_big where day='day1' -) a -left outer join -( -select -key, -value -from bucket_small -where pri between 1 and 2 -) b +( select key, value + from bucket_big where day='day1' ) a +left outer join +( select key, value + from bucket_small + where pri between 1 and 2 ) b on (a.key = b.key) POSTHOOK: type: QUERY @@ -250,3 +212,400 @@ POSTHOOK: Input: default@bucket_small@pri=2 172 val_172 val_172 day1 1 374 val_374 val_374 day1 1 374 val_374 val_374 day1 1 +PREHOOK: query: explain select a.key , a.value , b.value , 'day1' as day, 1 as pri +from +( select key, value + from bucket_big where day='day1' ) a +left outer join +( select key, value + from bucket_small + where pri between 1 and 2 ) b +on +(a.key = b.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain select a.key , a.value , b.value , 'day1' as day, 1 as pri +from +( select key, value + from bucket_big where day='day1' ) a +left outer join +( select key, value + from bucket_small + where pri between 1 and 2 ) b +on +(a.key = b.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: bucket_small + Statistics: Num rows: 236 Data size: 23364 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: bigint), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 236 Data size: 23364 Basic stats: COMPLETE Column stats: COMPLETE + Map Operator Tree: + TableScan + alias: bucket_big + Statistics: Num rows: 5 Data size: 495 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: bigint), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 5 Data size: 495 Basic stats: COMPLETE Column stats: COMPLETE + Merge Join Operator + condition map: + Left Outer Join 0 to 1 + keys: + 0 _col0 (type: bigint) + 1 _col0 (type: bigint) + outputColumnNames: _col0, _col1, _col3 + Statistics: Num rows: 15 Data size: 2850 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: bigint), _col1 (type: string), _col3 (type: string), 'day1' (type: string), 1 (type: int) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 15 Data size: 4230 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 15 Data size: 4230 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + Execution mode: llap + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select a.key , a.value , b.value , 'day1' as day, 1 as pri +from +( select key, value + from bucket_big where day='day1' ) a +left outer join +( select key, value + from bucket_small + where pri between 1 and 2 ) b +on +(a.key = b.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@bucket_big +PREHOOK: Input: default@bucket_big@day=day1/pri=1 +PREHOOK: Input: default@bucket_small +PREHOOK: Input: default@bucket_small@pri=1 +PREHOOK: Input: default@bucket_small@pri=2 +#### A masked pattern was here #### +POSTHOOK: query: select a.key , a.value , b.value , 'day1' as day, 1 as pri +from +( select key, value + from bucket_big where day='day1' ) a +left outer join +( select key, value + from bucket_small + where pri between 1 and 2 ) b +on +(a.key = b.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucket_big +POSTHOOK: Input: default@bucket_big@day=day1/pri=1 +POSTHOOK: Input: default@bucket_small +POSTHOOK: Input: default@bucket_small@pri=1 +POSTHOOK: Input: default@bucket_small@pri=2 +#### A masked pattern was here #### +0 val_0 val_0 day1 1 +0 val_0 val_0 day1 1 +0 val_0 val_0 day1 1 +0 val_0 val_0 day1 1 +0 val_0 val_0 day1 1 +0 val_0 val_0 day1 1 +103 val_103 val_103 day1 1 +103 val_103 val_103 day1 1 +103 val_103 val_103 day1 1 +103 val_103 val_103 day1 1 +169 val_169 val_169 day1 1 +169 val_169 val_169 day1 1 +169 val_169 val_169 day1 1 +169 val_169 val_169 day1 1 +169 val_169 val_169 day1 1 +169 val_169 val_169 day1 1 +169 val_169 val_169 day1 1 +169 val_169 val_169 day1 1 +172 val_172 val_172 day1 1 +172 val_172 val_172 day1 1 +172 val_172 val_172 day1 1 +172 val_172 val_172 day1 1 +374 val_374 val_374 day1 1 +374 val_374 val_374 day1 1 +PREHOOK: query: drop table bucket_big +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@bucket_big +PREHOOK: Output: default@bucket_big +POSTHOOK: query: drop table bucket_big +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@bucket_big +POSTHOOK: Output: default@bucket_big +PREHOOK: query: drop table bucket_small +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@bucket_small +PREHOOK: Output: default@bucket_small +POSTHOOK: query: drop table bucket_small +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@bucket_small +POSTHOOK: Output: default@bucket_small +PREHOOK: query: CREATE TABLE bucket_big +( +key BIGINT, +value STRING +) +PARTITIONED BY (day STRING, pri bigint) +clustered by (key) sorted by (key) into 12 buckets +stored as RCFile +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@bucket_big +POSTHOOK: query: CREATE TABLE bucket_big +( +key BIGINT, +value STRING +) +PARTITIONED BY (day STRING, pri bigint) +clustered by (key) sorted by (key) into 12 buckets +stored as RCFile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@bucket_big +PREHOOK: query: CREATE TABLE bucket_small +( +key BIGINT, +value string +) +PARTITIONED BY (pri bigint) +clustered by (key) sorted by (key) into 24 buckets +stored as RCFile +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@bucket_small +POSTHOOK: query: CREATE TABLE bucket_small +( +key BIGINT, +value string +) +PARTITIONED BY (pri bigint) +clustered by (key) sorted by (key) into 24 buckets +stored as RCFile +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@bucket_small +PREHOOK: query: insert overwrite table bucket_small partition(pri) +select +key, +value, +file_tag as pri +from +stage_bucket_small +where file_tag between 1 and 2 +PREHOOK: type: QUERY +PREHOOK: Input: default@stage_bucket_small +PREHOOK: Input: default@stage_bucket_small@file_tag=1 +PREHOOK: Input: default@stage_bucket_small@file_tag=2 +PREHOOK: Output: default@bucket_small +POSTHOOK: query: insert overwrite table bucket_small partition(pri) +select +key, +value, +file_tag as pri +from +stage_bucket_small +where file_tag between 1 and 2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@stage_bucket_small +POSTHOOK: Input: default@stage_bucket_small@file_tag=1 +POSTHOOK: Input: default@stage_bucket_small@file_tag=2 +POSTHOOK: Output: default@bucket_small@pri=1 +POSTHOOK: Output: default@bucket_small@pri=2 +POSTHOOK: Lineage: bucket_small PARTITION(pri=1).key SIMPLE [(stage_bucket_small)stage_bucket_small.FieldSchema(name:key, type:bigint, comment:null), ] +POSTHOOK: Lineage: bucket_small PARTITION(pri=1).value SIMPLE [(stage_bucket_small)stage_bucket_small.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: bucket_small PARTITION(pri=2).key SIMPLE [(stage_bucket_small)stage_bucket_small.FieldSchema(name:key, type:bigint, comment:null), ] +POSTHOOK: Lineage: bucket_small PARTITION(pri=2).value SIMPLE [(stage_bucket_small)stage_bucket_small.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: insert overwrite table bucket_big partition(day,pri) +select key, value, 'day1' as day, 1 as pri +from stage_bucket_big +where file_tag='1' +PREHOOK: type: QUERY +PREHOOK: Input: default@stage_bucket_big +PREHOOK: Input: default@stage_bucket_big@file_tag=1 +PREHOOK: Output: default@bucket_big +POSTHOOK: query: insert overwrite table bucket_big partition(day,pri) +select key, value, 'day1' as day, 1 as pri +from stage_bucket_big +where file_tag='1' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@stage_bucket_big +POSTHOOK: Input: default@stage_bucket_big@file_tag=1 +POSTHOOK: Output: default@bucket_big@day=day1/pri=1 +POSTHOOK: Lineage: bucket_big PARTITION(day=day1,pri=1).key SIMPLE [(stage_bucket_big)stage_bucket_big.FieldSchema(name:key, type:bigint, comment:null), ] +POSTHOOK: Lineage: bucket_big PARTITION(day=day1,pri=1).value SIMPLE [(stage_bucket_big)stage_bucket_big.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: explain select a.key , a.value , b.value , 'day1' as day, 1 as pri + from + ( select key, value + from bucket_big where day='day1' ) a + left outer join + ( select key, value + from bucket_small + where pri between 1 and 2 ) b + on + (a.key = b.key) +PREHOOK: type: QUERY +POSTHOOK: query: explain select a.key , a.value , b.value , 'day1' as day, 1 as pri + from + ( select key, value + from bucket_big where day='day1' ) a + left outer join + ( select key, value + from bucket_small + where pri between 1 and 2 ) b + on + (a.key = b.key) +POSTHOOK: type: QUERY +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: bucket_big + Statistics: Num rows: 5 Data size: 495 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: bigint), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 5 Data size: 495 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: bigint) + sort order: + + Map-reduce partition columns: _col0 (type: bigint) + Statistics: Num rows: 5 Data size: 495 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string) + Execution mode: llap + LLAP IO: no inputs + Map 3 + Map Operator Tree: + TableScan + alias: bucket_small + Statistics: Num rows: 236 Data size: 23364 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: bigint), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 236 Data size: 23364 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: bigint) + sort order: + + Map-reduce partition columns: _col0 (type: bigint) + Statistics: Num rows: 236 Data size: 23364 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string) + Execution mode: llap + LLAP IO: no inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Left Outer Join 0 to 1 + keys: + 0 _col0 (type: bigint) + 1 _col0 (type: bigint) + outputColumnNames: _col0, _col1, _col3 + Statistics: Num rows: 15 Data size: 2850 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: bigint), _col1 (type: string), _col3 (type: string), 'day1' (type: string), 1 (type: int) + outputColumnNames: _col0, _col1, _col2, _col3, _col4 + Statistics: Num rows: 15 Data size: 4230 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 15 Data size: 4230 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: select a.key , a.value , b.value , 'day1' as day, 1 as pri +from +( select key, value + from bucket_big where day='day1' ) a +left outer join +( select key, value + from bucket_small + where pri between 1 and 2 ) b +on +(a.key = b.key) +PREHOOK: type: QUERY +PREHOOK: Input: default@bucket_big +PREHOOK: Input: default@bucket_big@day=day1/pri=1 +PREHOOK: Input: default@bucket_small +PREHOOK: Input: default@bucket_small@pri=1 +PREHOOK: Input: default@bucket_small@pri=2 +#### A masked pattern was here #### +POSTHOOK: query: select a.key , a.value , b.value , 'day1' as day, 1 as pri +from +( select key, value + from bucket_big where day='day1' ) a +left outer join +( select key, value + from bucket_small + where pri between 1 and 2 ) b +on +(a.key = b.key) +POSTHOOK: type: QUERY +POSTHOOK: Input: default@bucket_big +POSTHOOK: Input: default@bucket_big@day=day1/pri=1 +POSTHOOK: Input: default@bucket_small +POSTHOOK: Input: default@bucket_small@pri=1 +POSTHOOK: Input: default@bucket_small@pri=2 +#### A masked pattern was here #### +0 val_0 val_0 day1 1 +0 val_0 val_0 day1 1 +0 val_0 val_0 day1 1 +0 val_0 val_0 day1 1 +0 val_0 val_0 day1 1 +0 val_0 val_0 day1 1 +103 val_103 val_103 day1 1 +103 val_103 val_103 day1 1 +103 val_103 val_103 day1 1 +103 val_103 val_103 day1 1 +169 val_169 val_169 day1 1 +169 val_169 val_169 day1 1 +169 val_169 val_169 day1 1 +169 val_169 val_169 day1 1 +169 val_169 val_169 day1 1 +169 val_169 val_169 day1 1 +169 val_169 val_169 day1 1 +169 val_169 val_169 day1 1 +172 val_172 val_172 day1 1 +172 val_172 val_172 day1 1 +172 val_172 val_172 day1 1 +172 val_172 val_172 day1 1 +374 val_374 val_374 day1 1 +374 val_374 val_374 day1 1