diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java index 69004dc..0804955 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java @@ -285,6 +285,23 @@ private SparkTask createSparkTask(SparkTask originalTask, return resultTask; } + private void updateBackupTask(Task task) { + if (task.getBackupTask() != null) { + updateBackupTask(task, task.getBackupTask(), task.getBackupChildrenTasks()); + } + } + + private void updateBackupTask(Task task, Task backupTask, + List> childrenBackupTasks) { + task.setBackupTask(backupTask); + task.setBackupChildrenTasks(childrenBackupTasks); + if (task.getParentTasks() != null) { + for (Task parentTask : task.getParentTasks()) { + updateBackupTask(parentTask, backupTask, childrenBackupTasks); + } + } + } + @Override public Object dispatch(Node nd, Stack stack, Object... nos) throws SemanticException { @@ -339,6 +356,8 @@ private void processCurrentTask(SparkTask sparkTask, ConditionalTask conditional for (SparkWork work : dependencyGraph.keySet()) { createSparkTask(sparkTask, work, createdTaskMap, conditionalTask); } + + updateBackupTask(sparkTask); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/StageIDsRearranger.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/StageIDsRearranger.java index 79c3e02..4c4c416 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/StageIDsRearranger.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/StageIDsRearranger.java @@ -167,7 +167,14 @@ protected void rejected(Task child) { if (task instanceof ConditionalTask) { return ((ConditionalTask) task).getListTasks(); } - return task.getChildTasks(); + List> result = new ArrayList>(); + if (task.getChildTasks() != null) { + result.addAll(task.getChildTasks()); + } + if (task.getBackupTask() != null) { + result.add(task.getBackupTask()); + } + return result; } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinOptimizer.java index d57ceff..f64bf34 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkJoinOptimizer.java @@ -43,7 +43,8 @@ public SparkJoinOptimizer(ParseContext procCtx) { } @Override - public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException { + public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx, + Object... nodeOutputs) throws SemanticException { OptimizeSparkProcContext context = (OptimizeSparkProcContext) procCtx; HiveConf conf = context.getConf(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java index 9ff47c7..31f0d36 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkMapJoinOptimizer.java @@ -33,7 +33,9 @@ import org.apache.hadoop.hive.ql.exec.MuxOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.UnionOperator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.lib.NodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; @@ -69,6 +71,7 @@ throws SemanticException { OptimizeSparkProcContext context = (OptimizeSparkProcContext) procCtx; + ParseContext parseCtx = context.getParseContext(); HiveConf conf = context.getConf(); JoinOperator joinOp = (JoinOperator) nd; @@ -84,6 +87,19 @@ return null; } + // For creating backup task for MJ. We need to save replaced operators (MJ, RS, etc). + // They are needed for creating back up task at a later stage. + if (parseCtx.getClonedRootsForBackupTask() == null) { + List> roots = new ArrayList>(parseCtx.getTopOps().values()); + List> clonedRoots = Utilities.cloneOperatorTree(parseCtx.getConf(), roots); + parseCtx.setClonedRootsForBackupTask(clonedRoots); + for (int i = 0; i < roots.size(); i++) { + TableScanOperator ts = (TableScanOperator) roots.get(i); + TableScanOperator clonedTs = (TableScanOperator) clonedRoots.get(i); + clonedTs.getConf().setTableMetadata(ts.getConf().getTableMetadata()); + } + } + int numBuckets = -1; List> bucketColNames = null; @@ -91,8 +107,7 @@ MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos); if (conf.getBoolVar(HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) { LOG.info("Check if it can be converted to bucketed map join"); - numBuckets = convertJoinBucketMapJoin(joinOp, mapJoinOp, - context, mapJoinConversionPos); + numBuckets = convertJoinBucketMapJoin(joinOp, mapJoinOp, context, mapJoinConversionPos); if (numBuckets > 1) { LOG.info("Converted to map join with " + numBuckets + " buckets"); bucketColNames = joinOp.getOpTraits().getBucketColNames(); diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java index 6e0ac38..562ac7c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java @@ -135,8 +135,8 @@ private static void initSMBJoinPlan(MapWork mapWork, } private static String findAliasId(GenSparkProcContext opProcCtx, TableScanOperator ts) { - for (String alias : opProcCtx.topOps.keySet()) { - if (opProcCtx.topOps.get(alias) == ts) { + for (String alias : opProcCtx.parseContext.getTopOps().keySet()) { + if (opProcCtx.parseContext.getTopOps().get(alias) == ts) { return alias; } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java index b838bff..3e5ef1e 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java @@ -107,6 +107,11 @@ private Operator fetchSource; private ListSinkOperator fetchSink; + // In Spark branch, we create backup task for MJ task. To do that, we need + // to save cloned root operators before converting JOIN to MAPJOIN. + // They are used later for creating backup task. + private List> clonedRootsForBackupTask; + public ParseContext() { } @@ -600,4 +605,13 @@ public ListSinkOperator getFetchSink() { public void setFetchSink(ListSinkOperator fetchSink) { this.fetchSink = fetchSink; } + + public List> getClonedRootsForBackupTask() { + return clonedRootsForBackupTask; + } + + public void setClonedRootsForBackupTask(List> clonedRootsForBackupTask) { + this.clonedRootsForBackupTask = clonedRootsForBackupTask; + } + } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java index 773cfbd..302bc8c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java @@ -134,25 +134,19 @@ public final Set fileSinkSet; public final Map> fileSinkMap; - // Alias to operator map, from the semantic analyzer. - // This is necessary as sometimes semantic analyzer's mapping is different than operator's own alias. - public final Map> topOps; - @SuppressWarnings("unchecked") public GenSparkProcContext(HiveConf conf, ParseContext parseContext, List> moveTask, List> rootTasks, Set inputs, - Set outputs, - Map> topOps) { + Set outputs) { this.conf = conf; this.parseContext = parseContext; this.moveTask = moveTask; this.rootTasks = rootTasks; this.inputs = inputs; this.outputs = outputs; - this.topOps = topOps; this.currentTask = (SparkTask) TaskFactory.get( new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf); this.rootTasks.add(currentTask); diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java index f7586a4..3b71af1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/OptimizeSparkProcContext.java @@ -20,15 +20,12 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.MapJoinOperator; -import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.parse.ParseContext; -import org.apache.hadoop.hive.ql.plan.OperatorDesc; -import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -49,19 +46,12 @@ private final Set visitedReduceSinks = new HashSet(); private final Map mjOpSizes = new HashMap(); - // rootOperators are all the table scan operators in sequence - // of traversal - private final Deque> rootOperators; - public OptimizeSparkProcContext(HiveConf conf, ParseContext parseContext, - Set inputs, Set outputs, - Deque> rootOperators) { - + Set inputs, Set outputs) { this.conf = conf; this.parseContext = parseContext; this.inputs = inputs; this.outputs = outputs; - this.rootOperators = rootOperators; } public ParseContext getParseContext() { @@ -84,10 +74,6 @@ public HiveConf getConf() { return visitedReduceSinks; } - public Deque> getRootOperators() { - return rootOperators; - } - public Map getMjOpSizes() { return mjOpSizes; } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 3a7477a..224a317 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -19,10 +19,8 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.Deque; import java.util.HashMap; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -58,7 +56,6 @@ import org.apache.hadoop.hive.ql.lib.RuleRegExp; import org.apache.hadoop.hive.ql.lib.TypeRule; import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer; import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext; @@ -82,7 +79,6 @@ import org.apache.hadoop.hive.ql.plan.MoveWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.SparkWork; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; /** * SparkCompiler translates the operator plan into SparkTasks. @@ -101,31 +97,31 @@ public SparkCompiler() { protected void optimizeOperatorPlan(ParseContext pCtx, Set inputs, Set outputs) throws SemanticException { PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE); - // Sequence of TableScan operators to be walked - Deque> deque = new LinkedList>(); - deque.addAll(pCtx.getTopOps().values()); - OptimizeSparkProcContext procCtx = new OptimizeSparkProcContext(conf, pCtx, inputs, outputs, deque); + OptimizeSparkProcContext procCtx = new OptimizeSparkProcContext(conf, pCtx, inputs, outputs); // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. Map opRules = new LinkedHashMap(); opRules.put(new RuleRegExp("Set parallelism - ReduceSink", - ReduceSinkOperator.getOperatorName() + "%"), - new SetSparkReducerParallelism()); - - opRules.put(new TypeRule(JoinOperator.class), new SparkJoinOptimizer(pCtx)); - - opRules.put(new TypeRule(MapJoinOperator.class), new SparkJoinHintOptimizer(pCtx)); + ReduceSinkOperator.getOperatorName() + "%"), new SetSparkReducerParallelism()); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); GraphWalker ogw = new DefaultGraphWalker(disp); + ArrayList topNodes = new ArrayList(pCtx.getTopOps().values()); + ogw.startWalking(topNodes, null); - // Create a list of topop nodes - ArrayList topNodes = new ArrayList(); - topNodes.addAll(pCtx.getTopOps().values()); + // We need to use two-passes, otherwise cloned RS will not get # of reducers + // set by SetSparkReduceParallelism. + opRules.clear(); + opRules.put(new TypeRule(JoinOperator.class), new SparkJoinOptimizer(pCtx)); + opRules.put(new TypeRule(MapJoinOperator.class), new SparkJoinHintOptimizer(pCtx)); + + disp = new DefaultRuleDispatcher(null, opRules, procCtx); + ogw = new DefaultGraphWalker(disp); ogw.startWalking(topNodes, null); + PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_OPERATOR_TREE); } @@ -137,13 +133,35 @@ protected void generateTaskTree(List> rootTasks, Pa List> mvTask, Set inputs, Set outputs) throws SemanticException { PERF_LOGGER.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE); + GenSparkUtils.getUtils().resetSequenceNumber(); - ParseContext tempParseContext = getParseContext(pCtx, rootTasks); - GenSparkWork genSparkWork = new GenSparkWork(GenSparkUtils.getUtils()); + SparkTask mainTask = generateWorkTree(rootTasks, pCtx, mvTask, inputs, outputs, + new ArrayList(pCtx.getTopOps().values()), true); + + if (pCtx.getClonedRootsForBackupTask() != null) { + // Generate work tree for the backup task + SparkTask backupTask = generateWorkTree(rootTasks, pCtx, mvTask, inputs, outputs, + new ArrayList(pCtx.getClonedRootsForBackupTask()), false); + rootTasks.remove(backupTask); + mainTask.setBackupTask(backupTask); + mainTask.setBackupChildrenTasks(mainTask.getChildTasks()); + backupTask.setChildTasks(mainTask.getChildTasks()); + } + + PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE); + } + private SparkTask generateWorkTree(List> rootTasks, ParseContext pCtx, + List> mvTask, Set inputs, + Set outputs, List topNodes, + boolean addFileSink) throws SemanticException { + ParseContext tempParseContext = getParseContext(pCtx, rootTasks); GenSparkProcContext procCtx = new GenSparkProcContext( - conf, tempParseContext, mvTask, rootTasks, inputs, outputs, pCtx.getTopOps()); + conf, tempParseContext, mvTask, rootTasks, inputs, outputs); + + GenSparkUtils utils = GenSparkUtils.getUtils(); + GenSparkWork genSparkWork = new GenSparkWork(utils); // create a walker which walks the tree in a DFS manner while maintaining // the operator stack. The dispatcher generates the plan from the operator tree @@ -153,12 +171,14 @@ protected void generateTaskTree(List> rootTasks, Pa opRules.put(new TypeRule(MapJoinOperator.class), new SparkReduceSinkMapJoinProc()); + NodeProcessor fileSinkProc = addFileSink ? + new CompositeProcessor(new SparkFileSinkProcessor(), genSparkWork) : genSparkWork; + opRules.put(new RuleRegExp("Split Work + Move/Merge - FileSink", - FileSinkOperator.getOperatorName() + "%"), - new CompositeProcessor(new SparkFileSinkProcessor(), genSparkWork)); + FileSinkOperator.getOperatorName() + "%"), fileSinkProc); opRules.put(new RuleRegExp("Handle Analyze Command", - TableScanOperator.getOperatorName() + "%"), + TableScanOperator.getOperatorName() + "%"), new SparkProcessAnalyzeTable(GenSparkUtils.getUtils())); opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"), @@ -185,28 +205,27 @@ public Object process(Node n, Stack s, * SMBJoinOP * * Some of the other processors are expecting only one traversal beyond SMBJoinOp. - * We need to traverse from the big-table path only, and stop traversing on the small-table path once we reach SMBJoinOp. + * We need to traverse from the big-table path only, and stop traversing on the + * small-table path once we reach SMBJoinOp. */ opRules.put(new TypeRule(SMBMapJoinOperator.class), - new NodeProcessor() { - @Override - public Object process(Node currNode, Stack stack, - NodeProcessorCtx procCtx, Object... os) throws SemanticException { - for (Node stackNode : stack) { - if (stackNode instanceof DummyStoreOperator) { - return true; + new NodeProcessor() { + @Override + public Object process(Node currNode, Stack stack, + NodeProcessorCtx procCtx, Object... os) throws SemanticException { + for (Node stackNode : stack) { + if (stackNode instanceof DummyStoreOperator) { + return true; + } } + return false; } - return false; } - } ); // The dispatcher fires the processor corresponding to the closest matching // rule and passes the context along Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx); - List topNodes = new ArrayList(); - topNodes.addAll(pCtx.getTopOps().values()); GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx); ogw.startWalking(topNodes, null); @@ -215,25 +234,23 @@ public Object process(Node currNode, Stack stack, // SMB Join optimizations to add the "localWork" and bucketing data structures to MapWork. opRules.clear(); opRules.put(new TypeRule(SMBMapJoinOperator.class), - SparkSortMergeJoinFactory.getTableScanMapJoin()); + SparkSortMergeJoinFactory.getTableScanMapJoin()); disp = new DefaultRuleDispatcher(null, opRules, procCtx); - topNodes = new ArrayList(); - topNodes.addAll(pCtx.getTopOps().values()); ogw = new GenSparkWorkWalker(disp, procCtx); ogw.startWalking(topNodes, null); // we need to clone some operator plans and remove union operators still for (BaseWork w: procCtx.workWithUnionOperators) { - GenSparkUtils.getUtils().removeUnionOperators(conf, procCtx, w); + utils.removeUnionOperators(conf, procCtx, w); } // finally make sure the file sink operators are set up right for (FileSinkOperator fileSink: procCtx.fileSinkSet) { - GenSparkUtils.getUtils().processFileSink(procCtx, fileSink); + utils.processFileSink(procCtx, fileSink); } - PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_GENERATE_TASK_TREE); + return procCtx.currentTask; } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java index 0e85990..49ae06c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java @@ -94,7 +94,7 @@ private boolean isMetadataOnly = false; - private transient final Table tableMetadata; + private transient Table tableMetadata; public TableScanDesc() { @@ -268,4 +268,8 @@ public boolean getIsMetadataOnly() { public Table getTableMetadata() { return tableMetadata; } + + public void setTableMetadata(Table tableMetadata) { + this.tableMetadata = tableMetadata; + } } diff --git ql/src/test/results/clientpositive/spark/auto_join25.q.out ql/src/test/results/clientpositive/spark/auto_join25.q.out index ab01b8a..4c39927 100644 --- ql/src/test/results/clientpositive/spark/auto_join25.q.out +++ ql/src/test/results/clientpositive/spark/auto_join25.q.out @@ -20,6 +20,9 @@ PREHOOK: Input: default@srcpart@ds=2008-04-08/hr=12 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=11 PREHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 PREHOOK: Output: default@dest1 +Status: Failed +FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.spark.SparkTask +ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.spark.SparkTask POSTHOOK: query: FROM srcpart src1 JOIN src src2 ON (src1.key = src2.key) INSERT OVERWRITE TABLE dest1 SELECT src1.key, src2.value where (src1.ds = '2008-04-08' or src1.ds = '2008-04-09' )and (src1.hr = '12' or src1.hr = '11') @@ -33,7 +36,8 @@ POSTHOOK: Input: default@srcpart@ds=2008-04-09/hr=12 POSTHOOK: Output: default@dest1 POSTHOOK: Lineage: dest1.key EXPRESSION [(srcpart)src1.FieldSchema(name:key, type:string, comment:default), ] POSTHOOK: Lineage: dest1.value SIMPLE [(src)src2.FieldSchema(name:value, type:string, comment:default), ] -RUN: Stage-1:MAPRED +RUN: Stage-4:MAPRED +RUN: Stage-3:MAPRED RUN: Stage-0:MOVE RUN: Stage-2:STATS PREHOOK: query: SELECT sum(hash(dest1.key,dest1.value)) FROM dest1 @@ -60,14 +64,18 @@ INSERT OVERWRITE TABLE dest_j2 SELECT src1.key, src3.value PREHOOK: type: QUERY PREHOOK: Input: default@src PREHOOK: Output: default@dest_j2 +Status: Failed +FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.spark.SparkTask +ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.spark.SparkTask POSTHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) JOIN src src3 ON (src1.key + src2.key = src3.key) INSERT OVERWRITE TABLE dest_j2 SELECT src1.key, src3.value POSTHOOK: type: QUERY POSTHOOK: Input: default@src POSTHOOK: Output: default@dest_j2 POSTHOOK: Lineage: dest_j2.key EXPRESSION [(src)src1.FieldSchema(name:key, type:string, comment:default), ] -POSTHOOK: Lineage: dest_j2.value SIMPLE [(src)src3.FieldSchema(name:value, type:string, comment:default), ] -RUN: Stage-1:MAPRED +POSTHOOK: Lineage: dest_j2.value SIMPLE [(src)src1.FieldSchema(name:value, type:string, comment:default), ] +RUN: Stage-5:MAPRED +RUN: Stage-3:MAPRED RUN: Stage-0:MOVE RUN: Stage-2:STATS PREHOOK: query: SELECT sum(hash(dest_j2.key,dest_j2.value)) FROM dest_j2 @@ -94,6 +102,9 @@ INSERT OVERWRITE TABLE dest_j1 SELECT src1.key, src2.value PREHOOK: type: QUERY PREHOOK: Input: default@src PREHOOK: Output: default@dest_j1 +Status: Failed +FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.spark.SparkTask +ATTEMPT: Execute BackupTask: org.apache.hadoop.hive.ql.exec.spark.SparkTask POSTHOOK: query: FROM src src1 JOIN src src2 ON (src1.key = src2.key) INSERT OVERWRITE TABLE dest_j1 SELECT src1.key, src2.value POSTHOOK: type: QUERY @@ -101,7 +112,8 @@ POSTHOOK: Input: default@src POSTHOOK: Output: default@dest_j1 POSTHOOK: Lineage: dest_j1.key EXPRESSION [(src)src1.FieldSchema(name:key, type:string, comment:default), ] POSTHOOK: Lineage: dest_j1.value SIMPLE [(src)src2.FieldSchema(name:value, type:string, comment:default), ] -RUN: Stage-1:MAPRED +RUN: Stage-4:MAPRED +RUN: Stage-3:MAPRED RUN: Stage-0:MOVE RUN: Stage-2:STATS PREHOOK: query: SELECT sum(hash(dest_j1.key,dest_j1.value)) FROM dest_j1