diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index 4bd3b43..a37bbe8 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -28,10 +28,10 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hive.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener; -import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.SparkWork; @@ -142,10 +142,10 @@ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) thr JavaPairRDD finalRDD = plan.generateGraph(); // We use Spark RDD async action to submit job as it's the only way to get jobId now. JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); - // As we always use foreach action to submit RDD graph, it would only trigger on job. + // As we always use foreach action to submit RDD graph, it would only trigger one job. int jobId = future.jobIds().get(0); - SimpleSparkJobStatus sparkJobStatus = - new SimpleSparkJobStatus(sc, jobId, jobMetricsListener, sparkCounters, future); + LocalSparkJobStatus sparkJobStatus = + new LocalSparkJobStatus(sc, jobId, jobMetricsListener, sparkCounters, future); return new SparkJobRef(Integer.toString(jobId), sparkJobStatus); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 93d486f..75079c6 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -27,7 +27,7 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hive.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.plan.BaseWork; @@ -122,8 +122,7 @@ public Serializable call(JobContext jc) throws Exception { return null; } }); - jobHandle.get(); - return new SparkJobRef(jobHandle.getClientJobId()); + return new SparkJobRef(jobHandle.getClientJobId(), jobHandle.getSparkJobStatus()); } private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java index 02a5329..fdf43f1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReporter.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hive.ql.exec.spark; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hive.spark.counter.SparkCounters; import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.Reporter; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 3613784..33c24ad 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -29,7 +29,6 @@ import java.util.Map; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; @@ -46,16 +45,16 @@ import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistic; -import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticGroup; -import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hive.spark.Statistic.SparkStatistic; +import org.apache.hive.spark.Statistic.SparkStatisticGroup; +import org.apache.hive.spark.Statistic.SparkStatistics; +import org.apache.hive.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; -import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; +import org.apache.hive.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistic.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistic.java deleted file mode 100644 index 3329ef0..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistic.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.Statistic; - -public class SparkStatistic { - private final String name; - private final String value; - - SparkStatistic(String name, String value) { - this.name = name; - this.value = value; - } - - public String getValue() { - return value; - } - - public String getName() { - return name; - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java deleted file mode 100644 index 0fcc506..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticGroup.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.Statistic; - -import java.util.Collections; -import java.util.Iterator; -import java.util.List; - -public class SparkStatisticGroup { - private final String groupName; - private final List statisticList; - - SparkStatisticGroup(String groupName, List statisticList) { - this.groupName = groupName; - this.statisticList = Collections.unmodifiableList(statisticList); - } - - public String getGroupName() { - return groupName; - } - - public Iterator getStatistics() { - return this.statisticList.iterator(); - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java deleted file mode 100644 index 150fbec..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatistics.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.Statistic; - - -import java.util.Collections; -import java.util.Iterator; -import java.util.List; - -public class SparkStatistics { - private final List statisticGroups; - - SparkStatistics(List statisticGroups) { - this.statisticGroups = Collections.unmodifiableList(statisticGroups); - } - - public Iterator getStatisticGroups() { - return this.statisticGroups.iterator(); - } -} \ No newline at end of file diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java deleted file mode 100644 index b4f2038..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/Statistic/SparkStatisticsBuilder.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.Statistic; - -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounter; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounterGroup; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -public class SparkStatisticsBuilder { - - private Map> statisticMap; - - public SparkStatisticsBuilder() { - statisticMap = new HashMap>(); - } - - public SparkStatistics build() { - List statisticGroups = new LinkedList(); - for (Map.Entry> entry : statisticMap.entrySet()) { - String groupName = entry.getKey(); - List statisitcList = entry.getValue(); - statisticGroups.add(new SparkStatisticGroup(groupName, statisitcList)); - } - - return new SparkStatistics(statisticGroups); - } - - public SparkStatisticsBuilder add(SparkCounters sparkCounters) { - for (SparkCounterGroup counterGroup : sparkCounters.getSparkCounterGroups().values()) { - String groupDisplayName = counterGroup.getGroupDisplayName(); - List statisticList = statisticMap.get(groupDisplayName); - if (statisticList == null) { - statisticList = new LinkedList(); - statisticMap.put(groupDisplayName, statisticList); - } - for (SparkCounter counter : counterGroup.getSparkCounters().values()) { - String displayName = counter.getDisplayName(); - statisticList.add(new SparkStatistic(displayName, Long.toString(counter.getValue()))); - } - } - return this; - } - - public SparkStatisticsBuilder add(String groupName, String name, String value) { - List statisticList = statisticMap.get(groupName); - if (statisticList == null) { - statisticList = new LinkedList(); - statisticMap.put(groupName, statisticList); - } - statisticList.add(new SparkStatistic(name, value)); - return this; - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java deleted file mode 100644 index cdf2e10..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounter.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.counter; - -import java.io.Serializable; - -import org.apache.spark.Accumulator; -import org.apache.spark.AccumulatorParam; -import org.apache.spark.api.java.JavaSparkContext; - -public class SparkCounter implements Serializable { - - private String name; - private String displayName; - private Accumulator accumulator; - - public SparkCounter( - String name, - String displayName, - String groupName, - long initValue, - JavaSparkContext sparkContext) { - - this.name = name; - this.displayName = displayName; - LongAccumulatorParam longParam = new LongAccumulatorParam(); - String accumulatorName = groupName + "_" + name; - this.accumulator = sparkContext.accumulator(initValue, accumulatorName, longParam); - } - - public long getValue() { - return accumulator.value(); - } - - public void increment(long incr) { - accumulator.add(incr); - } - - public String getName() { - return name; - } - - public String getDisplayName() { - return displayName; - } - - public void setDisplayName(String displayName) { - this.displayName = displayName; - } - - class LongAccumulatorParam implements AccumulatorParam { - - @Override - public Long addAccumulator(Long t1, Long t2) { - return t1 + t2; - } - - @Override - public Long addInPlace(Long r1, Long r2) { - return r1 + r2; - } - - @Override - public Long zero(Long initialValue) { - return 0L; - } - } - -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java deleted file mode 100644 index e4c912d..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounterGroup.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.counter; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.spark.api.java.JavaSparkContext; - -/** - * We use group to fold all the same kind of counters. - */ -public class SparkCounterGroup implements Serializable { - private static final long serialVersionUID = 1L; - private String groupName; - private String groupDisplayName; - private Map sparkCounters; - - private transient JavaSparkContext javaSparkContext; - - public SparkCounterGroup( - String groupName, - String groupDisplayName, - JavaSparkContext javaSparkContext) { - - this.groupName = groupName; - this.groupDisplayName = groupDisplayName; - this.javaSparkContext = javaSparkContext; - sparkCounters = new HashMap(); - } - - public void createCounter(String name, long initValue) { - String displayName = ShimLoader.getHadoopShims().getCounterGroupName(groupName, groupName); - SparkCounter counter = new SparkCounter(name, displayName, groupName, initValue, javaSparkContext); - sparkCounters.put(name, counter); - } - - public SparkCounter getCounter(String name) { - return sparkCounters.get(name); - } - - public String getGroupName() { - return groupName; - } - - public String getGroupDisplayName() { - return groupDisplayName; - } - - public void setGroupDisplayName(String groupDisplayName) { - this.groupDisplayName = groupDisplayName; - } - - public Map getSparkCounters() { - return sparkCounters; - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java deleted file mode 100644 index 1753e78..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/counter/SparkCounters.java +++ /dev/null @@ -1,153 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.counter; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.FileSinkOperator; -import org.apache.hadoop.hive.ql.exec.JoinOperator; -import org.apache.hadoop.hive.ql.exec.MapOperator; -import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; -import org.apache.hadoop.hive.ql.exec.Operator; -import org.apache.hadoop.hive.ql.exec.ScriptOperator; -import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.spark.api.java.JavaSparkContext; - -/** - * SparkCounters is used to collect Hive operator metric through Spark accumulator. There are few - * limitation of Spark accumulator, like: - * 1. accumulator should be created at Spark context side. - * 2. Spark tasks can only increment metric count. - * 3. User can only get accumulator value at Spark context side. - * These Spark Counter API is designed to fit into Hive requirement, while with several access - * restriction due to Spark accumulator previous mentioned: - * 1. Counter should be created on driver side if it would be accessed in task. - * 2. increment could only be invoked task side. - * 3. Hive could only get Counter value at driver side. - */ -public class SparkCounters implements Serializable { - private static final long serialVersionUID = 1L; - - private static final Log LOG = LogFactory.getLog(SparkCounters.class); - - private Map sparkCounterGroups; - - private transient JavaSparkContext javaSparkContext; - private transient Configuration hiveConf; - - public SparkCounters(JavaSparkContext javaSparkContext, Configuration hiveConf) { - this.javaSparkContext = javaSparkContext; - this.hiveConf = hiveConf; - sparkCounterGroups = new HashMap(); - } - - public void createCounter(Enum key) { - createCounter(key.getDeclaringClass().getName(), key.name()); - } - - public void createCounter(String groupName, Enum key) { - createCounter(groupName, key.name(), 0L); - } - - public void createCounter(String groupName, String counterName) { - createCounter(groupName, counterName, 0L); - } - - public void createCounter(String groupName, String counterName, long initValue) { - getGroup(groupName).createCounter(counterName, initValue); - } - - public void increment(Enum key, long incrValue) { - increment(key.getDeclaringClass().getName(), key.name(), incrValue); - } - - public void increment(String groupName, String counterName, long value) { - SparkCounter counter = getGroup(groupName).getCounter(counterName); - if (counter == null) { - LOG.error( - String.format("counter[%s, %s] has not initialized before.", groupName, counterName)); - } else { - counter.increment(value); - } - } - - public long getValue(String groupName, String counterName) { - SparkCounter counter = getGroup(groupName).getCounter(counterName); - if (counter == null) { - LOG.error( - String.format("counter[%s, %s] has not initialized before.", groupName, counterName)); - return 0; - } else { - return counter.getValue(); - } - } - - public SparkCounter getCounter(String groupName, String counterName) { - return getGroup(groupName).getCounter(counterName); - } - - public SparkCounter getCounter(Enum key) { - return getCounter(key.getDeclaringClass().getName(), key.name()); - } - - private SparkCounterGroup getGroup(String groupName) { - SparkCounterGroup group = sparkCounterGroups.get(groupName); - if (group == null) { - String groupDisplayName = - ShimLoader.getHadoopShims().getCounterGroupName(groupName, groupName); - group = new SparkCounterGroup(groupName, groupDisplayName, javaSparkContext); - sparkCounterGroups.put(groupName, group); - } - return group; - } - - public Map getSparkCounterGroups() { - return sparkCounterGroups; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - Map groups = getSparkCounterGroups(); - if (groups != null) { - for(Map.Entry groupEntry : groups.entrySet()) { - String groupName = groupEntry.getKey(); - SparkCounterGroup group = groupEntry.getValue(); - sb.append(groupName).append("\n"); - Map counters = group.getSparkCounters(); - for (Map.Entry counterEntry : counters.entrySet()) { - String counterName = counterEntry.getKey(); - SparkCounter counter = counterEntry.getValue(); - sb.append("\t") - .append(counterName) - .append(": ") - .append(counter.getValue()) - .append("\n"); - } - } - } - - return sb.toString(); - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 3b13d90..dd7ac9c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -28,6 +28,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hive.spark.status.SparkJobStatus; +import org.apache.hive.spark.status.SparkStageProgress; import org.apache.spark.JobExecutionStatus; /** @@ -62,53 +64,58 @@ public int startMonitor() { int rc = 0; JobExecutionStatus lastState = null; Map lastProgressMap = null; - long startTime = 0; + long startTime = -1; while (true) { try { JobExecutionStatus state = sparkJobStatus.getState(); - if (state != null && (state != lastState || state == JobExecutionStatus.RUNNING)) { + if (state != null && state != JobExecutionStatus.UNKNOWN && + (state != lastState || state == JobExecutionStatus.RUNNING)) { lastState = state; Map progressMap = sparkJobStatus.getSparkStageProgress(); switch (state) { - case RUNNING: - if (!running) { - // print job stages. - console.printInfo("\nQuery Hive on Spark job[" + - sparkJobStatus.getJobId() + "] stages:"); - for (int stageId : sparkJobStatus.getStageIds()) { - console.printInfo(Integer.toString(stageId)); + case RUNNING: + if (!running) { + // print job stages. + console.printInfo("\nQuery Hive on Spark job[" + + sparkJobStatus.getJobId() + "] stages:"); + for (int stageId : sparkJobStatus.getStageIds()) { + console.printInfo(Integer.toString(stageId)); + } + + console.printInfo("\nStatus: Running (Hive on Spark job[" + + sparkJobStatus.getJobId() + "])"); + startTime = System.currentTimeMillis(); + running = true; + + console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: " + + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]"); } - console.printInfo("\nStatus: Running (Hive on Spark job[" + - sparkJobStatus.getJobId() + "])"); - startTime = System.currentTimeMillis(); - running = true; - console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: " + - "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]"); - } - - - printStatus(progressMap, lastProgressMap); - lastProgressMap = progressMap; - break; - case SUCCEEDED: - printStatus(progressMap, lastProgressMap); - lastProgressMap = progressMap; - double duration = (System.currentTimeMillis() - startTime) / 1000.0; - console.printInfo("Status: Finished successfully in " + - String.format("%.2f seconds", duration)); - running = false; - done = true; - break; - case FAILED: - console.printError("Status: Failed"); - running = false; - done = true; - rc = 2; - break; + printStatus(progressMap, lastProgressMap); + lastProgressMap = progressMap; + break; + case SUCCEEDED: + printStatus(progressMap, lastProgressMap); + lastProgressMap = progressMap; + if (startTime < 0) { + console.printInfo("Status: Finished successfully within a monitor interval."); + } else { + double duration = (System.currentTimeMillis() - startTime) / 1000.0; + console.printInfo("Status: Finished successfully in " + + String.format("%.2f seconds", duration)); + } + running = false; + done = true; + break; + case FAILED: + console.printError("Status: Failed"); + running = false; + done = true; + rc = 2; + break; } } if (!done) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java index d16d1b4..4f86589 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec.spark.status; +import org.apache.hive.spark.status.SparkJobStatus; + public class SparkJobRef { private String jobId; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java deleted file mode 100644 index b5c1837..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.status; - -import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; -import org.apache.spark.JobExecutionStatus; - -import java.util.Map; - -/** - * SparkJobStatus identify what Hive want to know about the status of a Spark job. - */ -public interface SparkJobStatus { - - public int getJobId(); - - public JobExecutionStatus getState(); - - public int[] getStageIds(); - - public Map getSparkStageProgress(); - - public SparkCounters getCounter(); - - public SparkStatistics getSparkStatistics(); - - public void cleanup(); -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java deleted file mode 100644 index cfec354..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.status; - -public class SparkStageProgress { - - private int totalTaskCount; - private int succeededTaskCount; - private int runningTaskCount; - private int failedTaskCount; - // TODO: remove the following two metrics as they're not available in current spark API, - // we can add them back once spark provides it -// private int killedTaskCount; -// private long cumulativeTime; - - public SparkStageProgress( - int totalTaskCount, - int succeededTaskCount, - int runningTaskCount, - int failedTaskCount) { - - this.totalTaskCount = totalTaskCount; - this.succeededTaskCount = succeededTaskCount; - this.runningTaskCount = runningTaskCount; - this.failedTaskCount = failedTaskCount; - } - - public int getTotalTaskCount() { - return totalTaskCount; - } - - public int getSucceededTaskCount() { - return succeededTaskCount; - } - - public int getRunningTaskCount() { - return runningTaskCount; - } - - public int getFailedTaskCount() { - return failedTaskCount; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof SparkStageProgress) { - SparkStageProgress other = (SparkStageProgress) obj; - return getTotalTaskCount() == other.getTotalTaskCount() - && getSucceededTaskCount() == other.getSucceededTaskCount() - && getRunningTaskCount() == other.getRunningTaskCount() - && getFailedTaskCount() == other.getFailedTaskCount(); - } - return false; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("TotalTasks: "); - sb.append(getTotalTaskCount()); - sb.append(" Succeeded: "); - sb.append(getSucceededTaskCount()); - sb.append(" Running: "); - sb.append(getRunningTaskCount()); - sb.append(" Failed: "); - sb.append(getFailedTaskCount()); - return sb.toString(); - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java new file mode 100644 index 0000000..257651f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status.impl; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Maps; +import org.apache.hive.spark.Statistic.SparkStatistics; +import org.apache.hive.spark.Statistic.SparkStatisticsBuilder; +import org.apache.hive.spark.counter.SparkCounters; +import org.apache.hive.spark.status.SparkJobStatus; +import org.apache.hive.spark.status.SparkStageProgress; +import org.apache.spark.JobExecutionStatus; +import org.apache.spark.SparkJobInfo; +import org.apache.spark.SparkStageInfo; +import org.apache.spark.api.java.JavaFutureAction; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.executor.ShuffleReadMetrics; +import org.apache.spark.executor.ShuffleWriteMetrics; +import org.apache.spark.executor.TaskMetrics; + +import scala.Option; + +public class LocalSparkJobStatus implements SparkJobStatus { + + private final JavaSparkContext sparkContext; + private int jobId; + // After SPARK-2321, we only use JobMetricsListener to get job metrics + // TODO: remove it when the new API provides equivalent functionality + private JobMetricsListener jobMetricsListener; + private SparkCounters sparkCounters; + private JavaFutureAction future; + + public LocalSparkJobStatus(JavaSparkContext sparkContext, int jobId, + JobMetricsListener jobMetricsListener, SparkCounters sparkCounters, + JavaFutureAction future) { + this.sparkContext = sparkContext; + this.jobId = jobId; + this.jobMetricsListener = jobMetricsListener; + this.sparkCounters = sparkCounters; + this.future = future; + } + + @Override + public int getJobId() { + return jobId; + } + + @Override + public JobExecutionStatus getState() { + // For spark job with empty source data, it's not submitted actually, so we would never + // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current + // job state. + if (future.isDone()) { + return JobExecutionStatus.SUCCEEDED; + } else { + // SparkJobInfo may not be available yet + SparkJobInfo sparkJobInfo = getJobInfo(); + return sparkJobInfo == null ? null : sparkJobInfo.status(); + } + } + + @Override + public int[] getStageIds() { + SparkJobInfo sparkJobInfo = getJobInfo(); + return sparkJobInfo == null ? new int[0] : sparkJobInfo.stageIds(); + } + + @Override + public Map getSparkStageProgress() { + Map stageProgresses = new HashMap(); + for (int stageId : getStageIds()) { + SparkStageInfo sparkStageInfo = getStageInfo(stageId); + if (sparkStageInfo != null) { + int runningTaskCount = sparkStageInfo.numActiveTasks(); + int completedTaskCount = sparkStageInfo.numCompletedTasks(); + int failedTaskCount = sparkStageInfo.numFailedTasks(); + int totalTaskCount = sparkStageInfo.numTasks(); + SparkStageProgress sparkStageProgress = new SparkStageProgress( + totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); + stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" + + sparkStageInfo.currentAttemptId(), sparkStageProgress); + } + } + return stageProgresses; + } + + @Override + public SparkCounters getCounter() { + return sparkCounters; + } + + @Override + public SparkStatistics getSparkStatistics() { + SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder(); + // add Hive operator level statistics. + sparkStatisticsBuilder.add(sparkCounters); + // add spark job metrics. + String jobIdentifier = "Spark Job[" + jobId + "] Metrics"; + Map> jobMetric = jobMetricsListener.getJobMetric(jobId); + if (jobMetric == null) { + return null; + } + + Map flatJobMetric = combineJobLevelMetrics(jobMetric); + for (Map.Entry entry : flatJobMetric.entrySet()) { + sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); + } + + return sparkStatisticsBuilder.build(); + } + + @Override + public void cleanup() { + jobMetricsListener.cleanup(jobId); + } + + private Map combineJobLevelMetrics(Map> jobMetric) { + Map results = Maps.newLinkedHashMap(); + + long executorDeserializeTime = 0; + long executorRunTime = 0; + long resultSize = 0; + long jvmGCTime = 0; + long resultSerializationTime = 0; + long memoryBytesSpilled = 0; + long diskBytesSpilled = 0; + long bytesRead = 0; + long remoteBlocksFetched = 0; + long localBlocksFetched = 0; + long fetchWaitTime = 0; + long remoteBytesRead = 0; + long shuffleBytesWritten = 0; + long shuffleWriteTime = 0; + boolean inputMetricExist = false; + boolean shuffleReadMetricExist = false; + boolean shuffleWriteMetricExist = false; + + for (List stageMetric : jobMetric.values()) { + if (stageMetric != null) { + for (TaskMetrics taskMetrics : stageMetric) { + if (taskMetrics != null) { + executorDeserializeTime += taskMetrics.executorDeserializeTime(); + executorRunTime += taskMetrics.executorRunTime(); + resultSize += taskMetrics.resultSize(); + jvmGCTime += taskMetrics.jvmGCTime(); + resultSerializationTime += taskMetrics.resultSerializationTime(); + memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); + diskBytesSpilled += taskMetrics.diskBytesSpilled(); + if (!taskMetrics.inputMetrics().isEmpty()) { + inputMetricExist = true; + bytesRead += taskMetrics.inputMetrics().get().bytesRead(); + } + Option shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); + if (!shuffleReadMetricsOption.isEmpty()) { + shuffleReadMetricExist = true; + remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched(); + localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched(); + fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime(); + remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead(); + } + Option shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); + if (!shuffleWriteMetricsOption.isEmpty()) { + shuffleWriteMetricExist = true; + shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten(); + shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime(); + } + } + } + } + } + + results.put("EexcutorDeserializeTime", executorDeserializeTime); + results.put("ExecutorRunTime", executorRunTime); + results.put("ResultSize", resultSize); + results.put("JvmGCTime", jvmGCTime); + results.put("ResultSerializationTime", resultSerializationTime); + results.put("MemoryBytesSpilled", memoryBytesSpilled); + results.put("DiskBytesSpilled", diskBytesSpilled); + if (inputMetricExist) { + results.put("BytesRead", bytesRead); + } + if (shuffleReadMetricExist) { + results.put("RemoteBlocksFetched", remoteBlocksFetched); + results.put("LocalBlocksFetched", localBlocksFetched); + results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched); + results.put("FetchWaitTime", fetchWaitTime); + results.put("RemoteBytesRead", remoteBytesRead); + } + if (shuffleWriteMetricExist) { + results.put("ShuffleBytesWritten", shuffleBytesWritten); + results.put("ShuffleWriteTime", shuffleWriteTime); + } + return results; + } + + private SparkJobInfo getJobInfo() { + return sparkContext.statusTracker().getJobInfo(jobId); + } + + private SparkStageInfo getStageInfo(int stageId) { + return sparkContext.statusTracker().getStageInfo(stageId); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java deleted file mode 100644 index 19fd20d..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.status.impl; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.google.common.collect.Maps; -import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; -import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; -import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; -import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; -import org.apache.spark.JobExecutionStatus; -import org.apache.spark.SparkJobInfo; -import org.apache.spark.SparkStageInfo; -import org.apache.spark.api.java.JavaFutureAction; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.executor.ShuffleReadMetrics; -import org.apache.spark.executor.ShuffleWriteMetrics; -import org.apache.spark.executor.TaskMetrics; - -import scala.Option; - -public class SimpleSparkJobStatus implements SparkJobStatus { - - private final JavaSparkContext sparkContext; - private int jobId; - // After SPARK-2321, we only use JobMetricsListener to get job metrics - // TODO: remove it when the new API provides equivalent functionality - private JobMetricsListener jobMetricsListener; - private SparkCounters sparkCounters; - private JavaFutureAction future; - - public SimpleSparkJobStatus(JavaSparkContext sparkContext, int jobId, - JobMetricsListener jobMetricsListener, SparkCounters sparkCounters, - JavaFutureAction future) { - this.sparkContext = sparkContext; - this.jobId = jobId; - this.jobMetricsListener = jobMetricsListener; - this.sparkCounters = sparkCounters; - this.future = future; - } - - @Override - public int getJobId() { - return jobId; - } - - @Override - public JobExecutionStatus getState() { - // For spark job with empty source data, it's not submitted actually, so we would never - // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current - // job state. - if (future.isDone()) { - return JobExecutionStatus.SUCCEEDED; - } else { - // SparkJobInfo may not be available yet - SparkJobInfo sparkJobInfo = getJobInfo(); - return sparkJobInfo == null ? null : sparkJobInfo.status(); - } - } - - @Override - public int[] getStageIds() { - SparkJobInfo sparkJobInfo = getJobInfo(); - return sparkJobInfo == null ? new int[0] : sparkJobInfo.stageIds(); - } - - @Override - public Map getSparkStageProgress() { - Map stageProgresses = new HashMap(); - for (int stageId : getStageIds()) { - SparkStageInfo sparkStageInfo = getStageInfo(stageId); - if (sparkStageInfo != null) { - int runningTaskCount = sparkStageInfo.numActiveTasks(); - int completedTaskCount = sparkStageInfo.numCompletedTasks(); - int failedTaskCount = sparkStageInfo.numFailedTasks(); - int totalTaskCount = sparkStageInfo.numTasks(); - SparkStageProgress sparkStageProgress = new SparkStageProgress( - totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); - stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" + - sparkStageInfo.currentAttemptId(), sparkStageProgress); - } - } - return stageProgresses; - } - - @Override - public SparkCounters getCounter() { - return sparkCounters; - } - - @Override - public SparkStatistics getSparkStatistics() { - SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder(); - // add Hive operator level statistics. - sparkStatisticsBuilder.add(sparkCounters); - // add spark job metrics. - String jobIdentifier = "Spark Job[" + jobId + "] Metrics"; - Map> jobMetric = jobMetricsListener.getJobMetric(jobId); - if (jobMetric == null) { - return null; - } - - Map flatJobMetric = combineJobLevelMetrics(jobMetric); - for (Map.Entry entry : flatJobMetric.entrySet()) { - sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); - } - - return sparkStatisticsBuilder.build(); - } - - @Override - public void cleanup() { - jobMetricsListener.cleanup(jobId); - } - - private Map combineJobLevelMetrics(Map> jobMetric) { - Map results = Maps.newLinkedHashMap(); - - long executorDeserializeTime = 0; - long executorRunTime = 0; - long resultSize = 0; - long jvmGCTime = 0; - long resultSerializationTime = 0; - long memoryBytesSpilled = 0; - long diskBytesSpilled = 0; - long bytesRead = 0; - long remoteBlocksFetched = 0; - long localBlocksFetched = 0; - long fetchWaitTime = 0; - long remoteBytesRead = 0; - long shuffleBytesWritten = 0; - long shuffleWriteTime = 0; - boolean inputMetricExist = false; - boolean shuffleReadMetricExist = false; - boolean shuffleWriteMetricExist = false; - - for (List stageMetric : jobMetric.values()) { - if (stageMetric != null) { - for (TaskMetrics taskMetrics : stageMetric) { - if (taskMetrics != null) { - executorDeserializeTime += taskMetrics.executorDeserializeTime(); - executorRunTime += taskMetrics.executorRunTime(); - resultSize += taskMetrics.resultSize(); - jvmGCTime += taskMetrics.jvmGCTime(); - resultSerializationTime += taskMetrics.resultSerializationTime(); - memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); - diskBytesSpilled += taskMetrics.diskBytesSpilled(); - if (!taskMetrics.inputMetrics().isEmpty()) { - inputMetricExist = true; - bytesRead += taskMetrics.inputMetrics().get().bytesRead(); - } - Option shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); - if (!shuffleReadMetricsOption.isEmpty()) { - shuffleReadMetricExist = true; - remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched(); - localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched(); - fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime(); - remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead(); - } - Option shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); - if (!shuffleWriteMetricsOption.isEmpty()) { - shuffleWriteMetricExist = true; - shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten(); - shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime(); - } - } - } - } - } - - results.put("EexcutorDeserializeTime", executorDeserializeTime); - results.put("ExecutorRunTime", executorRunTime); - results.put("ResultSize", resultSize); - results.put("JvmGCTime", jvmGCTime); - results.put("ResultSerializationTime", resultSerializationTime); - results.put("MemoryBytesSpilled", memoryBytesSpilled); - results.put("DiskBytesSpilled", diskBytesSpilled); - if (inputMetricExist) { - results.put("BytesRead", bytesRead); - } - if (shuffleReadMetricExist) { - results.put("RemoteBlocksFetched", remoteBlocksFetched); - results.put("LocalBlocksFetched", localBlocksFetched); - results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched); - results.put("FetchWaitTime", fetchWaitTime); - results.put("RemoteBytesRead", remoteBytesRead); - } - if (shuffleWriteMetricExist) { - results.put("ShuffleBytesWritten", shuffleBytesWritten); - results.put("ShuffleWriteTime", shuffleWriteTime); - } - return results; - } - - private SparkJobInfo getJobInfo() { - return sparkContext.statusTracker().getJobInfo(jobId); - } - - private SparkStageInfo getStageInfo(int stageId) { - return sparkContext.statusTracker().getStageInfo(stageId); - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java index 0847ce7..03355a3 100644 --- ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java +++ ql/src/java/org/apache/hadoop/hive/ql/stats/CounterStatsAggregatorSpark.java @@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hive.spark.counter.SparkCounters; public class CounterStatsAggregatorSpark implements StatsAggregator, StatsCollectionTaskIndependent { diff --git spark-client/src/main/java/org/apache/hive/spark/Statistic/SparkStatistic.java spark-client/src/main/java/org/apache/hive/spark/Statistic/SparkStatistic.java new file mode 100644 index 0000000..a6fe720 --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/Statistic/SparkStatistic.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark.Statistic; + +public class SparkStatistic { + private final String name; + private final String value; + + SparkStatistic(String name, String value) { + this.name = name; + this.value = value; + } + + public String getValue() { + return value; + } + + public String getName() { + return name; + } +} diff --git spark-client/src/main/java/org/apache/hive/spark/Statistic/SparkStatisticGroup.java spark-client/src/main/java/org/apache/hive/spark/Statistic/SparkStatisticGroup.java new file mode 100644 index 0000000..1487d0e --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/Statistic/SparkStatisticGroup.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark.Statistic; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +public class SparkStatisticGroup { + private final String groupName; + private final List statisticList; + + SparkStatisticGroup(String groupName, List statisticList) { + this.groupName = groupName; + this.statisticList = Collections.unmodifiableList(statisticList); + } + + public String getGroupName() { + return groupName; + } + + public Iterator getStatistics() { + return this.statisticList.iterator(); + } +} diff --git spark-client/src/main/java/org/apache/hive/spark/Statistic/SparkStatistics.java spark-client/src/main/java/org/apache/hive/spark/Statistic/SparkStatistics.java new file mode 100644 index 0000000..aac2a5f --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/Statistic/SparkStatistics.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark.Statistic; + + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +public class SparkStatistics { + private final List statisticGroups; + + SparkStatistics(List statisticGroups) { + this.statisticGroups = Collections.unmodifiableList(statisticGroups); + } + + public Iterator getStatisticGroups() { + return this.statisticGroups.iterator(); + } +} \ No newline at end of file diff --git spark-client/src/main/java/org/apache/hive/spark/Statistic/SparkStatisticsBuilder.java spark-client/src/main/java/org/apache/hive/spark/Statistic/SparkStatisticsBuilder.java new file mode 100644 index 0000000..18194fe --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/Statistic/SparkStatisticsBuilder.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark.Statistic; + +import org.apache.hive.spark.counter.SparkCounter; +import org.apache.hive.spark.counter.SparkCounterGroup; +import org.apache.hive.spark.counter.SparkCounters; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +public class SparkStatisticsBuilder { + + private Map> statisticMap; + + public SparkStatisticsBuilder() { + statisticMap = new HashMap>(); + } + + public SparkStatistics build() { + List statisticGroups = new LinkedList(); + for (Map.Entry> entry : statisticMap.entrySet()) { + String groupName = entry.getKey(); + List statisitcList = entry.getValue(); + statisticGroups.add(new SparkStatisticGroup(groupName, statisitcList)); + } + + return new SparkStatistics(statisticGroups); + } + + public SparkStatisticsBuilder add(SparkCounters sparkCounters) { + for (SparkCounterGroup counterGroup : sparkCounters.getSparkCounterGroups().values()) { + String groupDisplayName = counterGroup.getGroupDisplayName(); + List statisticList = statisticMap.get(groupDisplayName); + if (statisticList == null) { + statisticList = new LinkedList(); + statisticMap.put(groupDisplayName, statisticList); + } + for (SparkCounter counter : counterGroup.getSparkCounters().values()) { + String displayName = counter.getDisplayName(); + statisticList.add(new SparkStatistic(displayName, Long.toString(counter.getValue()))); + } + } + return this; + } + + public SparkStatisticsBuilder add(String groupName, String name, String value) { + List statisticList = statisticMap.get(groupName); + if (statisticList == null) { + statisticList = new LinkedList(); + statisticMap.put(groupName, statisticList); + } + statisticList.add(new SparkStatistic(name, value)); + return this; + } +} diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java index 4a43a01..543ea3c 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java @@ -21,6 +21,7 @@ import java.util.concurrent.Future; import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hive.spark.status.SparkJobStatus; /** * A handle to a submitted job. Allows for monitoring and controlling of the running remote job. @@ -42,6 +43,11 @@ */ MetricsCollection getMetrics(); + /** + * Get a SparkJobStatus for this job + */ + SparkJobStatus getSparkJobStatus(); + // TODO: expose job status? } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java index 054f5ec..3a63621 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java @@ -17,6 +17,8 @@ package org.apache.hive.spark.client; +import org.apache.hive.spark.status.SparkJobStatus; + import java.io.Serializable; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -38,6 +40,8 @@ private T result; private Throwable error; + private final SparkJobStatus sparkJobStatus; + JobHandleImpl(SparkClientImpl client, String jobId) { this.client = client; this.jobId = jobId; @@ -45,6 +49,7 @@ this.metrics = new MetricsCollection(); this.cancelled = new AtomicBoolean(); this.completed = false; + sparkJobStatus = new RemoteSparkJobStatus(client, jobId); } /** Requests a running job to be cancelled. */ @@ -103,6 +108,11 @@ public MetricsCollection getMetrics() { return metrics; } + @Override + public SparkJobStatus getSparkJobStatus() { + return sparkJobStatus; + } + private T get(long timeout) throws ExecutionException, InterruptedException, TimeoutException { long deadline = System.currentTimeMillis() + timeout; synchronized (monitor) { diff --git spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java index 4212634..7e104a3 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java +++ spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java @@ -125,4 +125,56 @@ } + static class GetJobExecutionStatus implements Serializable { + final String clientJobId; + final Integer sparkJobId; + + GetJobExecutionStatus(String clientJobId, Integer sparkJobId) { + this.clientJobId = clientJobId; + this.sparkJobId = sparkJobId; + } + + GetJobExecutionStatus() { + this(null, null); + } + } + + static class GetStageIds implements Serializable { + final Integer sparkJobId; + + GetStageIds(Integer sparkJobId) { + this.sparkJobId = sparkJobId; + } + + GetStageIds() { + this(null); + } + } + + static class GetSparkStageProgress implements Serializable { + final Integer sparkJobId; + + GetSparkStageProgress(Integer sparkJobId) { + this.sparkJobId = sparkJobId; + } + + GetSparkStageProgress() { + this(null); + } + } + + static class JobSubmitted implements Serializable { + final String clientJobId; + final Integer sparkJobId; + + JobSubmitted(String clientJobId, Integer sparkJobId) { + this.clientJobId = clientJobId; + this.sparkJobId = sparkJobId; + } + + JobSubmitted() { + this(null, null); + } + } + } diff --git spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index 3de1fec..8936dc3 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -18,6 +18,7 @@ package org.apache.hive.spark.client; import java.io.Serializable; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -28,6 +29,10 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hive.spark.status.SparkStageProgress; +import org.apache.spark.JobExecutionStatus; +import org.apache.spark.SparkJobInfo; +import org.apache.spark.SparkStageInfo; import scala.Tuple2; import akka.actor.ActorRef; @@ -185,6 +190,51 @@ public void onReceive(Object message) throws Exception { JobWrapper wrapper = new JobWrapper(req); activeJobs.put(req.id, wrapper); wrapper.submit(); + } else if (message instanceof Protocol.GetJobExecutionStatus) { + Protocol.GetJobExecutionStatus getJobExecutionStatus = + (Protocol.GetJobExecutionStatus) message; + SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo( + getJobExecutionStatus.sparkJobId); + JobExecutionStatus jobExecutionStatus = jobInfo == null ? + JobExecutionStatus.UNKNOWN : jobInfo.status(); + if (activeJobs.containsKey(getJobExecutionStatus.clientJobId)) { + JobWrapper jobWrapper = activeJobs.get(getJobExecutionStatus.clientJobId); + if (jobWrapper.jobs.size() == 1) { + JavaFutureAction javaFutureAction = (JavaFutureAction) jobWrapper.jobs.get(0); + if (javaFutureAction.isDone()) { + jobExecutionStatus = JobExecutionStatus.SUCCEEDED; + } + } + } + getSender().tell(jobExecutionStatus, getSelf()); + } else if (message instanceof Protocol.GetStageIds) { + Protocol.GetStageIds getStageIds = (Protocol.GetStageIds) message; + SparkJobInfo sparkJobInfo = jc.sc().statusTracker().getJobInfo(getStageIds.sparkJobId); + int[] stageIds = sparkJobInfo == null ? new int[0] : sparkJobInfo.stageIds(); + getSender().tell(stageIds, getSelf()); + } else if (message instanceof Protocol.GetSparkStageProgress) { + Protocol.GetSparkStageProgress getSparkStageProgress = + (Protocol.GetSparkStageProgress) message; + SparkJobInfo sparkJobInfo = jc.sc().statusTracker().getJobInfo( + getSparkStageProgress.sparkJobId); + Map map = new HashMap(); + if (sparkJobInfo != null) { + for (int stageId : sparkJobInfo.stageIds()) { + SparkStageInfo sparkStageInfo = jc.sc().statusTracker().getStageInfo(stageId); + if (sparkStageInfo != null) { + String stageName = Integer.toString(sparkStageInfo.stageId()) + "_" + + sparkStageInfo.currentAttemptId(); + int runningTaskCount = sparkStageInfo.numActiveTasks(); + int completedTaskCount = sparkStageInfo.numCompletedTasks(); + int failedTaskCount = sparkStageInfo.numFailedTasks(); + int totalTaskCount = sparkStageInfo.numTasks(); + SparkStageProgress sparkStageProgress = new SparkStageProgress( + totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); + map.put(stageName, sparkStageProgress); + } + } + } + getSender().tell(map, getSelf()); } } @@ -249,8 +299,8 @@ void jobDone() { private void monitorJob(JavaFutureAction job) { jobs.add(job); + client.tell(new Protocol.JobSubmitted(req.id, job.jobIds().get(0)), actor); } - } private class ClientListener implements SparkListener { diff --git spark-client/src/main/java/org/apache/hive/spark/client/RemoteSparkJobStatus.java spark-client/src/main/java/org/apache/hive/spark/client/RemoteSparkJobStatus.java new file mode 100644 index 0000000..ea63474 --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/client/RemoteSparkJobStatus.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.spark.client; + +import org.apache.hive.spark.Statistic.SparkStatistics; +import org.apache.hive.spark.counter.SparkCounters; +import org.apache.hive.spark.status.SparkJobStatus; +import org.apache.hive.spark.status.SparkStageProgress; +import org.apache.spark.JobExecutionStatus; + +import java.util.HashMap; +import java.util.Map; + +/** + * Used with remote spark context mode + */ +public class RemoteSparkJobStatus implements SparkJobStatus { + private final SparkClientImpl client; + private final String clientJobId; + private volatile Integer sparkJobId; + + public RemoteSparkJobStatus(SparkClientImpl client, String clientJobId) { + this.client = client; + this.clientJobId = clientJobId; + } + + @Override + public int getJobId() { + return sparkJobId != null ? sparkJobId : -1; + } + + @Override + public JobExecutionStatus getState() { + return sparkJobId != null ? client.getJobExecutionStatus(clientJobId, sparkJobId) : null; + } + + @Override + public int[] getStageIds() { + return sparkJobId != null ? client.getStageIds(sparkJobId) : new int[0]; + } + + @Override + public Map getSparkStageProgress() { + return sparkJobId != null ? client.getSparkStageProgress(sparkJobId) : + new HashMap(); + } + + @Override + public SparkCounters getCounter() { + return null; + } + + @Override + public SparkStatistics getSparkStatistics() { + return null; + } + + @Override + public void cleanup() { + + } + + public void setSparkJobId(Integer sparkJobId) { + this.sparkJobId = sparkJobId; + } +} diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 161182f..25c58c0 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -30,22 +30,31 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; import akka.actor.UntypedActor; +import static akka.pattern.Patterns.ask; + +import akka.util.Timeout; import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.hive.spark.status.SparkJobStatus; +import org.apache.hive.spark.status.SparkStageProgress; +import org.apache.spark.JobExecutionStatus; import org.apache.spark.SparkContext; import org.apache.spark.SparkException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; class SparkClientImpl implements SparkClient { @@ -57,6 +66,7 @@ private final ActorRef clientRef; private final Thread driverThread; private final Map> jobs; + private static final Timeout ASK_TIMEOUT = new Timeout(Duration.create(10, TimeUnit.SECONDS)); private volatile ActorSelection remoteRef; @@ -122,6 +132,48 @@ public void stop() { return submit(new AddFileJob(url.toString())); } + public JobExecutionStatus getJobExecutionStatus(String clientJobId, int sparkJobId) { + if (remoteRef != null) { + try { + JobExecutionStatus jobExecutionStatus = (JobExecutionStatus) Await.result( + ask(remoteRef, new Protocol.GetJobExecutionStatus(clientJobId, sparkJobId), + ASK_TIMEOUT), ASK_TIMEOUT.duration()); + return jobExecutionStatus; + } catch (Exception e) { + LOG.warn("Error asking for JobExecutionStatus.", e); + } + } + return null; + } + + public int[] getStageIds(int sparkJobId) { + if (remoteRef != null) { + try { + int[] stageIds = (int[]) Await.result( + ask(remoteRef, new Protocol.GetStageIds(sparkJobId), ASK_TIMEOUT), + ASK_TIMEOUT.duration()); + return stageIds; + } catch (Exception e) { + LOG.warn("Error asking for StageIds.", e); + } + } + return new int[0]; + } + + public Map getSparkStageProgress(int sparkJobId) { + if (remoteRef != null) { + try { + Map map = (Map) Await.result( + ask(remoteRef, new Protocol.GetSparkStageProgress(sparkJobId), ASK_TIMEOUT), + ASK_TIMEOUT.duration()); + return map; + } catch (Exception e) { + LOG.warn("Error asking for SparkStageProgress.", e); + } + } + return null; + } + void cancel(String jobId) { remoteRef.tell(new Protocol.CancelJob(jobId), clientRef); } @@ -319,6 +371,20 @@ public void onReceive(Object message) throws Exception { } else { LOG.warn("Received result for unknown job {}", jr.id); } + } else if (message instanceof Protocol.JobSubmitted) { + Protocol.JobSubmitted jobSubmitted = (Protocol.JobSubmitted) message; + JobHandleImpl handle = jobs.get(jobSubmitted.clientJobId); + if (handle != null) { + LOG.info("Received spark job ID: {} for {}", + jobSubmitted.sparkJobId, jobSubmitted.clientJobId); + SparkJobStatus jobStatus = handle.getSparkJobStatus(); + if (jobStatus instanceof RemoteSparkJobStatus) { + ((RemoteSparkJobStatus) jobStatus).setSparkJobId(jobSubmitted.sparkJobId); + } + } else { + LOG.warn("Received spark job ID: {} for unknown job {}", + jobSubmitted.sparkJobId, jobSubmitted.clientJobId); + } } } diff --git spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounter.java spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounter.java new file mode 100644 index 0000000..a378783 --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounter.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark.counter; + +import java.io.Serializable; + +import org.apache.spark.Accumulator; +import org.apache.spark.AccumulatorParam; +import org.apache.spark.api.java.JavaSparkContext; + +public class SparkCounter implements Serializable { + + private String name; + private String displayName; + private Accumulator accumulator; + + public SparkCounter( + String name, + String displayName, + String groupName, + long initValue, + JavaSparkContext sparkContext) { + + this.name = name; + this.displayName = displayName; + LongAccumulatorParam longParam = new LongAccumulatorParam(); + String accumulatorName = groupName + "_" + name; + this.accumulator = sparkContext.accumulator(initValue, accumulatorName, longParam); + } + + public long getValue() { + return accumulator.value(); + } + + public void increment(long incr) { + accumulator.add(incr); + } + + public String getName() { + return name; + } + + public String getDisplayName() { + return displayName; + } + + public void setDisplayName(String displayName) { + this.displayName = displayName; + } + + class LongAccumulatorParam implements AccumulatorParam { + + @Override + public Long addAccumulator(Long t1, Long t2) { + return t1 + t2; + } + + @Override + public Long addInPlace(Long r1, Long r2) { + return r1 + r2; + } + + @Override + public Long zero(Long initialValue) { + return 0L; + } + } + +} diff --git spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounterGroup.java spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounterGroup.java new file mode 100644 index 0000000..ea1b7ee --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounterGroup.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark.counter; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * We use group to fold all the same kind of counters. + */ +public class SparkCounterGroup implements Serializable { + private static final long serialVersionUID = 1L; + private String groupName; + private String groupDisplayName; + private Map sparkCounters; + + private transient JavaSparkContext javaSparkContext; + + public SparkCounterGroup( + String groupName, + String groupDisplayName, + JavaSparkContext javaSparkContext) { + + this.groupName = groupName; + this.groupDisplayName = groupDisplayName; + this.javaSparkContext = javaSparkContext; + sparkCounters = new HashMap(); + } + + public void createCounter(String name, long initValue) { + String displayName = ShimLoader.getHadoopShims().getCounterGroupName(groupName, groupName); + SparkCounter counter = new SparkCounter(name, displayName, groupName, initValue, javaSparkContext); + sparkCounters.put(name, counter); + } + + public SparkCounter getCounter(String name) { + return sparkCounters.get(name); + } + + public String getGroupName() { + return groupName; + } + + public String getGroupDisplayName() { + return groupDisplayName; + } + + public void setGroupDisplayName(String groupDisplayName) { + this.groupDisplayName = groupDisplayName; + } + + public Map getSparkCounters() { + return sparkCounters; + } +} diff --git spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounters.java spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounters.java new file mode 100644 index 0000000..7156bff --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/counter/SparkCounters.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark.counter; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.spark.api.java.JavaSparkContext; + +/** + * SparkCounters is used to collect Hive operator metric through Spark accumulator. There are few + * limitation of Spark accumulator, like: + * 1. accumulator should be created at Spark context side. + * 2. Spark tasks can only increment metric count. + * 3. User can only get accumulator value at Spark context side. + * These Spark Counter API is designed to fit into Hive requirement, while with several access + * restriction due to Spark accumulator previous mentioned: + * 1. Counter should be created on driver side if it would be accessed in task. + * 2. increment could only be invoked task side. + * 3. Hive could only get Counter value at driver side. + */ +public class SparkCounters implements Serializable { + private static final long serialVersionUID = 1L; + + private static final Log LOG = LogFactory.getLog(SparkCounters.class); + + private Map sparkCounterGroups; + + private transient JavaSparkContext javaSparkContext; + private transient Configuration hiveConf; + + public SparkCounters(JavaSparkContext javaSparkContext, Configuration hiveConf) { + this.javaSparkContext = javaSparkContext; + this.hiveConf = hiveConf; + sparkCounterGroups = new HashMap(); + } + + public void createCounter(Enum key) { + createCounter(key.getDeclaringClass().getName(), key.name()); + } + + public void createCounter(String groupName, Enum key) { + createCounter(groupName, key.name(), 0L); + } + + public void createCounter(String groupName, String counterName) { + createCounter(groupName, counterName, 0L); + } + + public void createCounter(String groupName, String counterName, long initValue) { + getGroup(groupName).createCounter(counterName, initValue); + } + + public void increment(Enum key, long incrValue) { + increment(key.getDeclaringClass().getName(), key.name(), incrValue); + } + + public void increment(String groupName, String counterName, long value) { + SparkCounter counter = getGroup(groupName).getCounter(counterName); + if (counter == null) { + LOG.error( + String.format("counter[%s, %s] has not initialized before.", groupName, counterName)); + } else { + counter.increment(value); + } + } + + public long getValue(String groupName, String counterName) { + SparkCounter counter = getGroup(groupName).getCounter(counterName); + if (counter == null) { + LOG.error( + String.format("counter[%s, %s] has not initialized before.", groupName, counterName)); + return 0; + } else { + return counter.getValue(); + } + } + + public SparkCounter getCounter(String groupName, String counterName) { + return getGroup(groupName).getCounter(counterName); + } + + public SparkCounter getCounter(Enum key) { + return getCounter(key.getDeclaringClass().getName(), key.name()); + } + + private SparkCounterGroup getGroup(String groupName) { + SparkCounterGroup group = sparkCounterGroups.get(groupName); + if (group == null) { + String groupDisplayName = + ShimLoader.getHadoopShims().getCounterGroupName(groupName, groupName); + group = new SparkCounterGroup(groupName, groupDisplayName, javaSparkContext); + sparkCounterGroups.put(groupName, group); + } + return group; + } + + public Map getSparkCounterGroups() { + return sparkCounterGroups; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + Map groups = getSparkCounterGroups(); + if (groups != null) { + for(Map.Entry groupEntry : groups.entrySet()) { + String groupName = groupEntry.getKey(); + SparkCounterGroup group = groupEntry.getValue(); + sb.append(groupName).append("\n"); + Map counters = group.getSparkCounters(); + for (Map.Entry counterEntry : counters.entrySet()) { + String counterName = counterEntry.getKey(); + SparkCounter counter = counterEntry.getValue(); + sb.append("\t") + .append(counterName) + .append(": ") + .append(counter.getValue()) + .append("\n"); + } + } + } + + return sb.toString(); + } +} diff --git spark-client/src/main/java/org/apache/hive/spark/status/SparkJobStatus.java spark-client/src/main/java/org/apache/hive/spark/status/SparkJobStatus.java new file mode 100644 index 0000000..4656f11 --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/status/SparkJobStatus.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark.status; + +import org.apache.hive.spark.Statistic.SparkStatistics; +import org.apache.hive.spark.counter.SparkCounters; +import org.apache.spark.JobExecutionStatus; + +import java.util.Map; + +/** + * SparkJobStatus identify what Hive want to know about the status of a Spark job. + */ +public interface SparkJobStatus { + + public int getJobId(); + + public JobExecutionStatus getState(); + + public int[] getStageIds(); + + public Map getSparkStageProgress(); + + public SparkCounters getCounter(); + + public SparkStatistics getSparkStatistics(); + + public void cleanup(); +} diff --git spark-client/src/main/java/org/apache/hive/spark/status/SparkStageProgress.java spark-client/src/main/java/org/apache/hive/spark/status/SparkStageProgress.java new file mode 100644 index 0000000..3c40d31 --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/status/SparkStageProgress.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark.status; + +import java.io.Serializable; + +public class SparkStageProgress implements Serializable { + + private int totalTaskCount; + private int succeededTaskCount; + private int runningTaskCount; + private int failedTaskCount; + // TODO: remove the following two metrics as they're not available in current spark API, + // we can add them back once spark provides it +// private int killedTaskCount; +// private long cumulativeTime; + + public SparkStageProgress( + int totalTaskCount, + int succeededTaskCount, + int runningTaskCount, + int failedTaskCount) { + + this.totalTaskCount = totalTaskCount; + this.succeededTaskCount = succeededTaskCount; + this.runningTaskCount = runningTaskCount; + this.failedTaskCount = failedTaskCount; + } + + public int getTotalTaskCount() { + return totalTaskCount; + } + + public int getSucceededTaskCount() { + return succeededTaskCount; + } + + public int getRunningTaskCount() { + return runningTaskCount; + } + + public int getFailedTaskCount() { + return failedTaskCount; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof SparkStageProgress) { + SparkStageProgress other = (SparkStageProgress) obj; + return getTotalTaskCount() == other.getTotalTaskCount() + && getSucceededTaskCount() == other.getSucceededTaskCount() + && getRunningTaskCount() == other.getRunningTaskCount() + && getFailedTaskCount() == other.getFailedTaskCount(); + } + return false; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("TotalTasks: "); + sb.append(getTotalTaskCount()); + sb.append(" Succeeded: "); + sb.append(getSucceededTaskCount()); + sb.append(" Running: "); + sb.append(getRunningTaskCount()); + sb.append(" Failed: "); + sb.append(getFailedTaskCount()); + return sb.toString(); + } +}