diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java index 5078a3a..1674d4b 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java @@ -20,7 +20,9 @@ import java.util.Iterator; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.HiveKey; +import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; @@ -47,7 +49,15 @@ public HiveMapFunction(byte[] buffer) { jobConf = KryoSerializer.deserializeJobConf(this.buffer); } - SparkMapRecordHandler mapRecordHandler = new SparkMapRecordHandler(); + SparkRecordHandler mapRecordHandler; + + // need different record handler for MergeFileWork + if (MergeFileMapper.class.getName().equals(jobConf.get(Utilities.MAPRED_MAPPER_CLASS))) { + mapRecordHandler = new SparkMergeFileRecordHandler(); + } else { + mapRecordHandler = new SparkMapRecordHandler(); + } + HiveMapFunctionResultList result = new HiveMapFunctionResultList(jobConf, it, mapRecordHandler); //TODO we need to implement a Spark specified Reporter to collect stats, refer to HIVE-7709. mapRecordHandler.init(jobConf, result, Reporter.NULL); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java index c54bffe..74650e8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java @@ -27,7 +27,7 @@ public class HiveMapFunctionResultList extends HiveBaseFunctionResultList> { - private final SparkMapRecordHandler recordHandler; + private final SparkRecordHandler recordHandler; /** * Instantiate result set Iterable for Map function output. @@ -36,7 +36,7 @@ * @param handler Initialized {@link SparkMapRecordHandler} instance. */ public HiveMapFunctionResultList(Configuration conf, - Iterator> inputIterator, SparkMapRecordHandler handler) { + Iterator> inputIterator, SparkRecordHandler handler) { super(conf, inputIterator); recordHandler = handler; } @@ -44,7 +44,7 @@ public HiveMapFunctionResultList(Configuration conf, @Override protected void processNextRecord(Tuple2 inputRecord) throws IOException { - recordHandler.processRow(inputRecord._2()); + recordHandler.processRow(inputRecord._1(), inputRecord._2()); } @Override diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java index 2537789..e1bd8fc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java @@ -54,7 +54,7 @@ * - Catch and handle errors during execution of the operators. * */ -public class SparkMapRecordHandler extends SparkRecordHandler{ +public class SparkMapRecordHandler extends SparkRecordHandler { private static final String PLAN_KEY = "__MAP_PLAN__"; private MapOperator mo; @@ -130,7 +130,7 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) { } @Override - public void processRow(Object value) throws IOException { + public void processRow(Object key, Object value) throws IOException { // reset the execContext for each new row execContext.resetRow(); @@ -158,6 +158,7 @@ public void processRow(Object key, Iterator values) throws IOException { throw new UnsupportedOperationException("Do not support this method in SparkMapRecordHandler."); } + @Override public void close() { // No row was processed if (oc == null) { @@ -202,6 +203,7 @@ public void close() { } } + @Override public boolean getDone() { return mo.getDone(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java new file mode 100644 index 0000000..27f6464 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMergeFileRecordHandler.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.*; +import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.ql.plan.MapredLocalWork; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reporter; + +import java.io.IOException; +import java.util.Iterator; + +/** + * Copied from MergeFileMapper + * + * As MergeFileMapper is very similar to ExecMapper, this class is + * very similar to SparkMapRecordHandler + */ +public class SparkMergeFileRecordHandler extends SparkRecordHandler { + + private static final String PLAN_KEY = "__MAP_PLAN__"; + private static final Log l4j = LogFactory.getLog(SparkMergeFileRecordHandler.class); + private Operator op; + private AbstractFileMergeOperator mergeOp; + private Object[] row; + + @Override + public void init(JobConf job, OutputCollector output, Reporter reporter) { + super.init(job, output, reporter); + + ObjectCache cache = ObjectCacheFactory.getCache(job); + + try { + jc = job; + MapWork mapWork = (MapWork) cache.retrieve(PLAN_KEY); + + if (mapWork == null) { + mapWork = Utilities.getMapWork(job); + cache.cache(PLAN_KEY, mapWork); + } else { + Utilities.setMapWork(job, mapWork); + } + + if (mapWork instanceof MergeFileWork) { + MergeFileWork mergeFileWork = (MergeFileWork) mapWork; + String alias = mergeFileWork.getAliasToWork().keySet().iterator().next(); + op = mergeFileWork.getAliasToWork().get(alias); + if (op instanceof AbstractFileMergeOperator) { + mergeOp = (AbstractFileMergeOperator) op; + mergeOp.initializeOp(jc); + row = new Object[2]; + abort = false; + } else { + abort = true; + throw new RuntimeException( + "Merge file work's top operator should be an" + + " instance of AbstractFileMergeOperator"); + } + } else { + abort = true; + throw new RuntimeException("Map work should be a merge file work."); + } + + l4j.info(mergeOp.dump(0)); + } catch (HiveException e) { + abort = true; + throw new RuntimeException(e); + } + } + + @Override + public void processRow(Object key, Object value) throws IOException { + row[0] = key; + row[1] = value; + try { + mergeOp.processOp(row, 0); + } catch (HiveException e) { + abort = true; + throw new IOException(e); + } + } + + @Override + public void processRow(Object key, Iterator values) throws IOException { + throw new UnsupportedOperationException("Do not support this method in " + + this.getClass().getSimpleName()); + } + + @Override + public void close() { + l4j.info("Closing Merge Operator " + mergeOp.getName()); + try { + mergeOp.closeOp(abort); + } catch (HiveException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean getDone() { + return mergeOp.getDone(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 9b11fe4..7ab2ca0 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -28,7 +28,12 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; +import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat; +import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; +import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; @@ -84,8 +89,8 @@ public SparkPlan generate(SparkWork sparkWork) throws Exception { } MapWork mapWork = (MapWork) w; JobConf newJobConf = cloneJobConf(mapWork); - SparkTran tran = generate(newJobConf, mapWork); JavaPairRDD input = generateRDD(newJobConf, mapWork); + SparkTran tran = generate(newJobConf, mapWork); trans.addRootTranWithInput(tran, input); while (sparkWork.getChildren(w).size() > 0) { @@ -155,13 +160,14 @@ private ReduceTran generateRTWithEdge(SparkWork sparkWork, BaseWork parent, Base private JavaPairRDD generateRDD(JobConf jobConf, MapWork mapWork) throws Exception { - Class ifClass = getInputFormat(mapWork); + Class ifClass = getInputFormat(jobConf, mapWork); return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); } - private Class getInputFormat(MapWork mWork) throws HiveException { + private Class getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException { + // MergeFileWork is sub-class of MapWork, we don't need to distinguish here if (mWork.getInputformat() != null) { HiveConf.setVar(jobConf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat()); @@ -190,6 +196,20 @@ private Class getInputFormat(MapWork mWork) throws HiveException { } private MapTran generate(JobConf jobConf, MapWork mw) throws Exception { + // Create tmp dir for MergeFileWork + if (mw instanceof MergeFileWork) { + Path outputPath = ((MergeFileWork) mw).getOutputDir(); + Path tempOutPath = Utilities.toTempPath(outputPath); + FileSystem fs = outputPath.getFileSystem(jobConf); + try { + if (!fs.exists(tempOutPath)) { + fs.mkdirs(tempOutPath); + } + } catch (IOException e) { + throw new RuntimeException( + "Can't make path " + outputPath + " : " + e.getMessage()); + } + } initStatsPublisher(mw); MapTran result = new MapTran(); byte[] confBytes = KryoSerializer.serializeJobConf(jobConf); @@ -238,11 +258,18 @@ private JobConf cloneJobConf(BaseWork work) throws Exception { Utilities.setInputPaths(cloned, inputPaths); Utilities.setMapWork(cloned, (MapWork) work, scratchDir, false); Utilities.createTmpDirs(cloned, (MapWork) work); - cloned.set("mapred.mapper.class", ExecMapper.class.getName()); + if (work instanceof MergeFileWork) { + MergeFileWork mergeFileWork = (MergeFileWork) work; + cloned.set(Utilities.MAPRED_MAPPER_CLASS, MergeFileMapper.class.getName()); + cloned.set("mapred.input.format.class", mergeFileWork.getInputformat()); + cloned.setClass("mapred.output.format.class", MergeFileOutputFormat.class, FileOutputFormat.class); + } else { + cloned.set(Utilities.MAPRED_MAPPER_CLASS, ExecMapper.class.getName()); + } } else if (work instanceof ReduceWork) { Utilities.setReduceWork(cloned, (ReduceWork) work, scratchDir, false); Utilities.createTmpDirs(cloned, (ReduceWork) work); - cloned.set("mapred.reducer.class", ExecReducer.class.getName()); + cloned.set(Utilities.MAPRED_REDUCER_CLASS, ExecReducer.class.getName()); } return cloned; } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java index 3eea26a..e67210f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java @@ -67,9 +67,9 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) { } /** - * Process row with single value. + * Process row with key and single value. */ - public abstract void processRow(Object value) throws IOException; + public abstract void processRow(Object key, Object value) throws IOException; /** * Process row with key and value collection. @@ -89,7 +89,8 @@ protected void logMemoryInfo() { } } - abstract void close(); + public abstract void close(); + public abstract boolean getDone(); /** * Log information to be logged at the end diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 94ebcdd..c24e374 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -151,7 +151,7 @@ public void init(JobConf job, OutputCollector output, Reporter reporter) { } @Override - public void processRow(Object value) throws IOException { + public void processRow(Object key, Object value) throws IOException { throw new UnsupportedOperationException("Do not support this method in SparkReduceRecordHandler."); } @@ -278,4 +278,9 @@ public void close() { Utilities.clearWorkMap(); } } + + @Override + public boolean getDone() { + return reducer.getDone(); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index b0a9407..70060fc 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1274,6 +1274,10 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, work = new TezWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); cplan.setName("Tez Merge File Work"); ((TezWork) work).add(cplan); + } else if (conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { + work = new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); + cplan.setName("Spark Merge File Work"); + ((SparkWork) work).add(cplan); } else { work = cplan; } @@ -1285,8 +1289,8 @@ public static void createMRWorkForMergingFiles (FileSinkOperator fsInput, ((TezWork)work).add(cplan); } else if (conf.getVar(ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) { work = new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)); - cplan.setName("Merge"); - ((SparkWork)work).add(cplan); + cplan.setName("Spark Merge File Work"); + ((SparkWork) work).add(cplan); } else { work = new MapredWork(); ((MapredWork)work).setMapWork(cplan); diff --git ql/src/test/queries/clientpositive/disable_merge_for_bucketing.q ql/src/test/queries/clientpositive/disable_merge_for_bucketing.q index 471d296..7baca1a 100644 --- ql/src/test/queries/clientpositive/disable_merge_for_bucketing.q +++ ql/src/test/queries/clientpositive/disable_merge_for_bucketing.q @@ -2,6 +2,7 @@ set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat; set hive.enforce.bucketing = true; set hive.exec.reducers.max = 1; set hive.merge.mapredfiles=true; +set hive.merge.sparkfiles=true; CREATE TABLE bucket2_1(key int, value string) CLUSTERED BY (key) INTO 2 BUCKETS; diff --git ql/src/test/queries/clientpositive/merge1.q ql/src/test/queries/clientpositive/merge1.q index c7249af..3000262 100644 --- ql/src/test/queries/clientpositive/merge1.q +++ ql/src/test/queries/clientpositive/merge1.q @@ -1,4 +1,5 @@ set hive.merge.mapredfiles=true; +set hive.merge.sparkfiles=true; -- SORT_QUERY_RESULTS diff --git ql/src/test/queries/clientpositive/merge2.q ql/src/test/queries/clientpositive/merge2.q index bb86dc2..b0f01ce 100644 --- ql/src/test/queries/clientpositive/merge2.q +++ ql/src/test/queries/clientpositive/merge2.q @@ -1,5 +1,6 @@ set hive.merge.mapfiles=true; set hive.merge.mapredfiles=true; +set hive.merge.sparkfiles=true; set mapred.min.split.size=256; set mapred.min.split.size.per.node=256; set mapred.min.split.size.per.rack=256; diff --git ql/src/test/results/clientpositive/spark/merge1.q.out ql/src/test/results/clientpositive/spark/merge1.q.out index 772984d..5384b8f 100644 --- ql/src/test/results/clientpositive/spark/merge1.q.out +++ ql/src/test/results/clientpositive/spark/merge1.q.out @@ -20,9 +20,14 @@ select key, count(1) from src group by key POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-2 depends on stages: Stage-1 + Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6 + Stage-5 + Stage-2 depends on stages: Stage-5, Stage-4, Stage-7 Stage-0 depends on stages: Stage-2 Stage-3 depends on stages: Stage-0 + Stage-4 + Stage-6 + Stage-7 depends on stages: Stage-6 STAGE PLANS: Stage: Stage-1 @@ -73,6 +78,15 @@ STAGE PLANS: serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.dest1 + Stage: Stage-8 + Conditional Operator + + Stage: Stage-5 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + Stage: Stage-2 Dependency Collection @@ -89,6 +103,42 @@ STAGE PLANS: Stage: Stage-3 Stats-Aggr Operator + Stage: Stage-4 + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Map Operator Tree: + TableScan + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-6 + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Map Operator Tree: + TableScan + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-7 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + PREHOOK: query: insert overwrite table dest1 select key, count(1) from src group by key PREHOOK: type: QUERY @@ -473,9 +523,14 @@ insert overwrite table dest1 select key from test_src POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-2 depends on stages: Stage-1 + Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6 + Stage-5 + Stage-2 depends on stages: Stage-5, Stage-4, Stage-7 Stage-0 depends on stages: Stage-2 Stage-3 depends on stages: Stage-0 + Stage-4 + Stage-6 + Stage-7 depends on stages: Stage-6 STAGE PLANS: Stage: Stage-1 @@ -500,6 +555,15 @@ STAGE PLANS: serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.dest1 + Stage: Stage-8 + Conditional Operator + + Stage: Stage-5 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + Stage: Stage-2 Dependency Collection @@ -516,6 +580,42 @@ STAGE PLANS: Stage: Stage-3 Stats-Aggr Operator + Stage: Stage-4 + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Map Operator Tree: + TableScan + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-6 + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Map Operator Tree: + TableScan + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-7 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + PREHOOK: query: insert overwrite table dest1 select key from test_src PREHOOK: type: QUERY PREHOOK: Input: default@test_src @@ -538,9 +638,14 @@ insert overwrite table dest1 select key from test_src POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-2 depends on stages: Stage-1 + Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6 + Stage-5 + Stage-2 depends on stages: Stage-5, Stage-4, Stage-7 Stage-0 depends on stages: Stage-2 Stage-3 depends on stages: Stage-0 + Stage-4 + Stage-6 + Stage-7 depends on stages: Stage-6 STAGE PLANS: Stage: Stage-1 @@ -565,6 +670,15 @@ STAGE PLANS: serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.dest1 + Stage: Stage-8 + Conditional Operator + + Stage: Stage-5 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + Stage: Stage-2 Dependency Collection @@ -581,6 +695,42 @@ STAGE PLANS: Stage: Stage-3 Stats-Aggr Operator + Stage: Stage-4 + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Map Operator Tree: + TableScan + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-6 + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Map Operator Tree: + TableScan + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.dest1 + + Stage: Stage-7 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + PREHOOK: query: insert overwrite table dest1 select key from test_src PREHOOK: type: QUERY PREHOOK: Input: default@test_src diff --git ql/src/test/results/clientpositive/spark/merge2.q.out ql/src/test/results/clientpositive/spark/merge2.q.out index 8d8dcb8..bbedb65 100644 --- ql/src/test/results/clientpositive/spark/merge2.q.out +++ ql/src/test/results/clientpositive/spark/merge2.q.out @@ -20,9 +20,14 @@ select key, count(1) from src group by key POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-2 depends on stages: Stage-1 + Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6 + Stage-5 + Stage-2 depends on stages: Stage-5, Stage-4, Stage-7 Stage-0 depends on stages: Stage-2 Stage-3 depends on stages: Stage-0 + Stage-4 + Stage-6 + Stage-7 depends on stages: Stage-6 STAGE PLANS: Stage: Stage-1 @@ -73,6 +78,15 @@ STAGE PLANS: serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.test1 + Stage: Stage-8 + Conditional Operator + + Stage: Stage-5 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + Stage: Stage-2 Dependency Collection @@ -89,6 +103,42 @@ STAGE PLANS: Stage: Stage-3 Stats-Aggr Operator + Stage: Stage-4 + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Map Operator Tree: + TableScan + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.test1 + + Stage: Stage-6 + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Map Operator Tree: + TableScan + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.test1 + + Stage: Stage-7 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + PREHOOK: query: insert overwrite table test1 select key, count(1) from src group by key PREHOOK: type: QUERY @@ -473,9 +523,14 @@ insert overwrite table test1 select key from test_src POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-2 depends on stages: Stage-1 + Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6 + Stage-5 + Stage-2 depends on stages: Stage-5, Stage-4, Stage-7 Stage-0 depends on stages: Stage-2 Stage-3 depends on stages: Stage-0 + Stage-4 + Stage-6 + Stage-7 depends on stages: Stage-6 STAGE PLANS: Stage: Stage-1 @@ -500,6 +555,15 @@ STAGE PLANS: serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.test1 + Stage: Stage-8 + Conditional Operator + + Stage: Stage-5 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + Stage: Stage-2 Dependency Collection @@ -516,6 +580,42 @@ STAGE PLANS: Stage: Stage-3 Stats-Aggr Operator + Stage: Stage-4 + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Map Operator Tree: + TableScan + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.test1 + + Stage: Stage-6 + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Map Operator Tree: + TableScan + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.test1 + + Stage: Stage-7 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + PREHOOK: query: insert overwrite table test1 select key from test_src PREHOOK: type: QUERY PREHOOK: Input: default@test_src @@ -538,9 +638,14 @@ insert overwrite table test1 select key from test_src POSTHOOK: type: QUERY STAGE DEPENDENCIES: Stage-1 is a root stage - Stage-2 depends on stages: Stage-1 + Stage-8 depends on stages: Stage-1 , consists of Stage-5, Stage-4, Stage-6 + Stage-5 + Stage-2 depends on stages: Stage-5, Stage-4, Stage-7 Stage-0 depends on stages: Stage-2 Stage-3 depends on stages: Stage-0 + Stage-4 + Stage-6 + Stage-7 depends on stages: Stage-6 STAGE PLANS: Stage: Stage-1 @@ -565,6 +670,15 @@ STAGE PLANS: serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe name: default.test1 + Stage: Stage-8 + Conditional Operator + + Stage: Stage-5 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + Stage: Stage-2 Dependency Collection @@ -581,6 +695,42 @@ STAGE PLANS: Stage: Stage-3 Stats-Aggr Operator + Stage: Stage-4 + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Map Operator Tree: + TableScan + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.test1 + + Stage: Stage-6 + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Map Operator Tree: + TableScan + File Output Operator + compressed: false + table: + input format: org.apache.hadoop.mapred.TextInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + name: default.test1 + + Stage: Stage-7 + Move Operator + files: + hdfs directory: true +#### A masked pattern was here #### + PREHOOK: query: insert overwrite table test1 select key from test_src PREHOOK: type: QUERY PREHOOK: Input: default@test_src diff --git ql/src/test/results/clientpositive/spark/union_remove_10.q.out ql/src/test/results/clientpositive/spark/union_remove_10.q.out index f561fdf..d3a1f46 100644 --- ql/src/test/results/clientpositive/spark/union_remove_10.q.out +++ ql/src/test/results/clientpositive/spark/union_remove_10.q.out @@ -201,18 +201,26 @@ STAGE PLANS: name: default.outputtbl1 Stage: Stage-3 - Merge File Operator - Map Operator Tree: - RCFile Merge Operator - merge level: block - input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Merge File Operator + Map Operator Tree: + RCFile Merge Operator + merge level: block + input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat Stage: Stage-5 - Merge File Operator - Map Operator Tree: - RCFile Merge Operator - merge level: block - input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Merge File Operator + Map Operator Tree: + RCFile Merge Operator + merge level: block + input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat Stage: Stage-6 Move Operator diff --git ql/src/test/results/clientpositive/spark/union_remove_11.q.out ql/src/test/results/clientpositive/spark/union_remove_11.q.out index 10b0c9c..c436c30 100644 --- ql/src/test/results/clientpositive/spark/union_remove_11.q.out +++ ql/src/test/results/clientpositive/spark/union_remove_11.q.out @@ -176,18 +176,26 @@ STAGE PLANS: name: default.outputtbl1 Stage: Stage-3 - Merge File Operator - Map Operator Tree: - RCFile Merge Operator - merge level: block - input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Merge File Operator + Map Operator Tree: + RCFile Merge Operator + merge level: block + input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat Stage: Stage-5 - Merge File Operator - Map Operator Tree: - RCFile Merge Operator - merge level: block - input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Merge File Operator + Map Operator Tree: + RCFile Merge Operator + merge level: block + input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat Stage: Stage-6 Move Operator diff --git ql/src/test/results/clientpositive/spark/union_remove_16.q.out ql/src/test/results/clientpositive/spark/union_remove_16.q.out index a59a352..881291c 100644 --- ql/src/test/results/clientpositive/spark/union_remove_16.q.out +++ ql/src/test/results/clientpositive/spark/union_remove_16.q.out @@ -202,18 +202,26 @@ STAGE PLANS: name: default.outputtbl1 Stage: Stage-3 - Merge File Operator - Map Operator Tree: - RCFile Merge Operator - merge level: block - input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Merge File Operator + Map Operator Tree: + RCFile Merge Operator + merge level: block + input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat Stage: Stage-5 - Merge File Operator - Map Operator Tree: - RCFile Merge Operator - merge level: block - input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Merge File Operator + Map Operator Tree: + RCFile Merge Operator + merge level: block + input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat Stage: Stage-6 Move Operator diff --git ql/src/test/results/clientpositive/spark/union_remove_4.q.out ql/src/test/results/clientpositive/spark/union_remove_4.q.out index 518dc24..281fead 100644 --- ql/src/test/results/clientpositive/spark/union_remove_4.q.out +++ ql/src/test/results/clientpositive/spark/union_remove_4.q.out @@ -190,7 +190,7 @@ STAGE PLANS: Spark #### A masked pattern was here #### Vertices: - Merge + Spark Merge File Work Map Operator Tree: TableScan File Output Operator @@ -205,7 +205,7 @@ STAGE PLANS: Spark #### A masked pattern was here #### Vertices: - Merge + Spark Merge File Work Map Operator Tree: TableScan File Output Operator diff --git ql/src/test/results/clientpositive/spark/union_remove_5.q.out ql/src/test/results/clientpositive/spark/union_remove_5.q.out index f7f9627..07a4a95 100644 --- ql/src/test/results/clientpositive/spark/union_remove_5.q.out +++ ql/src/test/results/clientpositive/spark/union_remove_5.q.out @@ -190,7 +190,7 @@ STAGE PLANS: Spark #### A masked pattern was here #### Vertices: - Merge + Spark Merge File Work Map Operator Tree: TableScan File Output Operator @@ -205,7 +205,7 @@ STAGE PLANS: Spark #### A masked pattern was here #### Vertices: - Merge + Spark Merge File Work Map Operator Tree: TableScan File Output Operator diff --git ql/src/test/results/clientpositive/spark/union_remove_9.q.out ql/src/test/results/clientpositive/spark/union_remove_9.q.out index 0ec55de..93541db 100644 --- ql/src/test/results/clientpositive/spark/union_remove_9.q.out +++ ql/src/test/results/clientpositive/spark/union_remove_9.q.out @@ -197,18 +197,26 @@ STAGE PLANS: name: default.outputtbl1 Stage: Stage-3 - Merge File Operator - Map Operator Tree: - RCFile Merge Operator - merge level: block - input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Merge File Operator + Map Operator Tree: + RCFile Merge Operator + merge level: block + input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat Stage: Stage-5 - Merge File Operator - Map Operator Tree: - RCFile Merge Operator - merge level: block - input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat + Spark +#### A masked pattern was here #### + Vertices: + Spark Merge File Work + Merge File Operator + Map Operator Tree: + RCFile Merge Operator + merge level: block + input format: org.apache.hadoop.hive.ql.io.RCFileInputFormat Stage: Stage-6 Move Operator