commit 070a00993ba60351612061db341ddb7a8b288829 Author: Bharath Krishna Date: Tue Jul 3 11:29:09 2018 -0700 HIVE-19733 : Fixing RemoteSparkJobStatus#getSparkStageProgress inefficient implementation diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 832832b325d7eb0e43fb34a4306d7ffa43ceaa78..3bac9fe26240a5ec211e2944f07ee15924e236e1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -45,6 +45,7 @@ import org.apache.spark.api.java.JavaFutureAction; import java.io.Serializable; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -103,18 +104,20 @@ public JobExecutionStatus getState() throws HiveException { @Override public Map getSparkStageProgress() throws HiveException { + List sparkStagesInfo = getSparkStagesInfo(); Map stageProgresses = new HashMap(); - for (int stageId : getStageIds()) { - SparkStageInfo sparkStageInfo = getSparkStageInfo(stageId); - if (sparkStageInfo != null && sparkStageInfo.name() != null) { - int runningTaskCount = sparkStageInfo.numActiveTasks(); - int completedTaskCount = sparkStageInfo.numCompletedTasks(); - int failedTaskCount = sparkStageInfo.numFailedTasks(); - int totalTaskCount = sparkStageInfo.numTasks(); - SparkStageProgress sparkStageProgress = new SparkStageProgress( - totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); - SparkStage stage = new SparkStage(sparkStageInfo.stageId(), sparkStageInfo.currentAttemptId()); - stageProgresses.put(stage, sparkStageProgress); + if (sparkStagesInfo != null) { + for (SparkStageInfo sparkStageInfo : sparkStagesInfo) { + if (sparkStageInfo != null && sparkStageInfo.name() != null) { + int runningTaskCount = sparkStageInfo.numActiveTasks(); + int completedTaskCount = sparkStageInfo.numCompletedTasks(); + int failedTaskCount = sparkStageInfo.numFailedTasks(); + int totalTaskCount = sparkStageInfo.numTasks(); + SparkStageProgress sparkStageProgress = + new SparkStageProgress(totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); + SparkStage stage = new SparkStage(sparkStageInfo.stageId(), sparkStageInfo.currentAttemptId()); + stageProgresses.put(stage, sparkStageProgress); + } } } return stageProgresses; @@ -191,35 +194,40 @@ public boolean isRemoteActive() { return sparkClient.isActive(); } - private SparkJobInfo getSparkJobInfo() throws HiveException { - Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 - ? jobHandle.getSparkJobIds().get(0) : null; - if (sparkJobId == null) { - return null; - } - Future getJobInfo = sparkClient.run( - new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId)); + private T runJobOnSparkClient(Job job) throws HiveException { + Future getJob = sparkClient.run(job); try { - return getJobInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); + return getJob.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); } catch (TimeoutException e) { - throw new HiveException(e, ErrorMsg.SPARK_GET_JOB_INFO_TIMEOUT, - Long.toString(sparkClientTimeoutInSeconds)); + throw new HiveException(e, ErrorMsg.SPARK_GET_JOB_INFO_TIMEOUT, Long.toString(sparkClientTimeoutInSeconds)); } catch (InterruptedException e) { throw new HiveException(e, ErrorMsg.SPARK_GET_JOB_INFO_INTERRUPTED); } catch (ExecutionException e) { - throw new HiveException(e, ErrorMsg.SPARK_GET_JOB_INFO_EXECUTIONERROR, - Throwables.getRootCause(e).getMessage()); + throw new HiveException(e, ErrorMsg.SPARK_GET_JOB_INFO_EXECUTIONERROR, Throwables.getRootCause(e).getMessage()); } } - private SparkStageInfo getSparkStageInfo(int stageId) { - Future getStageInfo = sparkClient.run(new GetStageInfoJob(stageId)); - try { - return getStageInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); - } catch (Throwable t) { - LOG.warn("Error getting stage info", t); + private Integer getSparkJobId() { + return jobHandle.getSparkJobIds().size() == 1 + ? jobHandle.getSparkJobIds().get(0) : null; + } + + private SparkJobInfo getSparkJobInfo() throws HiveException { + + Integer sparkJobId = getSparkJobId(); + if (sparkJobId == null) { return null; } + return runJobOnSparkClient(new GetJobInfoJob(jobHandle.getClientJobId(), sparkJobId)); + } + + private List getSparkStagesInfo()throws HiveException { + + Integer sparkJobId = getSparkJobId(); + if(sparkJobId == null) { + return null; + } + return runJobOnSparkClient(new GetSparkStagesInfoJob(jobHandle.getClientJobId(), sparkJobId)); } public JobHandle.State getRemoteJobState() { @@ -229,86 +237,98 @@ private SparkStageInfo getSparkStageInfo(int stageId) { return jobHandle.getState(); } - private static class GetJobInfoJob implements Job { + private static SparkJobInfo getDefaultJobInfo(final Integer jobId, + final JobExecutionStatus status) { + return new SparkJobInfo() { + + @Override + public int jobId() { + return jobId == null ? -1 : jobId; + } + + @Override + public int[] stageIds() { + return new int[0]; + } + + @Override + public JobExecutionStatus status() { + return status; + } + }; + } + + private static SparkJobInfo getJobInfo(JobContext jc, String clientJobId, int sparkJobId) { + + SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId); + if (jobInfo == null) { + List> list = new ArrayList<>(jc.getMonitoredJobs().get(clientJobId)); + if (list != null && list.size() == 1) { + JavaFutureAction futureAction = list.get(0); + if (futureAction.isDone()) { + boolean futureSucceed = true; + try { + futureAction.get(); + } catch (Exception e) { + LOG.error("Failed to run job " + sparkJobId, e); + futureSucceed = false; + } + jobInfo = getDefaultJobInfo(sparkJobId, + futureSucceed ? JobExecutionStatus.SUCCEEDED : JobExecutionStatus.FAILED); + } + } + } + if (jobInfo == null) { + jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.UNKNOWN); + } + return jobInfo; + } + + private static class GetSparkStagesInfoJob implements Job> { private final String clientJobId; private final int sparkJobId; - private GetJobInfoJob() { + private GetSparkStagesInfoJob() { // For serialization. this(null, -1); } - GetJobInfoJob(String clientJobId, int sparkJobId) { + GetSparkStagesInfoJob(String clientJobId, int sparkJobId) { this.clientJobId = clientJobId; this.sparkJobId = sparkJobId; } - @Override - public SparkJobInfo call(JobContext jc) throws Exception { - SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId); - if (jobInfo == null) { - List> list = jc.getMonitoredJobs().get(clientJobId); - if (list != null && list.size() == 1) { - JavaFutureAction futureAction = list.get(0); - if (futureAction.isDone()) { - boolean futureSucceed = true; - try { - futureAction.get(); - } catch (Exception e) { - LOG.error("Failed to run job " + sparkJobId, e); - futureSucceed = false; - } - jobInfo = getDefaultJobInfo(sparkJobId, - futureSucceed ? JobExecutionStatus.SUCCEEDED : JobExecutionStatus.FAILED); - } - } + public ArrayList call(JobContext jc) throws Exception { + SparkJobInfo jobInfo = getJobInfo(jc, clientJobId, sparkJobId); + ArrayList sparkStageInfos = new ArrayList<>(); + int[] stageIds = jobInfo.stageIds(); + for(Integer stageid : stageIds) { + SparkStageInfo stageInfo = jc.sc().statusTracker().getStageInfo(stageid); + sparkStageInfos.add(stageInfo); } - if (jobInfo == null) { - jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.UNKNOWN); - } - return jobInfo; + return sparkStageInfos; } } + private static class GetJobInfoJob implements Job { + private final String clientJobId; + private final int sparkJobId; - private static class GetStageInfoJob implements Job { - private final int stageId; - - private GetStageInfoJob() { + private GetJobInfoJob() { // For serialization. - this(-1); + this(null, -1); } - GetStageInfoJob(int stageId) { - this.stageId = stageId; + GetJobInfoJob(String clientJobId, int sparkJobId) { + this.clientJobId = clientJobId; + this.sparkJobId = sparkJobId; } @Override - public SparkStageInfo call(JobContext jc) throws Exception { - return jc.sc().statusTracker().getStageInfo(stageId); + public SparkJobInfo call(JobContext jc) throws Exception { + return getJobInfo(jc, clientJobId, sparkJobId); } } - private static SparkJobInfo getDefaultJobInfo(final Integer jobId, - final JobExecutionStatus status) { - return new SparkJobInfo() { - - @Override - public int jobId() { - return jobId == null ? -1 : jobId; - } - - @Override - public int[] stageIds() { - return new int[0]; - } - - @Override - public JobExecutionStatus status() { - return status; - } - }; - } - private static class GetAppIDJob implements Job { public GetAppIDJob() { @@ -334,3 +354,4 @@ public String call(JobContext jc) throws Exception { } } } +