diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index 4bd3b43..246cc49 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobMetricsListener; -import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.SparkWork; @@ -142,10 +142,10 @@ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) thr JavaPairRDD finalRDD = plan.generateGraph(); // We use Spark RDD async action to submit job as it's the only way to get jobId now. JavaFutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); - // As we always use foreach action to submit RDD graph, it would only trigger on job. + // As we always use foreach action to submit RDD graph, it would only trigger one job. int jobId = future.jobIds().get(0); - SimpleSparkJobStatus sparkJobStatus = - new SimpleSparkJobStatus(sc, jobId, jobMetricsListener, sparkCounters, future); + LocalSparkJobStatus sparkJobStatus = + new LocalSparkJobStatus(sc, jobId, jobMetricsListener, sparkCounters, future); return new SparkJobRef(Integer.toString(jobId), sparkJobStatus); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 93d486f..24e8710 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.SparkWork; @@ -122,8 +123,7 @@ public Serializable call(JobContext jc) throws Exception { return null; } }); - jobHandle.get(); - return new SparkJobRef(jobHandle.getClientJobId()); + return new SparkJobRef(jobHandle.getClientJobId(), new RemoteSparkJobStatus(remoteClient, jobHandle)); } private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 3b13d90..72108cd 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -62,12 +62,13 @@ public int startMonitor() { int rc = 0; JobExecutionStatus lastState = null; Map lastProgressMap = null; - long startTime = 0; + long startTime = -1; while (true) { try { JobExecutionStatus state = sparkJobStatus.getState(); - if (state != null && (state != lastState || state == JobExecutionStatus.RUNNING)) { + if (state != null && state != JobExecutionStatus.UNKNOWN && + (state != lastState || state == JobExecutionStatus.RUNNING)) { lastState = state; Map progressMap = sparkJobStatus.getSparkStageProgress(); @@ -97,9 +98,13 @@ public int startMonitor() { case SUCCEEDED: printStatus(progressMap, lastProgressMap); lastProgressMap = progressMap; - double duration = (System.currentTimeMillis() - startTime) / 1000.0; - console.printInfo("Status: Finished successfully in " + - String.format("%.2f seconds", duration)); + if (startTime < 0) { + console.printInfo("Status: Finished successfully within a check interval."); + } else { + double duration = (System.currentTimeMillis() - startTime) / 1000.0; + console.printInfo("Status: Finished successfully in " + + String.format("%.2f seconds", duration)); + } running = false; done = true; break; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java new file mode 100644 index 0000000..09c79e4 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobStatus.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status.impl; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Maps; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; +import org.apache.spark.JobExecutionStatus; +import org.apache.spark.SparkJobInfo; +import org.apache.spark.SparkStageInfo; +import org.apache.spark.api.java.JavaFutureAction; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.executor.ShuffleReadMetrics; +import org.apache.spark.executor.ShuffleWriteMetrics; +import org.apache.spark.executor.TaskMetrics; + +import scala.Option; + +public class LocalSparkJobStatus implements SparkJobStatus { + + private final JavaSparkContext sparkContext; + private int jobId; + // After SPARK-2321, we only use JobMetricsListener to get job metrics + // TODO: remove it when the new API provides equivalent functionality + private JobMetricsListener jobMetricsListener; + private SparkCounters sparkCounters; + private JavaFutureAction future; + + public LocalSparkJobStatus(JavaSparkContext sparkContext, int jobId, + JobMetricsListener jobMetricsListener, SparkCounters sparkCounters, + JavaFutureAction future) { + this.sparkContext = sparkContext; + this.jobId = jobId; + this.jobMetricsListener = jobMetricsListener; + this.sparkCounters = sparkCounters; + this.future = future; + } + + @Override + public int getJobId() { + return jobId; + } + + @Override + public JobExecutionStatus getState() { + // For spark job with empty source data, it's not submitted actually, so we would never + // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current + // job state. + if (future.isDone()) { + return JobExecutionStatus.SUCCEEDED; + } else { + // SparkJobInfo may not be available yet + SparkJobInfo sparkJobInfo = getJobInfo(); + return sparkJobInfo == null ? null : sparkJobInfo.status(); + } + } + + @Override + public int[] getStageIds() { + SparkJobInfo sparkJobInfo = getJobInfo(); + return sparkJobInfo == null ? new int[0] : sparkJobInfo.stageIds(); + } + + @Override + public Map getSparkStageProgress() { + Map stageProgresses = new HashMap(); + for (int stageId : getStageIds()) { + SparkStageInfo sparkStageInfo = getStageInfo(stageId); + if (sparkStageInfo != null) { + int runningTaskCount = sparkStageInfo.numActiveTasks(); + int completedTaskCount = sparkStageInfo.numCompletedTasks(); + int failedTaskCount = sparkStageInfo.numFailedTasks(); + int totalTaskCount = sparkStageInfo.numTasks(); + SparkStageProgress sparkStageProgress = new SparkStageProgress( + totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); + stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" + + sparkStageInfo.currentAttemptId(), sparkStageProgress); + } + } + return stageProgresses; + } + + @Override + public SparkCounters getCounter() { + return sparkCounters; + } + + @Override + public SparkStatistics getSparkStatistics() { + SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder(); + // add Hive operator level statistics. + sparkStatisticsBuilder.add(sparkCounters); + // add spark job metrics. + String jobIdentifier = "Spark Job[" + jobId + "] Metrics"; + Map> jobMetric = jobMetricsListener.getJobMetric(jobId); + if (jobMetric == null) { + return null; + } + + Map flatJobMetric = combineJobLevelMetrics(jobMetric); + for (Map.Entry entry : flatJobMetric.entrySet()) { + sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); + } + + return sparkStatisticsBuilder.build(); + } + + @Override + public void cleanup() { + jobMetricsListener.cleanup(jobId); + } + + private Map combineJobLevelMetrics(Map> jobMetric) { + Map results = Maps.newLinkedHashMap(); + + long executorDeserializeTime = 0; + long executorRunTime = 0; + long resultSize = 0; + long jvmGCTime = 0; + long resultSerializationTime = 0; + long memoryBytesSpilled = 0; + long diskBytesSpilled = 0; + long bytesRead = 0; + long remoteBlocksFetched = 0; + long localBlocksFetched = 0; + long fetchWaitTime = 0; + long remoteBytesRead = 0; + long shuffleBytesWritten = 0; + long shuffleWriteTime = 0; + boolean inputMetricExist = false; + boolean shuffleReadMetricExist = false; + boolean shuffleWriteMetricExist = false; + + for (List stageMetric : jobMetric.values()) { + if (stageMetric != null) { + for (TaskMetrics taskMetrics : stageMetric) { + if (taskMetrics != null) { + executorDeserializeTime += taskMetrics.executorDeserializeTime(); + executorRunTime += taskMetrics.executorRunTime(); + resultSize += taskMetrics.resultSize(); + jvmGCTime += taskMetrics.jvmGCTime(); + resultSerializationTime += taskMetrics.resultSerializationTime(); + memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); + diskBytesSpilled += taskMetrics.diskBytesSpilled(); + if (!taskMetrics.inputMetrics().isEmpty()) { + inputMetricExist = true; + bytesRead += taskMetrics.inputMetrics().get().bytesRead(); + } + Option shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); + if (!shuffleReadMetricsOption.isEmpty()) { + shuffleReadMetricExist = true; + remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched(); + localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched(); + fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime(); + remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead(); + } + Option shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); + if (!shuffleWriteMetricsOption.isEmpty()) { + shuffleWriteMetricExist = true; + shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten(); + shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime(); + } + } + } + } + } + + results.put("EexcutorDeserializeTime", executorDeserializeTime); + results.put("ExecutorRunTime", executorRunTime); + results.put("ResultSize", resultSize); + results.put("JvmGCTime", jvmGCTime); + results.put("ResultSerializationTime", resultSerializationTime); + results.put("MemoryBytesSpilled", memoryBytesSpilled); + results.put("DiskBytesSpilled", diskBytesSpilled); + if (inputMetricExist) { + results.put("BytesRead", bytesRead); + } + if (shuffleReadMetricExist) { + results.put("RemoteBlocksFetched", remoteBlocksFetched); + results.put("LocalBlocksFetched", localBlocksFetched); + results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched); + results.put("FetchWaitTime", fetchWaitTime); + results.put("RemoteBytesRead", remoteBytesRead); + } + if (shuffleWriteMetricExist) { + results.put("ShuffleBytesWritten", shuffleBytesWritten); + results.put("ShuffleWriteTime", shuffleWriteTime); + } + return results; + } + + private SparkJobInfo getJobInfo() { + return sparkContext.statusTracker().getJobInfo(jobId); + } + + private SparkStageInfo getStageInfo(int stageId) { + return sparkContext.statusTracker().getStageInfo(stageId); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java new file mode 100644 index 0000000..758fe92 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark.status.impl; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; +import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; +import org.apache.hive.spark.client.Job; +import org.apache.hive.spark.client.JobContext; +import org.apache.hive.spark.client.JobHandle; +import org.apache.hive.spark.client.SparkClient; +import org.apache.hive.spark.client.status.HiveSparkJobInfo; +import org.apache.hive.spark.client.status.HiveSparkStageInfo; +import org.apache.spark.JobExecutionStatus; +import org.apache.spark.SparkJobInfo; +import org.apache.spark.SparkStageInfo; +import org.apache.spark.api.java.JavaFutureAction; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Used with remove spark client. + */ +public class RemoteSparkJobStatus implements SparkJobStatus { + private static final Log LOG = LogFactory.getLog(RemoteSparkJobStatus.class.getName()); + private final SparkClient sparkClient; + private final JobHandle jobHandle; + + public RemoteSparkJobStatus(SparkClient sparkClient, JobHandle jobHandle) { + this.sparkClient = sparkClient; + this.jobHandle = jobHandle; + } + + @Override + public int getJobId() { + return jobHandle.getSparkJobIds().size() == 1 ? jobHandle.getSparkJobIds().get(0) : -1; + } + + @Override + public JobExecutionStatus getState() { + SparkJobInfo sparkJobInfo = getSparkJobInfo(); + return sparkJobInfo != null ? sparkJobInfo.status() : JobExecutionStatus.UNKNOWN; + } + + @Override + public int[] getStageIds() { + SparkJobInfo sparkJobInfo = getSparkJobInfo(); + return sparkJobInfo != null ? sparkJobInfo.stageIds() : new int[0]; + } + + @Override + public Map getSparkStageProgress() { + Map stageProgresses = new HashMap(); + for (int stageId : getStageIds()) { + SparkStageInfo sparkStageInfo = getSparkStageInfo(stageId); + if (sparkStageInfo != null && sparkStageInfo.name() != null) { + int runningTaskCount = sparkStageInfo.numActiveTasks(); + int completedTaskCount = sparkStageInfo.numCompletedTasks(); + int failedTaskCount = sparkStageInfo.numFailedTasks(); + int totalTaskCount = sparkStageInfo.numTasks(); + SparkStageProgress sparkStageProgress = new SparkStageProgress( + totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); + stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" + + sparkStageInfo.currentAttemptId(), sparkStageProgress); + } + } + return stageProgresses; + } + + @Override + public SparkCounters getCounter() { + return null; + } + + @Override + public SparkStatistics getSparkStatistics() { + return null; + } + + @Override + public void cleanup() { + + } + + private SparkJobInfo getSparkJobInfo() { + Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 ? + jobHandle.getSparkJobIds().get(0) : null; + if (sparkJobId == null) { + return null; + } + JobHandle getJobInfo = sparkClient.submit( + new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId)); + try { + return getJobInfo.get(); + } catch (Throwable t) { + LOG.warn("Error getting job info", t); + return null; + } + } + + private SparkStageInfo getSparkStageInfo(int stageId) { + JobHandle getStageInfo = sparkClient.submit(new GetStageInfoJob(stageId)); + try { + return getStageInfo.get(); + } catch (Throwable t) { + LOG.warn("Error getting stage info", t); + return null; + } + } + + private static class GetJobInfoJob implements Job { + private final String clientJobId; + private final int sparkJobId; + + GetJobInfoJob(String clientJobId, int sparkJobId) { + this.clientJobId = clientJobId; + this.sparkJobId = sparkJobId; + } + + @Override + public HiveSparkJobInfo call(JobContext jc) throws Exception { + SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId); + if (jobInfo == null) { + List> list = jc.getMonitoredJobs().get(clientJobId); + if (list != null && list.size() == 1) { + JavaFutureAction futureAction = list.get(0); + if (futureAction.isDone()) { + jobInfo = new SparkJobInfo() { + @Override + public int jobId() { + return sparkJobId; + } + + @Override + public int[] stageIds() { + return new int[0]; + } + + @Override + public JobExecutionStatus status() { + return JobExecutionStatus.SUCCEEDED; + } + }; + } + } + } + if(jobInfo == null) { + jobInfo = new SparkJobInfo() { + @Override + public int jobId() { + return -1; + } + + @Override + public int[] stageIds() { + return new int[0]; + } + + @Override + public JobExecutionStatus status() { + return JobExecutionStatus.UNKNOWN; + } + }; + } + return new HiveSparkJobInfo(jobInfo); + } + } + + private static class GetStageInfoJob implements Job{ + private final int stageId; + + GetStageInfoJob(int stageId){ + this.stageId=stageId; + } + + @Override + public HiveSparkStageInfo call(JobContext jc) throws Exception { + SparkStageInfo stageInfo = jc.sc().statusTracker().getStageInfo(stageId); + return stageInfo != null ? new HiveSparkStageInfo(stageInfo) : new HiveSparkStageInfo(); + } + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java deleted file mode 100644 index 19fd20d..0000000 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/SimpleSparkJobStatus.java +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.spark.status.impl; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.google.common.collect.Maps; -import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatistics; -import org.apache.hadoop.hive.ql.exec.spark.Statistic.SparkStatisticsBuilder; -import org.apache.hadoop.hive.ql.exec.spark.counter.SparkCounters; -import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; -import org.apache.hadoop.hive.ql.exec.spark.status.SparkStageProgress; -import org.apache.spark.JobExecutionStatus; -import org.apache.spark.SparkJobInfo; -import org.apache.spark.SparkStageInfo; -import org.apache.spark.api.java.JavaFutureAction; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.executor.ShuffleReadMetrics; -import org.apache.spark.executor.ShuffleWriteMetrics; -import org.apache.spark.executor.TaskMetrics; - -import scala.Option; - -public class SimpleSparkJobStatus implements SparkJobStatus { - - private final JavaSparkContext sparkContext; - private int jobId; - // After SPARK-2321, we only use JobMetricsListener to get job metrics - // TODO: remove it when the new API provides equivalent functionality - private JobMetricsListener jobMetricsListener; - private SparkCounters sparkCounters; - private JavaFutureAction future; - - public SimpleSparkJobStatus(JavaSparkContext sparkContext, int jobId, - JobMetricsListener jobMetricsListener, SparkCounters sparkCounters, - JavaFutureAction future) { - this.sparkContext = sparkContext; - this.jobId = jobId; - this.jobMetricsListener = jobMetricsListener; - this.sparkCounters = sparkCounters; - this.future = future; - } - - @Override - public int getJobId() { - return jobId; - } - - @Override - public JobExecutionStatus getState() { - // For spark job with empty source data, it's not submitted actually, so we would never - // receive JobStart/JobEnd event in JobStateListener, use JavaFutureAction to get current - // job state. - if (future.isDone()) { - return JobExecutionStatus.SUCCEEDED; - } else { - // SparkJobInfo may not be available yet - SparkJobInfo sparkJobInfo = getJobInfo(); - return sparkJobInfo == null ? null : sparkJobInfo.status(); - } - } - - @Override - public int[] getStageIds() { - SparkJobInfo sparkJobInfo = getJobInfo(); - return sparkJobInfo == null ? new int[0] : sparkJobInfo.stageIds(); - } - - @Override - public Map getSparkStageProgress() { - Map stageProgresses = new HashMap(); - for (int stageId : getStageIds()) { - SparkStageInfo sparkStageInfo = getStageInfo(stageId); - if (sparkStageInfo != null) { - int runningTaskCount = sparkStageInfo.numActiveTasks(); - int completedTaskCount = sparkStageInfo.numCompletedTasks(); - int failedTaskCount = sparkStageInfo.numFailedTasks(); - int totalTaskCount = sparkStageInfo.numTasks(); - SparkStageProgress sparkStageProgress = new SparkStageProgress( - totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); - stageProgresses.put(String.valueOf(sparkStageInfo.stageId()) + "_" + - sparkStageInfo.currentAttemptId(), sparkStageProgress); - } - } - return stageProgresses; - } - - @Override - public SparkCounters getCounter() { - return sparkCounters; - } - - @Override - public SparkStatistics getSparkStatistics() { - SparkStatisticsBuilder sparkStatisticsBuilder = new SparkStatisticsBuilder(); - // add Hive operator level statistics. - sparkStatisticsBuilder.add(sparkCounters); - // add spark job metrics. - String jobIdentifier = "Spark Job[" + jobId + "] Metrics"; - Map> jobMetric = jobMetricsListener.getJobMetric(jobId); - if (jobMetric == null) { - return null; - } - - Map flatJobMetric = combineJobLevelMetrics(jobMetric); - for (Map.Entry entry : flatJobMetric.entrySet()) { - sparkStatisticsBuilder.add(jobIdentifier, entry.getKey(), Long.toString(entry.getValue())); - } - - return sparkStatisticsBuilder.build(); - } - - @Override - public void cleanup() { - jobMetricsListener.cleanup(jobId); - } - - private Map combineJobLevelMetrics(Map> jobMetric) { - Map results = Maps.newLinkedHashMap(); - - long executorDeserializeTime = 0; - long executorRunTime = 0; - long resultSize = 0; - long jvmGCTime = 0; - long resultSerializationTime = 0; - long memoryBytesSpilled = 0; - long diskBytesSpilled = 0; - long bytesRead = 0; - long remoteBlocksFetched = 0; - long localBlocksFetched = 0; - long fetchWaitTime = 0; - long remoteBytesRead = 0; - long shuffleBytesWritten = 0; - long shuffleWriteTime = 0; - boolean inputMetricExist = false; - boolean shuffleReadMetricExist = false; - boolean shuffleWriteMetricExist = false; - - for (List stageMetric : jobMetric.values()) { - if (stageMetric != null) { - for (TaskMetrics taskMetrics : stageMetric) { - if (taskMetrics != null) { - executorDeserializeTime += taskMetrics.executorDeserializeTime(); - executorRunTime += taskMetrics.executorRunTime(); - resultSize += taskMetrics.resultSize(); - jvmGCTime += taskMetrics.jvmGCTime(); - resultSerializationTime += taskMetrics.resultSerializationTime(); - memoryBytesSpilled += taskMetrics.memoryBytesSpilled(); - diskBytesSpilled += taskMetrics.diskBytesSpilled(); - if (!taskMetrics.inputMetrics().isEmpty()) { - inputMetricExist = true; - bytesRead += taskMetrics.inputMetrics().get().bytesRead(); - } - Option shuffleReadMetricsOption = taskMetrics.shuffleReadMetrics(); - if (!shuffleReadMetricsOption.isEmpty()) { - shuffleReadMetricExist = true; - remoteBlocksFetched += shuffleReadMetricsOption.get().remoteBlocksFetched(); - localBlocksFetched += shuffleReadMetricsOption.get().localBlocksFetched(); - fetchWaitTime += shuffleReadMetricsOption.get().fetchWaitTime(); - remoteBytesRead += shuffleReadMetricsOption.get().remoteBytesRead(); - } - Option shuffleWriteMetricsOption = taskMetrics.shuffleWriteMetrics(); - if (!shuffleWriteMetricsOption.isEmpty()) { - shuffleWriteMetricExist = true; - shuffleBytesWritten += shuffleWriteMetricsOption.get().shuffleBytesWritten(); - shuffleWriteTime += shuffleWriteMetricsOption.get().shuffleWriteTime(); - } - } - } - } - } - - results.put("EexcutorDeserializeTime", executorDeserializeTime); - results.put("ExecutorRunTime", executorRunTime); - results.put("ResultSize", resultSize); - results.put("JvmGCTime", jvmGCTime); - results.put("ResultSerializationTime", resultSerializationTime); - results.put("MemoryBytesSpilled", memoryBytesSpilled); - results.put("DiskBytesSpilled", diskBytesSpilled); - if (inputMetricExist) { - results.put("BytesRead", bytesRead); - } - if (shuffleReadMetricExist) { - results.put("RemoteBlocksFetched", remoteBlocksFetched); - results.put("LocalBlocksFetched", localBlocksFetched); - results.put("TotalBlocksFetched", localBlocksFetched + remoteBlocksFetched); - results.put("FetchWaitTime", fetchWaitTime); - results.put("RemoteBytesRead", remoteBytesRead); - } - if (shuffleWriteMetricExist) { - results.put("ShuffleBytesWritten", shuffleBytesWritten); - results.put("ShuffleWriteTime", shuffleWriteTime); - } - return results; - } - - private SparkJobInfo getJobInfo() { - return sparkContext.statusTracker().getJobInfo(jobId); - } - - private SparkStageInfo getStageInfo(int stageId) { - return sparkContext.statusTracker().getStageInfo(stageId); - } -} diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java index 7a4b88b..0fabba4 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobContext.java @@ -22,6 +22,9 @@ import org.apache.hadoop.hive.common.classification.InterfaceAudience; +import java.util.List; +import java.util.Map; + /** * Holds runtime information about the job execution context. * @@ -42,4 +45,9 @@ */ JavaFutureAction monitor(JavaFutureAction job); + /** + * Return a map from client job Id to corresponding JavaFutureActions + */ + Map>> getMonitoredJobs(); + } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java index c5da817..e58191d 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobContextImpl.java @@ -20,14 +20,20 @@ import org.apache.spark.api.java.JavaFutureAction; import org.apache.spark.api.java.JavaSparkContext; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + class JobContextImpl implements JobContext { private final JavaSparkContext sc; private final ThreadLocal monitorCb; + private final Map>> monitoredJobs; public JobContextImpl(JavaSparkContext sc) { this.sc = sc; this.monitorCb = new ThreadLocal(); + monitoredJobs = new ConcurrentHashMap>>(); } @@ -42,11 +48,17 @@ public JavaSparkContext sc() { return job; } + @Override + public Map>> getMonitoredJobs() { + return monitoredJobs; + } + void setMonitorCb(MonitorCallback cb) { monitorCb.set(cb); } void stop() { + monitoredJobs.clear(); sc.stop(); } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java index 4a43a01..0a1ae7d 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobHandle.java @@ -18,6 +18,7 @@ package org.apache.hive.spark.client; import java.io.Serializable; +import java.util.List; import java.util.concurrent.Future; import org.apache.hadoop.hive.common.classification.InterfaceAudience; @@ -42,6 +43,11 @@ */ MetricsCollection getMetrics(); + /** + * Get corresponding spark job IDs for this job + */ + List getSparkJobIds(); + // TODO: expose job status? } diff --git spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java index 054f5ec..5f27e7e 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/JobHandleImpl.java @@ -18,6 +18,8 @@ package org.apache.hive.spark.client; import java.io.Serializable; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeUnit; @@ -38,6 +40,8 @@ private T result; private Throwable error; + private final List sparkJobIds; + JobHandleImpl(SparkClientImpl client, String jobId) { this.client = client; this.jobId = jobId; @@ -45,6 +49,7 @@ this.metrics = new MetricsCollection(); this.cancelled = new AtomicBoolean(); this.completed = false; + this.sparkJobIds = new CopyOnWriteArrayList(); } /** Requests a running job to be cancelled. */ @@ -103,6 +108,11 @@ public MetricsCollection getMetrics() { return metrics; } + @Override + public List getSparkJobIds() { + return sparkJobIds; + } + private T get(long timeout) throws ExecutionException, InterruptedException, TimeoutException { long deadline = System.currentTimeMillis() + timeout; synchronized (monitor) { diff --git spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java index 4212634..ed94861 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java +++ spark-client/src/main/java/org/apache/hive/spark/client/Protocol.java @@ -125,4 +125,21 @@ } + /** + * Inform the client that a new spark job has been submitted for the client job + */ + static class JobSubmitted implements Serializable { + final String clientJobId; + final int sparkJobId; + + JobSubmitted(String clientJobId, int sparkJobId) { + this.clientJobId = clientJobId; + this.sparkJobId = sparkJobId; + } + + JobSubmitted() { + this(null, -1); + } + } + } diff --git spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java index 3de1fec..6dbe45a 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java +++ spark-client/src/main/java/org/apache/hive/spark/client/RemoteDriver.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; @@ -216,7 +217,7 @@ public void call(JavaFutureAction future) { T result = req.job.call(jc); synchronized (completed) { - while (completed.get() != jobs.size()) { + while (completed.get() < jobs.size()) { LOG.debug("Client job {} finished, {} of {} Spark jobs finished.", req.id, completed.get(), jobs.size()); completed.wait(); @@ -249,6 +250,11 @@ void jobDone() { private void monitorJob(JavaFutureAction job) { jobs.add(job); + if (!jc.getMonitoredJobs().containsKey(req.id)) { + jc.getMonitoredJobs().put(req.id, new CopyOnWriteArrayList>()); + } + jc.getMonitoredJobs().get(req.id).add(job); + client.tell(new Protocol.JobSubmitted(req.id, job.jobIds().get(0)), actor); } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java index 161182f..f1ce4c2 100644 --- spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java +++ spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java @@ -319,6 +319,17 @@ public void onReceive(Object message) throws Exception { } else { LOG.warn("Received result for unknown job {}", jr.id); } + } else if (message instanceof Protocol.JobSubmitted) { + Protocol.JobSubmitted jobSubmitted = (Protocol.JobSubmitted) message; + JobHandleImpl handle = jobs.get(jobSubmitted.clientJobId); + if (handle != null) { + LOG.info("Received spark job ID: {} for {}", + jobSubmitted.sparkJobId, jobSubmitted.clientJobId); + handle.getSparkJobIds().add(jobSubmitted.sparkJobId); + } else { + LOG.warn("Received spark job ID: {} for unknown job {}", + jobSubmitted.sparkJobId, jobSubmitted.clientJobId); + } } } diff --git spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java new file mode 100644 index 0000000..8ea6969 --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkJobInfo.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.spark.client.status; + +import org.apache.spark.JobExecutionStatus; +import org.apache.spark.SparkJobInfo; + +import java.io.Serializable; + +/** + * Wrapper of SparkJobInfo + */ +public class HiveSparkJobInfo implements SparkJobInfo, Serializable { + private final int jobId; + private final int[] stageIds; + private final JobExecutionStatus status; + + public HiveSparkJobInfo(SparkJobInfo jobInfo) { + this.jobId = jobInfo.jobId(); + this.stageIds = jobInfo.stageIds(); + this.status = jobInfo.status(); + } + + public HiveSparkJobInfo(int jobId, int[] stageIds, JobExecutionStatus status) { + this.jobId = jobId; + this.stageIds = stageIds; + this.status = status; + } + + public HiveSparkJobInfo() { + this(-1, new int[0], JobExecutionStatus.UNKNOWN); + } + + @Override + public int jobId() { + return jobId; + } + + @Override + public int[] stageIds() { + return stageIds; + } + + @Override + public JobExecutionStatus status() { + return status; + } + +} diff --git spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java new file mode 100644 index 0000000..dfbb01e --- /dev/null +++ spark-client/src/main/java/org/apache/hive/spark/client/status/HiveSparkStageInfo.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.spark.client.status; + +import org.apache.spark.SparkStageInfo; + +import java.io.Serializable; + +/** + * Wrapper of SparkStageInfo + */ +public class HiveSparkStageInfo implements SparkStageInfo, Serializable { + private final int stageId; + private final int currentAttemptId; + private final String name; + private final int numTasks; + private final int numActiveTasks; + private final int numCompletedTasks; + private final int numFailedTasks; + + public HiveSparkStageInfo(SparkStageInfo stageInfo) { + stageId = stageInfo.stageId(); + currentAttemptId = stageInfo.currentAttemptId(); + name = stageInfo.name(); + numTasks = stageInfo.numTasks(); + numActiveTasks = stageInfo.numActiveTasks(); + numCompletedTasks = stageInfo.numCompletedTasks(); + numFailedTasks = stageInfo.numFailedTasks(); + } + + public HiveSparkStageInfo(int stageId, int currentAttemptId, String name, + int numTasks, int numActiveTasks, int numCompletedTasks, int numFailedTasks) { + this.stageId = stageId; + this.currentAttemptId = currentAttemptId; + this.name = name; + this.numTasks = numTasks; + this.numActiveTasks = numActiveTasks; + this.numCompletedTasks = numCompletedTasks; + this.numFailedTasks = numFailedTasks; + } + + public HiveSparkStageInfo() { + this(-1, -1, null, -1, -1, -1, -1); + } + + @Override + public int stageId() { + return stageId; + } + + @Override + public int currentAttemptId() { + return currentAttemptId; + } + + @Override + public String name() { + return name; + } + + @Override + public int numTasks() { + return numTasks; + } + + @Override + public int numActiveTasks() { + return numActiveTasks; + } + + @Override + public int numCompletedTasks() { + return numCompletedTasks; + } + + @Override + public int numFailedTasks() { + return numFailedTasks; + } + +}