diff --git build-common.xml build-common.xml index 3ca7f87..670d1a3 100644 --- build-common.xml +++ build-common.xml @@ -61,7 +61,7 @@ - + diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index 0a2f976..203defb 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -134,16 +134,18 @@ public void setJobId(String jobId) { this.jobId = jobId; } - - public HadoopJobExecHelper() { - } - public HadoopJobExecHelper(JobConf job, LogHelper console, Task task, HadoopJobExecHook hookCallBack) { this.job = job; this.console = console; this.task = task; this.callBackObj = hookCallBack; + + if (job != null) { + // even with tez on some jobs are run as MR. disable the flag in + // the conf, so that the backend runs fully as MR. + HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_OPTIMIZE_TEZ, false); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 652a264..272fc48 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -45,10 +45,13 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType; +import org.apache.hadoop.hive.ql.stats.StatsFactory; +import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.hive.shims.Hadoop20Shims.NullOutputCommitter; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.BytesWritable; @@ -613,6 +616,22 @@ public static Vertex createVertex(JobConf conf, BaseWork work, return null; } + // initialize stats publisher if necessary + if (work.isGatheringStats()) { + StatsPublisher statsPublisher; + String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS); + if (StatsFactory.setImplementation(statsImplementationClass, conf)) { + statsPublisher = StatsFactory.getStatsPublisher(); + if (!statsPublisher.init(conf)) { // creating stats table if not exists + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { + throw + new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg()); + } + } + } + } + + // final vertices need to have at least one output if (!hasChildren) { v.addOutput("out_"+work.getName(), diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 4804f1d..fd1671b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -117,6 +117,11 @@ public void open(String sessionId, HiveConf conf) LOG.info("Opening new Tez Session (id: "+sessionId+", scratch dir: "+tezScratchDir+")"); session.start(); + + // In case we need to run some MR jobs, we'll run them under tez MR emulation. The session + // id is used for tez to reuse the current session rather than start a new one. + conf.set("mapreduce.framework.name", "yarn-tez"); + conf.set("tez.session.id", session.getApplicationId().toString()); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java index 45a49c5..8cbf32f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.session.SessionState; /** @@ -37,12 +38,19 @@ protected synchronized IOContext initialValue() { return new IOContext(); } }; + private static IOContext ioContext = new IOContext(); + public static IOContext get() { + if (SessionState.get() == null) { + // this happens on the backend. only one io context needed. + return ioContext; + } return IOContext.threadLocal.get(); } public static void clear() { IOContext.threadLocal.remove(); + ioContext = new IOContext(); } long currentBlockStart; diff --git ql/src/java/org/apache/hadoop/hive/ql/lib/CompositeProcessor.java ql/src/java/org/apache/hadoop/hive/ql/lib/CompositeProcessor.java new file mode 100644 index 0000000..d7d5e80 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/lib/CompositeProcessor.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.lib; + +import java.util.Stack; + +import org.apache.hadoop.hive.ql.parse.SemanticException; + +/** + * CompositeProcessor. Holds a list of node processors to be fired by the same + * rule. + * + */ +public class CompositeProcessor implements NodeProcessor { + + NodeProcessor[] procs; + + public CompositeProcessor(NodeProcessor...nodeProcessors) { + procs = nodeProcessors; + } + + @Override + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + for (NodeProcessor proc: procs) { + proc.process(nd, stack, procCtx, nodeOutputs); + } + return null; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java index 686a380..529dcb5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java @@ -21,51 +21,25 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Stack; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; -import org.apache.hadoop.hive.ql.Context; -import org.apache.hadoop.hive.ql.exec.ColumnInfo; -import org.apache.hadoop.hive.ql.exec.ConditionalTask; -import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.MoveTask; import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.OperatorFactory; -import org.apache.hadoop.hive.ql.exec.RowSchema; import org.apache.hadoop.hive.ql.exec.Task; -import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.UnionOperator; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.io.RCFileInputFormat; -import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.SemanticException; -import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles; -import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx; -import org.apache.hadoop.hive.ql.plan.ConditionalWork; -import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; -import org.apache.hadoop.hive.ql.plan.LoadFileDesc; -import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; -import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import org.apache.hadoop.hive.ql.plan.PartitionDesc; -import org.apache.hadoop.hive.ql.plan.StatsWork; -import org.apache.hadoop.hive.ql.plan.TableDesc; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.mapred.InputFormat; /** * Processor for the rule - table scan followed by reduce sink. @@ -95,8 +69,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, FileSinkOperator fsOp = (FileSinkOperator) nd; boolean isInsertTable = // is INSERT OVERWRITE TABLE - fsOp.getConf().getTableInfo().getTableName() != null && - parseCtx.getQB().getParseInfo().isInsertToTable(); + GenMapRedUtils.isInsertInto(parseCtx, fsOp); HiveConf hconf = parseCtx.getConf(); // Mark this task as a final map reduce task (ignoring the optional merge task) @@ -111,49 +84,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, return true; } - // Has the user enabled merging of files for map-only jobs or for all jobs - if ((ctx.getMvTask() != null) && (!ctx.getMvTask().isEmpty())) { - List> mvTasks = ctx.getMvTask(); - - // In case of unions or map-joins, it is possible that the file has - // already been seen. - // So, no need to attempt to merge the files again. - if ((ctx.getSeenFileSinkOps() == null) - || (!ctx.getSeenFileSinkOps().contains(nd))) { - - // no need of merging if the move is to a local file system - MoveTask mvTask = (MoveTask) findMoveTask(mvTasks, fsOp); - - if (mvTask != null && isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)) { - addStatsTask(fsOp, mvTask, currTask, parseCtx.getConf()); - } - - if ((mvTask != null) && !mvTask.isLocal() && fsOp.getConf().canBeMerged()) { - if (fsOp.getConf().isLinkedFileSink()) { - // If the user has HIVEMERGEMAPREDFILES set to false, the idea was the - // number of reducers are few, so the number of files anyway are small. - // However, with this optimization, we are increasing the number of files - // possibly by a big margin. So, merge aggresively. - if (hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) || - hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES)) { - chDir = true; - } - } else { - // There are separate configuration parameters to control whether to - // merge for a map-only job - // or for a map-reduce job - MapredWork currWork = (MapredWork) currTask.getWork(); - boolean mergeMapOnly = - hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && currWork.getReduceWork() == null; - boolean mergeMapRed = - hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) && - currWork.getReduceWork() != null; - if (mergeMapOnly || mergeMapRed) { - chDir = true; - } - } - } - } + // In case of unions or map-joins, it is possible that the file has + // already been seen. + // So, no need to attempt to merge the files again. + if ((ctx.getSeenFileSinkOps() == null) + || (!ctx.getSeenFileSinkOps().contains(nd))) { + chDir = GenMapRedUtils.isMergeRequired(ctx.getMvTask(), hconf, fsOp, currTask, isInsertTable); } String finalName = processFS(fsOp, stack, opProcCtx, chDir); @@ -162,7 +98,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx, // Merge the files in the destination table/partitions by creating Map-only merge job // If underlying data is RCFile a RCFileBlockMerge task would be created. LOG.info("using CombineHiveInputformat for the merge job"); - createMRWorkForMergingFiles(fsOp, ctx, finalName); + GenMapRedUtils.createMRWorkForMergingFiles(fsOp, finalName, + ctx.getDependencyTaskForMultiInsert(), ctx.getMvTask(), + hconf, currTask); } FileSinkDesc fileSinkDesc = fsOp.getConf(); @@ -205,435 +143,6 @@ private void processLinkedFileDesc(GenMRProcContext ctx, } /** - * Add the StatsTask as a dependent task of the MoveTask - * because StatsTask will change the Table/Partition metadata. For atomicity, we - * should not change it before the data is actually there done by MoveTask. - * - * @param nd - * the FileSinkOperator whose results are taken care of by the MoveTask. - * @param mvTask - * The MoveTask that moves the FileSinkOperator's results. - * @param currTask - * The MapRedTask that the FileSinkOperator belongs to. - * @param hconf - * HiveConf - */ - private void addStatsTask(FileSinkOperator nd, MoveTask mvTask, - Task currTask, HiveConf hconf) { - - MoveWork mvWork = ((MoveTask) mvTask).getWork(); - StatsWork statsWork = null; - if (mvWork.getLoadTableWork() != null) { - statsWork = new StatsWork(mvWork.getLoadTableWork()); - } else if (mvWork.getLoadFileWork() != null) { - statsWork = new StatsWork(mvWork.getLoadFileWork()); - } - assert statsWork != null : "Error when genereting StatsTask"; - statsWork.setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE)); - MapredWork mrWork = (MapredWork) currTask.getWork(); - - // AggKey in StatsWork is used for stats aggregation while StatsAggPrefix - // in FileSinkDesc is used for stats publishing. They should be consistent. - statsWork.setAggKey(((FileSinkOperator) nd).getConf().getStatsAggPrefix()); - Task statsTask = TaskFactory.get(statsWork, hconf); - - // mark the MapredWork and FileSinkOperator for gathering stats - nd.getConf().setGatherStats(true); - mrWork.getMapWork().setGatheringStats(true); - if (mrWork.getReduceWork() != null) { - mrWork.getReduceWork().setGatheringStats(true); - } - nd.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE)); - nd.getConf().setMaxStatsKeyPrefixLength( - hconf.getIntVar(ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH)); - // mrWork.addDestinationTable(nd.getConf().getTableInfo().getTableName()); - - // subscribe feeds from the MoveTask so that MoveTask can forward the list - // of dynamic partition list to the StatsTask - mvTask.addDependentTask(statsTask); - statsTask.subscribeFeed(mvTask); - } - - /** - * @param fsInput The FileSink operator. - * @param ctx The MR processing context. - * @param finalName the final destination path the merge job should output. - * @throws SemanticException - - * create a Map-only merge job using CombineHiveInputFormat for all partitions with - * following operators: - * MR job J0: - * ... - * | - * v - * FileSinkOperator_1 (fsInput) - * | - * v - * Merge job J1: - * | - * v - * TableScan (using CombineHiveInputFormat) (tsMerge) - * | - * v - * FileSinkOperator (fsMerge) - * - * Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths - * do - * not contain the dynamic partitions (their parent). So after the dynamic partitions are - * created (after the first job finished before the moveTask or ConditionalTask start), - * we need to change the pathToPartitionInfo & pathToAlias to include the dynamic - * partition - * directories. - * - */ - private void createMRWorkForMergingFiles (FileSinkOperator fsInput, GenMRProcContext ctx, - String finalName) throws SemanticException { - - // - // 1. create the operator tree - // - HiveConf conf = ctx.getParseCtx().getConf(); - FileSinkDesc fsInputDesc = fsInput.getConf(); - - // Create a TableScan operator - RowSchema inputRS = fsInput.getSchema(); - Operator tsMerge = - GenMapRedUtils.createTemporaryTableScanOperator(inputRS); - - // Create a FileSink operator - TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone(); - FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts, - conf.getBoolVar(ConfVars.COMPRESSRESULT)); - FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild( - fsOutputDesc, inputRS, tsMerge); - - // If the input FileSinkOperator is a dynamic partition enabled, the tsMerge input schema - // needs to include the partition column, and the fsOutput should have - // a DynamicPartitionCtx to indicate that it needs to dynamically partitioned. - DynamicPartitionCtx dpCtx = fsInputDesc.getDynPartCtx(); - if (dpCtx != null && dpCtx.getNumDPCols() > 0) { - // adding DP ColumnInfo to the RowSchema signature - ArrayList signature = inputRS.getSignature(); - String tblAlias = fsInputDesc.getTableInfo().getTableName(); - LinkedHashMap colMap = new LinkedHashMap(); - StringBuilder partCols = new StringBuilder(); - for (String dpCol : dpCtx.getDPColNames()) { - ColumnInfo colInfo = new ColumnInfo(dpCol, - TypeInfoFactory.stringTypeInfo, // all partition column type should be string - tblAlias, true); // partition column is virtual column - signature.add(colInfo); - colMap.put(dpCol, dpCol); // input and output have the same column name - partCols.append(dpCol).append('/'); - } - partCols.setLength(partCols.length() - 1); // remove the last '/' - inputRS.setSignature(signature); - - // create another DynamicPartitionCtx, which has a different input-to-DP column mapping - DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx); - dpCtx2.setInputToDPCols(colMap); - fsOutputDesc.setDynPartCtx(dpCtx2); - - // update the FileSinkOperator to include partition columns - fsInputDesc.getTableInfo().getProperties().setProperty( - org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, - partCols.toString()); // list of dynamic partition column names - } else { - // non-partitioned table - fsInputDesc.getTableInfo().getProperties().remove( - org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); - } - - // - // 2. Constructing a conditional task consisting of a move task and a map reduce task - // - MoveWork dummyMv = new MoveWork(null, null, null, - new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false); - MapWork cplan; - Serializable work; - - if (conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) && - fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) { - - // Check if InputFormatClass is valid - String inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL); - try { - Class c = (Class) Class.forName(inputFormatClass); - - LOG.info("RCFile format- Using block level merge"); - cplan = createRCFileMergeTask(fsInputDesc, finalName, - dpCtx != null && dpCtx.getNumDPCols() > 0); - work = cplan; - } catch (ClassNotFoundException e) { - String msg = "Illegal input format class: " + inputFormatClass; - throw new SemanticException(msg); - } - - } else { - cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc); - work = new MapredWork(); - ((MapredWork)work).setMapWork(cplan); - // use CombineHiveInputFormat for map-only merging - } - cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat"); - // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't - // know if merge MR2 will be triggered at execution time - ConditionalTask cndTsk = createCondTask(conf, ctx.getCurrTask(), dummyMv, work, - fsInputDesc.getFinalDirName()); - - // keep the dynamic partition context in conditional task resolver context - ConditionalResolverMergeFilesCtx mrCtx = - (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx(); - mrCtx.setDPCtx(fsInputDesc.getDynPartCtx()); - mrCtx.setLbCtx(fsInputDesc.getLbCtx()); - - // - // 3. add the moveTask as the children of the conditional task - // - linkMoveTask(ctx, fsOutput, cndTsk); - } - - /** - * Make the move task in the GenMRProcContext following the FileSinkOperator a dependent of all - * possible subtrees branching from the ConditionalTask. - * - * @param ctx - * @param newOutput - * @param cndTsk - */ - private void linkMoveTask(GenMRProcContext ctx, FileSinkOperator newOutput, - ConditionalTask cndTsk) { - - List> mvTasks = ctx.getMvTask(); - Task mvTask = findMoveTask(mvTasks, newOutput); - - for (Task tsk : cndTsk.getListTasks()) { - linkMoveTask(ctx, mvTask, tsk); - } - } - - /** - * Follows the task tree down from task and makes all leaves parents of mvTask - * - * @param ctx - * @param mvTask - * @param task - */ - private void linkMoveTask(GenMRProcContext ctx, Task mvTask, - Task task) { - - if (task.getDependentTasks() == null || task.getDependentTasks().isEmpty()) { - // If it's a leaf, add the move task as a child - addDependentMoveTasks(ctx, mvTask, task); - } else { - // Otherwise, for each child run this method recursively - for (Task childTask : task.getDependentTasks()) { - linkMoveTask(ctx, mvTask, childTask); - } - } - } - - /** - * Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask. If mvTask is a - * load table, and HIVE_MULTI_INSERT_ATOMIC_OUTPUTS is set, adds mvTask as a dependent of - * dependencyTaskForMultiInsert in ctx, otherwise adds mvTask as a dependent of parentTask as - * well. - * - * @param ctx - * @param mvTask - * @param parentTask - */ - private void addDependentMoveTasks(GenMRProcContext ctx, Task mvTask, - Task parentTask) { - - if (mvTask != null) { - if (ctx.getConf().getBoolVar(ConfVars.HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES)) { - DependencyCollectionTask dependencyTask = ctx.getDependencyTaskForMultiInsert(); - parentTask.addDependentTask(dependencyTask); - if (mvTask.getWork().getLoadTableWork() != null) { - // Moving tables/partitions depend on the dependencyTask - dependencyTask.addDependentTask(mvTask); - } else { - // Moving files depends on the parentTask (we still want the dependencyTask to depend - // on the parentTask) - parentTask.addDependentTask(mvTask); - } - } else { - parentTask.addDependentTask(mvTask); - } - } - } - - /** - * Create a MapredWork based on input path, the top operator and the input - * table descriptor. - * - * @param conf - * @param topOp - * the table scan operator that is the root of the MapReduce task. - * @param fsDesc - * the file sink descriptor that serves as the input to this merge task. - * @param parentMR - * the parent MapReduce work - * @param parentFS - * the last FileSinkOperator in the parent MapReduce work - * @return the MapredWork - */ - private MapWork createMRWorkForMergingFiles (HiveConf conf, - Operator topOp, FileSinkDesc fsDesc) { - - ArrayList aliases = new ArrayList(); - String inputDir = fsDesc.getFinalDirName(); - TableDesc tblDesc = fsDesc.getTableInfo(); - aliases.add(inputDir); // dummy alias: just use the input path - - // constructing the default MapredWork - MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf); - MapWork cplan = cMrPlan.getMapWork(); - cplan.getPathToAliases().put(inputDir, aliases); - cplan.getPathToPartitionInfo().put(inputDir, new PartitionDesc(tblDesc, null)); - cplan.getAliasToWork().put(inputDir, topOp); - cplan.setMapperCannotSpanPartns(true); - - return cplan; - } - - /** - * Create a block level merge task for RCFiles. - * - * @param fsInputDesc - * @param finalName - * @return MergeWork if table is stored as RCFile, - * null otherwise - */ - private MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc, - String finalName, boolean hasDynamicPartitions) throws SemanticException { - - String inputDir = fsInputDesc.getFinalDirName(); - TableDesc tblDesc = fsInputDesc.getTableInfo(); - - if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) { - ArrayList inputDirs = new ArrayList(); - if (!hasDynamicPartitions - && !isSkewedStoredAsDirs(fsInputDesc)) { - inputDirs.add(inputDir); - } - - MergeWork work = new MergeWork(inputDirs, finalName, - hasDynamicPartitions, fsInputDesc.getDynPartCtx()); - LinkedHashMap> pathToAliases = - new LinkedHashMap>(); - pathToAliases.put(inputDir, (ArrayList) inputDirs.clone()); - work.setMapperCannotSpanPartns(true); - work.setPathToAliases(pathToAliases); - work.setAliasToWork( - new LinkedHashMap>()); - if (hasDynamicPartitions - || isSkewedStoredAsDirs(fsInputDesc)) { - work.getPathToPartitionInfo().put(inputDir, - new PartitionDesc(tblDesc, null)); - } - work.setListBucketingCtx(fsInputDesc.getLbCtx()); - - return work; - } - - throw new SemanticException("createRCFileMergeTask called on non-RCFile table"); - } - - /** - * check if it is skewed table and stored as dirs. - * - * @param fsInputDesc - * @return - */ - private boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) { - return (fsInputDesc.getLbCtx() == null) ? false : fsInputDesc.getLbCtx() - .isSkewedStoredAsDir(); - } - - /** - * Construct a conditional task given the current leaf task, the MoveWork and the MapredWork. - * - * @param conf - * HiveConf - * @param currTask - * current leaf task - * @param mvWork - * MoveWork for the move task - * @param mergeWork - * MapredWork for the merge task. - * @param inputPath - * the input directory of the merge/move task - * @return The conditional task - */ - private ConditionalTask createCondTask(HiveConf conf, - Task currTask, MoveWork mvWork, - Serializable mergeWork, String inputPath) { - - // There are 3 options for this ConditionalTask: - // 1) Merge the partitions - // 2) Move the partitions (i.e. don't merge the partitions) - // 3) Merge some partitions and move other partitions (i.e. merge some partitions and don't - // merge others) in this case the merge is done first followed by the move to prevent - // conflicts. - Task mergeOnlyMergeTask = TaskFactory.get(mergeWork, conf); - Task moveOnlyMoveTask = TaskFactory.get(mvWork, conf); - Task mergeAndMoveMergeTask = TaskFactory.get(mergeWork, conf); - Task mergeAndMoveMoveTask = TaskFactory.get(mvWork, conf); - - // NOTE! It is necessary merge task is the parent of the move task, and not - // the other way around, for the proper execution of the execute method of - // ConditionalTask - mergeAndMoveMergeTask.addDependentTask(mergeAndMoveMoveTask); - - List listWorks = new ArrayList(); - listWorks.add(mvWork); - listWorks.add(mergeWork); - - ConditionalWork cndWork = new ConditionalWork(listWorks); - - List> listTasks = new ArrayList>(); - listTasks.add(moveOnlyMoveTask); - listTasks.add(mergeOnlyMergeTask); - listTasks.add(mergeAndMoveMergeTask); - - ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, conf); - cndTsk.setListTasks(listTasks); - - // create resolver - cndTsk.setResolver(new ConditionalResolverMergeFiles()); - ConditionalResolverMergeFilesCtx mrCtx = - new ConditionalResolverMergeFilesCtx(listTasks, inputPath); - cndTsk.setResolverCtx(mrCtx); - - // make the conditional task as the child of the current leaf task - currTask.addDependentTask(cndTsk); - - return cndTsk; - } - - private Task findMoveTask( - List> mvTasks, FileSinkOperator fsOp) { - // find the move task - for (Task mvTsk : mvTasks) { - MoveWork mvWork = mvTsk.getWork(); - String srcDir = null; - if (mvWork.getLoadFileWork() != null) { - srcDir = mvWork.getLoadFileWork().getSourceDir(); - } else if (mvWork.getLoadTableWork() != null) { - srcDir = mvWork.getLoadTableWork().getSourceDir(); - } - - String fsOpDirName = fsOp.getConf().getFinalDirName(); - if ((srcDir != null) - && (srcDir.equalsIgnoreCase(fsOpDirName))) { - return mvTsk; - } - } - return null; - } - - /** * Process the FileSink operator to generate a MoveTask if necessary. * * @param fsOp @@ -651,6 +160,11 @@ private String processFS(FileSinkOperator fsOp, Stack stack, NodeProcessorCtx opProcCtx, boolean chDir) throws SemanticException { GenMRProcContext ctx = (GenMRProcContext) opProcCtx; + Task currTask = ctx.getCurrTask(); + + // If the directory needs to be changed, send the new directory + String dest = null; + List seenFSOps = ctx.getSeenFileSinkOps(); if (seenFSOps == null) { seenFSOps = new ArrayList(); @@ -660,49 +174,14 @@ private String processFS(FileSinkOperator fsOp, Stack stack, } ctx.setSeenFileSinkOps(seenFSOps); - Task currTask = ctx.getCurrTask(); - - // If the directory needs to be changed, send the new directory - String dest = null; - - if (chDir) { - dest = fsOp.getConf().getFinalDirName(); - - // generate the temporary file - // it must be on the same file system as the current destination - ParseContext parseCtx = ctx.getParseCtx(); - Context baseCtx = parseCtx.getContext(); - String tmpDir = baseCtx.getExternalTmpFileURI((new Path(dest)).toUri()); - - FileSinkDesc fileSinkDesc = fsOp.getConf(); - // Change all the linked file sink descriptors - if (fileSinkDesc.isLinkedFileSink()) { - for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) { - String fileName = Utilities.getFileNameFromDirName(fsConf.getDirName()); - fsConf.setParentDir(tmpDir); - fsConf.setDirName(tmpDir + Path.SEPARATOR + fileName); - } - } else { - fileSinkDesc.setDirName(tmpDir); - } - } - - Task mvTask = null; - - if (!chDir) { - mvTask = findMoveTask(ctx.getMvTask(), fsOp); - } + dest = GenMapRedUtils.createMoveTask(ctx.getCurrTask(), chDir, fsOp, ctx.getParseCtx(), + ctx.getMvTask(), ctx.getConf(), ctx.getDependencyTaskForMultiInsert()); Operator currTopOp = ctx.getCurrTopOp(); String currAliasId = ctx.getCurrAliasId(); HashMap, Task> opTaskMap = ctx.getOpTaskMap(); - // Set the move task to be dependent on the current task - if (mvTask != null) { - addDependentMoveTasks(ctx, mvTask, currTask); - } - // In case of multi-table insert, the path to alias mapping is needed for // all the sources. Since there is no // reducer, treat it as a plan with null reducer diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index ec51d59..4ad1699 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -33,11 +33,15 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.DemuxOperator; +import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.MoveTask; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.OperatorFactory; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; @@ -50,6 +54,8 @@ import org.apache.hadoop.hive.ql.exec.mr.ExecDriver; import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; +import org.apache.hadoop.hive.ql.io.RCFileInputFormat; +import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx; @@ -61,20 +67,31 @@ import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; import org.apache.hadoop.hive.ql.parse.RowResolver; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles; +import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx; +import org.apache.hadoop.hive.ql.plan.ConditionalWork; +import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc; +import org.apache.hadoop.hive.ql.plan.LoadFileDesc; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.MapredWork; +import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; import org.apache.hadoop.hive.ql.plan.ReduceWork; +import org.apache.hadoop.hive.ql.plan.StatsWork; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.apache.hadoop.hive.ql.plan.TezWork; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.mapred.InputFormat; /** * General utility common functions for the Processor to convert operator into @@ -1107,6 +1124,571 @@ public static void replaceMapWork(String sourceAlias, String targetAlias, } } + /** + * @param fsInput The FileSink operator. + * @param ctx The MR processing context. + * @param finalName the final destination path the merge job should output. + * @param dependencyTask + * @param mvTasks + * @param conf + * @param currTask + * @throws SemanticException + + * create a Map-only merge job using CombineHiveInputFormat for all partitions with + * following operators: + * MR job J0: + * ... + * | + * v + * FileSinkOperator_1 (fsInput) + * | + * v + * Merge job J1: + * | + * v + * TableScan (using CombineHiveInputFormat) (tsMerge) + * | + * v + * FileSinkOperator (fsMerge) + * + * Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths + * do + * not contain the dynamic partitions (their parent). So after the dynamic partitions are + * created (after the first job finished before the moveTask or ConditionalTask start), + * we need to change the pathToPartitionInfo & pathToAlias to include the dynamic + * partition + * directories. + * + */ + public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, + String finalName, DependencyCollectionTask dependencyTask, + List> mvTasks, HiveConf conf, + Task currTask) throws SemanticException { + + // + // 1. create the operator tree + // + FileSinkDesc fsInputDesc = fsInput.getConf(); + + // Create a TableScan operator + RowSchema inputRS = fsInput.getSchema(); + Operator tsMerge = + GenMapRedUtils.createTemporaryTableScanOperator(inputRS); + + // Create a FileSink operator + TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone(); + FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts, + conf.getBoolVar(ConfVars.COMPRESSRESULT)); + FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild( + fsOutputDesc, inputRS, tsMerge); + + // If the input FileSinkOperator is a dynamic partition enabled, the tsMerge input schema + // needs to include the partition column, and the fsOutput should have + // a DynamicPartitionCtx to indicate that it needs to dynamically partitioned. + DynamicPartitionCtx dpCtx = fsInputDesc.getDynPartCtx(); + if (dpCtx != null && dpCtx.getNumDPCols() > 0) { + // adding DP ColumnInfo to the RowSchema signature + ArrayList signature = inputRS.getSignature(); + String tblAlias = fsInputDesc.getTableInfo().getTableName(); + LinkedHashMap colMap = new LinkedHashMap(); + StringBuilder partCols = new StringBuilder(); + for (String dpCol : dpCtx.getDPColNames()) { + ColumnInfo colInfo = new ColumnInfo(dpCol, + TypeInfoFactory.stringTypeInfo, // all partition column type should be string + tblAlias, true); // partition column is virtual column + signature.add(colInfo); + colMap.put(dpCol, dpCol); // input and output have the same column name + partCols.append(dpCol).append('/'); + } + partCols.setLength(partCols.length() - 1); // remove the last '/' + inputRS.setSignature(signature); + + // create another DynamicPartitionCtx, which has a different input-to-DP column mapping + DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx); + dpCtx2.setInputToDPCols(colMap); + fsOutputDesc.setDynPartCtx(dpCtx2); + + // update the FileSinkOperator to include partition columns + fsInputDesc.getTableInfo().getProperties().setProperty( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, + partCols.toString()); // list of dynamic partition column names + } else { + // non-partitioned table + fsInputDesc.getTableInfo().getProperties().remove( + org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS); + } + + // + // 2. Constructing a conditional task consisting of a move task and a map reduce task + // + MoveWork dummyMv = new MoveWork(null, null, null, + new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false); + MapWork cplan; + Serializable work; + + if (conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) && + fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) { + + // Check if InputFormatClass is valid + String inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL); + try { + Class c = (Class) Class.forName(inputFormatClass); + + LOG.info("RCFile format- Using block level merge"); + cplan = GenMapRedUtils.createRCFileMergeTask(fsInputDesc, finalName, + dpCtx != null && dpCtx.getNumDPCols() > 0); + work = cplan; + } catch (ClassNotFoundException e) { + String msg = "Illegal input format class: " + inputFormatClass; + throw new SemanticException(msg); + } + + } else { + cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc); + work = new MapredWork(); + ((MapredWork)work).setMapWork(cplan); + // use CombineHiveInputFormat for map-only merging + } + cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat"); + // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't + // know if merge MR2 will be triggered at execution time + ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work, + fsInputDesc.getFinalDirName()); + + // keep the dynamic partition context in conditional task resolver context + ConditionalResolverMergeFilesCtx mrCtx = + (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx(); + mrCtx.setDPCtx(fsInputDesc.getDynPartCtx()); + mrCtx.setLbCtx(fsInputDesc.getLbCtx()); + + // + // 3. add the moveTask as the children of the conditional task + // + linkMoveTask(fsOutput, cndTsk, mvTasks, conf, dependencyTask); + } + + /** + * Make the move task in the GenMRProcContext following the FileSinkOperator a dependent of all + * possible subtrees branching from the ConditionalTask. + * + * @param newOutput + * @param cndTsk + * @param mvTasks + * @param hconf + * @param dependencyTask + */ + public static void linkMoveTask(FileSinkOperator newOutput, + ConditionalTask cndTsk, List> mvTasks, HiveConf hconf, + DependencyCollectionTask dependencyTask) { + + Task mvTask = GenMapRedUtils.findMoveTask(mvTasks, newOutput); + + for (Task tsk : cndTsk.getListTasks()) { + linkMoveTask(mvTask, tsk, hconf, dependencyTask); + } + } + + /** + * Follows the task tree down from task and makes all leaves parents of mvTask + * + * @param mvTask + * @param task + * @param hconf + * @param dependencyTask + */ + public static void linkMoveTask(Task mvTask, + Task task, HiveConf hconf, + DependencyCollectionTask dependencyTask) { + + if (task.getDependentTasks() == null || task.getDependentTasks().isEmpty()) { + // If it's a leaf, add the move task as a child + addDependentMoveTasks(mvTask, hconf, task, dependencyTask); + } else { + // Otherwise, for each child run this method recursively + for (Task childTask : task.getDependentTasks()) { + linkMoveTask(mvTask, childTask, hconf, dependencyTask); + } + } + } + + /** + * Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask. If mvTask is a + * load table, and HIVE_MULTI_INSERT_ATOMIC_OUTPUTS is set, adds mvTask as a dependent of + * dependencyTaskForMultiInsert in ctx, otherwise adds mvTask as a dependent of parentTask as + * well. + * + * @param mvTask + * @param hconf + * @param parentTask + * @param dependencyTask + */ + public static void addDependentMoveTasks(Task mvTask, HiveConf hconf, + Task parentTask, DependencyCollectionTask dependencyTask) { + + if (mvTask != null) { + if (hconf.getBoolVar(ConfVars.HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES)) { + parentTask.addDependentTask(dependencyTask); + if (mvTask.getWork().getLoadTableWork() != null) { + // Moving tables/partitions depend on the dependencyTask + dependencyTask.addDependentTask(mvTask); + } else { + // Moving files depends on the parentTask (we still want the dependencyTask to depend + // on the parentTask) + parentTask.addDependentTask(mvTask); + } + } else { + parentTask.addDependentTask(mvTask); + } + } + } + + + /** + * Add the StatsTask as a dependent task of the MoveTask + * because StatsTask will change the Table/Partition metadata. For atomicity, we + * should not change it before the data is actually there done by MoveTask. + * + * @param nd + * the FileSinkOperator whose results are taken care of by the MoveTask. + * @param mvTask + * The MoveTask that moves the FileSinkOperator's results. + * @param currTask + * The MapRedTask that the FileSinkOperator belongs to. + * @param hconf + * HiveConf + */ + public static void addStatsTask(FileSinkOperator nd, MoveTask mvTask, + Task currTask, HiveConf hconf) { + + MoveWork mvWork = ((MoveTask) mvTask).getWork(); + StatsWork statsWork = null; + if (mvWork.getLoadTableWork() != null) { + statsWork = new StatsWork(mvWork.getLoadTableWork()); + } else if (mvWork.getLoadFileWork() != null) { + statsWork = new StatsWork(mvWork.getLoadFileWork()); + } + assert statsWork != null : "Error when genereting StatsTask"; + statsWork.setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE)); + + if (currTask.getWork() instanceof MapredWork) { + MapredWork mrWork = (MapredWork) currTask.getWork(); + mrWork.getMapWork().setGatheringStats(true); + if (mrWork.getReduceWork() != null) { + mrWork.getReduceWork().setGatheringStats(true); + } + } else { + TezWork work = (TezWork) currTask.getWork(); + for (BaseWork w: work.getAllWork()) { + w.setGatheringStats(true); + } + } + + // AggKey in StatsWork is used for stats aggregation while StatsAggPrefix + // in FileSinkDesc is used for stats publishing. They should be consistent. + statsWork.setAggKey(((FileSinkOperator) nd).getConf().getStatsAggPrefix()); + Task statsTask = TaskFactory.get(statsWork, hconf); + + // mark the MapredWork and FileSinkOperator for gathering stats + nd.getConf().setGatherStats(true); + nd.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE)); + nd.getConf().setMaxStatsKeyPrefixLength( + hconf.getIntVar(ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH)); + // mrWork.addDestinationTable(nd.getConf().getTableInfo().getTableName()); + + // subscribe feeds from the MoveTask so that MoveTask can forward the list + // of dynamic partition list to the StatsTask + mvTask.addDependentTask(statsTask); + statsTask.subscribeFeed(mvTask); + } + + /** + * Returns true iff current query is an insert into for the given file sink + * + * @param parseCtx + * @param fsOp + * @return + */ + public static boolean isInsertInto(ParseContext parseCtx, FileSinkOperator fsOp) { + return fsOp.getConf().getTableInfo().getTableName() != null && + parseCtx.getQB().getParseInfo().isInsertToTable(); + } + + /** + * Create a MapredWork based on input path, the top operator and the input + * table descriptor. + * + * @param conf + * @param topOp + * the table scan operator that is the root of the MapReduce task. + * @param fsDesc + * the file sink descriptor that serves as the input to this merge task. + * @param parentMR + * the parent MapReduce work + * @param parentFS + * the last FileSinkOperator in the parent MapReduce work + * @return the MapredWork + */ + private static MapWork createMRWorkForMergingFiles (HiveConf conf, + Operator topOp, FileSinkDesc fsDesc) { + + ArrayList aliases = new ArrayList(); + String inputDir = fsDesc.getFinalDirName(); + TableDesc tblDesc = fsDesc.getTableInfo(); + aliases.add(inputDir); // dummy alias: just use the input path + + // constructing the default MapredWork + MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf); + MapWork cplan = cMrPlan.getMapWork(); + cplan.getPathToAliases().put(inputDir, aliases); + cplan.getPathToPartitionInfo().put(inputDir, new PartitionDesc(tblDesc, null)); + cplan.getAliasToWork().put(inputDir, topOp); + cplan.setMapperCannotSpanPartns(true); + + return cplan; + } + + /** + * Create a block level merge task for RCFiles. + * + * @param fsInputDesc + * @param finalName + * @return MergeWork if table is stored as RCFile, + * null otherwise + */ + public static MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc, + String finalName, boolean hasDynamicPartitions) throws SemanticException { + + String inputDir = fsInputDesc.getFinalDirName(); + TableDesc tblDesc = fsInputDesc.getTableInfo(); + + if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) { + ArrayList inputDirs = new ArrayList(); + if (!hasDynamicPartitions + && !GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) { + inputDirs.add(inputDir); + } + + MergeWork work = new MergeWork(inputDirs, finalName, + hasDynamicPartitions, fsInputDesc.getDynPartCtx()); + LinkedHashMap> pathToAliases = + new LinkedHashMap>(); + pathToAliases.put(inputDir, (ArrayList) inputDirs.clone()); + work.setMapperCannotSpanPartns(true); + work.setPathToAliases(pathToAliases); + work.setAliasToWork( + new LinkedHashMap>()); + if (hasDynamicPartitions + || GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) { + work.getPathToPartitionInfo().put(inputDir, + new PartitionDesc(tblDesc, null)); + } + work.setListBucketingCtx(fsInputDesc.getLbCtx()); + + return work; + } + + throw new SemanticException("createRCFileMergeTask called on non-RCFile table"); + } + + /** + * Construct a conditional task given the current leaf task, the MoveWork and the MapredWork. + * + * @param conf + * HiveConf + * @param currTask + * current leaf task + * @param mvWork + * MoveWork for the move task + * @param mergeWork + * MapredWork for the merge task. + * @param inputPath + * the input directory of the merge/move task + * @return The conditional task + */ + @SuppressWarnings("unchecked") + public static ConditionalTask createCondTask(HiveConf conf, + Task currTask, MoveWork mvWork, + Serializable mergeWork, String inputPath) { + + // There are 3 options for this ConditionalTask: + // 1) Merge the partitions + // 2) Move the partitions (i.e. don't merge the partitions) + // 3) Merge some partitions and move other partitions (i.e. merge some partitions and don't + // merge others) in this case the merge is done first followed by the move to prevent + // conflicts. + Task mergeOnlyMergeTask = TaskFactory.get(mergeWork, conf); + Task moveOnlyMoveTask = TaskFactory.get(mvWork, conf); + Task mergeAndMoveMergeTask = TaskFactory.get(mergeWork, conf); + Task mergeAndMoveMoveTask = TaskFactory.get(mvWork, conf); + + // NOTE! It is necessary merge task is the parent of the move task, and not + // the other way around, for the proper execution of the execute method of + // ConditionalTask + mergeAndMoveMergeTask.addDependentTask(mergeAndMoveMoveTask); + + List listWorks = new ArrayList(); + listWorks.add(mvWork); + listWorks.add(mergeWork); + + ConditionalWork cndWork = new ConditionalWork(listWorks); + + List> listTasks = new ArrayList>(); + listTasks.add(moveOnlyMoveTask); + listTasks.add(mergeOnlyMergeTask); + listTasks.add(mergeAndMoveMergeTask); + + ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, conf); + cndTsk.setListTasks(listTasks); + + // create resolver + cndTsk.setResolver(new ConditionalResolverMergeFiles()); + ConditionalResolverMergeFilesCtx mrCtx = + new ConditionalResolverMergeFilesCtx(listTasks, inputPath); + cndTsk.setResolverCtx(mrCtx); + + // make the conditional task as the child of the current leaf task + currTask.addDependentTask(cndTsk); + + return cndTsk; + } + + /** + * check if it is skewed table and stored as dirs. + * + * @param fsInputDesc + * @return + */ + public static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) { + return (fsInputDesc.getLbCtx() == null) ? false : fsInputDesc.getLbCtx() + .isSkewedStoredAsDir(); + } + + public static Task findMoveTask( + List> mvTasks, FileSinkOperator fsOp) { + // find the move task + for (Task mvTsk : mvTasks) { + MoveWork mvWork = mvTsk.getWork(); + String srcDir = null; + if (mvWork.getLoadFileWork() != null) { + srcDir = mvWork.getLoadFileWork().getSourceDir(); + } else if (mvWork.getLoadTableWork() != null) { + srcDir = mvWork.getLoadTableWork().getSourceDir(); + } + + String fsOpDirName = fsOp.getConf().getFinalDirName(); + if ((srcDir != null) + && (srcDir.equalsIgnoreCase(fsOpDirName))) { + return mvTsk; + } + } + return null; + } + + /** + * Returns true iff the fsOp requires a merge + * @param mvTasks + * @param hconf + * @param fsOp + * @param currTask + * @param isInsertTable + * @return + */ + public static boolean isMergeRequired(List> mvTasks, HiveConf hconf, FileSinkOperator fsOp, + Task currTask, boolean isInsertTable) { + + // Has the user enabled merging of files for map-only jobs or for all jobs + if ((mvTasks != null) && (!mvTasks.isEmpty())) { + + // no need of merging if the move is to a local file system + MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTask(mvTasks, fsOp); + + if (mvTask != null && isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)) { + GenMapRedUtils.addStatsTask(fsOp, mvTask, currTask, hconf); + } + + if ((mvTask != null) && !mvTask.isLocal() && fsOp.getConf().canBeMerged()) { + if (fsOp.getConf().isLinkedFileSink()) { + // If the user has HIVEMERGEMAPREDFILES set to false, the idea was the + // number of reducers are few, so the number of files anyway are small. + // However, with this optimization, we are increasing the number of files + // possibly by a big margin. So, merge aggresively. + if (hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) || + hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES)) { + return true; + } + } else { + // There are separate configuration parameters to control whether to + // merge for a map-only job + // or for a map-reduce job + ReduceWork reduceWork = currTask.getWork() instanceof MapredWork + ? ((MapredWork) currTask.getWork()).getReduceWork() : null; + boolean mergeMapOnly = + hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null; + boolean mergeMapRed = + hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) && + reduceWork != null; + if (mergeMapOnly || mergeMapRed) { + return true; + } + } + } + } + return false; + } + + /** + * Create and add any dependent move tasks + * + * @param currTask + * @param chDir + * @param fsOp + * @param parseCtx + * @param mvTasks + * @param hconf + * @param dependencyTask + * @return + */ + public static String createMoveTask(Task currTask, boolean chDir, + FileSinkOperator fsOp, ParseContext parseCtx, List> mvTasks, + HiveConf hconf, DependencyCollectionTask dependencyTask) { + + String dest = null; + + if (chDir) { + dest = fsOp.getConf().getFinalDirName(); + + // generate the temporary file + // it must be on the same file system as the current destination + Context baseCtx = parseCtx.getContext(); + String tmpDir = baseCtx.getExternalTmpFileURI((new Path(dest)).toUri()); + + FileSinkDesc fileSinkDesc = fsOp.getConf(); + // Change all the linked file sink descriptors + if (fileSinkDesc.isLinkedFileSink()) { + for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) { + String fileName = Utilities.getFileNameFromDirName(fsConf.getDirName()); + fsConf.setParentDir(tmpDir); + fsConf.setDirName(tmpDir + Path.SEPARATOR + fileName); + } + } else { + fileSinkDesc.setDirName(tmpDir); + } + } + + Task mvTask = null; + + if (!chDir) { + mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOp); + } + + // Set the move task to be dependent on the current task + if (mvTask != null) { + GenMapRedUtils.addDependentMoveTasks(mvTask, hconf, currTask, dependencyTask); + } + + return dest; + } + private GenMapRedUtils() { // prevent instantiation } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java new file mode 100644 index 0000000..b1e6f04 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.parse; + +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.lib.NodeProcessor; +import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils; + +/** + * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks + * + */ +public class FileSinkProcessor implements NodeProcessor { + + static final private Log LOG = LogFactory.getLog(FileSinkProcessor.class.getName()); + + @Override + /* + * (non-Javadoc) + * we should ideally not modify the tree we traverse. + * However, since we need to walk the tree at any time when we modify the + * operator, we might as well do it here. + */ + public Object process(Node nd, Stack stack, + NodeProcessorCtx procCtx, Object... nodeOutputs) + throws SemanticException { + + GenTezProcContext context = (GenTezProcContext) procCtx; + FileSinkOperator fileSink = (FileSinkOperator) nd; + ParseContext parseContext = context.parseContext; + + + boolean isInsertTable = // is INSERT OVERWRITE TABLE + GenMapRedUtils.isInsertInto(parseContext, fileSink); + HiveConf hconf = parseContext.getConf(); + + boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask, + hconf, fileSink, context.currentTask, isInsertTable); + + String finalName = GenMapRedUtils.createMoveTask(context.currentTask, + chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask); + + if (chDir) { + // Merge the files in the destination table/partitions by creating Map-only merge job + // If underlying data is RCFile a RCFileBlockMerge task would be created. + LOG.info("using CombineHiveInputformat for the merge job"); + GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName, + context.dependencyTask, context.moveTask, + hconf, context.currentTask); + } + + return true; + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java index d78f39a..1f07594 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; @@ -35,6 +36,7 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.plan.BaseWork; +import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork; import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TezWork; @@ -97,6 +99,9 @@ // remember the dummy ops we created public final Map, List>> linkChildOpWithDummyOp; + // used to group dependent tasks for multi table inserts + public final DependencyCollectionTask dependencyTask; + @SuppressWarnings("unchecked") public GenTezProcContext(HiveConf conf, ParseContext parseContext, List> moveTask, List> rootTasks, @@ -115,5 +120,7 @@ public GenTezProcContext(HiveConf conf, ParseContext parseContext, this.operatorWorkMap = new HashMap, BaseWork>(); this.mapJoinParentMap = new HashMap>>(); this.linkChildOpWithDummyOp = new HashMap, List>>(); + this.dependencyTask = (DependencyCollectionTask) + TaskFactory.get(new DependencyCollectionWork(), conf); } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java index 41fb7d1..fe7359e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java @@ -71,6 +71,12 @@ public Object process(Node nd, Stack stack, // packing into a vertex, typically a table scan, union or join Operator root = context.currentRootOperator; if (root == null) { + // if there are no more rootOperators we're dealing with multiple + // file sinks off of the same table scan. Bail. + if (context.rootOperators.isEmpty()) { + return null; + } + // null means that we're starting with a new table scan // the graph walker walks the rootOperators in the same // order so we can just take the next diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java index fdc072d..1fe52d2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.lib.CompositeProcessor; import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher; import org.apache.hadoop.hive.ql.lib.Dispatcher; import org.apache.hadoop.hive.ql.lib.GraphWalker; @@ -126,16 +127,17 @@ protected void generateTaskTree(List> rootTasks, Pa // the operator stack. // The dispatcher generates the plan from the operator tree Map opRules = new LinkedHashMap(); - opRules.put(new RuleRegExp(new String("Split Work - ReduceSink"), + opRules.put(new RuleRegExp("Split Work - ReduceSink", ReduceSinkOperator.getOperatorName() + "%"), genTezWork); - opRules.put(new RuleRegExp(new String("No more walking on ReduceSink-MapJoin"), + + opRules.put(new RuleRegExp("No more walking on ReduceSink-MapJoin", ReduceSinkOperator.getOperatorName() + "%" + MapJoinOperator.getOperatorName() + "%"), new ReduceSinkMapJoinProc()); - opRules.put(new RuleRegExp(new String("Split Work - FileSink"), - FileSinkOperator.getOperatorName() + "%"), - genTezWork); + opRules.put(new RuleRegExp("Split Work + Move/Merge - FileSink", + FileSinkOperator.getOperatorName() + "%"), + new CompositeProcessor(new FileSinkProcessor(), genTezWork)); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along diff --git ql/src/test/queries/clientpositive/tez_dml.q ql/src/test/queries/clientpositive/tez_dml.q new file mode 100644 index 0000000..b4ca8be --- /dev/null +++ ql/src/test/queries/clientpositive/tez_dml.q @@ -0,0 +1,37 @@ +set hive.optimize.tez=true; +set hive.exec.dynamic.partition.mode=nonstrict; + +-- CTAS +EXPLAIN CREATE TABLE tmp_src AS SELECT * FROM (SELECT value, count(value) AS cnt FROM src GROUP BY value) f1 ORDER BY cnt; +CREATE TABLE tmp_src AS SELECT * FROM (SELECT value, count(value) AS cnt FROM src GROUP BY value) f1 ORDER BY cnt; + +SELECT * FROM tmp_src; + +-- dyn partitions +CREATE TABLE tmp_src_part (c string) PARTITIONED BY (d int); +EXPLAIN INSERT INTO TABLE tmp_src_part PARTITION (d) SELECT * FROM tmp_src; +INSERT INTO TABLE tmp_src_part PARTITION (d) SELECT * FROM tmp_src; + +SELECT * FROM tmp_src_part; + +-- multi insert +CREATE TABLE even (c int, d string); +CREATE TABLE odd (c int, d string); + +EXPLAIN +FROM src +INSERT INTO TABLE even SELECT key, value WHERE key % 2 = 0 +INSERT INTO TABLE odd SELECT key, value WHERE key % 2 = 1; + +FROM src +INSERT INTO TABLE even SELECT key, value WHERE key % 2 = 0 +INSERT INTO TABLE odd SELECT key, value WHERE key % 2 = 1; + +SELECT * FROM even; +SELECT * FROM odd; + +-- drop the tables +DROP TABLE even; +DROP TABLE odd; +DROP TABLE tmp_src; +DROP TABLE tmp_src_part; diff --git ql/src/test/results/clientpositive/tez_dml.q.out ql/src/test/results/clientpositive/tez_dml.q.out new file mode 100644 index 0000000..e5a140e --- /dev/null +++ ql/src/test/results/clientpositive/tez_dml.q.out @@ -0,0 +1,1778 @@ +PREHOOK: query: -- CTAS +EXPLAIN CREATE TABLE tmp_src AS SELECT * FROM (SELECT value, count(value) AS cnt FROM src GROUP BY value) f1 ORDER BY cnt +PREHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: query: -- CTAS +EXPLAIN CREATE TABLE tmp_src AS SELECT * FROM (SELECT value, count(value) AS cnt FROM src GROUP BY value) f1 ORDER BY cnt +POSTHOOK: type: CREATETABLE_AS_SELECT +ABSTRACT SYNTAX TREE: + (TOK_CREATETABLE (TOK_TABNAME tmp_src) TOK_LIKETABLE (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL value)) (TOK_SELEXPR (TOK_FUNCTION count (TOK_TABLE_OR_COL value)) cnt)) (TOK_GROUPBY (TOK_TABLE_OR_COL value)))) f1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL cnt)))))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6 + Stage-5 + Stage-0 depends on stages: Stage-5, Stage-4, Stage-7 + Stage-9 depends on stages: Stage-0 + Stage-3 depends on stages: Stage-9 + Stage-4 + Stage-6 + Stage-7 depends on stages: Stage-6 + +STAGE PLANS: + Stage: Stage-1 + Tez + Alias -> Map Operator Tree: + src + TableScan + alias: src + Select Operator + expressions: + expr: value + type: string + outputColumnNames: value + Group By Operator + aggregations: + expr: count(value) + bucketGroup: false + keys: + expr: value + type: string + mode: hash + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col0 + type: string + sort order: + + Map-reduce partition columns: + expr: _col0 + type: string + tag: -1 + value expressions: + expr: _col1 + type: bigint + Reduce Operator Tree: + Group By Operator + aggregations: + expr: count(VALUE._col0) + bucketGroup: false + keys: + expr: KEY._col0 + type: string + mode: mergepartial + outputColumnNames: _col0, _col1 + Select Operator + expressions: + expr: _col0 + type: string + expr: _col1 + type: bigint + outputColumnNames: _col0, _col1 + Reduce Output Operator + key expressions: + expr: _col1 + type: bigint + sort order: + + tag: -1 + value expressions: + expr: _col0 + type: string + expr: _col1 + type: bigint + Reduce Operator Tree: + Extract + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tmp_src + + Stage: Stage-8 + Conditional Operator + + Stage: Stage-5 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + Stage: Stage-0 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + Stage: Stage-9 + Create Table Operator: + Create Table + columns: value string, cnt bigint + if not exists: false + input format: org.apache.hadoop.mapred.TextInputFormat + # buckets: -1 + output format: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat + name: tmp_src + isExternal: false + + Stage: Stage-3 + Stats-Aggr Operator + + Stage: Stage-4 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + TableScan + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tmp_src + + Stage: Stage-6 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + TableScan + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tmp_src + + Stage: Stage-7 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + +PREHOOK: query: CREATE TABLE tmp_src AS SELECT * FROM (SELECT value, count(value) AS cnt FROM src GROUP BY value) f1 ORDER BY cnt +PREHOOK: type: CREATETABLE_AS_SELECT +PREHOOK: Input: default@src +POSTHOOK: query: CREATE TABLE tmp_src AS SELECT * FROM (SELECT value, count(value) AS cnt FROM src GROUP BY value) f1 ORDER BY cnt +POSTHOOK: type: CREATETABLE_AS_SELECT +POSTHOOK: Input: default@src +POSTHOOK: Output: default@tmp_src +PREHOOK: query: SELECT * FROM tmp_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tmp_src +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM tmp_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tmp_src +#### A masked pattern was here #### +val_490 1 +val_287 1 +val_286 1 +val_285 1 +val_284 1 +val_283 1 +val_114 1 +val_487 1 +val_485 1 +val_28 1 +val_484 1 +val_181 1 +val_275 1 +val_274 1 +val_183 1 +val_483 1 +val_27 1 +val_266 1 +val_482 1 +val_263 1 +val_262 1 +val_260 1 +val_481 1 +val_258 1 +val_257 1 +val_116 1 +val_479 1 +val_252 1 +val_249 1 +val_248 1 +val_247 1 +val_244 1 +val_92 1 +val_241 1 +val_477 1 +val_475 1 +val_472 1 +val_470 1 +val_235 1 +val_47 1 +val_186 1 +val_126 1 +val_228 1 +val_226 1 +val_131 1 +val_467 1 +val_222 1 +val_133 1 +val_82 1 +val_218 1 +val_80 1 +val_460 1 +val_214 1 +val_8 1 +val_78 1 +val_189 1 +val_457 1 +val_455 1 +val_136 1 +val_202 1 +val_201 1 +val_453 1 +val_20 1 +val_2 1 +val_19 1 +val_452 1 +val_196 1 +val_449 1 +val_194 1 +val_190 1 +val_192 1 +val_448 1 +val_446 1 +val_444 1 +val_443 1 +val_44 1 +val_77 1 +val_143 1 +val_437 1 +val_436 1 +val_435 1 +val_432 1 +val_145 1 +val_150 1 +val_43 1 +val_10 1 +val_427 1 +val_74 1 +val_421 1 +val_9 1 +val_419 1 +val_418 1 +val_153 1 +val_105 1 +val_69 1 +val_411 1 +val_41 1 +val_155 1 +val_407 1 +val_156 1 +val_87 1 +val_157 1 +val_402 1 +val_158 1 +val_400 1 +val_4 1 +val_66 1 +val_65 1 +val_160 1 +val_64 1 +val_394 1 +val_393 1 +val_392 1 +val_389 1 +val_386 1 +val_162 1 +val_86 1 +val_379 1 +val_378 1 +val_377 1 +val_375 1 +val_374 1 +val_373 1 +val_57 1 +val_163 1 +val_368 1 +val_54 1 +val_366 1 +val_365 1 +val_364 1 +val_362 1 +val_360 1 +val_356 1 +val_53 1 +val_351 1 +val_166 1 +val_168 1 +val_345 1 +val_85 1 +val_11 1 +val_341 1 +val_34 1 +val_339 1 +val_338 1 +val_336 1 +val_335 1 +val_111 1 +val_332 1 +val_497 1 +val_33 1 +val_17 1 +val_496 1 +val_323 1 +val_495 1 +val_494 1 +val_170 1 +val_493 1 +val_177 1 +val_315 1 +val_178 1 +val_310 1 +val_96 1 +val_308 1 +val_491 1 +val_306 1 +val_305 1 +val_302 1 +val_30 1 +val_180 1 +val_296 1 +val_292 1 +val_291 1 +val_289 1 +val_98 2 +val_97 2 +val_95 2 +val_84 2 +val_83 2 +val_76 2 +val_72 2 +val_67 2 +val_58 2 +val_51 2 +val_492 2 +val_478 2 +val_463 2 +val_462 2 +val_459 2 +val_458 2 +val_439 2 +val_429 2 +val_424 2 +val_42 2 +val_414 2 +val_413 2 +val_404 2 +val_399 2 +val_397 2 +val_395 2 +val_382 2 +val_37 2 +val_367 2 +val_353 2 +val_344 2 +val_342 2 +val_333 2 +val_331 2 +val_325 2 +val_322 2 +val_321 2 +val_317 2 +val_309 2 +val_307 2 +val_288 2 +val_282 2 +val_281 2 +val_280 2 +val_278 2 +val_272 2 +val_265 2 +val_26 2 +val_256 2 +val_255 2 +val_242 2 +val_24 2 +val_239 2 +val_238 2 +val_237 2 +val_233 2 +val_229 2 +val_224 2 +val_223 2 +val_221 2 +val_219 2 +val_217 2 +val_216 2 +val_213 2 +val_209 2 +val_207 2 +val_205 2 +val_203 2 +val_200 2 +val_197 2 +val_195 2 +val_191 2 +val_18 2 +val_179 2 +val_176 2 +val_175 2 +val_174 2 +val_172 2 +val_165 2 +val_164 2 +val_152 2 +val_15 2 +val_149 2 +val_146 2 +val_137 2 +val_134 2 +val_129 2 +val_125 2 +val_120 2 +val_12 2 +val_118 2 +val_113 2 +val_104 2 +val_103 2 +val_100 2 +val_498 3 +val_369 3 +val_384 3 +val_396 3 +val_403 3 +val_409 3 +val_417 3 +val_5 3 +val_430 3 +val_70 3 +val_119 3 +val_0 3 +val_431 3 +val_438 3 +val_480 3 +val_193 3 +val_199 3 +val_208 3 +val_187 3 +val_273 3 +val_298 3 +val_454 3 +val_311 3 +val_316 3 +val_466 3 +val_90 3 +val_128 3 +val_318 3 +val_327 3 +val_167 3 +val_35 3 +val_468 4 +val_489 4 +val_406 4 +val_169 4 +val_138 4 +val_277 4 +val_469 5 +val_401 5 +val_230 5 +val_348 5 +PREHOOK: query: -- dyn partitions +CREATE TABLE tmp_src_part (c string) PARTITIONED BY (d int) +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- dyn partitions +CREATE TABLE tmp_src_part (c string) PARTITIONED BY (d int) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@tmp_src_part +PREHOOK: query: EXPLAIN INSERT INTO TABLE tmp_src_part PARTITION (d) SELECT * FROM tmp_src +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN INSERT INTO TABLE tmp_src_part PARTITION (d) SELECT * FROM tmp_src +POSTHOOK: type: QUERY +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME tmp_src))) (TOK_INSERT (TOK_INSERT_INTO (TOK_TAB (TOK_TABNAME tmp_src_part) (TOK_PARTSPEC (TOK_PARTVAL d)))) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)))) + +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6 + Stage-5 + Stage-0 depends on stages: Stage-5, Stage-4, Stage-7 + Stage-3 depends on stages: Stage-0 + Stage-4 + Stage-6 + Stage-7 depends on stages: Stage-6 + +STAGE PLANS: + Stage: Stage-1 + Tez + Alias -> Map Operator Tree: + tmp_src + TableScan + alias: tmp_src + Select Operator + expressions: + expr: value + type: string + expr: cnt + type: bigint + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tmp_src_part + + Stage: Stage-8 + Conditional Operator + + Stage: Stage-5 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + Stage: Stage-0 + Move Operator + tables: + partition: + d + replace: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tmp_src_part + + Stage: Stage-3 + Stats-Aggr Operator + + Stage: Stage-4 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + TableScan + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tmp_src_part + + Stage: Stage-6 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + TableScan + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.tmp_src_part + + Stage: Stage-7 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + +PREHOOK: query: INSERT INTO TABLE tmp_src_part PARTITION (d) SELECT * FROM tmp_src +PREHOOK: type: QUERY +PREHOOK: Input: default@tmp_src +PREHOOK: Output: default@tmp_src_part +POSTHOOK: query: INSERT INTO TABLE tmp_src_part PARTITION (d) SELECT * FROM tmp_src +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tmp_src +POSTHOOK: Output: default@tmp_src_part@d=1 +POSTHOOK: Output: default@tmp_src_part@d=2 +POSTHOOK: Output: default@tmp_src_part@d=3 +POSTHOOK: Output: default@tmp_src_part@d=4 +POSTHOOK: Output: default@tmp_src_part@d=5 +POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: SELECT * FROM tmp_src_part +PREHOOK: type: QUERY +PREHOOK: Input: default@tmp_src_part +PREHOOK: Input: default@tmp_src_part@d=1 +PREHOOK: Input: default@tmp_src_part@d=2 +PREHOOK: Input: default@tmp_src_part@d=3 +PREHOOK: Input: default@tmp_src_part@d=4 +PREHOOK: Input: default@tmp_src_part@d=5 +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM tmp_src_part +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tmp_src_part +POSTHOOK: Input: default@tmp_src_part@d=1 +POSTHOOK: Input: default@tmp_src_part@d=2 +POSTHOOK: Input: default@tmp_src_part@d=3 +POSTHOOK: Input: default@tmp_src_part@d=4 +POSTHOOK: Input: default@tmp_src_part@d=5 +#### A masked pattern was here #### +POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +val_490 1 +val_287 1 +val_286 1 +val_285 1 +val_284 1 +val_283 1 +val_114 1 +val_487 1 +val_485 1 +val_28 1 +val_484 1 +val_181 1 +val_275 1 +val_274 1 +val_183 1 +val_483 1 +val_27 1 +val_266 1 +val_482 1 +val_263 1 +val_262 1 +val_260 1 +val_481 1 +val_258 1 +val_257 1 +val_116 1 +val_479 1 +val_252 1 +val_249 1 +val_248 1 +val_247 1 +val_244 1 +val_92 1 +val_241 1 +val_477 1 +val_475 1 +val_472 1 +val_470 1 +val_235 1 +val_47 1 +val_186 1 +val_126 1 +val_228 1 +val_226 1 +val_131 1 +val_467 1 +val_222 1 +val_133 1 +val_82 1 +val_218 1 +val_80 1 +val_460 1 +val_214 1 +val_8 1 +val_78 1 +val_189 1 +val_457 1 +val_455 1 +val_136 1 +val_202 1 +val_201 1 +val_453 1 +val_20 1 +val_2 1 +val_19 1 +val_452 1 +val_196 1 +val_449 1 +val_194 1 +val_190 1 +val_192 1 +val_448 1 +val_446 1 +val_444 1 +val_443 1 +val_44 1 +val_77 1 +val_143 1 +val_437 1 +val_436 1 +val_435 1 +val_432 1 +val_145 1 +val_150 1 +val_43 1 +val_10 1 +val_427 1 +val_74 1 +val_421 1 +val_9 1 +val_419 1 +val_418 1 +val_153 1 +val_105 1 +val_69 1 +val_411 1 +val_41 1 +val_155 1 +val_407 1 +val_156 1 +val_87 1 +val_157 1 +val_402 1 +val_158 1 +val_400 1 +val_4 1 +val_66 1 +val_65 1 +val_160 1 +val_64 1 +val_394 1 +val_393 1 +val_392 1 +val_389 1 +val_386 1 +val_162 1 +val_86 1 +val_379 1 +val_378 1 +val_377 1 +val_375 1 +val_374 1 +val_373 1 +val_57 1 +val_163 1 +val_368 1 +val_54 1 +val_366 1 +val_365 1 +val_364 1 +val_362 1 +val_360 1 +val_356 1 +val_53 1 +val_351 1 +val_166 1 +val_168 1 +val_345 1 +val_85 1 +val_11 1 +val_341 1 +val_34 1 +val_339 1 +val_338 1 +val_336 1 +val_335 1 +val_111 1 +val_332 1 +val_497 1 +val_33 1 +val_17 1 +val_496 1 +val_323 1 +val_495 1 +val_494 1 +val_170 1 +val_493 1 +val_177 1 +val_315 1 +val_178 1 +val_310 1 +val_96 1 +val_308 1 +val_491 1 +val_306 1 +val_305 1 +val_302 1 +val_30 1 +val_180 1 +val_296 1 +val_292 1 +val_291 1 +val_289 1 +val_98 2 +val_97 2 +val_95 2 +val_84 2 +val_83 2 +val_76 2 +val_72 2 +val_67 2 +val_58 2 +val_51 2 +val_492 2 +val_478 2 +val_463 2 +val_462 2 +val_459 2 +val_458 2 +val_439 2 +val_429 2 +val_424 2 +val_42 2 +val_414 2 +val_413 2 +val_404 2 +val_399 2 +val_397 2 +val_395 2 +val_382 2 +val_37 2 +val_367 2 +val_353 2 +val_344 2 +val_342 2 +val_333 2 +val_331 2 +val_325 2 +val_322 2 +val_321 2 +val_317 2 +val_309 2 +val_307 2 +val_288 2 +val_282 2 +val_281 2 +val_280 2 +val_278 2 +val_272 2 +val_265 2 +val_26 2 +val_256 2 +val_255 2 +val_242 2 +val_24 2 +val_239 2 +val_238 2 +val_237 2 +val_233 2 +val_229 2 +val_224 2 +val_223 2 +val_221 2 +val_219 2 +val_217 2 +val_216 2 +val_213 2 +val_209 2 +val_207 2 +val_205 2 +val_203 2 +val_200 2 +val_197 2 +val_195 2 +val_191 2 +val_18 2 +val_179 2 +val_176 2 +val_175 2 +val_174 2 +val_172 2 +val_165 2 +val_164 2 +val_152 2 +val_15 2 +val_149 2 +val_146 2 +val_137 2 +val_134 2 +val_129 2 +val_125 2 +val_120 2 +val_12 2 +val_118 2 +val_113 2 +val_104 2 +val_103 2 +val_100 2 +val_498 3 +val_369 3 +val_384 3 +val_396 3 +val_403 3 +val_409 3 +val_417 3 +val_5 3 +val_430 3 +val_70 3 +val_119 3 +val_0 3 +val_431 3 +val_438 3 +val_480 3 +val_193 3 +val_199 3 +val_208 3 +val_187 3 +val_273 3 +val_298 3 +val_454 3 +val_311 3 +val_316 3 +val_466 3 +val_90 3 +val_128 3 +val_318 3 +val_327 3 +val_167 3 +val_35 3 +val_468 4 +val_489 4 +val_406 4 +val_169 4 +val_138 4 +val_277 4 +val_469 5 +val_401 5 +val_230 5 +val_348 5 +PREHOOK: query: -- multi insert +CREATE TABLE even (c int, d string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: -- multi insert +CREATE TABLE even (c int, d string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@even +POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: CREATE TABLE odd (c int, d string) +PREHOOK: type: CREATETABLE +POSTHOOK: query: CREATE TABLE odd (c int, d string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: default@odd +POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: EXPLAIN +FROM src +INSERT INTO TABLE even SELECT key, value WHERE key % 2 = 0 +INSERT INTO TABLE odd SELECT key, value WHERE key % 2 = 1 +PREHOOK: type: QUERY +POSTHOOK: query: EXPLAIN +FROM src +INSERT INTO TABLE even SELECT key, value WHERE key % 2 = 0 +INSERT INTO TABLE odd SELECT key, value WHERE key % 2 = 1 +POSTHOOK: type: QUERY +POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +ABSTRACT SYNTAX TREE: + (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_INSERT_INTO (TOK_TAB (TOK_TABNAME even))) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL value))) (TOK_WHERE (= (% (TOK_TABLE_OR_COL key) 2) 0))) (TOK_INSERT (TOK_INSERT_INTO (TOK_TAB (TOK_TABNAME odd))) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL value))) (TOK_WHERE (= (% (TOK_TABLE_OR_COL key) 2) 1)))) + +STAGE DEPENDENCIES: + Stage-2 is a root stage + Stage-9 depends on stages: Stage-2 , consists of Stage-6, Stage-5, Stage-7 + Stage-6 + Stage-0 depends on stages: Stage-6, Stage-5, Stage-8 + Stage-4 depends on stages: Stage-0 + Stage-5 + Stage-7 + Stage-8 depends on stages: Stage-7 + Stage-15 depends on stages: Stage-2 , consists of Stage-12, Stage-11, Stage-13 + Stage-12 + Stage-1 depends on stages: Stage-12, Stage-11, Stage-14 + Stage-10 depends on stages: Stage-1 + Stage-11 + Stage-13 + Stage-14 depends on stages: Stage-13 + +STAGE PLANS: + Stage: Stage-2 + Tez + Alias -> Map Operator Tree: + src + TableScan + alias: src + Filter Operator + predicate: + expr: ((key % 2) = 0) + type: boolean + Select Operator + expressions: + expr: UDFToInteger(key) + type: int + expr: value + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 1 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.even + Filter Operator + predicate: + expr: ((key % 2) = 1) + type: boolean + Select Operator + expressions: + expr: UDFToInteger(key) + type: int + expr: value + type: string + outputColumnNames: _col0, _col1 + File Output Operator + compressed: false + GlobalTableId: 2 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.odd + + Stage: Stage-9 + Conditional Operator + + Stage: Stage-6 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + Stage: Stage-0 + Move Operator + tables: + replace: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.even + + Stage: Stage-4 + Stats-Aggr Operator + + Stage: Stage-5 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + TableScan + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.even + + Stage: Stage-7 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + TableScan + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.even + + Stage: Stage-8 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + Stage: Stage-15 + Conditional Operator + + Stage: Stage-12 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + Stage: Stage-1 + Move Operator + tables: + replace: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.odd + + Stage: Stage-10 + Stats-Aggr Operator + + Stage: Stage-11 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + TableScan + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.odd + + Stage: Stage-13 + Map Reduce + Alias -> Map Operator Tree: +#### A masked pattern was here #### + TableScan + File Output Operator + compressed: false + GlobalTableId: 0 + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.odd + + Stage: Stage-14 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + + +PREHOOK: query: FROM src +INSERT INTO TABLE even SELECT key, value WHERE key % 2 = 0 +INSERT INTO TABLE odd SELECT key, value WHERE key % 2 = 1 +PREHOOK: type: QUERY +PREHOOK: Input: default@src +PREHOOK: Output: default@even +PREHOOK: Output: default@odd +POSTHOOK: query: FROM src +INSERT INTO TABLE even SELECT key, value WHERE key % 2 = 0 +INSERT INTO TABLE odd SELECT key, value WHERE key % 2 = 1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@src +POSTHOOK: Output: default@even +POSTHOOK: Output: default@odd +POSTHOOK: Lineage: even.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: even.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: odd.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: odd.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: SELECT * FROM even +PREHOOK: type: QUERY +PREHOOK: Input: default@even +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM even +POSTHOOK: type: QUERY +POSTHOOK: Input: default@even +#### A masked pattern was here #### +POSTHOOK: Lineage: even.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: even.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: odd.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: odd.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +238 val_238 +86 val_86 +278 val_278 +98 val_98 +484 val_484 +150 val_150 +224 val_224 +66 val_66 +128 val_128 +146 val_146 +406 val_406 +374 val_374 +152 val_152 +82 val_82 +166 val_166 +430 val_430 +252 val_252 +292 val_292 +338 val_338 +446 val_446 +394 val_394 +482 val_482 +174 val_174 +494 val_494 +466 val_466 +208 val_208 +174 val_174 +396 val_396 +162 val_162 +266 val_266 +342 val_342 +0 val_0 +128 val_128 +316 val_316 +302 val_302 +438 val_438 +170 val_170 +20 val_20 +378 val_378 +92 val_92 +72 val_72 +4 val_4 +280 val_280 +208 val_208 +356 val_356 +382 val_382 +498 val_498 +386 val_386 +192 val_192 +286 val_286 +176 val_176 +54 val_54 +138 val_138 +216 val_216 +430 val_430 +278 val_278 +176 val_176 +318 val_318 +332 val_332 +180 val_180 +284 val_284 +12 val_12 +230 val_230 +260 val_260 +404 val_404 +384 val_384 +272 val_272 +138 val_138 +84 val_84 +348 val_348 +466 val_466 +58 val_58 +8 val_8 +230 val_230 +208 val_208 +348 val_348 +24 val_24 +172 val_172 +42 val_42 +158 val_158 +496 val_496 +0 val_0 +322 val_322 +468 val_468 +454 val_454 +100 val_100 +298 val_298 +418 val_418 +96 val_96 +26 val_26 +230 val_230 +120 val_120 +404 val_404 +436 val_436 +156 val_156 +468 val_468 +308 val_308 +196 val_196 +288 val_288 +98 val_98 +282 val_282 +318 val_318 +318 val_318 +470 val_470 +316 val_316 +0 val_0 +490 val_490 +364 val_364 +118 val_118 +134 val_134 +282 val_282 +138 val_138 +238 val_238 +118 val_118 +72 val_72 +90 val_90 +10 val_10 +306 val_306 +224 val_224 +242 val_242 +392 val_392 +272 val_272 +242 val_242 +452 val_452 +226 val_226 +402 val_402 +396 val_396 +58 val_58 +336 val_336 +168 val_168 +34 val_34 +472 val_472 +322 val_322 +498 val_498 +160 val_160 +42 val_42 +430 val_430 +458 val_458 +78 val_78 +76 val_76 +492 val_492 +218 val_218 +228 val_228 +138 val_138 +30 val_30 +64 val_64 +468 val_468 +76 val_76 +74 val_74 +342 val_342 +230 val_230 +368 val_368 +296 val_296 +216 val_216 +344 val_344 +274 val_274 +116 val_116 +256 val_256 +70 val_70 +480 val_480 +288 val_288 +244 val_244 +438 val_438 +128 val_128 +432 val_432 +202 val_202 +316 val_316 +280 val_280 +2 val_2 +80 val_80 +44 val_44 +104 val_104 +466 val_466 +366 val_366 +406 val_406 +190 val_190 +406 val_406 +114 val_114 +258 val_258 +90 val_90 +262 val_262 +348 val_348 +424 val_424 +12 val_12 +396 val_396 +164 val_164 +454 val_454 +478 val_478 +298 val_298 +164 val_164 +424 val_424 +382 val_382 +70 val_70 +480 val_480 +24 val_24 +104 val_104 +70 val_70 +438 val_438 +414 val_414 +200 val_200 +360 val_360 +248 val_248 +444 val_444 +120 val_120 +230 val_230 +478 val_478 +178 val_178 +468 val_468 +310 val_310 +460 val_460 +480 val_480 +136 val_136 +172 val_172 +214 val_214 +462 val_462 +406 val_406 +454 val_454 +384 val_384 +256 val_256 +26 val_26 +134 val_134 +384 val_384 +18 val_18 +462 val_462 +492 val_492 +100 val_100 +298 val_298 +498 val_498 +146 val_146 +458 val_458 +362 val_362 +186 val_186 +348 val_348 +18 val_18 +344 val_344 +84 val_84 +28 val_28 +448 val_448 +152 val_152 +348 val_348 +194 val_194 +414 val_414 +222 val_222 +126 val_126 +90 val_90 +400 val_400 +200 val_200 +PREHOOK: query: SELECT * FROM odd +PREHOOK: type: QUERY +PREHOOK: Input: default@odd +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM odd +POSTHOOK: type: QUERY +POSTHOOK: Input: default@odd +#### A masked pattern was here #### +POSTHOOK: Lineage: even.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: even.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: odd.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: odd.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +311 val_311 +27 val_27 +165 val_165 +409 val_409 +255 val_255 +265 val_265 +193 val_193 +401 val_401 +273 val_273 +369 val_369 +213 val_213 +429 val_429 +469 val_469 +145 val_145 +495 val_495 +37 val_37 +327 val_327 +281 val_281 +277 val_277 +209 val_209 +15 val_15 +403 val_403 +417 val_417 +219 val_219 +287 val_287 +153 val_153 +193 val_193 +459 val_459 +237 val_237 +413 val_413 +207 val_207 +199 val_199 +399 val_399 +247 val_247 +417 val_417 +489 val_489 +377 val_377 +397 val_397 +309 val_309 +365 val_365 +439 val_439 +367 val_367 +325 val_325 +167 val_167 +195 val_195 +475 val_475 +17 val_17 +113 val_113 +155 val_155 +203 val_203 +339 val_339 +455 val_455 +311 val_311 +57 val_57 +205 val_205 +149 val_149 +345 val_345 +129 val_129 +489 val_489 +157 val_157 +221 val_221 +111 val_111 +47 val_47 +35 val_35 +427 val_427 +277 val_277 +399 val_399 +169 val_169 +125 val_125 +437 val_437 +469 val_469 +187 val_187 +459 val_459 +51 val_51 +103 val_103 +239 val_239 +213 val_213 +289 val_289 +221 val_221 +65 val_65 +311 val_311 +275 val_275 +137 val_137 +241 val_241 +83 val_83 +333 val_333 +181 val_181 +67 val_67 +489 val_489 +353 val_353 +373 val_373 +217 val_217 +411 val_411 +463 val_463 +431 val_431 +179 val_179 +129 val_129 +119 val_119 +197 val_197 +393 val_393 +199 val_199 +191 val_191 +165 val_165 +327 val_327 +205 val_205 +131 val_131 +51 val_51 +43 val_43 +469 val_469 +95 val_95 +481 val_481 +457 val_457 +197 val_197 +187 val_187 +409 val_409 +137 val_137 +369 val_369 +169 val_169 +413 val_413 +85 val_85 +77 val_77 +87 val_87 +179 val_179 +395 val_395 +419 val_419 +15 val_15 +307 val_307 +19 val_19 +435 val_435 +277 val_277 +273 val_273 +309 val_309 +389 val_389 +327 val_327 +369 val_369 +331 val_331 +401 val_401 +177 val_177 +5 val_5 +497 val_497 +317 val_317 +395 val_395 +35 val_35 +95 val_95 +11 val_11 +229 val_229 +233 val_233 +143 val_143 +195 val_195 +321 val_321 +119 val_119 +489 val_489 +41 val_41 +223 val_223 +149 val_149 +449 val_449 +453 val_453 +209 val_209 +69 val_69 +33 val_33 +103 val_103 +113 val_113 +367 val_367 +167 val_167 +219 val_219 +239 val_239 +485 val_485 +223 val_223 +263 val_263 +487 val_487 +401 val_401 +191 val_191 +5 val_5 +467 val_467 +229 val_229 +469 val_469 +463 val_463 +35 val_35 +283 val_283 +331 val_331 +235 val_235 +193 val_193 +321 val_321 +335 val_335 +175 val_175 +403 val_403 +483 val_483 +53 val_53 +105 val_105 +257 val_257 +409 val_409 +401 val_401 +203 val_203 +201 val_201 +217 val_217 +431 val_431 +125 val_125 +431 val_431 +187 val_187 +5 val_5 +397 val_397 +291 val_291 +351 val_351 +255 val_255 +163 val_163 +119 val_119 +491 val_491 +237 val_237 +439 val_439 +479 val_479 +305 val_305 +417 val_417 +199 val_199 +429 val_429 +169 val_169 +443 val_443 +323 val_323 +325 val_325 +277 val_277 +317 val_317 +333 val_333 +493 val_493 +207 val_207 +249 val_249 +265 val_265 +83 val_83 +353 val_353 +233 val_233 +133 val_133 +175 val_175 +189 val_189 +375 val_375 +401 val_401 +421 val_421 +407 val_407 +67 val_67 +379 val_379 +9 val_9 +341 val_341 +285 val_285 +167 val_167 +273 val_273 +183 val_183 +281 val_281 +97 val_97 +469 val_469 +315 val_315 +37 val_37 +307 val_307 +477 val_477 +169 val_169 +403 val_403 +97 val_97 +PREHOOK: query: -- drop the tables +DROP TABLE even +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@even +PREHOOK: Output: default@even +POSTHOOK: query: -- drop the tables +DROP TABLE even +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@even +POSTHOOK: Output: default@even +POSTHOOK: Lineage: even.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: even.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: odd.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: odd.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: DROP TABLE odd +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@odd +PREHOOK: Output: default@odd +POSTHOOK: query: DROP TABLE odd +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@odd +POSTHOOK: Output: default@odd +POSTHOOK: Lineage: even.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: even.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: odd.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: odd.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: DROP TABLE tmp_src +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tmp_src +PREHOOK: Output: default@tmp_src +POSTHOOK: query: DROP TABLE tmp_src +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tmp_src +POSTHOOK: Output: default@tmp_src +POSTHOOK: Lineage: even.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: even.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: odd.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: odd.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +PREHOOK: query: DROP TABLE tmp_src_part +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@tmp_src_part +PREHOOK: Output: default@tmp_src_part +POSTHOOK: query: DROP TABLE tmp_src_part +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@tmp_src_part +POSTHOOK: Output: default@tmp_src_part +POSTHOOK: Lineage: even.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: even.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: odd.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ] +POSTHOOK: Lineage: odd.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ] +POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]