diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index 4bd3b43..246cc49 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener; -import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.SparkWork; @@ -142,10 +142,10 @@ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) thr JavaPairRDD finalRDD = plan.generateGraph(); // We use Spark RDD async action to submit job as it's the only way to get jobId now. JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); - // As we always use foreach action to submit RDD graph, it would only trigger on job. + // As we always use foreach action to submit RDD graph, it would only trigger one job. int jobId = future.jobIds().get(0); - SimpleSparkJobStatus sparkJobStatus = - new SimpleSparkJobStatus(sc, jobId, jobMetricsListener, sparkCounters, future); + LocalSparkJobStatus sparkJobStatus = + new LocalSparkJobStatus(sc, jobId, jobMetricsListener, sparkCounters, future); return new SparkJobRef(Integer.toString(jobId), sparkJobStatus); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 93d486f..a7017fa 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.SparkWork; @@ -122,8 +123,9 @@ public Serializable call(JobContext jc) throws Exception { return null; } }); - jobHandle.get(); - return new SparkJobRef(jobHandle.getClientJobId()); + RemoteSparkJobStatus remoteSparkJobStatus = + new RemoteSparkJobStatus(jobHandle); + return new SparkJobRef(jobHandle.getClientJobId(), remoteSparkJobStatus); } private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 3b13d90..08d0250 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.apache.hive.spark.status.SparkStageProgress; import org.apache.spark.JobExecutionStatus; /** @@ -62,53 +63,58 @@ public int startMonitor() { int rc = 0; JobExecutionStatus lastState = null; Map lastProgressMap = null; - long startTime = 0; + long startTime = -1; while (true) { try { JobExecutionStatus state = sparkJobStatus.getState(); - if (state != null && (state != lastState || state == JobExecutionStatus.RUNNING)) { + if (state != null && state != JobExecutionStatus.UNKNOWN && + (state != lastState || state == JobExecutionStatus.RUNNING)) { lastState = state; Map progressMap = sparkJobStatus.getSparkStageProgress(); switch (state) { - case RUNNING: - if (!running) { - // print job stages. - console.printInfo("\nQuery Hive on Spark job[" + - sparkJobStatus.getJobId() + "] stages:"); - for (int stageId : sparkJobStatus.getStageIds()) { - console.printInfo(Integer.toString(stageId)); + case RUNNING: + if (!running) { + // print job stages. + console.printInfo("\nQuery Hive on Spark job[" + + sparkJobStatus.getJobId() + "] stages:"); + for (int stageId : sparkJobStatus.getStageIds()) { + console.printInfo(Integer.toString(stageId)); + } + + console.printInfo("\nStatus: Running (Hive on Spark job[" + + sparkJobStatus.getJobId() + "])"); + startTime = System.currentTimeMillis(); + running = true; + + console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: " + + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]"); } - console.printInfo("\nStatus: Running (Hive on Spark job[" + - sparkJobStatus.getJobId() + "])"); - startTime = System.currentTimeMillis(); - running = true; - console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: " + - "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]"); - } - - - printStatus(progressMap, lastProgressMap); - lastProgressMap = progressMap; - break; - case SUCCEEDED: - printStatus(progressMap, lastProgressMap); - lastProgressMap = progressMap; - double duration = (System.currentTimeMillis() - startTime) / 1000.0; - console.printInfo("Status: Finished successfully in " + - String.format("%.2f seconds", duration)); - running = false; - done = true; - break; - case FAILED: - console.printError("Status: Failed"); - running = false; - done = true; - rc = 2; - break; + printStatus(progressMap, lastProgressMap); + lastProgressMap = progressMap; + break; + case SUCCEEDED: + printStatus(progressMap, lastProgressMap); + lastProgressMap = progressMap; + if (startTime < 0) { + console.printInfo("Status: Finished successfully within a monitor interval."); + } else { + double duration = (System.currentTimeMillis() - startTime) / 1000.0; + console.printInfo("Status: Finished successfully in " + + String.format("%.2f seconds", duration)); + } + running = false; + done = true; + break; + case FAILED: + console.printError("Status: Failed"); + running = false; + done = true; + rc = 2; + break; } } if (!done) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java index b5c1837..0b16298 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -19,6 +19,7 @@ import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hive.spark.status.SparkStageProgress; import org.apache.spark.JobExecutionStatus; import java.util.Map; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java deleted file mode 100644 index cfec354..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkStageProgress.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.status; - -public class SparkStageProgress { - - private int totalTaskCount; - private int succeededTaskCount; - private int runningTaskCount; - private int failedTaskCount; - // TODO: remove the following two metrics as they're not available in current spark API, - // we can add them back once spark provides it -// private int killedTaskCount; -// private long cumulativeTime; - - public SparkStageProgress( - int totalTaskCount, - int succeededTaskCount, - int runningTaskCount, - int failedTaskCount) { - - this.totalTaskCount = totalTaskCount; - this.succeededTaskCount = succeededTaskCount; - this.runningTaskCount = runningTaskCount; - this.failedTaskCount = failedTaskCount; - } - - public int getTotalTaskCount() { - return totalTaskCount; - } - - public int getSucceededTaskCount() { - return succeededTaskCount; - } - - public int getRunningTaskCount() { - return runningTaskCount; - } - - public int getFailedTaskCount() { - return failedTaskCount; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof SparkStageProgress) { - SparkStageProgress other = (SparkStageProgress) obj; - return getTotalTaskCount() == other.getTotalTaskCount() - && getSucceededTaskCount() == other.getSucceededTaskCount() - && getRunningTaskCount() == other.getRunningTaskCount() - && getFailedTaskCount() == other.getFailedTaskCount(); - } - return false; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("TotalTasks: "); - sb.append(getTotalTaskCount()); - sb.append(" Succeeded: "); - sb.append(getSucceededTaskCount()); - sb.append(" Running: "); - sb.append(getRunningTaskCount()); - sb.append(" Failed: "); - sb.append(getFailedTaskCount()); - return sb.toString(); - } -} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java new file mode 100644 index 0000000..ab40d39 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status.impl; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Maps; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; +import org.apache.hive.spark.status.SparkStageProgress; +import org.apache.spark.JobExecutionStatus; +import org.apache.spark.SparkJobInfo; +import org.apache.spark.SparkStageInfo; +import org.apache.spark.api.java.JavaFutureAction; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.executor.ShuffleReadMetrics; +import org.apache.spark.executor.ShuffleWriteMetrics; +import org.apache.spark.executor.TaskMetrics; + +import scala.Option; + +public class LocalSparkJobStatus implements SparkJobStatus { + + private final JavaSparkContext sparkContext; + private int jobId; + // After SPARK-2321, we only use JobMetricsListener to get job metrics + // TODO: remove it when the new API provides equivalent functionality + private JobMetricsListener jobMetricsListener; + private SparkCounters sparkCounters; + private JavaFutureAction future; + + public LocalSparkJobStatus(JavaSparkContext sparkContext, int jobId, + JobMetricsListener jobMetricsListener, SparkCounters sparkCounters, + JavaFutureAction future) { + this.sparkContext = sparkContext; + this.jobId = jobId; + this.jobMetricsListener = jobMetricsListener; + this.sparkCounters = sparkCounters; + this.future = future; + } + + @Override + public int getJobId() { + return jobId; + } + + @Override + public JobExecutionStatus getState() { + // For spark job with empty source data, it's not submitted actually, so we would never + // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current + // job state. + if (future.isDone()) { + return JobExecutionStatus.SUCCEEDED; + } else { + // SparkJobInfo may not be available yet + SparkJobInfo sparkJobInfo = getJobInfo(); + return sparkJobInfo == null ? null : sparkJobInfo.status(); + } + } + + @Override + public int[] getStageIds() { + SparkJobInfo sparkJobInfo = getJobInfo(); + return sparkJobInfo == null ? new int[0] : sparkJobInfo.stageIds(); + } + + @Override + public Map getSparkStageProgress() { + Map stageProgresses = new HashMap(); + for (int stageId : getStageIds()) { + SparkStageInfo sparkStageInfo = getStageInfo(stageId); + if (sparkStageInfo != null) { + int runningTaskCount = sparkStageInfo.numActiveTasks(); + int completedTaskCount = sparkStageInfo.numCompletedTasks(); + int failedTaskCount = sparkStageInfo.numFailedTasks(); + int totalTaskCount = sparkStageInfo.numTasks(); + SparkStageProgress sparkStageProgress = new SparkStageProgress( + totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); + stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" + + sparkStageInfo.currentAttemptId(), sparkStageProgress); + } + } + return stageProgresses; + } + + @Override + public SparkCounters getCounter() { + return sparkCounters; + } + + @Override + public SparkStatistics getSparkStatistics() { + SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder(); + // add Hive operator level statistics. + sparkStatisticsBuilder.add(sparkCounters); + // add spark job metrics. + String jobIdentifier = "Spark Job[" + jobId + "] Metrics"; + Map> jobMetric = jobMetricsListener.getJobMetric(jobId); + if (jobMetric == null) { + return null; + } + + Map flatJobMetric = combineJobLevelMetrics(jobMetric); + for (Map.Entry entry : flatJobMetric.entrySet()) { + sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); + } + + return sparkStatisticsBuilder.build(); + } + + @Override + public void cleanup() { + jobMetricsListener.cleanup(jobId); + } + + private Map combineJobLevelMetrics(Map> jobMetric) { + Map results = Maps.newLinkedHashMap(); + + long executorDeserializeTime = 0; + long executorRunTime = 0; + long resultSize = 0; + long jvmGCTime = 0; + long resultSerializationTime = 0; + long memoryBytesSpilled = 0; + long diskBytesSpilled = 0; + long bytesRead = 0; + long remoteBlocksFetched = 0; + long localBlocksFetched = 0; + long fetchWaitTime = 0; + long remoteBytesRead = 0; + long shuffleBytesWritten = 0; + long shuffleWriteTime = 0; + boolean inputMetricExist = false; + boolean shuffleReadMetricExist = false; + boolean shuffleWriteMetricExist = false; + + for (List stageMetric : jobMetric.values()) { + if (stageMetric != null) { + for (TaskMetrics taskMetrics : stageMetric) { + if (taskMetrics != null) { + executorDeserializeTime += taskMetrics.executorDeserializeTime(); + executorRunTime += taskMetrics.executorRunTime(); + resultSize += taskMetrics.resultSize(); + jvmGCTime += taskMetrics.jvmGCTime(); + resultSerializationTime += taskMetrics.resultSerializationTime(); + memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); + diskBytesSpilled += taskMetrics.diskBytesSpilled(); + if (!taskMetrics.inputMetrics().isEmpty()) { + inputMetricExist = true; + bytesRead += taskMetrics.inputMetrics().get().bytesRead(); + } + Option shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); + if (!shuffleReadMetricsOption.isEmpty()) { + shuffleReadMetricExist = true; + remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched(); + localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched(); + fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime(); + remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead(); + } + Option shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); + if (!shuffleWriteMetricsOption.isEmpty()) { + shuffleWriteMetricExist = true; + shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten(); + shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime(); + } + } + } + } + } + + results.put("EexcutorDeserializeTime", executorDeserializeTime); + results.put("ExecutorRunTime", executorRunTime); + results.put("ResultSize", resultSize); + results.put("JvmGCTime", jvmGCTime); + results.put("ResultSerializationTime", resultSerializationTime); + results.put("MemoryBytesSpilled", memoryBytesSpilled); + results.put("DiskBytesSpilled", diskBytesSpilled); + if (inputMetricExist) { + results.put("BytesRead", bytesRead); + } + if (shuffleReadMetricExist) { + results.put("RemoteBlocksFetched", remoteBlocksFetched); + results.put("LocalBlocksFetched", localBlocksFetched); + results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched); + results.put("FetchWaitTime", fetchWaitTime); + results.put("RemoteBytesRead", remoteBytesRead); + } + if (shuffleWriteMetricExist) { + results.put("ShuffleBytesWritten", shuffleBytesWritten); + results.put("ShuffleWriteTime", shuffleWriteTime); + } + return results; + } + + private SparkJobInfo getJobInfo() { + return sparkContext.statusTracker().getJobInfo(jobId); + } + + private SparkStageInfo getStageInfo(int stageId) { + return sparkContext.statusTracker().getStageInfo(stageId); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java new file mode 100644 index 0000000..64fe68b --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark.status.impl; + +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; +import org.apache.hive.spark.client.JobHandle; +import org.apache.hive.spark.status.SparkStageProgress; +import org.apache.spark.JobExecutionStatus; + +import java.io.Serializable; +import java.util.Map; + +/** + * Used with remote spark context mode + */ +public class RemoteSparkJobStatus implements SparkJobStatus { + private JobHandle jobHandle; + + public RemoteSparkJobStatus(JobHandle jobHandle) { + this.jobHandle = jobHandle; + } + + @Override + public int getJobId() { + Integer sparkJobId = jobHandle.getSparkJobId(); + return sparkJobId != null ? sparkJobId : -1; + } + + @Override + public JobExecutionStatus getState() { + return jobHandle.getJobExecutionStatus(); + } + + @Override + public int[] getStageIds() { + return jobHandle.getStageIds(); + } + + @Override + public Map getSparkStageProgress() { + return jobHandle.getSparkStageProgress(); + } + + @Override + public SparkCounters getCounter() { + return null; + } + + @Override + public SparkStatistics getSparkStatistics() { + return null; + } + + @Override + public void cleanup() { + + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java deleted file mode 100644 index 19fd20d..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.status.impl; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.google.common.collect.Maps; -import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; -import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; -import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; -import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; -import org.apache.spark.JobExecutionStatus; -import org.apache.spark.SparkJobInfo; -import org.apache.spark.SparkStageInfo; -import org.apache.spark.api.java.JavaFutureAction; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.executor.ShuffleReadMetrics; -import org.apache.spark.executor.ShuffleWriteMetrics; -import org.apache.spark.executor.TaskMetrics; - -import scala.Option; - -public class SimpleSparkJobStatus implements SparkJobStatus { - - private final JavaSparkContext sparkContext; - private int jobId; - // After SPARK-2321, we only use JobMetricsListener to get job metrics - // TODO: remove it when the new API provides equivalent functionality - private JobMetricsListener jobMetricsListener; - private SparkCounters sparkCounters; - private JavaFutureAction future; - - public SimpleSparkJobStatus(JavaSparkContext sparkContext, int jobId, - JobMetricsListener jobMetricsListener, SparkCounters sparkCounters, - JavaFutureAction future) { - this.sparkContext = sparkContext; - this.jobId = jobId; - this.jobMetricsListener = jobMetricsListener; - this.sparkCounters = sparkCounters; - this.future = future; - } - - @Override - public int getJobId() { - return jobId; - } - - @Override - public JobExecutionStatus getState() { - // For spark job with empty source data, it's not submitted actually, so we would never - // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current - // job state. - if (future.isDone()) { - return JobExecutionStatus.SUCCEEDED; - } else { - // SparkJobInfo may not be available yet - SparkJobInfo sparkJobInfo = getJobInfo(); - return sparkJobInfo == null ? null : sparkJobInfo.status(); - } - } - - @Override - public int[] getStageIds() { - SparkJobInfo sparkJobInfo = getJobInfo(); - return sparkJobInfo == null ? new int[0] : sparkJobInfo.stageIds(); - } - - @Override - public Map getSparkStageProgress() { - Map stageProgresses = new HashMap(); - for (int stageId : getStageIds()) { - SparkStageInfo sparkStageInfo = getStageInfo(stageId); - if (sparkStageInfo != null) { - int runningTaskCount = sparkStageInfo.numActiveTasks(); - int completedTaskCount = sparkStageInfo.numCompletedTasks(); - int failedTaskCount = sparkStageInfo.numFailedTasks(); - int totalTaskCount = sparkStageInfo.numTasks(); - SparkStageProgress sparkStageProgress = new SparkStageProgress( - totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); - stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" + - sparkStageInfo.currentAttemptId(), sparkStageProgress); - } - } - return stageProgresses; - } - - @Override - public SparkCounters getCounter() { - return sparkCounters; - } - - @Override - public SparkStatistics getSparkStatistics() { - SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder(); - // add Hive operator level statistics. - sparkStatisticsBuilder.add(sparkCounters); - // add spark job metrics. - String jobIdentifier = "Spark Job[" + jobId + "] Metrics"; - Map> jobMetric = jobMetricsListener.getJobMetric(jobId); - if (jobMetric == null) { - return null; - } - - Map flatJobMetric = combineJobLevelMetrics(jobMetric); - for (Map.Entry entry : flatJobMetric.entrySet()) { - sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); - } - - return sparkStatisticsBuilder.build(); - } - - @Override - public void cleanup() { - jobMetricsListener.cleanup(jobId); - } - - private Map combineJobLevelMetrics(Map> jobMetric) { - Map results = Maps.newLinkedHashMap(); - - long executorDeserializeTime = 0; - long executorRunTime = 0; - long resultSize = 0; - long jvmGCTime = 0; - long resultSerializationTime = 0; - long memoryBytesSpilled = 0; - long diskBytesSpilled = 0; - long bytesRead = 0; - long remoteBlocksFetched = 0; - long localBlocksFetched = 0; - long fetchWaitTime = 0; - long remoteBytesRead = 0; - long shuffleBytesWritten = 0; - long shuffleWriteTime = 0; - boolean inputMetricExist = false; - boolean shuffleReadMetricExist = false; - boolean shuffleWriteMetricExist = false; - - for (List stageMetric : jobMetric.values()) { - if (stageMetric != null) { - for (TaskMetrics taskMetrics : stageMetric) { - if (taskMetrics != null) { - executorDeserializeTime += taskMetrics.executorDeserializeTime(); - executorRunTime += taskMetrics.executorRunTime(); - resultSize += taskMetrics.resultSize(); - jvmGCTime += taskMetrics.jvmGCTime(); - resultSerializationTime += taskMetrics.resultSerializationTime(); - memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); - diskBytesSpilled += taskMetrics.diskBytesSpilled(); - if (!taskMetrics.inputMetrics().isEmpty()) { - inputMetricExist = true; - bytesRead += taskMetrics.inputMetrics().get().bytesRead(); - } - Option shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); - if (!shuffleReadMetricsOption.isEmpty()) { - shuffleReadMetricExist = true; - remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched(); - localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched(); - fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime(); - remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead(); - } - Option shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); - if (!shuffleWriteMetricsOption.isEmpty()) { - shuffleWriteMetricExist = true; - shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten(); - shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime(); - } - } - } - } - } - - results.put("EexcutorDeserializeTime", executorDeserializeTime); - results.put("ExecutorRunTime", executorRunTime); - results.put("ResultSize", resultSize); - results.put("JvmGCTime", jvmGCTime); - results.put("ResultSerializationTime", resultSerializationTime); - results.put("MemoryBytesSpilled", memoryBytesSpilled); - results.put("DiskBytesSpilled", diskBytesSpilled); - if (inputMetricExist) { - results.put("BytesRead", bytesRead); - } - if (shuffleReadMetricExist) { - results.put("RemoteBlocksFetched", remoteBlocksFetched); - results.put("LocalBlocksFetched", localBlocksFetched); - results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched); - results.put("FetchWaitTime", fetchWaitTime); - results.put("RemoteBytesRead", remoteBytesRead); - } - if (shuffleWriteMetricExist) { - results.put("ShuffleBytesWritten", shuffleBytesWritten); - results.put("ShuffleWriteTime", shuffleWriteTime); - } - return results; - } - - private SparkJobInfo getJobInfo() { - return sparkContext.statusTracker().getJobInfo(jobId); - } - - private SparkStageInfo getStageInfo(int stageId) { - return sparkContext.statusTracker().getStageInfo(stageId); - } -} diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java index 4a43a01..c546239 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java @@ -18,9 +18,12 @@ package org.apache.hive.spark.client; import java.io.Serializable; +import java.util.Map; import java.util.concurrent.Future; import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import org.apache.hive.spark.status.SparkStageProgress; +import org.apache.spark.JobExecutionStatus; /** * A handle to a submitted job. Allows for monitoring and controlling of the running remote job. @@ -42,6 +45,26 @@ */ MetricsCollection getMetrics(); + /** + * Get the JobExecutionStatus for this job + */ + JobExecutionStatus getJobExecutionStatus(); + + /** + * Get stageIds for this job + */ + int[] getStageIds(); + + /** + * Get SparkStageProgress for each stage in this job + */ + Map getSparkStageProgress(); + + /** + * The spark job ID. Currently only support the case where one SparkTask triggers one spark job + */ + Integer getSparkJobId(); + // TODO: expose job status? } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java index 054f5ec..d96fcf6 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java @@ -17,7 +17,12 @@ package org.apache.hive.spark.client; +import org.apache.hive.spark.status.SparkStageProgress; +import org.apache.spark.JobExecutionStatus; + import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; @@ -30,6 +35,7 @@ private final SparkClientImpl client; private final String jobId; + private volatile Integer sparkJobId = null; private final MetricsCollection metrics; private final Object monitor; @@ -103,6 +109,27 @@ public MetricsCollection getMetrics() { return metrics; } + @Override + public JobExecutionStatus getJobExecutionStatus() { + return sparkJobId != null ? client.getJobExecutionStatus(getClientJobId(), sparkJobId) : null; + } + + @Override + public int[] getStageIds() { + return sparkJobId != null ? client.getStageIds(sparkJobId) : new int[0]; + } + + @Override + public Map getSparkStageProgress() { + return sparkJobId != null ? client.getSparkStageProgress(sparkJobId) : + new HashMap(); + } + + @Override + public Integer getSparkJobId() { + return sparkJobId; + } + private T get(long timeout) throws ExecutionException, InterruptedException, TimeoutException { long deadline = System.currentTimeMillis() + timeout; synchronized (monitor) { @@ -140,4 +167,7 @@ void complete(Object result, Throwable error) { } } + public void setSparkJobId(Integer sparkJobId) { + this.sparkJobId = sparkJobId; + } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java index 4212634..7e104a3 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java +++ spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java @@ -125,4 +125,56 @@ } + static class GetJobExecutionStatus implements Serializable { + final String clientJobId; + final Integer sparkJobId; + + GetJobExecutionStatus(String clientJobId, Integer sparkJobId) { + this.clientJobId = clientJobId; + this.sparkJobId = sparkJobId; + } + + GetJobExecutionStatus() { + this(null, null); + } + } + + static class GetStageIds implements Serializable { + final Integer sparkJobId; + + GetStageIds(Integer sparkJobId) { + this.sparkJobId = sparkJobId; + } + + GetStageIds() { + this(null); + } + } + + static class GetSparkStageProgress implements Serializable { + final Integer sparkJobId; + + GetSparkStageProgress(Integer sparkJobId) { + this.sparkJobId = sparkJobId; + } + + GetSparkStageProgress() { + this(null); + } + } + + static class JobSubmitted implements Serializable { + final String clientJobId; + final Integer sparkJobId; + + JobSubmitted(String clientJobId, Integer sparkJobId) { + this.clientJobId = clientJobId; + this.sparkJobId = sparkJobId; + } + + JobSubmitted() { + this(null, null); + } + } + } diff --git spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index 3de1fec..8936dc3 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -18,6 +18,7 @@ package org.apache.hive.spark.client; import java.io.Serializable; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -28,6 +29,10 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hive.spark.status.SparkStageProgress; +import org.apache.spark.JobExecutionStatus; +import org.apache.spark.SparkJobInfo; +import org.apache.spark.SparkStageInfo; import scala.Tuple2; import akka.actor.ActorRef; @@ -185,6 +190,51 @@ public void onReceive(Object message) throws Exception { JobWrapper wrapper = new JobWrapper(req); activeJobs.put(req.id, wrapper); wrapper.submit(); + } else if (message instanceof Protocol.GetJobExecutionStatus) { + Protocol.GetJobExecutionStatus getJobExecutionStatus = + (Protocol.GetJobExecutionStatus) message; + SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo( + getJobExecutionStatus.sparkJobId); + JobExecutionStatus jobExecutionStatus = jobInfo == null ? + JobExecutionStatus.UNKNOWN : jobInfo.status(); + if (activeJobs.containsKey(getJobExecutionStatus.clientJobId)) { + JobWrapper jobWrapper = activeJobs.get(getJobExecutionStatus.clientJobId); + if (jobWrapper.jobs.size() == 1) { + JavaFutureAction javaFutureAction = (JavaFutureAction) jobWrapper.jobs.get(0); + if (javaFutureAction.isDone()) { + jobExecutionStatus = JobExecutionStatus.SUCCEEDED; + } + } + } + getSender().tell(jobExecutionStatus, getSelf()); + } else if (message instanceof Protocol.GetStageIds) { + Protocol.GetStageIds getStageIds = (Protocol.GetStageIds) message; + SparkJobInfo sparkJobInfo = jc.sc().statusTracker().getJobInfo(getStageIds.sparkJobId); + int[] stageIds = sparkJobInfo == null ? new int[0] : sparkJobInfo.stageIds(); + getSender().tell(stageIds, getSelf()); + } else if (message instanceof Protocol.GetSparkStageProgress) { + Protocol.GetSparkStageProgress getSparkStageProgress = + (Protocol.GetSparkStageProgress) message; + SparkJobInfo sparkJobInfo = jc.sc().statusTracker().getJobInfo( + getSparkStageProgress.sparkJobId); + Map map = new HashMap(); + if (sparkJobInfo != null) { + for (int stageId : sparkJobInfo.stageIds()) { + SparkStageInfo sparkStageInfo = jc.sc().statusTracker().getStageInfo(stageId); + if (sparkStageInfo != null) { + String stageName = Integer.toString(sparkStageInfo.stageId()) + "_" + + sparkStageInfo.currentAttemptId(); + int runningTaskCount = sparkStageInfo.numActiveTasks(); + int completedTaskCount = sparkStageInfo.numCompletedTasks(); + int failedTaskCount = sparkStageInfo.numFailedTasks(); + int totalTaskCount = sparkStageInfo.numTasks(); + SparkStageProgress sparkStageProgress = new SparkStageProgress( + totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); + map.put(stageName, sparkStageProgress); + } + } + } + getSender().tell(map, getSelf()); } } @@ -249,8 +299,8 @@ void jobDone() { private void monitorJob(JavaFutureAction job) { jobs.add(job); + client.tell(new Protocol.JobSubmitted(req.id, job.jobIds().get(0)), actor); } - } private class ClientListener implements SparkListener { diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 161182f..1492c4b 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -30,22 +30,30 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Props; import akka.actor.UntypedActor; +import static akka.pattern.Patterns.ask; + +import akka.util.Timeout; import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.hive.spark.status.SparkStageProgress; +import org.apache.spark.JobExecutionStatus; import org.apache.spark.SparkContext; import org.apache.spark.SparkException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; class SparkClientImpl implements SparkClient { @@ -57,6 +65,7 @@ private final ActorRef clientRef; private final Thread driverThread; private final Map> jobs; + private static final Timeout ASK_TIMEOUT = new Timeout(Duration.create(10, TimeUnit.SECONDS)); private volatile ActorSelection remoteRef; @@ -122,6 +131,48 @@ public void stop() { return submit(new AddFileJob(url.toString())); } + public JobExecutionStatus getJobExecutionStatus(String clientJobId, int sparkJobId) { + if (remoteRef != null) { + try { + JobExecutionStatus jobExecutionStatus = (JobExecutionStatus) Await.result( + ask(remoteRef, new Protocol.GetJobExecutionStatus(clientJobId, sparkJobId), + ASK_TIMEOUT), ASK_TIMEOUT.duration()); + return jobExecutionStatus; + } catch (Exception e) { + LOG.warn("Error asking for JobExecutionStatus.", e); + } + } + return null; + } + + public int[] getStageIds(int sparkJobId) { + if (remoteRef != null) { + try { + int[] stageIds = (int[]) Await.result( + ask(remoteRef, new Protocol.GetStageIds(sparkJobId), ASK_TIMEOUT), + ASK_TIMEOUT.duration()); + return stageIds; + } catch (Exception e) { + LOG.warn("Error asking for StageIds.", e); + } + } + return new int[0]; + } + + public Map getSparkStageProgress(int sparkJobId) { + if (remoteRef != null) { + try { + Map map = (Map) Await.result( + ask(remoteRef, new Protocol.GetSparkStageProgress(sparkJobId), ASK_TIMEOUT), + ASK_TIMEOUT.duration()); + return map; + } catch (Exception e) { + LOG.warn("Error asking for SparkStageProgress.", e); + } + } + return null; + } + void cancel(String jobId) { remoteRef.tell(new Protocol.CancelJob(jobId), clientRef); } @@ -319,6 +370,17 @@ public void onReceive(Object message) throws Exception { } else { LOG.warn("Received result for unknown job {}", jr.id); } + } else if (message instanceof Protocol.JobSubmitted) { + Protocol.JobSubmitted jobSubmitted = (Protocol.JobSubmitted) message; + JobHandleImpl handle = jobs.get(jobSubmitted.clientJobId); + if (handle != null) { + LOG.info("Received spark job ID: {} for {}", + jobSubmitted.sparkJobId, jobSubmitted.clientJobId); + handle.setSparkJobId(jobSubmitted.sparkJobId); + } else { + LOG.warn("Received spark job ID: {} for unknown job {}", + jobSubmitted.sparkJobId, jobSubmitted.clientJobId); + } } } diff --git spark-client/src/main/java/org/apache/hive/spark/status/SparkStageProgress.java spark-client/src/main/java/org/apache/hive/spark/status/SparkStageProgress.java new file mode 100644 index 0000000..3c40d31 --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/status/SparkStageProgress.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hive.spark.status; + +import java.io.Serializable; + +public class SparkStageProgress implements Serializable { + + private int totalTaskCount; + private int succeededTaskCount; + private int runningTaskCount; + private int failedTaskCount; + // TODO: remove the following two metrics as they're not available in current spark API, + // we can add them back once spark provides it +// private int killedTaskCount; +// private long cumulativeTime; + + public SparkStageProgress( + int totalTaskCount, + int succeededTaskCount, + int runningTaskCount, + int failedTaskCount) { + + this.totalTaskCount = totalTaskCount; + this.succeededTaskCount = succeededTaskCount; + this.runningTaskCount = runningTaskCount; + this.failedTaskCount = failedTaskCount; + } + + public int getTotalTaskCount() { + return totalTaskCount; + } + + public int getSucceededTaskCount() { + return succeededTaskCount; + } + + public int getRunningTaskCount() { + return runningTaskCount; + } + + public int getFailedTaskCount() { + return failedTaskCount; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof SparkStageProgress) { + SparkStageProgress other = (SparkStageProgress) obj; + return getTotalTaskCount() == other.getTotalTaskCount() + && getSucceededTaskCount() == other.getSucceededTaskCount() + && getRunningTaskCount() == other.getRunningTaskCount() + && getFailedTaskCount() == other.getFailedTaskCount(); + } + return false; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("TotalTasks: "); + sb.append(getTotalTaskCount()); + sb.append(" Succeeded: "); + sb.append(getSucceededTaskCount()); + sb.append(" Running: "); + sb.append(getRunningTaskCount()); + sb.append(" Failed: "); + sb.append(getFailedTaskCount()); + return sb.toString(); + } +}