diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 04323bb..7ab9a34 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -36,6 +37,9 @@ import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticGroup; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager; @@ -103,7 +107,10 @@ public int execute(DriverContext driverContext) { sparkCounters = jobRef.getSparkJobStatus().getCounter(); SparkJobMonitor monitor = new SparkJobMonitor(jobRef.getSparkJobStatus()); monitor.startMonitor(); - console.printInfo(sparkCounters.toString()); + SparkStatistics sparkStatistics = jobRef.getSparkJobStatus().getSparkStatistics(); + if (LOG.isInfoEnabled() && sparkStatistics != null) { + logSparkStatistic(sparkStatistics); + } rc = 0; } catch (Exception e) { LOG.error("Failed to execute spark task.", e); @@ -121,6 +128,19 @@ public int execute(DriverContext driverContext) { return rc; } + private void logSparkStatistic(SparkStatistics sparkStatistic) { + Iterator groupIterator = sparkStatistic.getStatisticGroups(); + while (groupIterator.hasNext()) { + SparkStatisticGroup group = groupIterator.next(); + LOG.info(group.getGroupName()); + Iterator statisticIterator = group.getStatistics(); + while (statisticIterator.hasNext()) { + SparkStatistic statistic = statisticIterator.next(); + LOG.info("\t" + statistic.getName() + ": " + statistic.getValue()); + } + } + } + /** * close will move the temp files into the right place for the fetch * task. If the job has failed it will clean up the files. diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistic.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistic.java new file mode 100644 index 0000000..3329ef0 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistic.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.Statistic; + +public class SparkStatistic { + private final String name; + private final String value; + + SparkStatistic(String name, String value) { + this.name = name; + this.value = value; + } + + public String getValue() { + return value; + } + + public String getName() { + return name; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java new file mode 100644 index 0000000..0fcc506 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.Statistic; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +public class SparkStatisticGroup { + private final String groupName; + private final List statisticList; + + SparkStatisticGroup(String groupName, List statisticList) { + this.groupName = groupName; + this.statisticList = Collections.unmodifiableList(statisticList); + } + + public String getGroupName() { + return groupName; + } + + public Iterator getStatistics() { + return this.statisticList.iterator(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java new file mode 100644 index 0000000..150fbec --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.Statistic; + + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +public class SparkStatistics { + private final List statisticGroups; + + SparkStatistics(List statisticGroups) { + this.statisticGroups = Collections.unmodifiableList(statisticGroups); + } + + public Iterator getStatisticGroups() { + return this.statisticGroups.iterator(); + } +} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java new file mode 100644 index 0000000..b4f2038 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.Statistic; + +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounter; +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounterGroup; +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class SparkStatisticsBuilder { + + private Map> statisticMap; + + public SparkStatisticsBuilder() { + statisticMap = new HashMap>(); + } + + public SparkStatistics build() { + List statisticGroups = new LinkedList(); + for (Map.Entry> entry : statisticMap.entrySet()) { + String groupName = entry.getKey(); + List statisitcList = entry.getValue(); + statisticGroups.add(new SparkStatisticGroup(groupName, statisitcList)); + } + + return new SparkStatistics(statisticGroups); + } + + public SparkStatisticsBuilder add(SparkCounters sparkCounters) { + for (SparkCounterGroup counterGroup : sparkCounters.getSparkCounterGroups().values()) { + String groupDisplayName = counterGroup.getGroupDisplayName(); + List statisticList = statisticMap.get(groupDisplayName); + if (statisticList == null) { + statisticList = new LinkedList(); + statisticMap.put(groupDisplayName, statisticList); + } + for (SparkCounter counter : counterGroup.getSparkCounters().values()) { + String displayName = counter.getDisplayName(); + statisticList.add(new SparkStatistic(displayName, Long.toString(counter.getValue()))); + } + } + return this; + } + + public SparkStatisticsBuilder add(String groupName, String name, String value) { + List statisticList = statisticMap.get(groupName); + if (statisticList == null) { + statisticList = new LinkedList(); + statisticMap.put(groupName, statisticList); + } + statisticList.add(new SparkStatistic(name, value)); + return this; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index a450af4..f6cc581 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.spark.status; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; import java.util.Map; @@ -36,4 +37,6 @@ public SparkCounters getCounter(); + public SparkStatistics getSparkStatistics(); + } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java index db2eca4..78e16c5 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java @@ -22,6 +22,8 @@ import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; @@ -120,6 +122,11 @@ public SparkCounters getCounter() { return sparkCounters; } + @Override + public SparkStatistics getSparkStatistics() { + return new SparkStatisticsBuilder().add(sparkCounters).build(); + } + private List getStageInfo(int stageId) { List stageInfos = new LinkedList();