diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java index 2859d3b..dd695f2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java @@ -271,7 +271,7 @@ public class GenMRFileSink1 implements NodeProcessor { statsTask.subscribeFeed(mvTask); } - private void createMapReduce4Merge(FileSinkOperator fsOp, GenMRProcContext ctx, String finalName) + private static void createMapReduce4Merge(FileSinkOperator fsOp, GenMRProcContext ctx, String finalName) throws SemanticException { Task currTask = ctx.getCurrTask(); RowSchema inputRS = fsOp.getSchema(); @@ -357,7 +357,7 @@ public class GenMRFileSink1 implements NodeProcessor { * the final destination path the merge job should output. * @throws SemanticException */ - private void createMergeJob(FileSinkOperator fsOp, GenMRProcContext ctx, String finalName) + public static void createMergeJob(FileSinkOperator fsOp, GenMRProcContext ctx, String finalName) throws SemanticException { // if the hadoop version support CombineFileInputFormat (version >= 0.20), @@ -408,7 +408,7 @@ public class GenMRFileSink1 implements NodeProcessor { * directories. * */ - private void createMap4Merge(FileSinkOperator fsInput, GenMRProcContext ctx, String finalName) + private static void createMap4Merge(FileSinkOperator fsInput, GenMRProcContext ctx, String finalName) throws SemanticException { // @@ -522,7 +522,7 @@ public class GenMRFileSink1 implements NodeProcessor { * @param newOutput * @param cndTsk */ - private void linkMoveTask(GenMRProcContext ctx, FileSinkOperator newOutput, + private static void linkMoveTask(GenMRProcContext ctx, FileSinkOperator newOutput, ConditionalTask cndTsk) { List> mvTasks = ctx.getMvTask(); @@ -540,7 +540,7 @@ public class GenMRFileSink1 implements NodeProcessor { * @param mvTask * @param task */ - private void linkMoveTask(GenMRProcContext ctx, Task mvTask, + private static void linkMoveTask(GenMRProcContext ctx, Task mvTask, Task task) { if (task.getDependentTasks() == null || task.getDependentTasks().isEmpty()) { @@ -564,7 +564,7 @@ public class GenMRFileSink1 implements NodeProcessor { * @param mvTask * @param parentTask */ - private void addDependentMoveTasks(GenMRProcContext ctx, Task mvTask, + private static void addDependentMoveTasks(GenMRProcContext ctx, Task mvTask, Task parentTask) { if (mvTask != null) { @@ -602,7 +602,7 @@ public class GenMRFileSink1 implements NodeProcessor { * the last FileSinkOperator in the parent MapReduce work * @return the MapredWork */ - private MapredWork createMergeTask(HiveConf conf, Operator topOp, + private static MapredWork createMergeTask(HiveConf conf, Operator topOp, FileSinkDesc fsDesc) { ArrayList aliases = new ArrayList(); @@ -629,7 +629,7 @@ public class GenMRFileSink1 implements NodeProcessor { * @return MergeWork if table is stored as RCFile, * null otherwise */ - private MapredWork createRCFileMergeTask(FileSinkDesc fsInputDesc, + private static MapredWork createRCFileMergeTask(FileSinkDesc fsInputDesc, String finalName, boolean hasDynamicPartitions) throws SemanticException { String inputDir = fsInputDesc.getFinalDirName(); @@ -670,7 +670,7 @@ public class GenMRFileSink1 implements NodeProcessor { * @param fsInputDesc * @return */ - private boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) { + private static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) { return (fsInputDesc.getLbCtx() == null) ? false : fsInputDesc.getLbCtx() .isSkewedStoredAsDir(); } @@ -690,7 +690,7 @@ public class GenMRFileSink1 implements NodeProcessor { * the input directory of the merge/move task * @return The conditional task */ - private ConditionalTask createCondTask(HiveConf conf, + private static ConditionalTask createCondTask(HiveConf conf, Task currTask, MoveWork mvWork, MapredWork mergeWork, String inputPath) { @@ -736,7 +736,7 @@ public class GenMRFileSink1 implements NodeProcessor { return cndTsk; } - private Task findMoveTask( + private static Task findMoveTask( List> mvTasks, FileSinkOperator fsOp) { // find the move task for (Task mvTsk : mvTasks) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapMergeResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapMergeResolver.java new file mode 100644 index 0000000..a4a05cc --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/MapMergeResolver.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.optimizer.physical; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Stack; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.MapRedTask; +import org.apache.hadoop.hive.ql.exec.MoveTask; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.Task; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.GraphWalker; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.optimizer.GenMRFileSink1; +import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext; +import org.apache.hadoop.hive.ql.parse.ParseContext; +import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.FileSinkDesc; +import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.MoveWork; + + +public class MapMergeResolver implements PhysicalPlanResolver { + @Override + public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException { + // create dispatcher and graph walker + Dispatcher disp = new MapOnlyMergeDispatcher(pctx); + return PhysicalOptimizer.resolveUsingWalker(pctx, disp); + } + + + /** + * Iterate over the tasks. If a task is {@link MapRedTask} which is Map only + * and it contains a final {@link FileSinkOperator} (.i.e. writing to a real + * table and NOT to an intermediate output directory for use between MR jobs) + * AND if {@link HiveConf.ConfVars#HIVEMERGEMAPFILES} is true, then create + * a {@link ConditionalTask} to conditionally merge map files. + * Take care to ensure that the {@link FileSinkOperator} is not already a + * child of a ConditionalTask handling merging - in this case, do not create + * additional ConditionalTask for merging + */ + class MapOnlyMergeDispatcher implements Dispatcher { + + private final PhysicalContext physicalContext; + + public MapOnlyMergeDispatcher(PhysicalContext context) { + physicalContext = context; + } + + @Override + public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) + throws SemanticException { + @SuppressWarnings("unchecked") + Task currTask = (Task) nd; + // if not a map reduce task just skip + if (currTask instanceof MapRedTask) { + // check if this task is not a child of a ConditionalTask + // If so, then it is likely already a merging map-only task, so we + // will ignore it + List> parents = currTask.getParentTasks(); + if (parents != null && parents.size() == 1 && + parents.get(0) instanceof ConditionalTask) { + // ignore + return null; + } else { + // check if the configuration to merge map only files is true + if (physicalContext.getConf().getBoolVar( + HiveConf.ConfVars.HIVEMERGEMAPFILES)) { + // check that the children of current MapRedTask are MvTasks + List> childTasks = + currTask.getChildTasks(); + boolean areChildTasksMvTasks = true; + if (childTasks != null) { + for (Task t : childTasks) { + if (!(t instanceof MoveTask)) { + areChildTasksMvTasks = false; + break; + } + } + } else { + areChildTasksMvTasks = false; + } + + if (areChildTasksMvTasks) { + // process any eligible FileSinkOperator in this MapRedTask + FileSinkOperatorDispatcher dispatcher = new + FileSinkOperatorDispatcher(currTask, childTasks); + GraphWalker walker = new DefaultGraphWalker(dispatcher); + walker.startWalking(new ArrayList( + currTask.getTopOperators()), null); + } + } + } + } + return null; + } + + private class FileSinkOperatorDispatcher implements Dispatcher { + + Task currTask; + List> moveTasks; + + @SuppressWarnings("unchecked") + public FileSinkOperatorDispatcher(Task currTask, + List> moveTasks) { + this.currTask = currTask; + this.moveTasks = new ArrayList>(); + for(Task t: moveTasks) { + this.moveTasks.add((Task) t); + } + } + + @Override + public Object dispatch(Node nd, Stack stack, Object... nodeOutputs) + throws SemanticException { + if (nd instanceof FileSinkOperator && nd.getChildren() == null) { + + // check the stack to see if this FileSinkOperator is in Map or Reduce + MapredWork currWork = (MapredWork) currTask.getWork(); + Operator currReducer = currWork.getReducer(); + + boolean isInReducer = false; + if (currReducer != null) { + // check whether the FileSinkOperator we are processing is in the + // reducer part - this will influence how we honor the merge + // configuration parameters + if (stack != null) { + // traversing the stack in a read-only LIFO fashion ignoring + // the top of the stack since that is just the FileSinkOperator + // we are currently processing to check if we can find the + // currReducer in it before any other FileSinkOperator + for (int i = stack.size() - 2; i >= 0; i--) { + if (stack.get(i) instanceof ReduceSinkOperator) { + isInReducer = true; + break; + } else if (stack.get(i) instanceof FileSinkOperator) { + break; + } + } + } + } + if (!isInReducer) { + FileSinkOperator fsOp = (FileSinkOperator) nd; + ParseContext parseCtx = physicalContext.getParseContext(); + // check if the FileSinkOperator writes to a real table + + boolean isInsertTable = // is INSERT OVERWRITE TABLE + fsOp.getConf().getTableInfo().getTableName() != null && + parseCtx.getQB().getParseInfo().isInsertToTable(); + if (isInsertTable) { + String finalName = processFS(nd); + GenMRProcContext ctx = new GenMRProcContext(); + ctx.setConf(physicalContext.getConf()); + ctx.setCurrTask(currTask); + ctx.setParseCtx(parseCtx); + ctx.setMvTask(moveTasks); + GenMRFileSink1.createMergeJob(fsOp, ctx, finalName); + } + } + } + return null; + } + + /** + * Process the FileSink operator to generate a MoveTask if necessary. + * + * @param nd + * current FileSink operator + * @param stack + * parent operators + * @param opProcCtx + * @param chDir + * whether the operator should be first output to a tmp dir and then merged + * to the final dir later + * @return the final file name to which the FileSinkOperator should store. + * @throws SemanticException + */ + private String processFS(Node nd) throws SemanticException { + FileSinkOperator fsOp = (FileSinkOperator) nd; + String dest = fsOp.getConf().getFinalDirName(); + + // generate the temporary file + // it must be on the same file system as the current destination + ParseContext parseCtx = physicalContext.getParseContext(); + Context baseCtx = parseCtx.getContext(); + String tmpDir = baseCtx.getExternalTmpFileURI((new Path(dest)).toUri()); + + FileSinkDesc fileSinkDesc = fsOp.getConf(); + // Change all the linked file sink descriptors + if (fileSinkDesc.isLinkedFileSink()) { + for (FileSinkDesc fsConf : fileSinkDesc.getLinkedFileSinkDesc()) { + String fileName = Utilities.getFileNameFromDirName(fsConf.getDirName()); + fsConf.setParentDir(tmpDir); + fsConf.setDirName(tmpDir + Path.SEPARATOR + fileName); + } + } else { + fileSinkDesc.setDirName(tmpDir); + } + return dest; + } + + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java index 451ff71..2401258 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java @@ -22,6 +22,9 @@ import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.lib.Dispatcher; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.TaskGraphWalker; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -59,6 +62,8 @@ public class PhysicalOptimizer { if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVEMETADATAONLYQUERIES)) { resolvers.add(new MetadataOnlyOptimizer()); } + // optimize any map-only tasks to merge files if so indicated + resolvers.add(new MapMergeResolver()); } /** @@ -74,4 +79,17 @@ public class PhysicalOptimizer { return pctx; } + public static PhysicalContext resolveUsingWalker(PhysicalContext pctx, Dispatcher disp) + throws SemanticException { + TaskGraphWalker ogw = new TaskGraphWalker(disp); + + // get all the tasks nodes from root task + ArrayList topNodes = new ArrayList(); + topNodes.addAll(pctx.rootTasks); + + // begin to walk through the task tree. + ogw.startWalking(topNodes, null); + return pctx; + } + } diff --git ql/src/test/queries/clientpositive/optimal_merge.q ql/src/test/queries/clientpositive/optimal_merge.q new file mode 100644 index 0000000..d08e854 --- /dev/null +++ ql/src/test/queries/clientpositive/optimal_merge.q @@ -0,0 +1,28 @@ +-- drop any tables remaining from previous run +drop table optimal_merge_table_src; +drop table optimal_merge_table1; +drop table optimal_merge_table2; + +create table if not exists optimal_merge_table_src (key string, value string); +create table if not exists optimal_merge_table1 like optimal_merge_table_src; +create table if not exists optimal_merge_table2 (key string, value string) stored as RCFILE; + +set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; +set hive.merge.mapfiles=true; +set hive.merge.mapredfiles=false; + +load data local inpath '../data/files/T1.txt' into table optimal_merge_table_src; +load data local inpath '../data/files/T2.txt' into table optimal_merge_table_src; + +explain +from optimal_merge_table_src +insert overwrite table optimal_merge_table1 select key, count(*) group by key +insert overwrite table optimal_merge_table2 select *; + +from optimal_merge_table_src +insert overwrite table optimal_merge_table1 select key, count(*) group by key +insert overwrite table optimal_merge_table2 select *; + +-- the output should show that the table only has 1 file +show table extended like optimal_merge_table2; + diff --git ql/src/test/results/clientpositive/optimal_merge.q.out ql/src/test/results/clientpositive/optimal_merge.q.out new file mode 100644 index 0000000..5a095d9 --- /dev/null +++ ql/src/test/results/clientpositive/optimal_merge.q.out @@ -0,0 +1,229 @@ +PREHOOK: query: -- drop any tables remaining from previous run +drop table optimal_merge_table_src +PREHOOK: type: DROPTABLE +POSTHOOK: query: -- drop any tables remaining from previous run +drop table optimal_merge_table_src +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table optimal_merge_table1 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table optimal_merge_table1 +POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table optimal_merge_table2 +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table optimal_merge_table2 +POSTHOOK: type: DROPTABLE +PREHOOK: query: create table if not exists optimal_merge_table_src (key string, value string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table if not exists optimal_merge_table_src (key string, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@optimal_merge_table_src +PREHOOK: query: create table if not exists optimal_merge_table1 like optimal_merge_table_src +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table if not exists optimal_merge_table1 like optimal_merge_table_src +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@optimal_merge_table1 +PREHOOK: query: create table if not exists optimal_merge_table2 (key string, value string) stored as RCFILE +PREHOOK: type: CREATETABLE +POSTHOOK: query: create table if not exists optimal_merge_table2 (key string, value string) stored as RCFILE +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@optimal_merge_table2 +PREHOOK: query: load data local inpath '../data/files/T1.txt' into table optimal_merge_table_src +PREHOOK: type: LOAD +PREHOOK: Output: default@optimal_merge_table_src +POSTHOOK: query: load data local inpath '../data/files/T1.txt' into table optimal_merge_table_src +POSTHOOK: type: LOAD +POSTHOOK: Output: default@optimal_merge_table_src +PREHOOK: query: load data local inpath '../data/files/T2.txt' into table optimal_merge_table_src +PREHOOK: type: LOAD +PREHOOK: Output: default@optimal_merge_table_src +POSTHOOK: query: load data local inpath '../data/files/T2.txt' into table optimal_merge_table_src +POSTHOOK: type: LOAD +POSTHOOK: Output: default@optimal_merge_table_src +PREHOOK: query: explain +from optimal_merge_table_src +insert overwrite table optimal_merge_table1 select key, count(*) group by key +insert overwrite table optimal_merge_table2 select * +PREHOOK: type: QUERY +POSTHOOK: query: explain +from optimal_merge_table_src +insert overwrite table optimal_merge_table1 select key, count(*) group by key +insert overwrite table optimal_merge_table2 select * +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME optimal_merge_table_src))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME optimal_merge_table1))) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_FUNCTIONSTAR count))) (TOK_GROUPBY (TOK_TABLE_OR_COL key))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME optimal_merge_table2))) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)))) + +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-0 depends on stages: Stage-2 + Stage-3 depends on stages: Stage-0 + Stage-1 depends on stages: Stage-2, Stage-6, Stage-5, Stage-8 + Stage-4 depends on stages: Stage-1 + Stage-9 depends on stages: Stage-2 , consists of Stage-6, Stage-5, Stage-7 + Stage-6 + Stage-5 + Stage-7 + Stage-8 depends on stages: Stage-7 + +STAGE PLANS: + Stage: Stage-2 + Map Reduce + Alias -> Map Operator Tree: + optimal_merge_table_src + TableScan + alias: optimal_merge_table_src + Select Operator + expressions: + expr: key + type: string + outputColumnNames: key + Group By Operator + aggregations: + expr: count() + bucketGroup: false + keys: + expr: key + type: string + mode: hash + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + Map-reduce partition columns: + expr: _col0 + type: string + tag: -1 + value expressions: + expr: _col1 + type: bigint + Select Operator + expressions: + expr: key + type: string + expr: value + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 2 + table: + input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat + output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat + serde: org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe + name: default.optimal_merge_table2 + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: bigint + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.optimal_merge_table1 + + Stage: Stage-0 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.optimal_merge_table1 + + Stage: Stage-3 + Stats-Aggr Operator + + Stage: Stage-1 + Move Operator + tables: + replace: true + table: + input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat + output format: org.apache.hadoop.hive.ql.io.RCFileOutputFormat + serde: org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe + name: default.optimal_merge_table2 + + Stage: Stage-4 + Stats-Aggr Operator + + Stage: Stage-9 + Conditional Operator + + Stage: Stage-6 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + Stage: Stage-5 + Block level merge + + Stage: Stage-7 + Block level merge + + Stage: Stage-8 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + +PREHOOK: query: from optimal_merge_table_src +insert overwrite table optimal_merge_table1 select key, count(*) group by key +insert overwrite table optimal_merge_table2 select * +PREHOOK: type: QUERY +PREHOOK: Input: default@optimal_merge_table_src +PREHOOK: Output: default@optimal_merge_table1 +PREHOOK: Output: default@optimal_merge_table2 +POSTHOOK: query: from optimal_merge_table_src +insert overwrite table optimal_merge_table1 select key, count(*) group by key +insert overwrite table optimal_merge_table2 select * +POSTHOOK: type: QUERY +POSTHOOK: Input: default@optimal_merge_table_src +POSTHOOK: Output: default@optimal_merge_table1 +POSTHOOK: Output: default@optimal_merge_table2 +POSTHOOK: Lineage: optimal_merge_table1.key SIMPLE [(optimal_merge_table_src)optimal_merge_table_src.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: optimal_merge_table1.value EXPRESSION [(optimal_merge_table_src)optimal_merge_table_src.null, ] +POSTHOOK: Lineage: optimal_merge_table2.key SIMPLE [(optimal_merge_table_src)optimal_merge_table_src.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: optimal_merge_table2.value SIMPLE [(optimal_merge_table_src)optimal_merge_table_src.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: -- the output should show that the table only has 1 file +show table extended like optimal_merge_table2 +PREHOOK: type: SHOW_TABLESTATUS +POSTHOOK: query: -- the output should show that the table only has 1 file +show table extended like optimal_merge_table2 +POSTHOOK: type: SHOW_TABLESTATUS +POSTHOOK: Lineage: optimal_merge_table1.key SIMPLE [(optimal_merge_table_src)optimal_merge_table_src.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: optimal_merge_table1.value EXPRESSION [(optimal_merge_table_src)optimal_merge_table_src.null, ] +POSTHOOK: Lineage: optimal_merge_table2.key SIMPLE [(optimal_merge_table_src)optimal_merge_table_src.FieldSchema(name:key, type:string, comment:null), ] +POSTHOOK: Lineage: optimal_merge_table2.value SIMPLE [(optimal_merge_table_src)optimal_merge_table_src.FieldSchema(name:value, type:string, comment:null), ] +tableName:optimal_merge_table2 +#### A masked pattern was here #### +inputformat:org.apache.hadoop.hive.ql.io.RCFileInputFormat +outputformat:org.apache.hadoop.hive.ql.io.RCFileOutputFormat +columns:struct columns { string key, string value} +partitioned:false +partitionColumns: +totalNumberFiles:1 +totalFileSize:138 +maxFileSize:138 +minFileSize:138 +#### A masked pattern was here #### + diff --git ql/src/test/results/clientpositive/union19.q.out ql/src/test/results/clientpositive/union19.q.out index e3f33cb..3537504 100644 --- ql/src/test/results/clientpositive/union19.q.out +++ ql/src/test/results/clientpositive/union19.q.out @@ -34,8 +34,13 @@ STAGE DEPENDENCIES: Stage-3 depends on stages: Stage-2 Stage-0 depends on stages: Stage-3 Stage-4 depends on stages: Stage-0 - Stage-1 depends on stages: Stage-3 + Stage-1 depends on stages: Stage-3, Stage-8, Stage-7, Stage-10 Stage-5 depends on stages: Stage-1 + Stage-11 depends on stages: Stage-3 , consists of Stage-8, Stage-7, Stage-9 + Stage-8 + Stage-7 + Stage-9 + Stage-10 depends on stages: Stage-9 STAGE PLANS: Stage: Stage-2 @@ -237,6 +242,47 @@ STAGE PLANS: Stage: Stage-5 Stats-Aggr Operator + Stage: Stage-11 + Conditional Operator + + Stage: Stage-8 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + Stage: Stage-7 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest2 + + Stage: Stage-9 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest2 + + Stage: Stage-10 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + PREHOOK: query: FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1 UNION ALL