diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 91d7c40..45eff67 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -24,15 +24,20 @@ import java.util.Set; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.mr.ExecMapper; import org.apache.hadoop.hive.ql.io.HiveInputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty; import org.apache.hadoop.hive.ql.plan.SparkWork; +import org.apache.hadoop.hive.ql.stats.StatsFactory; +import org.apache.hadoop.hive.ql.stats.StatsPublisher; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -89,7 +94,20 @@ public SparkPlan generate(SparkWork sparkWork) throws Exception { return sc.hadoopRDD(jobConf, ifClass, WritableComparable.class, Writable.class); } - private SparkTran generate(BaseWork bw) throws IOException { + private SparkTran generate(BaseWork bw) throws IOException, HiveException { + // initialize stats publisher if necessary + if (bw.isGatheringStats()) { + StatsPublisher statsPublisher; + StatsFactory factory = StatsFactory.newFactory(jobConf); + if (factory != null) { + statsPublisher = factory.getStatsPublisher(); + if (!statsPublisher.init(jobConf)) { // creating stats table if not exists + if (HiveConf.getBoolVar(jobConf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) { + throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg()); + } + } + } + } if (bw instanceof MapWork) { return generate((MapWork)bw); } else if (bw instanceof ReduceWork) { diff --git ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 6eac4e7..e8ec928 100644 --- ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -856,13 +856,10 @@ public static void setKeyAndValueDescForTaskTree(Task ta } } } else if (task instanceof SparkTask) { - SparkWork sw = ((SparkTask)task).getWork(); - sw.getMapWork().deriveExplainAttributes(); - HashMap> opMap = sw - .getMapWork().getAliasToWork(); - if (opMap != null && !opMap.isEmpty()) { - for (Operator op : opMap.values()) { - setKeyAndValueDesc(sw.getReduceWork(), op); + SparkWork work = (SparkWork)task.getWork(); + for (BaseWork w : work.getAllWorkUnsorted()) { + if (w instanceof MapWork) { + ((MapWork)w).deriveExplainAttributes(); } } } diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java index 3840318..9400fd3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.exec.FileSinkOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; @@ -144,9 +145,9 @@ protected void generateTaskTree(List> rootTasks, Pa FileSinkOperator.getOperatorName() + "%"), new CompositeProcessor(new SparkFileSinkProcessor(), genSparkWork)); -// opRules.put(new RuleRegExp("Handle Potential Analyze Command", -// TableScanOperator.getOperatorName() + "%"), -// new ProcessAnalyzeTable(GenSparkUtils.getUtils())); + opRules.put(new RuleRegExp("Handle Potential Analyze Command", + TableScanOperator.getOperatorName() + "%"), + new SparkProcessAnalyzeTable(GenSparkUtils.getUtils())); // opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"), // new NodeProcessor() { diff --git ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java index c6bc03b..5fcaf64 100644 --- ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java +++ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java @@ -87,11 +87,10 @@ public Object process(Node nd, Stack stack, QBParseInfo parseInfo = parseContext.getQB().getParseInfo(); if (parseInfo.isAnalyzeCommand()) { - Preconditions.checkArgument(tableScan.getChildOperators() == null, - "AssertionError: expected tableScan.getChildOperators() to be null"); - int childOpSize = tableScan.getChildOperators().size(); - Preconditions.checkArgument(childOpSize == 0, - "AssertionError: expected tableScan.getChildOperators().size() to be 0, but was " + childOpSize); + Preconditions.checkArgument(tableScan.getChildOperators() == null || + tableScan.getChildOperators().size() == 0, + "AssertionError: expected tableScan.getChildOperators() to be null, " + + "or tableScan.getChildOperators().size() to be 0"); String alias = null; for (String a: parseContext.getTopOps().keySet()) {