diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java index 32e5530..7e33a3f 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/LocalHiveSparkClient.java @@ -137,7 +137,7 @@ public SparkJobRef execute(DriverContext driverContext, SparkWork sparkWork) thr int jobId = future.jobIds().get(0); LocalSparkJobStatus sparkJobStatus = new LocalSparkJobStatus( sc, jobId, jobMetricsListener, sparkCounters, plan.getCachedRDDIds(), future); - return new LocalSparkJobRef(Integer.toString(jobId), sparkJobStatus, sc); + return new LocalSparkJobRef(Integer.toString(jobId), hiveConf, sparkJobStatus, sc); } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 30a00a7..a4a166a 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -77,10 +77,13 @@ private transient List localJars = new ArrayList(); private transient List localFiles = new ArrayList(); + private final transient long sparkClientTimtout; + RemoteHiveSparkClient(HiveConf hiveConf, Map conf) throws IOException, SparkException { this.hiveConf = hiveConf; sparkConf = HiveSparkClientFactory.generateSparkConf(conf); remoteClient = SparkClientFactory.createClient(conf, hiveConf); + sparkClientTimtout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS); } @Override @@ -90,17 +93,14 @@ public SparkConf getSparkConf() { @Override public int getExecutorCount() throws Exception { - long timeout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS); Future handler = remoteClient.getExecutorCount(); - return handler.get(timeout, TimeUnit.SECONDS).intValue(); + return handler.get(sparkClientTimtout, TimeUnit.SECONDS).intValue(); } @Override public int getDefaultParallelism() throws Exception { - long timeout = hiveConf.getTimeVar( - HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS); Future handler = remoteClient.getDefaultParallelism(); - return handler.get(timeout, TimeUnit.SECONDS); + return handler.get(sparkClientTimtout, TimeUnit.SECONDS); } @Override @@ -119,11 +119,10 @@ public SparkJobRef execute(final DriverContext driverContext, final SparkWork sp byte[] scratchDirBytes = KryoSerializer.serialize(emptyScratchDir); byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork); - long timeout = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_CLIENT_FUTURE_TIMEOUT, TimeUnit.SECONDS); - - JobHandle jobHandle = remoteClient.submit( - new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes)); - return new RemoteSparkJobRef(jobHandle, new RemoteSparkJobStatus(remoteClient, jobHandle, timeout)); + JobStatusJob job = new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes); + JobHandle jobHandle = remoteClient.submit(job); + RemoteSparkJobStatus sparkJobStatus = new RemoteSparkJobStatus(remoteClient, jobHandle, sparkClientTimtout); + return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus); } private void refreshLocalResources(SparkWork sparkWork, HiveConf conf) { diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index a4554ac..1342afe 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -49,7 +49,7 @@ import org.apache.hadoop.hive.ql.exec.spark.session.SparkSession; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManager; import org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl; -import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor; +import org.apache.hadoop.hive.ql.exec.spark.status.LocalSparkJobMonitor; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -65,7 +65,6 @@ import org.apache.hadoop.hive.ql.plan.ReduceWork; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.plan.StatsWork; -import org.apache.hadoop.hive.ql.plan.UnionWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.stats.StatsFactory; import org.apache.hadoop.util.StringUtils; @@ -102,23 +101,20 @@ public int execute(DriverContext driverContext) { SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); + rc = jobRef.monitorJob(); SparkJobStatus sparkJobStatus = jobRef.getSparkJobStatus(); - if (sparkJobStatus != null) { - SparkJobMonitor monitor = new SparkJobMonitor(conf, sparkJobStatus); - rc = monitor.startMonitor(); + if (rc == 0) { sparkCounters = sparkJobStatus.getCounter(); - if (rc == 0 ) { - // for RSC, we should get the counters after job has finished - SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics(); - if (LOG.isInfoEnabled() && sparkStatistics != null) { - LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId())); - logSparkStatistic(sparkStatistics); - } - } else if (rc == 2) { // Cancel job if the monitor found job submission timeout. - jobRef.cancelJob(); + // for RSC, we should get the counters after job has finished + SparkStatistics sparkStatistics = sparkJobStatus.getSparkStatistics(); + if (LOG.isInfoEnabled() && sparkStatistics != null) { + LOG.info(String.format("=====Spark Job[%s] statistics=====", jobRef.getJobId())); + logSparkStatistic(sparkStatistics); } - sparkJobStatus.cleanup(); + } else if (rc == 2) { // Cancel job if the monitor found job submission timeout. + jobRef.cancelJob(); } + sparkJobStatus.cleanup(); } catch (Exception e) { String msg = "Failed to execute spark task, with exception '" + Utilities.getNameMessage(e) + "'"; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java new file mode 100644 index 0000000..a94a374 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/LocalSparkJobMonitor.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status; + +import java.util.Map; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.spark.JobExecutionStatus; + +/** + * LocalSparkJobMonitor monitor a single Spark job status in a loop until job finished/failed/killed. + * It print current job status to console and sleep current thread between monitor interval. + */ +public class LocalSparkJobMonitor extends SparkJobMonitor{ + + private SparkJobStatus sparkJobStatus; + + public LocalSparkJobMonitor(HiveConf hiveConf, SparkJobStatus sparkJobStatus) { + super(hiveConf); + this.sparkJobStatus = sparkJobStatus; + } + + public int startMonitor() { + boolean running = false; + boolean done = false; + int rc = 0; + JobExecutionStatus lastState = null; + Map lastProgressMap = null; + + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); + + long startTime = System.currentTimeMillis(); + + while (true) { + try { + JobExecutionStatus state = sparkJobStatus.getState(); + if (LOG.isDebugEnabled()) { + console.printInfo("state = " + state); + } + + if (state == null) { + long timeCount = (System.currentTimeMillis() - startTime)/1000; + if (timeCount > monitorTimeoutInteval) { + LOG.info("Job hasn't been submitted after " + timeCount + "s. Aborting it."); + console.printError("Status: " + state); + running = false; + done = true; + rc = 2; + break; + } + } else if (state != lastState || state == JobExecutionStatus.RUNNING) { + lastState = state; + Map progressMap = sparkJobStatus.getSparkStageProgress(); + + switch (state) { + case RUNNING: + if (!running) { + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); + // print job stages. + console.printInfo("\nQuery Hive on Spark job[" + + sparkJobStatus.getJobId() + "] stages:"); + for (int stageId : sparkJobStatus.getStageIds()) { + console.printInfo(Integer.toString(stageId)); + } + + console.printInfo("\nStatus: Running (Hive on Spark job[" + + sparkJobStatus.getJobId() + "])"); + running = true; + + console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: " + + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]"); + } + + printStatus(progressMap, lastProgressMap); + lastProgressMap = progressMap; + break; + case SUCCEEDED: + printStatus(progressMap, lastProgressMap); + lastProgressMap = progressMap; + double duration = (System.currentTimeMillis() - startTime) / 1000.0; + console.printInfo("Status: Finished successfully in " + + String.format("%.2f seconds", duration)); + running = false; + done = true; + break; + case FAILED: + console.printError("Status: Failed"); + running = false; + done = true; + rc = 3; + break; + case UNKNOWN: + console.printError("Status: Unknown"); + running = false; + done = true; + rc = 4; + break; + } + } + if (!done) { + Thread.sleep(checkInterval); + } + } catch (Exception e) { + String msg = " with exception '" + Utilities.getNameMessage(e) + "'"; + msg = "Failed to monitor Job[ " + sparkJobStatus.getJobId() + "]" + msg; + + // Has to use full name to make sure it does not conflict with + // org.apache.commons.lang.StringUtils + LOG.error(msg, e); + console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); + rc = 1; + done = true; + } finally { + if (done) { + break; + } + } + } + + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); + return rc; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java new file mode 100644 index 0000000..fb0498a --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.spark.status; + +import java.util.Map; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.RemoteSparkJobStatus; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hive.spark.client.JobHandle; +import org.apache.spark.JobExecutionStatus; + +/** + * RemoteSparkJobMonitor monitor a RSC remote job status in a loop until job finished/failed/killed. + * It print current job status to console and sleep current thread between monitor interval. + */ +public class RemoteSparkJobMonitor extends SparkJobMonitor { + + private RemoteSparkJobStatus sparkJobStatus; + + public RemoteSparkJobMonitor(HiveConf hiveConf, RemoteSparkJobStatus sparkJobStatus) { + super(hiveConf); + this.sparkJobStatus = sparkJobStatus; + } + + @Override + public int startMonitor() { + boolean running = false; + boolean done = false; + int rc = 0; + Map lastProgressMap = null; + + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); + perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); + + long startTime = System.currentTimeMillis(); + + while (true) { + try { + JobHandle.State state = sparkJobStatus.getRemoteJobState(); + if (LOG.isDebugEnabled()) { + console.printInfo("state = " + state); + } + + switch (state) { + case SENT: + case QUEUED: + long timeCount = (System.currentTimeMillis() - startTime) / 1000; + if ((timeCount > monitorTimeoutInteval)) { + LOG.info("Job hasn't been submitted after " + timeCount + "s. Aborting it."); + console.printError("Status: " + state); + running = false; + done = true; + rc = 2; + } + break; + case STARTED: + JobExecutionStatus sparkJobState = sparkJobStatus.getState(); + if (sparkJobState == JobExecutionStatus.RUNNING) { + Map progressMap = sparkJobStatus.getSparkStageProgress(); + if (!running) { + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); + // print job stages. + console.printInfo("\nQuery Hive on Spark job[" + + sparkJobStatus.getJobId() + "] stages:"); + for (int stageId : sparkJobStatus.getStageIds()) { + console.printInfo(Integer.toString(stageId)); + } + + console.printInfo("\nStatus: Running (Hive on Spark job[" + + sparkJobStatus.getJobId() + "])"); + running = true; + + console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: " + + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]"); + } + + printStatus(progressMap, lastProgressMap); + lastProgressMap = progressMap; + } + break; + case SUCCEEDED: + Map progressMap = sparkJobStatus.getSparkStageProgress(); + printStatus(progressMap, lastProgressMap); + lastProgressMap = progressMap; + double duration = (System.currentTimeMillis() - startTime) / 1000.0; + console.printInfo("Status: Finished successfully in " + + String.format("%.2f seconds", duration)); + running = false; + done = true; + break; + case FAILED: + console.printError("Status: Failed"); + running = false; + done = true; + rc = 3; + break; + } + + if (!done) { + Thread.sleep(checkInterval); + } + } catch (Exception e) { + String msg = " with exception '" + Utilities.getNameMessage(e) + "'"; + msg = "Failed to monitor Job[ " + sparkJobStatus.getJobId() + "]" + msg; + + // Has to use full name to make sure it does not conflict with + // org.apache.commons.lang.StringUtils + LOG.error(msg, e); + console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); + rc = 1; + done = true; + } finally { + if (done) { + break; + } + } + } + + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); + return rc; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 4f54612..b7e8142 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -15,8 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.hadoop.hive.ql.exec.spark.status; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.LocalSparkJobStatus; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.session.SessionState; + import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashSet; @@ -26,147 +34,27 @@ import java.util.TreeSet; import java.util.concurrent.TimeUnit; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.log.PerfLogger; -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.apache.spark.JobExecutionStatus; +abstract class SparkJobMonitor { -/** - * SparkJobMonitor monitor a single Spark job status in a loop until job finished/failed/killed. - * It print current job status to console and sleep current thread between monitor interval. - */ -public class SparkJobMonitor { - - private static final String CLASS_NAME = SparkJobMonitor.class.getName(); - private static final Log LOG = LogFactory.getLog(CLASS_NAME); + protected static final String CLASS_NAME = SparkJobMonitor.class.getName(); + protected static final Log LOG = LogFactory.getLog(CLASS_NAME); + protected static SessionState.LogHelper console = new SessionState.LogHelper(LOG); + protected final PerfLogger perfLogger = PerfLogger.getPerfLogger(); + protected final int checkInterval = 1000; + protected final long monitorTimeoutInteval; - private transient LogHelper console; - private final PerfLogger perfLogger = PerfLogger.getPerfLogger(); - private final long monitorTimeoutInteval; - private final int checkInterval = 1000; + private Set completed = new HashSet(); private final int printInterval = 3000; - private final HiveConf hiveConf; private long lastPrintTime; - private Set completed; - private SparkJobStatus sparkJobStatus; - - public SparkJobMonitor(HiveConf hiveConf, SparkJobStatus sparkJobStatus) { - this.sparkJobStatus = sparkJobStatus; - this.hiveConf = hiveConf; + protected SparkJobMonitor(HiveConf hiveConf) { monitorTimeoutInteval = hiveConf.getTimeVar(HiveConf.ConfVars.SPARK_JOB_MONITOR_TIMEOUT, TimeUnit.SECONDS); - console = new LogHelper(LOG); } - public int startMonitor() { - completed = new HashSet(); - - boolean running = false; - boolean done = false; - int rc = 0; - JobExecutionStatus lastState = null; - Map lastProgressMap = null; - - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); - perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); - - long startTime = System.currentTimeMillis(); - - while (true) { - try { - JobExecutionStatus state = sparkJobStatus.getState(); - if (LOG.isDebugEnabled()) { - console.printInfo("state = " + state); - } + public abstract int startMonitor(); - if (state == null) { - long timeCount = (System.currentTimeMillis() - startTime)/1000; - if (timeCount > monitorTimeoutInteval) { - LOG.info("Job hasn't been submitted after " + timeCount + "s. Aborting it."); - console.printError("Status: " + state); - running = false; - done = true; - rc = 2; - break; - } - } else if (state != lastState || state == JobExecutionStatus.RUNNING) { - lastState = state; - Map progressMap = sparkJobStatus.getSparkStageProgress(); - - switch (state) { - case RUNNING: - if (!running) { - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_TO_RUNNING); - // print job stages. - console.printInfo("\nQuery Hive on Spark job[" - + sparkJobStatus.getJobId() + "] stages:"); - for (int stageId : sparkJobStatus.getStageIds()) { - console.printInfo(Integer.toString(stageId)); - } - - console.printInfo("\nStatus: Running (Hive on Spark job[" - + sparkJobStatus.getJobId() + "])"); - running = true; - - console.printInfo("Job Progress Format\nCurrentTime StageId_StageAttemptId: " - + "SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount [StageCost]"); - } - - printStatus(progressMap, lastProgressMap); - lastProgressMap = progressMap; - break; - case SUCCEEDED: - printStatus(progressMap, lastProgressMap); - lastProgressMap = progressMap; - double duration = (System.currentTimeMillis() - startTime) / 1000.0; - console.printInfo("Status: Finished successfully in " - + String.format("%.2f seconds", duration)); - running = false; - done = true; - break; - case FAILED: - console.printError("Status: Failed"); - running = false; - done = true; - rc = 3; - break; - case UNKNOWN: - console.printError("Status: Unknown"); - running = false; - done = true; - rc = 4; - break; - } - } - if (!done) { - Thread.sleep(checkInterval); - } - } catch (Exception e) { - String msg = " with exception '" + Utilities.getNameMessage(e) + "'"; - msg = "Failed to monitor Job[ " + sparkJobStatus.getJobId() + "]" + msg; - - // Has to use full name to make sure it does not conflict with - // org.apache.commons.lang.StringUtils - LOG.error(msg, e); - console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e)); - rc = 1; - done = true; - } finally { - if (done) { - break; - } - } - } - - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_RUN_JOB); - return rc; - } - - private void printStatus(Map progressMap, - Map lastProgressMap) { + protected void printStatus(Map progressMap, + Map lastProgressMap) { // do not print duplicate status while still in middle of print interval. boolean isDuplicateState = isSameAsPreviousProgress(progressMap, lastProgressMap); @@ -217,13 +105,13 @@ private void printStatus(Map progressMap, if (failed > 0) { /* tasks finished but some failed */ reportBuffer.append( - String.format( - "%s: %d(-%d)/%d Finished with failed tasks\t", - stageName, complete, failed, total)); + String.format( + "%s: %d(-%d)/%d Finished with failed tasks\t", + stageName, complete, failed, total)); } else { if (complete == total) { reportBuffer.append( - String.format("%s: %d/%d Finished\t", stageName, complete, total)); + String.format("%s: %d/%d Finished\t", stageName, complete, total)); } else { reportBuffer.append(String.format("%s: %d/%d\t", stageName, complete, total)); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java index fe2d9f7..fcf5368 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobRef.java @@ -25,4 +25,5 @@ public boolean cancelJob(); + public int monitorJob(); } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java index f28c02b..ce4d932 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/LocalSparkJobRef.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec.spark.status.impl; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.spark.status.LocalSparkJobMonitor; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.spark.api.java.JavaSparkContext; @@ -24,11 +26,18 @@ public class LocalSparkJobRef implements SparkJobRef { private final String jobId; - private final SparkJobStatus sparkJobStatus; + private final HiveConf hiveConf; + private final LocalSparkJobStatus sparkJobStatus; private final JavaSparkContext javaSparkContext; - public LocalSparkJobRef(String jobId, SparkJobStatus sparkJobStatus, JavaSparkContext javaSparkContext) { + public LocalSparkJobRef( + String jobId, + HiveConf hiveConf, + LocalSparkJobStatus sparkJobStatus, + JavaSparkContext javaSparkContext) { + this.jobId = jobId; + this.hiveConf = hiveConf; this.sparkJobStatus = sparkJobStatus; this.javaSparkContext = javaSparkContext; } @@ -49,4 +58,10 @@ public boolean cancelJob() { javaSparkContext.sc().cancelJob(id); return true; } + + @Override + public int monitorJob() { + LocalSparkJobMonitor localSparkJobMonitor = new LocalSparkJobMonitor(hiveConf, sparkJobStatus); + return localSparkJobMonitor.startMonitor(); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java index a2707d1..4c0993c 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobRef.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.ql.exec.spark.status.impl; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.spark.status.RemoteSparkJobMonitor; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobRef; import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobStatus; import org.apache.hive.spark.client.JobHandle; @@ -26,12 +28,14 @@ public class RemoteSparkJobRef implements SparkJobRef { private final String jobId; - private final SparkJobStatus sparkJobStatus; + private final HiveConf hiveConf; + private final RemoteSparkJobStatus sparkJobStatus; private final JobHandle jobHandler; - public RemoteSparkJobRef(JobHandle jobHandler, SparkJobStatus sparkJobStatus) { + public RemoteSparkJobRef(HiveConf hiveConf, JobHandle jobHandler, RemoteSparkJobStatus sparkJobStatus) { this.jobHandler = jobHandler; this.jobId = jobHandler.getClientJobId(); + this.hiveConf = hiveConf; this.sparkJobStatus = sparkJobStatus; } @@ -49,4 +53,10 @@ public SparkJobStatus getSparkJobStatus() { public boolean cancelJob() { return jobHandler.cancel(true); } + + @Override + public int monitorJob() { + RemoteSparkJobMonitor remoteSparkJobMonitor = new RemoteSparkJobMonitor(hiveConf, sparkJobStatus); + return remoteSparkJobMonitor.startMonitor(); + } } diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index a8ac482..d4018ae 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -153,6 +153,10 @@ private SparkStageInfo getSparkStageInfo(int stageId) { } } + public JobHandle.State getRemoteJobState() { + return jobHandle.getState(); + } + private static class GetJobInfoJob implements Job { private final String clientJobId; private final int sparkJobId;