commit 487478779cdf6fcc1545a2f4e0fef5e230c1360e Author: Bharath Krishna Date: Fri Jun 22 08:40:36 2018 -0700 HIVE-19733 : Fix RemoteSparkJobStatus#getSparkStageProgress inefficient implementation diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 832832b325d7eb0e43fb34a4306d7ffa43ceaa78..8a82e403226ea8462d21fa2135f26bedbfc218c1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -45,6 +45,7 @@ import org.apache.spark.api.java.JavaFutureAction; import java.io.Serializable; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -103,18 +104,20 @@ public JobExecutionStatus getState() throws HiveException { @Override public Map getSparkStageProgress() throws HiveException { + List sparkStagesInfo = getSparkStagesInfo(); Map stageProgresses = new HashMap(); - for (int stageId : getStageIds()) { - SparkStageInfo sparkStageInfo = getSparkStageInfo(stageId); - if (sparkStageInfo != null && sparkStageInfo.name() != null) { - int runningTaskCount = sparkStageInfo.numActiveTasks(); - int completedTaskCount = sparkStageInfo.numCompletedTasks(); - int failedTaskCount = sparkStageInfo.numFailedTasks(); - int totalTaskCount = sparkStageInfo.numTasks(); - SparkStageProgress sparkStageProgress = new SparkStageProgress( - totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); - SparkStage stage = new SparkStage(sparkStageInfo.stageId(), sparkStageInfo.currentAttemptId()); - stageProgresses.put(stage, sparkStageProgress); + if (sparkStagesInfo != null) { + for (SparkStageInfo sparkStageInfo : sparkStagesInfo) { + if (sparkStageInfo != null && sparkStageInfo.name() != null) { + int runningTaskCount = sparkStageInfo.numActiveTasks(); + int completedTaskCount = sparkStageInfo.numCompletedTasks(); + int failedTaskCount = sparkStageInfo.numFailedTasks(); + int totalTaskCount = sparkStageInfo.numTasks(); + SparkStageProgress sparkStageProgress = + new SparkStageProgress(totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); + SparkStage stage = new SparkStage(sparkStageInfo.stageId(), sparkStageInfo.currentAttemptId()); + stageProgresses.put(stage, sparkStageProgress); + } } } return stageProgresses; @@ -222,6 +225,27 @@ private SparkStageInfo getSparkStageInfo(int stageId) { } } + private List getSparkStagesInfo()throws HiveException { + + Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 + ? jobHandle.getSparkJobIds().get(0) : null; + if (sparkJobId == null) { + return null; + } + Future> getJobInfo = sparkClient.run( + new GetSparkStagesInfoJob(jobHandle.getClientJobId(), sparkJobId)); + try { + return getJobInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); + } catch (TimeoutException e) { + throw new HiveException(e, ErrorMsg.SPARK_GET_JOB_INFO_TIMEOUT, + Long.toString(sparkClientTimeoutInSeconds)); + } catch (InterruptedException e) { + throw new HiveException(e, ErrorMsg.SPARK_GET_JOB_INFO_INTERRUPTED); + } catch (ExecutionException e) { + throw new HiveException(e, ErrorMsg.SPARK_GET_JOB_INFO_EXECUTIONERROR, + Throwables.getRootCause(e).getMessage()); + } + } public JobHandle.State getRemoteJobState() { if (error != null) { return JobHandle.State.FAILED; @@ -229,6 +253,51 @@ private SparkStageInfo getSparkStageInfo(int stageId) { return jobHandle.getState(); } + private static class GetSparkStagesInfoJob implements Job> { + private final String clientJobId; + private final int sparkJobId; + + private GetSparkStagesInfoJob() { + // For serialization. + this(null, -1); + } + + GetSparkStagesInfoJob(String clientJobId, int sparkJobId) { + this.clientJobId = clientJobId; + this.sparkJobId = sparkJobId; + } + @Override + public ArrayList call(JobContext jc) throws Exception { + SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId); + if (jobInfo == null) { + ArrayList> list = new ArrayList<>(jc.getMonitoredJobs().get(clientJobId)); + if (list != null && list.size() == 1) { + JavaFutureAction futureAction = list.get(0); + if (futureAction.isDone()) { + boolean futureSucceed = true; + try { + futureAction.get(); + } catch (Exception e) { + LOG.error("Failed to run job " + sparkJobId, e); + futureSucceed = false; + } + jobInfo = getDefaultJobInfo(sparkJobId, + futureSucceed ? JobExecutionStatus.SUCCEEDED : JobExecutionStatus.FAILED); + } + } + } + if (jobInfo == null) { + jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.UNKNOWN); + } + ArrayList sparkStageInfos = new ArrayList<>(); + int[] stageIds = jobInfo.stageIds(); + for(Integer stageid : stageIds) { + SparkStageInfo stageInfo = jc.sc().statusTracker().getStageInfo(stageid); + sparkStageInfos.add(stageInfo); + } + return sparkStageInfos; + } + } private static class GetJobInfoJob implements Job { private final String clientJobId; private final int sparkJobId;