diff --git build-common.xml build-common.xml
index 3ca7f87..670d1a3 100644
--- build-common.xml
+++ build-common.xml
@@ -61,7 +61,7 @@
-
+
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
index 0a2f976..203defb 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
@@ -134,16 +134,18 @@ public void setJobId(String jobId) {
this.jobId = jobId;
}
-
- public HadoopJobExecHelper() {
- }
-
public HadoopJobExecHelper(JobConf job, LogHelper console,
Task extends Serializable> task, HadoopJobExecHook hookCallBack) {
this.job = job;
this.console = console;
this.task = task;
this.callBackObj = hookCallBack;
+
+ if (job != null) {
+ // even with tez on some jobs are run as MR. disable the flag in
+ // the conf, so that the backend runs fully as MR.
+ HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_OPTIMIZE_TEZ, false);
+ }
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 652a264..272fc48 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -45,10 +45,13 @@
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
+import org.apache.hadoop.hive.ql.stats.StatsFactory;
+import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.shims.Hadoop20Shims.NullOutputCommitter;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BytesWritable;
@@ -613,6 +616,22 @@ public static Vertex createVertex(JobConf conf, BaseWork work,
return null;
}
+ // initialize stats publisher if necessary
+ if (work.isGatheringStats()) {
+ StatsPublisher statsPublisher;
+ String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
+ if (StatsFactory.setImplementation(statsImplementationClass, conf)) {
+ statsPublisher = StatsFactory.getStatsPublisher();
+ if (!statsPublisher.init(conf)) { // creating stats table if not exists
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
+ throw
+ new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
+ }
+ }
+ }
+ }
+
+
// final vertices need to have at least one output
if (!hasChildren) {
v.addOutput("out_"+work.getName(),
diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 4804f1d..fd1671b 100644
--- ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -117,6 +117,11 @@ public void open(String sessionId, HiveConf conf)
LOG.info("Opening new Tez Session (id: "+sessionId+", scratch dir: "+tezScratchDir+")");
session.start();
+
+ // In case we need to run some MR jobs, we'll run them under tez MR emulation. The session
+ // id is used for tez to reuse the current session rather than start a new one.
+ conf.set("mapreduce.framework.name", "yarn-tez");
+ conf.set("tez.session.id", session.getApplicationId().toString());
}
/**
diff --git ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
index 45a49c5..8cbf32f 100644
--- ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
+++ ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.io;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.session.SessionState;
/**
@@ -37,12 +38,19 @@
protected synchronized IOContext initialValue() { return new IOContext(); }
};
+ private static IOContext ioContext = new IOContext();
+
public static IOContext get() {
+ if (SessionState.get() == null) {
+ // this happens on the backend. only one io context needed.
+ return ioContext;
+ }
return IOContext.threadLocal.get();
}
public static void clear() {
IOContext.threadLocal.remove();
+ ioContext = new IOContext();
}
long currentBlockStart;
diff --git ql/src/java/org/apache/hadoop/hive/ql/lib/CompositeProcessor.java ql/src/java/org/apache/hadoop/hive/ql/lib/CompositeProcessor.java
new file mode 100644
index 0000000..d7d5e80
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/lib/CompositeProcessor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.lib;
+
+import java.util.Stack;
+
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+/**
+ * CompositeProcessor. Holds a list of node processors to be fired by the same
+ * rule.
+ *
+ */
+public class CompositeProcessor implements NodeProcessor {
+
+ NodeProcessor[] procs;
+
+ public CompositeProcessor(NodeProcessor...nodeProcessors) {
+ procs = nodeProcessors;
+ }
+
+ @Override
+ public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
+ throws SemanticException {
+ for (NodeProcessor proc: procs) {
+ proc.process(nd, stack, procCtx, nodeOutputs);
+ }
+ return null;
+ }
+}
diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
index 686a380..529dcb5 100644
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
@@ -21,51 +21,25 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.exec.ColumnInfo;
-import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.MoveTask;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.OperatorFactory;
-import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
-import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
-import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
-import org.apache.hadoop.hive.ql.plan.ConditionalWork;
-import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
-import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
-import org.apache.hadoop.hive.ql.plan.StatsWork;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.mapred.InputFormat;
/**
* Processor for the rule - table scan followed by reduce sink.
@@ -95,8 +69,7 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx,
FileSinkOperator fsOp = (FileSinkOperator) nd;
boolean isInsertTable = // is INSERT OVERWRITE TABLE
- fsOp.getConf().getTableInfo().getTableName() != null &&
- parseCtx.getQB().getParseInfo().isInsertToTable();
+ GenMapRedUtils.isInsertInto(parseCtx, fsOp);
HiveConf hconf = parseCtx.getConf();
// Mark this task as a final map reduce task (ignoring the optional merge task)
@@ -111,49 +84,12 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx,
return true;
}
- // Has the user enabled merging of files for map-only jobs or for all jobs
- if ((ctx.getMvTask() != null) && (!ctx.getMvTask().isEmpty())) {
- List> mvTasks = ctx.getMvTask();
-
- // In case of unions or map-joins, it is possible that the file has
- // already been seen.
- // So, no need to attempt to merge the files again.
- if ((ctx.getSeenFileSinkOps() == null)
- || (!ctx.getSeenFileSinkOps().contains(nd))) {
-
- // no need of merging if the move is to a local file system
- MoveTask mvTask = (MoveTask) findMoveTask(mvTasks, fsOp);
-
- if (mvTask != null && isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)) {
- addStatsTask(fsOp, mvTask, currTask, parseCtx.getConf());
- }
-
- if ((mvTask != null) && !mvTask.isLocal() && fsOp.getConf().canBeMerged()) {
- if (fsOp.getConf().isLinkedFileSink()) {
- // If the user has HIVEMERGEMAPREDFILES set to false, the idea was the
- // number of reducers are few, so the number of files anyway are small.
- // However, with this optimization, we are increasing the number of files
- // possibly by a big margin. So, merge aggresively.
- if (hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) ||
- hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES)) {
- chDir = true;
- }
- } else {
- // There are separate configuration parameters to control whether to
- // merge for a map-only job
- // or for a map-reduce job
- MapredWork currWork = (MapredWork) currTask.getWork();
- boolean mergeMapOnly =
- hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && currWork.getReduceWork() == null;
- boolean mergeMapRed =
- hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
- currWork.getReduceWork() != null;
- if (mergeMapOnly || mergeMapRed) {
- chDir = true;
- }
- }
- }
- }
+ // In case of unions or map-joins, it is possible that the file has
+ // already been seen.
+ // So, no need to attempt to merge the files again.
+ if ((ctx.getSeenFileSinkOps() == null)
+ || (!ctx.getSeenFileSinkOps().contains(nd))) {
+ chDir = GenMapRedUtils.isMergeRequired(ctx.getMvTask(), hconf, fsOp, currTask, isInsertTable);
}
String finalName = processFS(fsOp, stack, opProcCtx, chDir);
@@ -162,7 +98,9 @@ public Object process(Node nd, Stack stack, NodeProcessorCtx opProcCtx,
// Merge the files in the destination table/partitions by creating Map-only merge job
// If underlying data is RCFile a RCFileBlockMerge task would be created.
LOG.info("using CombineHiveInputformat for the merge job");
- createMRWorkForMergingFiles(fsOp, ctx, finalName);
+ GenMapRedUtils.createMRWorkForMergingFiles(fsOp, finalName,
+ ctx.getDependencyTaskForMultiInsert(), ctx.getMvTask(),
+ hconf, currTask);
}
FileSinkDesc fileSinkDesc = fsOp.getConf();
@@ -205,435 +143,6 @@ private void processLinkedFileDesc(GenMRProcContext ctx,
}
/**
- * Add the StatsTask as a dependent task of the MoveTask
- * because StatsTask will change the Table/Partition metadata. For atomicity, we
- * should not change it before the data is actually there done by MoveTask.
- *
- * @param nd
- * the FileSinkOperator whose results are taken care of by the MoveTask.
- * @param mvTask
- * The MoveTask that moves the FileSinkOperator's results.
- * @param currTask
- * The MapRedTask that the FileSinkOperator belongs to.
- * @param hconf
- * HiveConf
- */
- private void addStatsTask(FileSinkOperator nd, MoveTask mvTask,
- Task extends Serializable> currTask, HiveConf hconf) {
-
- MoveWork mvWork = ((MoveTask) mvTask).getWork();
- StatsWork statsWork = null;
- if (mvWork.getLoadTableWork() != null) {
- statsWork = new StatsWork(mvWork.getLoadTableWork());
- } else if (mvWork.getLoadFileWork() != null) {
- statsWork = new StatsWork(mvWork.getLoadFileWork());
- }
- assert statsWork != null : "Error when genereting StatsTask";
- statsWork.setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
- MapredWork mrWork = (MapredWork) currTask.getWork();
-
- // AggKey in StatsWork is used for stats aggregation while StatsAggPrefix
- // in FileSinkDesc is used for stats publishing. They should be consistent.
- statsWork.setAggKey(((FileSinkOperator) nd).getConf().getStatsAggPrefix());
- Task extends Serializable> statsTask = TaskFactory.get(statsWork, hconf);
-
- // mark the MapredWork and FileSinkOperator for gathering stats
- nd.getConf().setGatherStats(true);
- mrWork.getMapWork().setGatheringStats(true);
- if (mrWork.getReduceWork() != null) {
- mrWork.getReduceWork().setGatheringStats(true);
- }
- nd.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
- nd.getConf().setMaxStatsKeyPrefixLength(
- hconf.getIntVar(ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH));
- // mrWork.addDestinationTable(nd.getConf().getTableInfo().getTableName());
-
- // subscribe feeds from the MoveTask so that MoveTask can forward the list
- // of dynamic partition list to the StatsTask
- mvTask.addDependentTask(statsTask);
- statsTask.subscribeFeed(mvTask);
- }
-
- /**
- * @param fsInput The FileSink operator.
- * @param ctx The MR processing context.
- * @param finalName the final destination path the merge job should output.
- * @throws SemanticException
-
- * create a Map-only merge job using CombineHiveInputFormat for all partitions with
- * following operators:
- * MR job J0:
- * ...
- * |
- * v
- * FileSinkOperator_1 (fsInput)
- * |
- * v
- * Merge job J1:
- * |
- * v
- * TableScan (using CombineHiveInputFormat) (tsMerge)
- * |
- * v
- * FileSinkOperator (fsMerge)
- *
- * Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths
- * do
- * not contain the dynamic partitions (their parent). So after the dynamic partitions are
- * created (after the first job finished before the moveTask or ConditionalTask start),
- * we need to change the pathToPartitionInfo & pathToAlias to include the dynamic
- * partition
- * directories.
- *
- */
- private void createMRWorkForMergingFiles (FileSinkOperator fsInput, GenMRProcContext ctx,
- String finalName) throws SemanticException {
-
- //
- // 1. create the operator tree
- //
- HiveConf conf = ctx.getParseCtx().getConf();
- FileSinkDesc fsInputDesc = fsInput.getConf();
-
- // Create a TableScan operator
- RowSchema inputRS = fsInput.getSchema();
- Operator extends OperatorDesc> tsMerge =
- GenMapRedUtils.createTemporaryTableScanOperator(inputRS);
-
- // Create a FileSink operator
- TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone();
- FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts,
- conf.getBoolVar(ConfVars.COMPRESSRESULT));
- FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild(
- fsOutputDesc, inputRS, tsMerge);
-
- // If the input FileSinkOperator is a dynamic partition enabled, the tsMerge input schema
- // needs to include the partition column, and the fsOutput should have
- // a DynamicPartitionCtx to indicate that it needs to dynamically partitioned.
- DynamicPartitionCtx dpCtx = fsInputDesc.getDynPartCtx();
- if (dpCtx != null && dpCtx.getNumDPCols() > 0) {
- // adding DP ColumnInfo to the RowSchema signature
- ArrayList signature = inputRS.getSignature();
- String tblAlias = fsInputDesc.getTableInfo().getTableName();
- LinkedHashMap colMap = new LinkedHashMap();
- StringBuilder partCols = new StringBuilder();
- for (String dpCol : dpCtx.getDPColNames()) {
- ColumnInfo colInfo = new ColumnInfo(dpCol,
- TypeInfoFactory.stringTypeInfo, // all partition column type should be string
- tblAlias, true); // partition column is virtual column
- signature.add(colInfo);
- colMap.put(dpCol, dpCol); // input and output have the same column name
- partCols.append(dpCol).append('/');
- }
- partCols.setLength(partCols.length() - 1); // remove the last '/'
- inputRS.setSignature(signature);
-
- // create another DynamicPartitionCtx, which has a different input-to-DP column mapping
- DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx);
- dpCtx2.setInputToDPCols(colMap);
- fsOutputDesc.setDynPartCtx(dpCtx2);
-
- // update the FileSinkOperator to include partition columns
- fsInputDesc.getTableInfo().getProperties().setProperty(
- org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS,
- partCols.toString()); // list of dynamic partition column names
- } else {
- // non-partitioned table
- fsInputDesc.getTableInfo().getProperties().remove(
- org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
- }
-
- //
- // 2. Constructing a conditional task consisting of a move task and a map reduce task
- //
- MoveWork dummyMv = new MoveWork(null, null, null,
- new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false);
- MapWork cplan;
- Serializable work;
-
- if (conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) &&
- fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) {
-
- // Check if InputFormatClass is valid
- String inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
- try {
- Class c = (Class extends InputFormat>) Class.forName(inputFormatClass);
-
- LOG.info("RCFile format- Using block level merge");
- cplan = createRCFileMergeTask(fsInputDesc, finalName,
- dpCtx != null && dpCtx.getNumDPCols() > 0);
- work = cplan;
- } catch (ClassNotFoundException e) {
- String msg = "Illegal input format class: " + inputFormatClass;
- throw new SemanticException(msg);
- }
-
- } else {
- cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc);
- work = new MapredWork();
- ((MapredWork)work).setMapWork(cplan);
- // use CombineHiveInputFormat for map-only merging
- }
- cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
- // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't
- // know if merge MR2 will be triggered at execution time
- ConditionalTask cndTsk = createCondTask(conf, ctx.getCurrTask(), dummyMv, work,
- fsInputDesc.getFinalDirName());
-
- // keep the dynamic partition context in conditional task resolver context
- ConditionalResolverMergeFilesCtx mrCtx =
- (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx();
- mrCtx.setDPCtx(fsInputDesc.getDynPartCtx());
- mrCtx.setLbCtx(fsInputDesc.getLbCtx());
-
- //
- // 3. add the moveTask as the children of the conditional task
- //
- linkMoveTask(ctx, fsOutput, cndTsk);
- }
-
- /**
- * Make the move task in the GenMRProcContext following the FileSinkOperator a dependent of all
- * possible subtrees branching from the ConditionalTask.
- *
- * @param ctx
- * @param newOutput
- * @param cndTsk
- */
- private void linkMoveTask(GenMRProcContext ctx, FileSinkOperator newOutput,
- ConditionalTask cndTsk) {
-
- List> mvTasks = ctx.getMvTask();
- Task mvTask = findMoveTask(mvTasks, newOutput);
-
- for (Task extends Serializable> tsk : cndTsk.getListTasks()) {
- linkMoveTask(ctx, mvTask, tsk);
- }
- }
-
- /**
- * Follows the task tree down from task and makes all leaves parents of mvTask
- *
- * @param ctx
- * @param mvTask
- * @param task
- */
- private void linkMoveTask(GenMRProcContext ctx, Task mvTask,
- Task extends Serializable> task) {
-
- if (task.getDependentTasks() == null || task.getDependentTasks().isEmpty()) {
- // If it's a leaf, add the move task as a child
- addDependentMoveTasks(ctx, mvTask, task);
- } else {
- // Otherwise, for each child run this method recursively
- for (Task extends Serializable> childTask : task.getDependentTasks()) {
- linkMoveTask(ctx, mvTask, childTask);
- }
- }
- }
-
- /**
- * Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask. If mvTask is a
- * load table, and HIVE_MULTI_INSERT_ATOMIC_OUTPUTS is set, adds mvTask as a dependent of
- * dependencyTaskForMultiInsert in ctx, otherwise adds mvTask as a dependent of parentTask as
- * well.
- *
- * @param ctx
- * @param mvTask
- * @param parentTask
- */
- private void addDependentMoveTasks(GenMRProcContext ctx, Task mvTask,
- Task extends Serializable> parentTask) {
-
- if (mvTask != null) {
- if (ctx.getConf().getBoolVar(ConfVars.HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES)) {
- DependencyCollectionTask dependencyTask = ctx.getDependencyTaskForMultiInsert();
- parentTask.addDependentTask(dependencyTask);
- if (mvTask.getWork().getLoadTableWork() != null) {
- // Moving tables/partitions depend on the dependencyTask
- dependencyTask.addDependentTask(mvTask);
- } else {
- // Moving files depends on the parentTask (we still want the dependencyTask to depend
- // on the parentTask)
- parentTask.addDependentTask(mvTask);
- }
- } else {
- parentTask.addDependentTask(mvTask);
- }
- }
- }
-
- /**
- * Create a MapredWork based on input path, the top operator and the input
- * table descriptor.
- *
- * @param conf
- * @param topOp
- * the table scan operator that is the root of the MapReduce task.
- * @param fsDesc
- * the file sink descriptor that serves as the input to this merge task.
- * @param parentMR
- * the parent MapReduce work
- * @param parentFS
- * the last FileSinkOperator in the parent MapReduce work
- * @return the MapredWork
- */
- private MapWork createMRWorkForMergingFiles (HiveConf conf,
- Operator extends OperatorDesc> topOp, FileSinkDesc fsDesc) {
-
- ArrayList aliases = new ArrayList();
- String inputDir = fsDesc.getFinalDirName();
- TableDesc tblDesc = fsDesc.getTableInfo();
- aliases.add(inputDir); // dummy alias: just use the input path
-
- // constructing the default MapredWork
- MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf);
- MapWork cplan = cMrPlan.getMapWork();
- cplan.getPathToAliases().put(inputDir, aliases);
- cplan.getPathToPartitionInfo().put(inputDir, new PartitionDesc(tblDesc, null));
- cplan.getAliasToWork().put(inputDir, topOp);
- cplan.setMapperCannotSpanPartns(true);
-
- return cplan;
- }
-
- /**
- * Create a block level merge task for RCFiles.
- *
- * @param fsInputDesc
- * @param finalName
- * @return MergeWork if table is stored as RCFile,
- * null otherwise
- */
- private MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
- String finalName, boolean hasDynamicPartitions) throws SemanticException {
-
- String inputDir = fsInputDesc.getFinalDirName();
- TableDesc tblDesc = fsInputDesc.getTableInfo();
-
- if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
- ArrayList inputDirs = new ArrayList();
- if (!hasDynamicPartitions
- && !isSkewedStoredAsDirs(fsInputDesc)) {
- inputDirs.add(inputDir);
- }
-
- MergeWork work = new MergeWork(inputDirs, finalName,
- hasDynamicPartitions, fsInputDesc.getDynPartCtx());
- LinkedHashMap> pathToAliases =
- new LinkedHashMap>();
- pathToAliases.put(inputDir, (ArrayList) inputDirs.clone());
- work.setMapperCannotSpanPartns(true);
- work.setPathToAliases(pathToAliases);
- work.setAliasToWork(
- new LinkedHashMap>());
- if (hasDynamicPartitions
- || isSkewedStoredAsDirs(fsInputDesc)) {
- work.getPathToPartitionInfo().put(inputDir,
- new PartitionDesc(tblDesc, null));
- }
- work.setListBucketingCtx(fsInputDesc.getLbCtx());
-
- return work;
- }
-
- throw new SemanticException("createRCFileMergeTask called on non-RCFile table");
- }
-
- /**
- * check if it is skewed table and stored as dirs.
- *
- * @param fsInputDesc
- * @return
- */
- private boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) {
- return (fsInputDesc.getLbCtx() == null) ? false : fsInputDesc.getLbCtx()
- .isSkewedStoredAsDir();
- }
-
- /**
- * Construct a conditional task given the current leaf task, the MoveWork and the MapredWork.
- *
- * @param conf
- * HiveConf
- * @param currTask
- * current leaf task
- * @param mvWork
- * MoveWork for the move task
- * @param mergeWork
- * MapredWork for the merge task.
- * @param inputPath
- * the input directory of the merge/move task
- * @return The conditional task
- */
- private ConditionalTask createCondTask(HiveConf conf,
- Task extends Serializable> currTask, MoveWork mvWork,
- Serializable mergeWork, String inputPath) {
-
- // There are 3 options for this ConditionalTask:
- // 1) Merge the partitions
- // 2) Move the partitions (i.e. don't merge the partitions)
- // 3) Merge some partitions and move other partitions (i.e. merge some partitions and don't
- // merge others) in this case the merge is done first followed by the move to prevent
- // conflicts.
- Task extends Serializable> mergeOnlyMergeTask = TaskFactory.get(mergeWork, conf);
- Task extends Serializable> moveOnlyMoveTask = TaskFactory.get(mvWork, conf);
- Task extends Serializable> mergeAndMoveMergeTask = TaskFactory.get(mergeWork, conf);
- Task extends Serializable> mergeAndMoveMoveTask = TaskFactory.get(mvWork, conf);
-
- // NOTE! It is necessary merge task is the parent of the move task, and not
- // the other way around, for the proper execution of the execute method of
- // ConditionalTask
- mergeAndMoveMergeTask.addDependentTask(mergeAndMoveMoveTask);
-
- List listWorks = new ArrayList();
- listWorks.add(mvWork);
- listWorks.add(mergeWork);
-
- ConditionalWork cndWork = new ConditionalWork(listWorks);
-
- List> listTasks = new ArrayList>();
- listTasks.add(moveOnlyMoveTask);
- listTasks.add(mergeOnlyMergeTask);
- listTasks.add(mergeAndMoveMergeTask);
-
- ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, conf);
- cndTsk.setListTasks(listTasks);
-
- // create resolver
- cndTsk.setResolver(new ConditionalResolverMergeFiles());
- ConditionalResolverMergeFilesCtx mrCtx =
- new ConditionalResolverMergeFilesCtx(listTasks, inputPath);
- cndTsk.setResolverCtx(mrCtx);
-
- // make the conditional task as the child of the current leaf task
- currTask.addDependentTask(cndTsk);
-
- return cndTsk;
- }
-
- private Task findMoveTask(
- List> mvTasks, FileSinkOperator fsOp) {
- // find the move task
- for (Task mvTsk : mvTasks) {
- MoveWork mvWork = mvTsk.getWork();
- String srcDir = null;
- if (mvWork.getLoadFileWork() != null) {
- srcDir = mvWork.getLoadFileWork().getSourceDir();
- } else if (mvWork.getLoadTableWork() != null) {
- srcDir = mvWork.getLoadTableWork().getSourceDir();
- }
-
- String fsOpDirName = fsOp.getConf().getFinalDirName();
- if ((srcDir != null)
- && (srcDir.equalsIgnoreCase(fsOpDirName))) {
- return mvTsk;
- }
- }
- return null;
- }
-
- /**
* Process the FileSink operator to generate a MoveTask if necessary.
*
* @param fsOp
@@ -651,6 +160,11 @@ private String processFS(FileSinkOperator fsOp, Stack stack,
NodeProcessorCtx opProcCtx, boolean chDir) throws SemanticException {
GenMRProcContext ctx = (GenMRProcContext) opProcCtx;
+ Task extends Serializable> currTask = ctx.getCurrTask();
+
+ // If the directory needs to be changed, send the new directory
+ String dest = null;
+
List seenFSOps = ctx.getSeenFileSinkOps();
if (seenFSOps == null) {
seenFSOps = new ArrayList();
@@ -660,49 +174,14 @@ private String processFS(FileSinkOperator fsOp, Stack stack,
}
ctx.setSeenFileSinkOps(seenFSOps);
- Task extends Serializable> currTask = ctx.getCurrTask();
-
- // If the directory needs to be changed, send the new directory
- String dest = null;
-
- if (chDir) {
- dest = fsOp.getConf().getFinalDirName();
-
- // generate the temporary file
- // it must be on the same file system as the current destination
- ParseContext parseCtx = ctx.getParseCtx();
- Context baseCtx = parseCtx.getContext();
- String tmpDir = baseCtx.getExternalTmpFileURI((new Path(dest)).toUri());
-
- FileSinkDesc fileSinkDesc = fsOp.getConf();
- // Change all the linked file sink descriptors
- if (fileSinkDesc.isLinkedFileSink()) {
- for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) {
- String fileName = Utilities.getFileNameFromDirName(fsConf.getDirName());
- fsConf.setParentDir(tmpDir);
- fsConf.setDirName(tmpDir + Path.SEPARATOR + fileName);
- }
- } else {
- fileSinkDesc.setDirName(tmpDir);
- }
- }
-
- Task mvTask = null;
-
- if (!chDir) {
- mvTask = findMoveTask(ctx.getMvTask(), fsOp);
- }
+ dest = GenMapRedUtils.createMoveTask(ctx.getCurrTask(), chDir, fsOp, ctx.getParseCtx(),
+ ctx.getMvTask(), ctx.getConf(), ctx.getDependencyTaskForMultiInsert());
Operator extends OperatorDesc> currTopOp = ctx.getCurrTopOp();
String currAliasId = ctx.getCurrAliasId();
HashMap, Task extends Serializable>> opTaskMap =
ctx.getOpTaskMap();
- // Set the move task to be dependent on the current task
- if (mvTask != null) {
- addDependentMoveTasks(ctx, mvTask, currTask);
- }
-
// In case of multi-table insert, the path to alias mapping is needed for
// all the sources. Since there is no
// reducer, treat it as a plan with null reducer
diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index ec51d59..4ad1699 100644
--- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -33,11 +33,15 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.DemuxOperator;
+import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MoveTask;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -50,6 +54,8 @@
import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.optimizer.GenMRProcContext.GenMRUnionCtx;
@@ -61,20 +67,31 @@
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverMergeFiles.ConditionalResolverMergeFilesCtx;
+import org.apache.hadoop.hive.ql.plan.ConditionalWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc.sampleDesc;
+import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.StatsWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.InputFormat;
/**
* General utility common functions for the Processor to convert operator into
@@ -1107,6 +1124,571 @@ public static void replaceMapWork(String sourceAlias, String targetAlias,
}
}
+ /**
+ * @param fsInput The FileSink operator.
+ * @param ctx The MR processing context.
+ * @param finalName the final destination path the merge job should output.
+ * @param dependencyTask
+ * @param mvTasks
+ * @param conf
+ * @param currTask
+ * @throws SemanticException
+
+ * create a Map-only merge job using CombineHiveInputFormat for all partitions with
+ * following operators:
+ * MR job J0:
+ * ...
+ * |
+ * v
+ * FileSinkOperator_1 (fsInput)
+ * |
+ * v
+ * Merge job J1:
+ * |
+ * v
+ * TableScan (using CombineHiveInputFormat) (tsMerge)
+ * |
+ * v
+ * FileSinkOperator (fsMerge)
+ *
+ * Here the pathToPartitionInfo & pathToAlias will remain the same, which means the paths
+ * do
+ * not contain the dynamic partitions (their parent). So after the dynamic partitions are
+ * created (after the first job finished before the moveTask or ConditionalTask start),
+ * we need to change the pathToPartitionInfo & pathToAlias to include the dynamic
+ * partition
+ * directories.
+ *
+ */
+ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput,
+ String finalName, DependencyCollectionTask dependencyTask,
+ List> mvTasks, HiveConf conf,
+ Task extends Serializable> currTask) throws SemanticException {
+
+ //
+ // 1. create the operator tree
+ //
+ FileSinkDesc fsInputDesc = fsInput.getConf();
+
+ // Create a TableScan operator
+ RowSchema inputRS = fsInput.getSchema();
+ Operator extends OperatorDesc> tsMerge =
+ GenMapRedUtils.createTemporaryTableScanOperator(inputRS);
+
+ // Create a FileSink operator
+ TableDesc ts = (TableDesc) fsInputDesc.getTableInfo().clone();
+ FileSinkDesc fsOutputDesc = new FileSinkDesc(finalName, ts,
+ conf.getBoolVar(ConfVars.COMPRESSRESULT));
+ FileSinkOperator fsOutput = (FileSinkOperator) OperatorFactory.getAndMakeChild(
+ fsOutputDesc, inputRS, tsMerge);
+
+ // If the input FileSinkOperator is a dynamic partition enabled, the tsMerge input schema
+ // needs to include the partition column, and the fsOutput should have
+ // a DynamicPartitionCtx to indicate that it needs to dynamically partitioned.
+ DynamicPartitionCtx dpCtx = fsInputDesc.getDynPartCtx();
+ if (dpCtx != null && dpCtx.getNumDPCols() > 0) {
+ // adding DP ColumnInfo to the RowSchema signature
+ ArrayList signature = inputRS.getSignature();
+ String tblAlias = fsInputDesc.getTableInfo().getTableName();
+ LinkedHashMap colMap = new LinkedHashMap();
+ StringBuilder partCols = new StringBuilder();
+ for (String dpCol : dpCtx.getDPColNames()) {
+ ColumnInfo colInfo = new ColumnInfo(dpCol,
+ TypeInfoFactory.stringTypeInfo, // all partition column type should be string
+ tblAlias, true); // partition column is virtual column
+ signature.add(colInfo);
+ colMap.put(dpCol, dpCol); // input and output have the same column name
+ partCols.append(dpCol).append('/');
+ }
+ partCols.setLength(partCols.length() - 1); // remove the last '/'
+ inputRS.setSignature(signature);
+
+ // create another DynamicPartitionCtx, which has a different input-to-DP column mapping
+ DynamicPartitionCtx dpCtx2 = new DynamicPartitionCtx(dpCtx);
+ dpCtx2.setInputToDPCols(colMap);
+ fsOutputDesc.setDynPartCtx(dpCtx2);
+
+ // update the FileSinkOperator to include partition columns
+ fsInputDesc.getTableInfo().getProperties().setProperty(
+ org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS,
+ partCols.toString()); // list of dynamic partition column names
+ } else {
+ // non-partitioned table
+ fsInputDesc.getTableInfo().getProperties().remove(
+ org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
+ }
+
+ //
+ // 2. Constructing a conditional task consisting of a move task and a map reduce task
+ //
+ MoveWork dummyMv = new MoveWork(null, null, null,
+ new LoadFileDesc(fsInputDesc.getFinalDirName(), finalName, true, null, null), false);
+ MapWork cplan;
+ Serializable work;
+
+ if (conf.getBoolVar(ConfVars.HIVEMERGERCFILEBLOCKLEVEL) &&
+ fsInputDesc.getTableInfo().getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+
+ // Check if InputFormatClass is valid
+ String inputFormatClass = conf.getVar(ConfVars.HIVEMERGEINPUTFORMATBLOCKLEVEL);
+ try {
+ Class c = (Class extends InputFormat>) Class.forName(inputFormatClass);
+
+ LOG.info("RCFile format- Using block level merge");
+ cplan = GenMapRedUtils.createRCFileMergeTask(fsInputDesc, finalName,
+ dpCtx != null && dpCtx.getNumDPCols() > 0);
+ work = cplan;
+ } catch (ClassNotFoundException e) {
+ String msg = "Illegal input format class: " + inputFormatClass;
+ throw new SemanticException(msg);
+ }
+
+ } else {
+ cplan = createMRWorkForMergingFiles(conf, tsMerge, fsInputDesc);
+ work = new MapredWork();
+ ((MapredWork)work).setMapWork(cplan);
+ // use CombineHiveInputFormat for map-only merging
+ }
+ cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
+ // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't
+ // know if merge MR2 will be triggered at execution time
+ ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work,
+ fsInputDesc.getFinalDirName());
+
+ // keep the dynamic partition context in conditional task resolver context
+ ConditionalResolverMergeFilesCtx mrCtx =
+ (ConditionalResolverMergeFilesCtx) cndTsk.getResolverCtx();
+ mrCtx.setDPCtx(fsInputDesc.getDynPartCtx());
+ mrCtx.setLbCtx(fsInputDesc.getLbCtx());
+
+ //
+ // 3. add the moveTask as the children of the conditional task
+ //
+ linkMoveTask(fsOutput, cndTsk, mvTasks, conf, dependencyTask);
+ }
+
+ /**
+ * Make the move task in the GenMRProcContext following the FileSinkOperator a dependent of all
+ * possible subtrees branching from the ConditionalTask.
+ *
+ * @param newOutput
+ * @param cndTsk
+ * @param mvTasks
+ * @param hconf
+ * @param dependencyTask
+ */
+ public static void linkMoveTask(FileSinkOperator newOutput,
+ ConditionalTask cndTsk, List> mvTasks, HiveConf hconf,
+ DependencyCollectionTask dependencyTask) {
+
+ Task mvTask = GenMapRedUtils.findMoveTask(mvTasks, newOutput);
+
+ for (Task extends Serializable> tsk : cndTsk.getListTasks()) {
+ linkMoveTask(mvTask, tsk, hconf, dependencyTask);
+ }
+ }
+
+ /**
+ * Follows the task tree down from task and makes all leaves parents of mvTask
+ *
+ * @param mvTask
+ * @param task
+ * @param hconf
+ * @param dependencyTask
+ */
+ public static void linkMoveTask(Task mvTask,
+ Task extends Serializable> task, HiveConf hconf,
+ DependencyCollectionTask dependencyTask) {
+
+ if (task.getDependentTasks() == null || task.getDependentTasks().isEmpty()) {
+ // If it's a leaf, add the move task as a child
+ addDependentMoveTasks(mvTask, hconf, task, dependencyTask);
+ } else {
+ // Otherwise, for each child run this method recursively
+ for (Task extends Serializable> childTask : task.getDependentTasks()) {
+ linkMoveTask(mvTask, childTask, hconf, dependencyTask);
+ }
+ }
+ }
+
+ /**
+ * Adds the dependencyTaskForMultiInsert in ctx as a dependent of parentTask. If mvTask is a
+ * load table, and HIVE_MULTI_INSERT_ATOMIC_OUTPUTS is set, adds mvTask as a dependent of
+ * dependencyTaskForMultiInsert in ctx, otherwise adds mvTask as a dependent of parentTask as
+ * well.
+ *
+ * @param mvTask
+ * @param hconf
+ * @param parentTask
+ * @param dependencyTask
+ */
+ public static void addDependentMoveTasks(Task mvTask, HiveConf hconf,
+ Task extends Serializable> parentTask, DependencyCollectionTask dependencyTask) {
+
+ if (mvTask != null) {
+ if (hconf.getBoolVar(ConfVars.HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES)) {
+ parentTask.addDependentTask(dependencyTask);
+ if (mvTask.getWork().getLoadTableWork() != null) {
+ // Moving tables/partitions depend on the dependencyTask
+ dependencyTask.addDependentTask(mvTask);
+ } else {
+ // Moving files depends on the parentTask (we still want the dependencyTask to depend
+ // on the parentTask)
+ parentTask.addDependentTask(mvTask);
+ }
+ } else {
+ parentTask.addDependentTask(mvTask);
+ }
+ }
+ }
+
+
+ /**
+ * Add the StatsTask as a dependent task of the MoveTask
+ * because StatsTask will change the Table/Partition metadata. For atomicity, we
+ * should not change it before the data is actually there done by MoveTask.
+ *
+ * @param nd
+ * the FileSinkOperator whose results are taken care of by the MoveTask.
+ * @param mvTask
+ * The MoveTask that moves the FileSinkOperator's results.
+ * @param currTask
+ * The MapRedTask that the FileSinkOperator belongs to.
+ * @param hconf
+ * HiveConf
+ */
+ public static void addStatsTask(FileSinkOperator nd, MoveTask mvTask,
+ Task extends Serializable> currTask, HiveConf hconf) {
+
+ MoveWork mvWork = ((MoveTask) mvTask).getWork();
+ StatsWork statsWork = null;
+ if (mvWork.getLoadTableWork() != null) {
+ statsWork = new StatsWork(mvWork.getLoadTableWork());
+ } else if (mvWork.getLoadFileWork() != null) {
+ statsWork = new StatsWork(mvWork.getLoadFileWork());
+ }
+ assert statsWork != null : "Error when genereting StatsTask";
+ statsWork.setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
+
+ if (currTask.getWork() instanceof MapredWork) {
+ MapredWork mrWork = (MapredWork) currTask.getWork();
+ mrWork.getMapWork().setGatheringStats(true);
+ if (mrWork.getReduceWork() != null) {
+ mrWork.getReduceWork().setGatheringStats(true);
+ }
+ } else {
+ TezWork work = (TezWork) currTask.getWork();
+ for (BaseWork w: work.getAllWork()) {
+ w.setGatheringStats(true);
+ }
+ }
+
+ // AggKey in StatsWork is used for stats aggregation while StatsAggPrefix
+ // in FileSinkDesc is used for stats publishing. They should be consistent.
+ statsWork.setAggKey(((FileSinkOperator) nd).getConf().getStatsAggPrefix());
+ Task extends Serializable> statsTask = TaskFactory.get(statsWork, hconf);
+
+ // mark the MapredWork and FileSinkOperator for gathering stats
+ nd.getConf().setGatherStats(true);
+ nd.getConf().setStatsReliable(hconf.getBoolVar(ConfVars.HIVE_STATS_RELIABLE));
+ nd.getConf().setMaxStatsKeyPrefixLength(
+ hconf.getIntVar(ConfVars.HIVE_STATS_KEY_PREFIX_MAX_LENGTH));
+ // mrWork.addDestinationTable(nd.getConf().getTableInfo().getTableName());
+
+ // subscribe feeds from the MoveTask so that MoveTask can forward the list
+ // of dynamic partition list to the StatsTask
+ mvTask.addDependentTask(statsTask);
+ statsTask.subscribeFeed(mvTask);
+ }
+
+ /**
+ * Returns true iff current query is an insert into for the given file sink
+ *
+ * @param parseCtx
+ * @param fsOp
+ * @return
+ */
+ public static boolean isInsertInto(ParseContext parseCtx, FileSinkOperator fsOp) {
+ return fsOp.getConf().getTableInfo().getTableName() != null &&
+ parseCtx.getQB().getParseInfo().isInsertToTable();
+ }
+
+ /**
+ * Create a MapredWork based on input path, the top operator and the input
+ * table descriptor.
+ *
+ * @param conf
+ * @param topOp
+ * the table scan operator that is the root of the MapReduce task.
+ * @param fsDesc
+ * the file sink descriptor that serves as the input to this merge task.
+ * @param parentMR
+ * the parent MapReduce work
+ * @param parentFS
+ * the last FileSinkOperator in the parent MapReduce work
+ * @return the MapredWork
+ */
+ private static MapWork createMRWorkForMergingFiles (HiveConf conf,
+ Operator extends OperatorDesc> topOp, FileSinkDesc fsDesc) {
+
+ ArrayList aliases = new ArrayList();
+ String inputDir = fsDesc.getFinalDirName();
+ TableDesc tblDesc = fsDesc.getTableInfo();
+ aliases.add(inputDir); // dummy alias: just use the input path
+
+ // constructing the default MapredWork
+ MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf);
+ MapWork cplan = cMrPlan.getMapWork();
+ cplan.getPathToAliases().put(inputDir, aliases);
+ cplan.getPathToPartitionInfo().put(inputDir, new PartitionDesc(tblDesc, null));
+ cplan.getAliasToWork().put(inputDir, topOp);
+ cplan.setMapperCannotSpanPartns(true);
+
+ return cplan;
+ }
+
+ /**
+ * Create a block level merge task for RCFiles.
+ *
+ * @param fsInputDesc
+ * @param finalName
+ * @return MergeWork if table is stored as RCFile,
+ * null otherwise
+ */
+ public static MapWork createRCFileMergeTask(FileSinkDesc fsInputDesc,
+ String finalName, boolean hasDynamicPartitions) throws SemanticException {
+
+ String inputDir = fsInputDesc.getFinalDirName();
+ TableDesc tblDesc = fsInputDesc.getTableInfo();
+
+ if (tblDesc.getInputFileFormatClass().equals(RCFileInputFormat.class)) {
+ ArrayList inputDirs = new ArrayList();
+ if (!hasDynamicPartitions
+ && !GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
+ inputDirs.add(inputDir);
+ }
+
+ MergeWork work = new MergeWork(inputDirs, finalName,
+ hasDynamicPartitions, fsInputDesc.getDynPartCtx());
+ LinkedHashMap> pathToAliases =
+ new LinkedHashMap>();
+ pathToAliases.put(inputDir, (ArrayList) inputDirs.clone());
+ work.setMapperCannotSpanPartns(true);
+ work.setPathToAliases(pathToAliases);
+ work.setAliasToWork(
+ new LinkedHashMap>());
+ if (hasDynamicPartitions
+ || GenMapRedUtils.isSkewedStoredAsDirs(fsInputDesc)) {
+ work.getPathToPartitionInfo().put(inputDir,
+ new PartitionDesc(tblDesc, null));
+ }
+ work.setListBucketingCtx(fsInputDesc.getLbCtx());
+
+ return work;
+ }
+
+ throw new SemanticException("createRCFileMergeTask called on non-RCFile table");
+ }
+
+ /**
+ * Construct a conditional task given the current leaf task, the MoveWork and the MapredWork.
+ *
+ * @param conf
+ * HiveConf
+ * @param currTask
+ * current leaf task
+ * @param mvWork
+ * MoveWork for the move task
+ * @param mergeWork
+ * MapredWork for the merge task.
+ * @param inputPath
+ * the input directory of the merge/move task
+ * @return The conditional task
+ */
+ @SuppressWarnings("unchecked")
+ public static ConditionalTask createCondTask(HiveConf conf,
+ Task extends Serializable> currTask, MoveWork mvWork,
+ Serializable mergeWork, String inputPath) {
+
+ // There are 3 options for this ConditionalTask:
+ // 1) Merge the partitions
+ // 2) Move the partitions (i.e. don't merge the partitions)
+ // 3) Merge some partitions and move other partitions (i.e. merge some partitions and don't
+ // merge others) in this case the merge is done first followed by the move to prevent
+ // conflicts.
+ Task extends Serializable> mergeOnlyMergeTask = TaskFactory.get(mergeWork, conf);
+ Task extends Serializable> moveOnlyMoveTask = TaskFactory.get(mvWork, conf);
+ Task extends Serializable> mergeAndMoveMergeTask = TaskFactory.get(mergeWork, conf);
+ Task extends Serializable> mergeAndMoveMoveTask = TaskFactory.get(mvWork, conf);
+
+ // NOTE! It is necessary merge task is the parent of the move task, and not
+ // the other way around, for the proper execution of the execute method of
+ // ConditionalTask
+ mergeAndMoveMergeTask.addDependentTask(mergeAndMoveMoveTask);
+
+ List listWorks = new ArrayList();
+ listWorks.add(mvWork);
+ listWorks.add(mergeWork);
+
+ ConditionalWork cndWork = new ConditionalWork(listWorks);
+
+ List> listTasks = new ArrayList>();
+ listTasks.add(moveOnlyMoveTask);
+ listTasks.add(mergeOnlyMergeTask);
+ listTasks.add(mergeAndMoveMergeTask);
+
+ ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, conf);
+ cndTsk.setListTasks(listTasks);
+
+ // create resolver
+ cndTsk.setResolver(new ConditionalResolverMergeFiles());
+ ConditionalResolverMergeFilesCtx mrCtx =
+ new ConditionalResolverMergeFilesCtx(listTasks, inputPath);
+ cndTsk.setResolverCtx(mrCtx);
+
+ // make the conditional task as the child of the current leaf task
+ currTask.addDependentTask(cndTsk);
+
+ return cndTsk;
+ }
+
+ /**
+ * check if it is skewed table and stored as dirs.
+ *
+ * @param fsInputDesc
+ * @return
+ */
+ public static boolean isSkewedStoredAsDirs(FileSinkDesc fsInputDesc) {
+ return (fsInputDesc.getLbCtx() == null) ? false : fsInputDesc.getLbCtx()
+ .isSkewedStoredAsDir();
+ }
+
+ public static Task findMoveTask(
+ List> mvTasks, FileSinkOperator fsOp) {
+ // find the move task
+ for (Task mvTsk : mvTasks) {
+ MoveWork mvWork = mvTsk.getWork();
+ String srcDir = null;
+ if (mvWork.getLoadFileWork() != null) {
+ srcDir = mvWork.getLoadFileWork().getSourceDir();
+ } else if (mvWork.getLoadTableWork() != null) {
+ srcDir = mvWork.getLoadTableWork().getSourceDir();
+ }
+
+ String fsOpDirName = fsOp.getConf().getFinalDirName();
+ if ((srcDir != null)
+ && (srcDir.equalsIgnoreCase(fsOpDirName))) {
+ return mvTsk;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Returns true iff the fsOp requires a merge
+ * @param mvTasks
+ * @param hconf
+ * @param fsOp
+ * @param currTask
+ * @param isInsertTable
+ * @return
+ */
+ public static boolean isMergeRequired(List> mvTasks, HiveConf hconf, FileSinkOperator fsOp,
+ Task extends Serializable> currTask, boolean isInsertTable) {
+
+ // Has the user enabled merging of files for map-only jobs or for all jobs
+ if ((mvTasks != null) && (!mvTasks.isEmpty())) {
+
+ // no need of merging if the move is to a local file system
+ MoveTask mvTask = (MoveTask) GenMapRedUtils.findMoveTask(mvTasks, fsOp);
+
+ if (mvTask != null && isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)) {
+ GenMapRedUtils.addStatsTask(fsOp, mvTask, currTask, hconf);
+ }
+
+ if ((mvTask != null) && !mvTask.isLocal() && fsOp.getConf().canBeMerged()) {
+ if (fsOp.getConf().isLinkedFileSink()) {
+ // If the user has HIVEMERGEMAPREDFILES set to false, the idea was the
+ // number of reducers are few, so the number of files anyway are small.
+ // However, with this optimization, we are increasing the number of files
+ // possibly by a big margin. So, merge aggresively.
+ if (hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) ||
+ hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES)) {
+ return true;
+ }
+ } else {
+ // There are separate configuration parameters to control whether to
+ // merge for a map-only job
+ // or for a map-reduce job
+ ReduceWork reduceWork = currTask.getWork() instanceof MapredWork
+ ? ((MapredWork) currTask.getWork()).getReduceWork() : null;
+ boolean mergeMapOnly =
+ hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null;
+ boolean mergeMapRed =
+ hconf.getBoolVar(ConfVars.HIVEMERGEMAPREDFILES) &&
+ reduceWork != null;
+ if (mergeMapOnly || mergeMapRed) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Create and add any dependent move tasks
+ *
+ * @param currTask
+ * @param chDir
+ * @param fsOp
+ * @param parseCtx
+ * @param mvTasks
+ * @param hconf
+ * @param dependencyTask
+ * @return
+ */
+ public static String createMoveTask(Task extends Serializable> currTask, boolean chDir,
+ FileSinkOperator fsOp, ParseContext parseCtx, List> mvTasks,
+ HiveConf hconf, DependencyCollectionTask dependencyTask) {
+
+ String dest = null;
+
+ if (chDir) {
+ dest = fsOp.getConf().getFinalDirName();
+
+ // generate the temporary file
+ // it must be on the same file system as the current destination
+ Context baseCtx = parseCtx.getContext();
+ String tmpDir = baseCtx.getExternalTmpFileURI((new Path(dest)).toUri());
+
+ FileSinkDesc fileSinkDesc = fsOp.getConf();
+ // Change all the linked file sink descriptors
+ if (fileSinkDesc.isLinkedFileSink()) {
+ for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) {
+ String fileName = Utilities.getFileNameFromDirName(fsConf.getDirName());
+ fsConf.setParentDir(tmpDir);
+ fsConf.setDirName(tmpDir + Path.SEPARATOR + fileName);
+ }
+ } else {
+ fileSinkDesc.setDirName(tmpDir);
+ }
+ }
+
+ Task mvTask = null;
+
+ if (!chDir) {
+ mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOp);
+ }
+
+ // Set the move task to be dependent on the current task
+ if (mvTask != null) {
+ GenMapRedUtils.addDependentMoveTasks(mvTask, hconf, currTask, dependencyTask);
+ }
+
+ return dest;
+ }
+
private GenMapRedUtils() {
// prevent instantiation
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
new file mode 100644
index 0000000..b1e6f04
--- /dev/null
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/FileSinkProcessor.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse;
+
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+
+/**
+ * FileSinkProcessor handles addition of merge, move and stats tasks for filesinks
+ *
+ */
+public class FileSinkProcessor implements NodeProcessor {
+
+ static final private Log LOG = LogFactory.getLog(FileSinkProcessor.class.getName());
+
+ @Override
+ /*
+ * (non-Javadoc)
+ * we should ideally not modify the tree we traverse.
+ * However, since we need to walk the tree at any time when we modify the
+ * operator, we might as well do it here.
+ */
+ public Object process(Node nd, Stack stack,
+ NodeProcessorCtx procCtx, Object... nodeOutputs)
+ throws SemanticException {
+
+ GenTezProcContext context = (GenTezProcContext) procCtx;
+ FileSinkOperator fileSink = (FileSinkOperator) nd;
+ ParseContext parseContext = context.parseContext;
+
+
+ boolean isInsertTable = // is INSERT OVERWRITE TABLE
+ GenMapRedUtils.isInsertInto(parseContext, fileSink);
+ HiveConf hconf = parseContext.getConf();
+
+ boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask,
+ hconf, fileSink, context.currentTask, isInsertTable);
+
+ String finalName = GenMapRedUtils.createMoveTask(context.currentTask,
+ chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask);
+
+ if (chDir) {
+ // Merge the files in the destination table/partitions by creating Map-only merge job
+ // If underlying data is RCFile a RCFileBlockMerge task would be created.
+ LOG.info("using CombineHiveInputformat for the merge job");
+ GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
+ context.dependencyTask, context.moveTask,
+ hconf, context.currentTask);
+ }
+
+ return true;
+ }
+}
\ No newline at end of file
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
index d78f39a..1f07594 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
@@ -26,6 +26,7 @@
import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -35,6 +36,7 @@
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TezWork;
@@ -97,6 +99,9 @@
// remember the dummy ops we created
public final Map, List>> linkChildOpWithDummyOp;
+ // used to group dependent tasks for multi table inserts
+ public final DependencyCollectionTask dependencyTask;
+
@SuppressWarnings("unchecked")
public GenTezProcContext(HiveConf conf, ParseContext parseContext,
List> moveTask, List> rootTasks,
@@ -115,5 +120,7 @@ public GenTezProcContext(HiveConf conf, ParseContext parseContext,
this.operatorWorkMap = new HashMap, BaseWork>();
this.mapJoinParentMap = new HashMap>>();
this.linkChildOpWithDummyOp = new HashMap, List>>();
+ this.dependencyTask = (DependencyCollectionTask)
+ TaskFactory.get(new DependencyCollectionWork(), conf);
}
}
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
index 41fb7d1..fe7359e 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
@@ -71,6 +71,12 @@ public Object process(Node nd, Stack stack,
// packing into a vertex, typically a table scan, union or join
Operator> root = context.currentRootOperator;
if (root == null) {
+ // if there are no more rootOperators we're dealing with multiple
+ // file sinks off of the same table scan. Bail.
+ if (context.rootOperators.isEmpty()) {
+ return null;
+ }
+
// null means that we're starting with a new table scan
// the graph walker walks the rootOperators in the same
// order so we can just take the next
diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
index fdc072d..1fe52d2 100644
--- ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
+++ ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.lib.CompositeProcessor;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
import org.apache.hadoop.hive.ql.lib.GraphWalker;
@@ -126,16 +127,17 @@ protected void generateTaskTree(List> rootTasks, Pa
// the operator stack.
// The dispatcher generates the plan from the operator tree
Map opRules = new LinkedHashMap();
- opRules.put(new RuleRegExp(new String("Split Work - ReduceSink"),
+ opRules.put(new RuleRegExp("Split Work - ReduceSink",
ReduceSinkOperator.getOperatorName() + "%"),
genTezWork);
- opRules.put(new RuleRegExp(new String("No more walking on ReduceSink-MapJoin"),
+
+ opRules.put(new RuleRegExp("No more walking on ReduceSink-MapJoin",
ReduceSinkOperator.getOperatorName() + "%" +
MapJoinOperator.getOperatorName() + "%"), new ReduceSinkMapJoinProc());
- opRules.put(new RuleRegExp(new String("Split Work - FileSink"),
- FileSinkOperator.getOperatorName() + "%"),
- genTezWork);
+ opRules.put(new RuleRegExp("Split Work + Move/Merge - FileSink",
+ FileSinkOperator.getOperatorName() + "%"),
+ new CompositeProcessor(new FileSinkProcessor(), genTezWork));
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
diff --git ql/src/test/queries/clientpositive/tez_dml.q ql/src/test/queries/clientpositive/tez_dml.q
new file mode 100644
index 0000000..b4ca8be
--- /dev/null
+++ ql/src/test/queries/clientpositive/tez_dml.q
@@ -0,0 +1,37 @@
+set hive.optimize.tez=true;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
+-- CTAS
+EXPLAIN CREATE TABLE tmp_src AS SELECT * FROM (SELECT value, count(value) AS cnt FROM src GROUP BY value) f1 ORDER BY cnt;
+CREATE TABLE tmp_src AS SELECT * FROM (SELECT value, count(value) AS cnt FROM src GROUP BY value) f1 ORDER BY cnt;
+
+SELECT * FROM tmp_src;
+
+-- dyn partitions
+CREATE TABLE tmp_src_part (c string) PARTITIONED BY (d int);
+EXPLAIN INSERT INTO TABLE tmp_src_part PARTITION (d) SELECT * FROM tmp_src;
+INSERT INTO TABLE tmp_src_part PARTITION (d) SELECT * FROM tmp_src;
+
+SELECT * FROM tmp_src_part;
+
+-- multi insert
+CREATE TABLE even (c int, d string);
+CREATE TABLE odd (c int, d string);
+
+EXPLAIN
+FROM src
+INSERT INTO TABLE even SELECT key, value WHERE key % 2 = 0
+INSERT INTO TABLE odd SELECT key, value WHERE key % 2 = 1;
+
+FROM src
+INSERT INTO TABLE even SELECT key, value WHERE key % 2 = 0
+INSERT INTO TABLE odd SELECT key, value WHERE key % 2 = 1;
+
+SELECT * FROM even;
+SELECT * FROM odd;
+
+-- drop the tables
+DROP TABLE even;
+DROP TABLE odd;
+DROP TABLE tmp_src;
+DROP TABLE tmp_src_part;
diff --git ql/src/test/results/clientpositive/tez_dml.q.out ql/src/test/results/clientpositive/tez_dml.q.out
new file mode 100644
index 0000000..e5a140e
--- /dev/null
+++ ql/src/test/results/clientpositive/tez_dml.q.out
@@ -0,0 +1,1778 @@
+PREHOOK: query: -- CTAS
+EXPLAIN CREATE TABLE tmp_src AS SELECT * FROM (SELECT value, count(value) AS cnt FROM src GROUP BY value) f1 ORDER BY cnt
+PREHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: query: -- CTAS
+EXPLAIN CREATE TABLE tmp_src AS SELECT * FROM (SELECT value, count(value) AS cnt FROM src GROUP BY value) f1 ORDER BY cnt
+POSTHOOK: type: CREATETABLE_AS_SELECT
+ABSTRACT SYNTAX TREE:
+ (TOK_CREATETABLE (TOK_TABNAME tmp_src) TOK_LIKETABLE (TOK_QUERY (TOK_FROM (TOK_SUBQUERY (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL value)) (TOK_SELEXPR (TOK_FUNCTION count (TOK_TABLE_OR_COL value)) cnt)) (TOK_GROUPBY (TOK_TABLE_OR_COL value)))) f1)) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF)) (TOK_ORDERBY (TOK_TABSORTCOLNAMEASC (TOK_TABLE_OR_COL cnt))))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
+ Stage-5
+ Stage-0 depends on stages: Stage-5, Stage-4, Stage-7
+ Stage-9 depends on stages: Stage-0
+ Stage-3 depends on stages: Stage-9
+ Stage-4
+ Stage-6
+ Stage-7 depends on stages: Stage-6
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+ Alias -> Map Operator Tree:
+ src
+ TableScan
+ alias: src
+ Select Operator
+ expressions:
+ expr: value
+ type: string
+ outputColumnNames: value
+ Group By Operator
+ aggregations:
+ expr: count(value)
+ bucketGroup: false
+ keys:
+ expr: value
+ type: string
+ mode: hash
+ outputColumnNames: _col0, _col1
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: string
+ sort order: +
+ Map-reduce partition columns:
+ expr: _col0
+ type: string
+ tag: -1
+ value expressions:
+ expr: _col1
+ type: bigint
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations:
+ expr: count(VALUE._col0)
+ bucketGroup: false
+ keys:
+ expr: KEY._col0
+ type: string
+ mode: mergepartial
+ outputColumnNames: _col0, _col1
+ Select Operator
+ expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: bigint
+ outputColumnNames: _col0, _col1
+ Reduce Output Operator
+ key expressions:
+ expr: _col1
+ type: bigint
+ sort order: +
+ tag: -1
+ value expressions:
+ expr: _col0
+ type: string
+ expr: _col1
+ type: bigint
+ Reduce Operator Tree:
+ Extract
+ File Output Operator
+ compressed: false
+ GlobalTableId: 1
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.tmp_src
+
+ Stage: Stage-8
+ Conditional Operator
+
+ Stage: Stage-5
+ Move Operator
+ files:
+ hdfs directory: true
+#### A masked pattern was here ####
+
+ Stage: Stage-0
+ Move Operator
+ files:
+ hdfs directory: true
+#### A masked pattern was here ####
+
+ Stage: Stage-9
+ Create Table Operator:
+ Create Table
+ columns: value string, cnt bigint
+ if not exists: false
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ # buckets: -1
+ output format: org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat
+ name: tmp_src
+ isExternal: false
+
+ Stage: Stage-3
+ Stats-Aggr Operator
+
+ Stage: Stage-4
+ Map Reduce
+ Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+ TableScan
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.tmp_src
+
+ Stage: Stage-6
+ Map Reduce
+ Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+ TableScan
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.tmp_src
+
+ Stage: Stage-7
+ Move Operator
+ files:
+ hdfs directory: true
+#### A masked pattern was here ####
+
+
+PREHOOK: query: CREATE TABLE tmp_src AS SELECT * FROM (SELECT value, count(value) AS cnt FROM src GROUP BY value) f1 ORDER BY cnt
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@src
+POSTHOOK: query: CREATE TABLE tmp_src AS SELECT * FROM (SELECT value, count(value) AS cnt FROM src GROUP BY value) f1 ORDER BY cnt
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@tmp_src
+PREHOOK: query: SELECT * FROM tmp_src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tmp_src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM tmp_src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tmp_src
+#### A masked pattern was here ####
+val_490 1
+val_287 1
+val_286 1
+val_285 1
+val_284 1
+val_283 1
+val_114 1
+val_487 1
+val_485 1
+val_28 1
+val_484 1
+val_181 1
+val_275 1
+val_274 1
+val_183 1
+val_483 1
+val_27 1
+val_266 1
+val_482 1
+val_263 1
+val_262 1
+val_260 1
+val_481 1
+val_258 1
+val_257 1
+val_116 1
+val_479 1
+val_252 1
+val_249 1
+val_248 1
+val_247 1
+val_244 1
+val_92 1
+val_241 1
+val_477 1
+val_475 1
+val_472 1
+val_470 1
+val_235 1
+val_47 1
+val_186 1
+val_126 1
+val_228 1
+val_226 1
+val_131 1
+val_467 1
+val_222 1
+val_133 1
+val_82 1
+val_218 1
+val_80 1
+val_460 1
+val_214 1
+val_8 1
+val_78 1
+val_189 1
+val_457 1
+val_455 1
+val_136 1
+val_202 1
+val_201 1
+val_453 1
+val_20 1
+val_2 1
+val_19 1
+val_452 1
+val_196 1
+val_449 1
+val_194 1
+val_190 1
+val_192 1
+val_448 1
+val_446 1
+val_444 1
+val_443 1
+val_44 1
+val_77 1
+val_143 1
+val_437 1
+val_436 1
+val_435 1
+val_432 1
+val_145 1
+val_150 1
+val_43 1
+val_10 1
+val_427 1
+val_74 1
+val_421 1
+val_9 1
+val_419 1
+val_418 1
+val_153 1
+val_105 1
+val_69 1
+val_411 1
+val_41 1
+val_155 1
+val_407 1
+val_156 1
+val_87 1
+val_157 1
+val_402 1
+val_158 1
+val_400 1
+val_4 1
+val_66 1
+val_65 1
+val_160 1
+val_64 1
+val_394 1
+val_393 1
+val_392 1
+val_389 1
+val_386 1
+val_162 1
+val_86 1
+val_379 1
+val_378 1
+val_377 1
+val_375 1
+val_374 1
+val_373 1
+val_57 1
+val_163 1
+val_368 1
+val_54 1
+val_366 1
+val_365 1
+val_364 1
+val_362 1
+val_360 1
+val_356 1
+val_53 1
+val_351 1
+val_166 1
+val_168 1
+val_345 1
+val_85 1
+val_11 1
+val_341 1
+val_34 1
+val_339 1
+val_338 1
+val_336 1
+val_335 1
+val_111 1
+val_332 1
+val_497 1
+val_33 1
+val_17 1
+val_496 1
+val_323 1
+val_495 1
+val_494 1
+val_170 1
+val_493 1
+val_177 1
+val_315 1
+val_178 1
+val_310 1
+val_96 1
+val_308 1
+val_491 1
+val_306 1
+val_305 1
+val_302 1
+val_30 1
+val_180 1
+val_296 1
+val_292 1
+val_291 1
+val_289 1
+val_98 2
+val_97 2
+val_95 2
+val_84 2
+val_83 2
+val_76 2
+val_72 2
+val_67 2
+val_58 2
+val_51 2
+val_492 2
+val_478 2
+val_463 2
+val_462 2
+val_459 2
+val_458 2
+val_439 2
+val_429 2
+val_424 2
+val_42 2
+val_414 2
+val_413 2
+val_404 2
+val_399 2
+val_397 2
+val_395 2
+val_382 2
+val_37 2
+val_367 2
+val_353 2
+val_344 2
+val_342 2
+val_333 2
+val_331 2
+val_325 2
+val_322 2
+val_321 2
+val_317 2
+val_309 2
+val_307 2
+val_288 2
+val_282 2
+val_281 2
+val_280 2
+val_278 2
+val_272 2
+val_265 2
+val_26 2
+val_256 2
+val_255 2
+val_242 2
+val_24 2
+val_239 2
+val_238 2
+val_237 2
+val_233 2
+val_229 2
+val_224 2
+val_223 2
+val_221 2
+val_219 2
+val_217 2
+val_216 2
+val_213 2
+val_209 2
+val_207 2
+val_205 2
+val_203 2
+val_200 2
+val_197 2
+val_195 2
+val_191 2
+val_18 2
+val_179 2
+val_176 2
+val_175 2
+val_174 2
+val_172 2
+val_165 2
+val_164 2
+val_152 2
+val_15 2
+val_149 2
+val_146 2
+val_137 2
+val_134 2
+val_129 2
+val_125 2
+val_120 2
+val_12 2
+val_118 2
+val_113 2
+val_104 2
+val_103 2
+val_100 2
+val_498 3
+val_369 3
+val_384 3
+val_396 3
+val_403 3
+val_409 3
+val_417 3
+val_5 3
+val_430 3
+val_70 3
+val_119 3
+val_0 3
+val_431 3
+val_438 3
+val_480 3
+val_193 3
+val_199 3
+val_208 3
+val_187 3
+val_273 3
+val_298 3
+val_454 3
+val_311 3
+val_316 3
+val_466 3
+val_90 3
+val_128 3
+val_318 3
+val_327 3
+val_167 3
+val_35 3
+val_468 4
+val_489 4
+val_406 4
+val_169 4
+val_138 4
+val_277 4
+val_469 5
+val_401 5
+val_230 5
+val_348 5
+PREHOOK: query: -- dyn partitions
+CREATE TABLE tmp_src_part (c string) PARTITIONED BY (d int)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: -- dyn partitions
+CREATE TABLE tmp_src_part (c string) PARTITIONED BY (d int)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@tmp_src_part
+PREHOOK: query: EXPLAIN INSERT INTO TABLE tmp_src_part PARTITION (d) SELECT * FROM tmp_src
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN INSERT INTO TABLE tmp_src_part PARTITION (d) SELECT * FROM tmp_src
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME tmp_src))) (TOK_INSERT (TOK_INSERT_INTO (TOK_TAB (TOK_TABNAME tmp_src_part) (TOK_PARTSPEC (TOK_PARTVAL d)))) (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF))))
+
+STAGE DEPENDENCIES:
+ Stage-1 is a root stage
+ Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6
+ Stage-5
+ Stage-0 depends on stages: Stage-5, Stage-4, Stage-7
+ Stage-3 depends on stages: Stage-0
+ Stage-4
+ Stage-6
+ Stage-7 depends on stages: Stage-6
+
+STAGE PLANS:
+ Stage: Stage-1
+ Tez
+ Alias -> Map Operator Tree:
+ tmp_src
+ TableScan
+ alias: tmp_src
+ Select Operator
+ expressions:
+ expr: value
+ type: string
+ expr: cnt
+ type: bigint
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 1
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.tmp_src_part
+
+ Stage: Stage-8
+ Conditional Operator
+
+ Stage: Stage-5
+ Move Operator
+ files:
+ hdfs directory: true
+#### A masked pattern was here ####
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ partition:
+ d
+ replace: false
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.tmp_src_part
+
+ Stage: Stage-3
+ Stats-Aggr Operator
+
+ Stage: Stage-4
+ Map Reduce
+ Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+ TableScan
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.tmp_src_part
+
+ Stage: Stage-6
+ Map Reduce
+ Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+ TableScan
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.tmp_src_part
+
+ Stage: Stage-7
+ Move Operator
+ files:
+ hdfs directory: true
+#### A masked pattern was here ####
+
+
+PREHOOK: query: INSERT INTO TABLE tmp_src_part PARTITION (d) SELECT * FROM tmp_src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tmp_src
+PREHOOK: Output: default@tmp_src_part
+POSTHOOK: query: INSERT INTO TABLE tmp_src_part PARTITION (d) SELECT * FROM tmp_src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tmp_src
+POSTHOOK: Output: default@tmp_src_part@d=1
+POSTHOOK: Output: default@tmp_src_part@d=2
+POSTHOOK: Output: default@tmp_src_part@d=3
+POSTHOOK: Output: default@tmp_src_part@d=4
+POSTHOOK: Output: default@tmp_src_part@d=5
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+PREHOOK: query: SELECT * FROM tmp_src_part
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tmp_src_part
+PREHOOK: Input: default@tmp_src_part@d=1
+PREHOOK: Input: default@tmp_src_part@d=2
+PREHOOK: Input: default@tmp_src_part@d=3
+PREHOOK: Input: default@tmp_src_part@d=4
+PREHOOK: Input: default@tmp_src_part@d=5
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM tmp_src_part
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tmp_src_part
+POSTHOOK: Input: default@tmp_src_part@d=1
+POSTHOOK: Input: default@tmp_src_part@d=2
+POSTHOOK: Input: default@tmp_src_part@d=3
+POSTHOOK: Input: default@tmp_src_part@d=4
+POSTHOOK: Input: default@tmp_src_part@d=5
+#### A masked pattern was here ####
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+val_490 1
+val_287 1
+val_286 1
+val_285 1
+val_284 1
+val_283 1
+val_114 1
+val_487 1
+val_485 1
+val_28 1
+val_484 1
+val_181 1
+val_275 1
+val_274 1
+val_183 1
+val_483 1
+val_27 1
+val_266 1
+val_482 1
+val_263 1
+val_262 1
+val_260 1
+val_481 1
+val_258 1
+val_257 1
+val_116 1
+val_479 1
+val_252 1
+val_249 1
+val_248 1
+val_247 1
+val_244 1
+val_92 1
+val_241 1
+val_477 1
+val_475 1
+val_472 1
+val_470 1
+val_235 1
+val_47 1
+val_186 1
+val_126 1
+val_228 1
+val_226 1
+val_131 1
+val_467 1
+val_222 1
+val_133 1
+val_82 1
+val_218 1
+val_80 1
+val_460 1
+val_214 1
+val_8 1
+val_78 1
+val_189 1
+val_457 1
+val_455 1
+val_136 1
+val_202 1
+val_201 1
+val_453 1
+val_20 1
+val_2 1
+val_19 1
+val_452 1
+val_196 1
+val_449 1
+val_194 1
+val_190 1
+val_192 1
+val_448 1
+val_446 1
+val_444 1
+val_443 1
+val_44 1
+val_77 1
+val_143 1
+val_437 1
+val_436 1
+val_435 1
+val_432 1
+val_145 1
+val_150 1
+val_43 1
+val_10 1
+val_427 1
+val_74 1
+val_421 1
+val_9 1
+val_419 1
+val_418 1
+val_153 1
+val_105 1
+val_69 1
+val_411 1
+val_41 1
+val_155 1
+val_407 1
+val_156 1
+val_87 1
+val_157 1
+val_402 1
+val_158 1
+val_400 1
+val_4 1
+val_66 1
+val_65 1
+val_160 1
+val_64 1
+val_394 1
+val_393 1
+val_392 1
+val_389 1
+val_386 1
+val_162 1
+val_86 1
+val_379 1
+val_378 1
+val_377 1
+val_375 1
+val_374 1
+val_373 1
+val_57 1
+val_163 1
+val_368 1
+val_54 1
+val_366 1
+val_365 1
+val_364 1
+val_362 1
+val_360 1
+val_356 1
+val_53 1
+val_351 1
+val_166 1
+val_168 1
+val_345 1
+val_85 1
+val_11 1
+val_341 1
+val_34 1
+val_339 1
+val_338 1
+val_336 1
+val_335 1
+val_111 1
+val_332 1
+val_497 1
+val_33 1
+val_17 1
+val_496 1
+val_323 1
+val_495 1
+val_494 1
+val_170 1
+val_493 1
+val_177 1
+val_315 1
+val_178 1
+val_310 1
+val_96 1
+val_308 1
+val_491 1
+val_306 1
+val_305 1
+val_302 1
+val_30 1
+val_180 1
+val_296 1
+val_292 1
+val_291 1
+val_289 1
+val_98 2
+val_97 2
+val_95 2
+val_84 2
+val_83 2
+val_76 2
+val_72 2
+val_67 2
+val_58 2
+val_51 2
+val_492 2
+val_478 2
+val_463 2
+val_462 2
+val_459 2
+val_458 2
+val_439 2
+val_429 2
+val_424 2
+val_42 2
+val_414 2
+val_413 2
+val_404 2
+val_399 2
+val_397 2
+val_395 2
+val_382 2
+val_37 2
+val_367 2
+val_353 2
+val_344 2
+val_342 2
+val_333 2
+val_331 2
+val_325 2
+val_322 2
+val_321 2
+val_317 2
+val_309 2
+val_307 2
+val_288 2
+val_282 2
+val_281 2
+val_280 2
+val_278 2
+val_272 2
+val_265 2
+val_26 2
+val_256 2
+val_255 2
+val_242 2
+val_24 2
+val_239 2
+val_238 2
+val_237 2
+val_233 2
+val_229 2
+val_224 2
+val_223 2
+val_221 2
+val_219 2
+val_217 2
+val_216 2
+val_213 2
+val_209 2
+val_207 2
+val_205 2
+val_203 2
+val_200 2
+val_197 2
+val_195 2
+val_191 2
+val_18 2
+val_179 2
+val_176 2
+val_175 2
+val_174 2
+val_172 2
+val_165 2
+val_164 2
+val_152 2
+val_15 2
+val_149 2
+val_146 2
+val_137 2
+val_134 2
+val_129 2
+val_125 2
+val_120 2
+val_12 2
+val_118 2
+val_113 2
+val_104 2
+val_103 2
+val_100 2
+val_498 3
+val_369 3
+val_384 3
+val_396 3
+val_403 3
+val_409 3
+val_417 3
+val_5 3
+val_430 3
+val_70 3
+val_119 3
+val_0 3
+val_431 3
+val_438 3
+val_480 3
+val_193 3
+val_199 3
+val_208 3
+val_187 3
+val_273 3
+val_298 3
+val_454 3
+val_311 3
+val_316 3
+val_466 3
+val_90 3
+val_128 3
+val_318 3
+val_327 3
+val_167 3
+val_35 3
+val_468 4
+val_489 4
+val_406 4
+val_169 4
+val_138 4
+val_277 4
+val_469 5
+val_401 5
+val_230 5
+val_348 5
+PREHOOK: query: -- multi insert
+CREATE TABLE even (c int, d string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: -- multi insert
+CREATE TABLE even (c int, d string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@even
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+PREHOOK: query: CREATE TABLE odd (c int, d string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE odd (c int, d string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@odd
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+PREHOOK: query: EXPLAIN
+FROM src
+INSERT INTO TABLE even SELECT key, value WHERE key % 2 = 0
+INSERT INTO TABLE odd SELECT key, value WHERE key % 2 = 1
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+FROM src
+INSERT INTO TABLE even SELECT key, value WHERE key % 2 = 0
+INSERT INTO TABLE odd SELECT key, value WHERE key % 2 = 1
+POSTHOOK: type: QUERY
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+ABSTRACT SYNTAX TREE:
+ (TOK_QUERY (TOK_FROM (TOK_TABREF (TOK_TABNAME src))) (TOK_INSERT (TOK_INSERT_INTO (TOK_TAB (TOK_TABNAME even))) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL value))) (TOK_WHERE (= (% (TOK_TABLE_OR_COL key) 2) 0))) (TOK_INSERT (TOK_INSERT_INTO (TOK_TAB (TOK_TABNAME odd))) (TOK_SELECT (TOK_SELEXPR (TOK_TABLE_OR_COL key)) (TOK_SELEXPR (TOK_TABLE_OR_COL value))) (TOK_WHERE (= (% (TOK_TABLE_OR_COL key) 2) 1))))
+
+STAGE DEPENDENCIES:
+ Stage-2 is a root stage
+ Stage-9 depends on stages: Stage-2 , consists of Stage-6, Stage-5, Stage-7
+ Stage-6
+ Stage-0 depends on stages: Stage-6, Stage-5, Stage-8
+ Stage-4 depends on stages: Stage-0
+ Stage-5
+ Stage-7
+ Stage-8 depends on stages: Stage-7
+ Stage-15 depends on stages: Stage-2 , consists of Stage-12, Stage-11, Stage-13
+ Stage-12
+ Stage-1 depends on stages: Stage-12, Stage-11, Stage-14
+ Stage-10 depends on stages: Stage-1
+ Stage-11
+ Stage-13
+ Stage-14 depends on stages: Stage-13
+
+STAGE PLANS:
+ Stage: Stage-2
+ Tez
+ Alias -> Map Operator Tree:
+ src
+ TableScan
+ alias: src
+ Filter Operator
+ predicate:
+ expr: ((key % 2) = 0)
+ type: boolean
+ Select Operator
+ expressions:
+ expr: UDFToInteger(key)
+ type: int
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 1
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.even
+ Filter Operator
+ predicate:
+ expr: ((key % 2) = 1)
+ type: boolean
+ Select Operator
+ expressions:
+ expr: UDFToInteger(key)
+ type: int
+ expr: value
+ type: string
+ outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ GlobalTableId: 2
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.odd
+
+ Stage: Stage-9
+ Conditional Operator
+
+ Stage: Stage-6
+ Move Operator
+ files:
+ hdfs directory: true
+#### A masked pattern was here ####
+
+ Stage: Stage-0
+ Move Operator
+ tables:
+ replace: false
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.even
+
+ Stage: Stage-4
+ Stats-Aggr Operator
+
+ Stage: Stage-5
+ Map Reduce
+ Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+ TableScan
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.even
+
+ Stage: Stage-7
+ Map Reduce
+ Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+ TableScan
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.even
+
+ Stage: Stage-8
+ Move Operator
+ files:
+ hdfs directory: true
+#### A masked pattern was here ####
+
+ Stage: Stage-15
+ Conditional Operator
+
+ Stage: Stage-12
+ Move Operator
+ files:
+ hdfs directory: true
+#### A masked pattern was here ####
+
+ Stage: Stage-1
+ Move Operator
+ tables:
+ replace: false
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.odd
+
+ Stage: Stage-10
+ Stats-Aggr Operator
+
+ Stage: Stage-11
+ Map Reduce
+ Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+ TableScan
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.odd
+
+ Stage: Stage-13
+ Map Reduce
+ Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+ TableScan
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.odd
+
+ Stage: Stage-14
+ Move Operator
+ files:
+ hdfs directory: true
+#### A masked pattern was here ####
+
+
+PREHOOK: query: FROM src
+INSERT INTO TABLE even SELECT key, value WHERE key % 2 = 0
+INSERT INTO TABLE odd SELECT key, value WHERE key % 2 = 1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@even
+PREHOOK: Output: default@odd
+POSTHOOK: query: FROM src
+INSERT INTO TABLE even SELECT key, value WHERE key % 2 = 0
+INSERT INTO TABLE odd SELECT key, value WHERE key % 2 = 1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@even
+POSTHOOK: Output: default@odd
+POSTHOOK: Lineage: even.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: even.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: odd.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: odd.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+PREHOOK: query: SELECT * FROM even
+PREHOOK: type: QUERY
+PREHOOK: Input: default@even
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM even
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@even
+#### A masked pattern was here ####
+POSTHOOK: Lineage: even.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: even.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: odd.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: odd.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+238 val_238
+86 val_86
+278 val_278
+98 val_98
+484 val_484
+150 val_150
+224 val_224
+66 val_66
+128 val_128
+146 val_146
+406 val_406
+374 val_374
+152 val_152
+82 val_82
+166 val_166
+430 val_430
+252 val_252
+292 val_292
+338 val_338
+446 val_446
+394 val_394
+482 val_482
+174 val_174
+494 val_494
+466 val_466
+208 val_208
+174 val_174
+396 val_396
+162 val_162
+266 val_266
+342 val_342
+0 val_0
+128 val_128
+316 val_316
+302 val_302
+438 val_438
+170 val_170
+20 val_20
+378 val_378
+92 val_92
+72 val_72
+4 val_4
+280 val_280
+208 val_208
+356 val_356
+382 val_382
+498 val_498
+386 val_386
+192 val_192
+286 val_286
+176 val_176
+54 val_54
+138 val_138
+216 val_216
+430 val_430
+278 val_278
+176 val_176
+318 val_318
+332 val_332
+180 val_180
+284 val_284
+12 val_12
+230 val_230
+260 val_260
+404 val_404
+384 val_384
+272 val_272
+138 val_138
+84 val_84
+348 val_348
+466 val_466
+58 val_58
+8 val_8
+230 val_230
+208 val_208
+348 val_348
+24 val_24
+172 val_172
+42 val_42
+158 val_158
+496 val_496
+0 val_0
+322 val_322
+468 val_468
+454 val_454
+100 val_100
+298 val_298
+418 val_418
+96 val_96
+26 val_26
+230 val_230
+120 val_120
+404 val_404
+436 val_436
+156 val_156
+468 val_468
+308 val_308
+196 val_196
+288 val_288
+98 val_98
+282 val_282
+318 val_318
+318 val_318
+470 val_470
+316 val_316
+0 val_0
+490 val_490
+364 val_364
+118 val_118
+134 val_134
+282 val_282
+138 val_138
+238 val_238
+118 val_118
+72 val_72
+90 val_90
+10 val_10
+306 val_306
+224 val_224
+242 val_242
+392 val_392
+272 val_272
+242 val_242
+452 val_452
+226 val_226
+402 val_402
+396 val_396
+58 val_58
+336 val_336
+168 val_168
+34 val_34
+472 val_472
+322 val_322
+498 val_498
+160 val_160
+42 val_42
+430 val_430
+458 val_458
+78 val_78
+76 val_76
+492 val_492
+218 val_218
+228 val_228
+138 val_138
+30 val_30
+64 val_64
+468 val_468
+76 val_76
+74 val_74
+342 val_342
+230 val_230
+368 val_368
+296 val_296
+216 val_216
+344 val_344
+274 val_274
+116 val_116
+256 val_256
+70 val_70
+480 val_480
+288 val_288
+244 val_244
+438 val_438
+128 val_128
+432 val_432
+202 val_202
+316 val_316
+280 val_280
+2 val_2
+80 val_80
+44 val_44
+104 val_104
+466 val_466
+366 val_366
+406 val_406
+190 val_190
+406 val_406
+114 val_114
+258 val_258
+90 val_90
+262 val_262
+348 val_348
+424 val_424
+12 val_12
+396 val_396
+164 val_164
+454 val_454
+478 val_478
+298 val_298
+164 val_164
+424 val_424
+382 val_382
+70 val_70
+480 val_480
+24 val_24
+104 val_104
+70 val_70
+438 val_438
+414 val_414
+200 val_200
+360 val_360
+248 val_248
+444 val_444
+120 val_120
+230 val_230
+478 val_478
+178 val_178
+468 val_468
+310 val_310
+460 val_460
+480 val_480
+136 val_136
+172 val_172
+214 val_214
+462 val_462
+406 val_406
+454 val_454
+384 val_384
+256 val_256
+26 val_26
+134 val_134
+384 val_384
+18 val_18
+462 val_462
+492 val_492
+100 val_100
+298 val_298
+498 val_498
+146 val_146
+458 val_458
+362 val_362
+186 val_186
+348 val_348
+18 val_18
+344 val_344
+84 val_84
+28 val_28
+448 val_448
+152 val_152
+348 val_348
+194 val_194
+414 val_414
+222 val_222
+126 val_126
+90 val_90
+400 val_400
+200 val_200
+PREHOOK: query: SELECT * FROM odd
+PREHOOK: type: QUERY
+PREHOOK: Input: default@odd
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM odd
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@odd
+#### A masked pattern was here ####
+POSTHOOK: Lineage: even.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: even.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: odd.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: odd.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+311 val_311
+27 val_27
+165 val_165
+409 val_409
+255 val_255
+265 val_265
+193 val_193
+401 val_401
+273 val_273
+369 val_369
+213 val_213
+429 val_429
+469 val_469
+145 val_145
+495 val_495
+37 val_37
+327 val_327
+281 val_281
+277 val_277
+209 val_209
+15 val_15
+403 val_403
+417 val_417
+219 val_219
+287 val_287
+153 val_153
+193 val_193
+459 val_459
+237 val_237
+413 val_413
+207 val_207
+199 val_199
+399 val_399
+247 val_247
+417 val_417
+489 val_489
+377 val_377
+397 val_397
+309 val_309
+365 val_365
+439 val_439
+367 val_367
+325 val_325
+167 val_167
+195 val_195
+475 val_475
+17 val_17
+113 val_113
+155 val_155
+203 val_203
+339 val_339
+455 val_455
+311 val_311
+57 val_57
+205 val_205
+149 val_149
+345 val_345
+129 val_129
+489 val_489
+157 val_157
+221 val_221
+111 val_111
+47 val_47
+35 val_35
+427 val_427
+277 val_277
+399 val_399
+169 val_169
+125 val_125
+437 val_437
+469 val_469
+187 val_187
+459 val_459
+51 val_51
+103 val_103
+239 val_239
+213 val_213
+289 val_289
+221 val_221
+65 val_65
+311 val_311
+275 val_275
+137 val_137
+241 val_241
+83 val_83
+333 val_333
+181 val_181
+67 val_67
+489 val_489
+353 val_353
+373 val_373
+217 val_217
+411 val_411
+463 val_463
+431 val_431
+179 val_179
+129 val_129
+119 val_119
+197 val_197
+393 val_393
+199 val_199
+191 val_191
+165 val_165
+327 val_327
+205 val_205
+131 val_131
+51 val_51
+43 val_43
+469 val_469
+95 val_95
+481 val_481
+457 val_457
+197 val_197
+187 val_187
+409 val_409
+137 val_137
+369 val_369
+169 val_169
+413 val_413
+85 val_85
+77 val_77
+87 val_87
+179 val_179
+395 val_395
+419 val_419
+15 val_15
+307 val_307
+19 val_19
+435 val_435
+277 val_277
+273 val_273
+309 val_309
+389 val_389
+327 val_327
+369 val_369
+331 val_331
+401 val_401
+177 val_177
+5 val_5
+497 val_497
+317 val_317
+395 val_395
+35 val_35
+95 val_95
+11 val_11
+229 val_229
+233 val_233
+143 val_143
+195 val_195
+321 val_321
+119 val_119
+489 val_489
+41 val_41
+223 val_223
+149 val_149
+449 val_449
+453 val_453
+209 val_209
+69 val_69
+33 val_33
+103 val_103
+113 val_113
+367 val_367
+167 val_167
+219 val_219
+239 val_239
+485 val_485
+223 val_223
+263 val_263
+487 val_487
+401 val_401
+191 val_191
+5 val_5
+467 val_467
+229 val_229
+469 val_469
+463 val_463
+35 val_35
+283 val_283
+331 val_331
+235 val_235
+193 val_193
+321 val_321
+335 val_335
+175 val_175
+403 val_403
+483 val_483
+53 val_53
+105 val_105
+257 val_257
+409 val_409
+401 val_401
+203 val_203
+201 val_201
+217 val_217
+431 val_431
+125 val_125
+431 val_431
+187 val_187
+5 val_5
+397 val_397
+291 val_291
+351 val_351
+255 val_255
+163 val_163
+119 val_119
+491 val_491
+237 val_237
+439 val_439
+479 val_479
+305 val_305
+417 val_417
+199 val_199
+429 val_429
+169 val_169
+443 val_443
+323 val_323
+325 val_325
+277 val_277
+317 val_317
+333 val_333
+493 val_493
+207 val_207
+249 val_249
+265 val_265
+83 val_83
+353 val_353
+233 val_233
+133 val_133
+175 val_175
+189 val_189
+375 val_375
+401 val_401
+421 val_421
+407 val_407
+67 val_67
+379 val_379
+9 val_9
+341 val_341
+285 val_285
+167 val_167
+273 val_273
+183 val_183
+281 val_281
+97 val_97
+469 val_469
+315 val_315
+37 val_37
+307 val_307
+477 val_477
+169 val_169
+403 val_403
+97 val_97
+PREHOOK: query: -- drop the tables
+DROP TABLE even
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@even
+PREHOOK: Output: default@even
+POSTHOOK: query: -- drop the tables
+DROP TABLE even
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@even
+POSTHOOK: Output: default@even
+POSTHOOK: Lineage: even.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: even.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: odd.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: odd.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+PREHOOK: query: DROP TABLE odd
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@odd
+PREHOOK: Output: default@odd
+POSTHOOK: query: DROP TABLE odd
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@odd
+POSTHOOK: Output: default@odd
+POSTHOOK: Lineage: even.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: even.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: odd.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: odd.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+PREHOOK: query: DROP TABLE tmp_src
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@tmp_src
+PREHOOK: Output: default@tmp_src
+POSTHOOK: query: DROP TABLE tmp_src
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@tmp_src
+POSTHOOK: Output: default@tmp_src
+POSTHOOK: Lineage: even.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: even.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: odd.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: odd.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+PREHOOK: query: DROP TABLE tmp_src_part
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@tmp_src_part
+PREHOOK: Output: default@tmp_src_part
+POSTHOOK: query: DROP TABLE tmp_src_part
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@tmp_src_part
+POSTHOOK: Output: default@tmp_src_part
+POSTHOOK: Lineage: even.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: even.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: odd.c EXPRESSION [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: odd.d SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=1).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=2).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=3).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=4).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: tmp_src_part PARTITION(d=5).c SIMPLE [(tmp_src)tmp_src.FieldSchema(name:value, type:string, comment:null), ]