commit 72a23ed78a3a5facc8896f8eb487fc5dfb35c767 Author: Bharath Krishna Date: Thu Jul 19 11:14:29 2018 -0700 HIVE-19733 : Fix RemoteSparkJobStatus#getSparkStageProgress inefficient implementation diff --git ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 06d0ed3c18f80df84c7d06e695c722e20957b0eb..37bc153631f9cee415338c53a27ec396781caca2 100644 --- ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -599,6 +599,9 @@ SPARK_GET_JOB_INFO_EXECUTIONERROR(30046, "Spark job failed in execution while getting job info due to exception {0}"), REPL_FILE_SYSTEM_OPERATION_RETRY(30047, "Replication file system operation retry expired."), + SPARK_GET_STAGES_INFO_TIMEOUT(30048, "Spark job GetSparkStagesInfoJob timed out after {0} seconds.", true), + SPARK_GET_STAGES_INFO_INTERRUPTED(30049, "Spark job GetSparkStagesInfoJob was interrupted."), + SPARK_GET_STAGES_INFO_EXECUTIONERROR(30050, "Spark job GetSparkStagesInfoJob failed in execution while getting job info due to exception {0}", true), //========================== 40000 range starts here ========================// diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 832832b325d7eb0e43fb34a4306d7ffa43ceaa78..3d414430ab4061698c5c694bce0c3fca0a7cb424 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -45,6 +45,7 @@ import org.apache.spark.api.java.JavaFutureAction; import java.io.Serializable; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -103,18 +104,20 @@ public JobExecutionStatus getState() throws HiveException { @Override public Map getSparkStageProgress() throws HiveException { + List sparkStagesInfo = getSparkStagesInfo(); Map stageProgresses = new HashMap(); - for (int stageId : getStageIds()) { - SparkStageInfo sparkStageInfo = getSparkStageInfo(stageId); - if (sparkStageInfo != null && sparkStageInfo.name() != null) { - int runningTaskCount = sparkStageInfo.numActiveTasks(); - int completedTaskCount = sparkStageInfo.numCompletedTasks(); - int failedTaskCount = sparkStageInfo.numFailedTasks(); - int totalTaskCount = sparkStageInfo.numTasks(); - SparkStageProgress sparkStageProgress = new SparkStageProgress( - totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); - SparkStage stage = new SparkStage(sparkStageInfo.stageId(), sparkStageInfo.currentAttemptId()); - stageProgresses.put(stage, sparkStageProgress); + if (sparkStagesInfo != null) { + for (SparkStageInfo sparkStageInfo : sparkStagesInfo) { + if (sparkStageInfo != null && sparkStageInfo.name() != null) { + int runningTaskCount = sparkStageInfo.numActiveTasks(); + int completedTaskCount = sparkStageInfo.numCompletedTasks(); + int failedTaskCount = sparkStageInfo.numFailedTasks(); + int totalTaskCount = sparkStageInfo.numTasks(); + SparkStageProgress sparkStageProgress = + new SparkStageProgress(totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount); + SparkStage stage = new SparkStage(sparkStageInfo.stageId(), sparkStageInfo.currentAttemptId()); + stageProgresses.put(stage, sparkStageProgress); + } } } return stageProgresses; @@ -212,14 +215,26 @@ private SparkJobInfo getSparkJobInfo() throws HiveException { } } - private SparkStageInfo getSparkStageInfo(int stageId) { - Future getStageInfo = sparkClient.run(new GetStageInfoJob(stageId)); - try { - return getStageInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); - } catch (Throwable t) { - LOG.warn("Error getting stage info", t); + private List getSparkStagesInfo()throws HiveException { + + Integer sparkJobId = jobHandle.getSparkJobIds().size() == 1 + ? jobHandle.getSparkJobIds().get(0) : null; + if (sparkJobId == null) { return null; } + Future> getStagesInfo = sparkClient.run( + new GetSparkStagesInfoJob(jobHandle.getClientJobId(), sparkJobId)); + try { + return getStagesInfo.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); + } catch (TimeoutException e) { + throw new HiveException(e, ErrorMsg.SPARK_GET_STAGES_INFO_TIMEOUT, + Long.toString(sparkClientTimeoutInSeconds)); + } catch (InterruptedException e) { + throw new HiveException(e, ErrorMsg.SPARK_GET_STAGES_INFO_INTERRUPTED); + } catch (ExecutionException e) { + throw new HiveException(e, ErrorMsg.SPARK_GET_STAGES_INFO_EXECUTIONERROR, + Throwables.getRootCause(e).getMessage()); + } } public JobHandle.State getRemoteJobState() { @@ -229,25 +244,24 @@ private SparkStageInfo getSparkStageInfo(int stageId) { return jobHandle.getState(); } - private static class GetJobInfoJob implements Job { + private static class GetSparkStagesInfoJob implements Job> { private final String clientJobId; private final int sparkJobId; - private GetJobInfoJob() { + private GetSparkStagesInfoJob() { // For serialization. this(null, -1); } - GetJobInfoJob(String clientJobId, int sparkJobId) { + GetSparkStagesInfoJob(String clientJobId, int sparkJobId) { this.clientJobId = clientJobId; this.sparkJobId = sparkJobId; } - @Override - public SparkJobInfo call(JobContext jc) throws Exception { + public ArrayList call(JobContext jc) throws Exception { SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId); if (jobInfo == null) { - List> list = jc.getMonitoredJobs().get(clientJobId); + ArrayList> list = new ArrayList<>(jc.getMonitoredJobs().get(clientJobId)); if (list != null && list.size() == 1) { JavaFutureAction futureAction = list.get(0); if (futureAction.isDone()) { @@ -266,25 +280,53 @@ public SparkJobInfo call(JobContext jc) throws Exception { if (jobInfo == null) { jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.UNKNOWN); } - return jobInfo; + ArrayList sparkStageInfos = new ArrayList<>(); + int[] stageIds = jobInfo.stageIds(); + for(Integer stageid : stageIds) { + SparkStageInfo stageInfo = jc.sc().statusTracker().getStageInfo(stageid); + sparkStageInfos.add(stageInfo); + } + return sparkStageInfos; } } + private static class GetJobInfoJob implements Job { + private final String clientJobId; + private final int sparkJobId; - private static class GetStageInfoJob implements Job { - private final int stageId; - - private GetStageInfoJob() { + private GetJobInfoJob() { // For serialization. - this(-1); + this(null, -1); } - GetStageInfoJob(int stageId) { - this.stageId = stageId; + GetJobInfoJob(String clientJobId, int sparkJobId) { + this.clientJobId = clientJobId; + this.sparkJobId = sparkJobId; } @Override - public SparkStageInfo call(JobContext jc) throws Exception { - return jc.sc().statusTracker().getStageInfo(stageId); + public SparkJobInfo call(JobContext jc) throws Exception { + SparkJobInfo jobInfo = jc.sc().statusTracker().getJobInfo(sparkJobId); + if (jobInfo == null) { + List> list = jc.getMonitoredJobs().get(clientJobId); + if (list != null && list.size() == 1) { + JavaFutureAction futureAction = list.get(0); + if (futureAction.isDone()) { + boolean futureSucceed = true; + try { + futureAction.get(); + } catch (Exception e) { + LOG.error("Failed to run job " + sparkJobId, e); + futureSucceed = false; + } + jobInfo = getDefaultJobInfo(sparkJobId, + futureSucceed ? JobExecutionStatus.SUCCEEDED : JobExecutionStatus.FAILED); + } + } + } + if (jobInfo == null) { + jobInfo = getDefaultJobInfo(sparkJobId, JobExecutionStatus.UNKNOWN); + } + return jobInfo; } }