diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java index e955da3..b33f0e2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java @@ -197,12 +197,13 @@ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) thr fs.mkdirs(emptyScratchDir); SparkCounters sparkCounters = new SparkCounters(sc, hiveConf); - List prefixes = sparkWork.getRequiredCounterPrefix(); + Map> prefixes = sparkWork.getRequiredCounterPrefix(); // register spark counters before submit spark job. if (prefixes != null) { - for (String prefix : prefixes) { - sparkCounters.createCounter(prefix, StatsSetupConst.ROW_COUNT); - sparkCounters.createCounter(prefix, StatsSetupConst.RAW_DATA_SIZE); + for (String group : prefixes.keySet()) { + for (String counter : prefixes.get(group)) { + sparkCounters.createCounter(group, counter); + } } } SparkReporter sparkReporter = new SparkReporter(sparkCounters); diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 46b04bc..2fea62d 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -22,18 +22,27 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.QueryPlan; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.MapOperator; import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.ScriptOperator; import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -97,12 +106,7 @@ public int execute(DriverContext driverContext) { SessionState.get().setSparkSession(sparkSession); SparkWork sparkWork = getWork(); - // We need to pre register spark counters for table statistic collection related query. - String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS); - StatsTask statsTask = getStatsTaskInChildTasks(this); - if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) { - sparkWork.setRequiredCounterPrefix(getRequiredCounterPrefix(statsTask)); - } + sparkWork.setRequiredCounterPrefix(getCounterPrefixes()); SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork); SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); @@ -225,18 +229,38 @@ private void printConfigInfo() throws IOException { console.printInfo(" set " + HiveConf.ConfVars.HADOOPNUMREDUCERS + "="); } + private Map> getCounterPrefixes() throws HiveException, MetaException { + Map> counters = getOperatorCounters(); + StatsTask statsTask = getStatsTaskInChildTasks(this); + String statsImpl = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS); + // fetch table prefix if SparkTask try to gather table statistics based on counter. + if (statsImpl.equalsIgnoreCase("counter") && statsTask != null) { + List prefixes = getRequiredCounterPrefix(statsTask); + for (String prefix : prefixes) { + List counterGroup = counters.get(prefix); + if (counterGroup == null) { + counterGroup = new LinkedList(); + counters.put(prefix, counterGroup); + } + counterGroup.add(StatsSetupConst.ROW_COUNT); + counterGroup.add(StatsSetupConst.RAW_DATA_SIZE); + } + } + return counters; + } + private List getRequiredCounterPrefix(StatsTask statsTask) throws HiveException, MetaException { List prefixs = new LinkedList(); StatsWork statsWork = statsTask.getWork(); - String prefix = getPrefix(statsWork); + String tablePrefix = getTablePrefix(statsWork); List partitions = getPartitionsList(statsWork); int maxPrefixLength = StatsFactory.getMaxPrefixLength(conf); if (partitions == null) { - prefixs.add(Utilities.getHashedStatsPrefix(prefix, maxPrefixLength)); + prefixs.add(Utilities.getHashedStatsPrefix(tablePrefix, maxPrefixLength)); } else { for (Partition partition : partitions) { - String prefixWithPartition = Utilities.join(prefix, Warehouse.makePartPath(partition.getSpec())); + String prefixWithPartition = Utilities.join(tablePrefix, Warehouse.makePartPath(partition.getSpec())); prefixs.add(Utilities.getHashedStatsPrefix(prefixWithPartition, maxPrefixLength)); } } @@ -244,7 +268,7 @@ private void printConfigInfo() throws IOException { return prefixs; } - private String getPrefix(StatsWork work) throws HiveException { + private String getTablePrefix(StatsWork work) throws HiveException { String tableName; if (work.getLoadTableDesc() != null) { tableName = work.getLoadTableDesc().getTable().getTableName(); @@ -326,4 +350,40 @@ private static StatsTask getStatsTaskInChildTasks(Task r } return list; } + + private Map> getOperatorCounters() { + String groupName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP); + Map> counters = new HashMap>(); + List hiveCounters = new LinkedList(); + counters.put(groupName, hiveCounters); + hiveCounters.add(Operator.HIVECOUNTERCREATEDFILES); + SparkWork sparkWork = this.getWork(); + for (BaseWork work : sparkWork.getAllWork()) { + for (Operator operator : work.getAllOperators()) { + if (operator instanceof MapOperator) { + for (MapOperator.Counter counter : MapOperator.Counter.values()) { + hiveCounters.add(counter.toString()); + } + } else if (operator instanceof FileSinkOperator) { + for (FileSinkOperator.Counter counter : FileSinkOperator.Counter.values()) { + hiveCounters.add(counter.toString()); + } + } else if (operator instanceof ReduceSinkOperator) { + for (ReduceSinkOperator.Counter counter : ReduceSinkOperator.Counter.values()) { + hiveCounters.add(counter.toString()); + } + }else if (operator instanceof ScriptOperator) { + for (ScriptOperator.Counter counter : ScriptOperator.Counter.values()) { + hiveCounters.add(counter.toString()); + } + }else if (operator instanceof JoinOperator) { + for (JoinOperator.SkewkeyTableCounter counter : JoinOperator.SkewkeyTableCounter.values()) { + hiveCounters.add(counter.toString()); + } + } + } + } + + return counters; + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java index bb3597a..9aa770a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java @@ -59,28 +59,6 @@ public SparkCounters(JavaSparkContext javaSparkContext, Configuration hiveConf) this.javaSparkContext = javaSparkContext; this.hiveConf = hiveConf; sparkCounterGroups = new HashMap(); - initializeSparkCounters(); - } - - /** - * pre-define all needed Counters here. - */ - private void initializeSparkCounters() { - String groupName = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVECOUNTERGROUP); - createCounter(groupName, Operator.HIVECOUNTERCREATEDFILES); - createCounter(groupName, MapOperator.Counter.DESERIALIZE_ERRORS); - createCounter(groupName, MapOperator.Counter.RECORDS_IN); - createCounter(groupName, FileSinkOperator.Counter.RECORDS_OUT); - createCounter(groupName, ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE); - createCounter(groupName, ScriptOperator.Counter.DESERIALIZE_ERRORS); - createCounter(groupName, ScriptOperator.Counter.SERIALIZE_ERRORS); - createCounter(groupName, JoinOperator.SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS); - // TODO remove? changed due to HIVE-8429 - createCounter(MapOperator.Counter.DESERIALIZE_ERRORS); - createCounter(MapOperator.Counter.RECORDS_IN); - createCounter(ScriptOperator.Counter.DESERIALIZE_ERRORS); - createCounter(ScriptOperator.Counter.SERIALIZE_ERRORS); - createCounter(JoinOperator.SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS); } public void createCounter(Enum key) { diff --git ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java index 66fd6b6..46d02bf 100644 --- ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java +++ ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java @@ -54,7 +54,7 @@ protected final Map, SparkEdgeProperty> edgeProperties = new HashMap, SparkEdgeProperty>(); - private List requiredCounterPrefix; + private Map> requiredCounterPrefix; public SparkWork(String name) { this.name = name + ":" + (++counter); @@ -176,11 +176,11 @@ public void disconnect(BaseWork a, BaseWork b) { return new HashSet(leaves); } - public void setRequiredCounterPrefix(List requiredCounterPrefix) { + public void setRequiredCounterPrefix(Map> requiredCounterPrefix) { this.requiredCounterPrefix = requiredCounterPrefix; } - public List getRequiredCounterPrefix() { + public Map> getRequiredCounterPrefix() { return requiredCounterPrefix; }