diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java index 89fa854..39af1d1 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java @@ -29,12 +29,21 @@ import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobMonitor; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.JobStateListener; +import org.apache.hadoop.hive.ql.exec.spark.status.impl.SimpleSparkJobStatus; +import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.SparkWork; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; +import org.apache.spark.FutureAction; +import org.apache.spark.SimpleFutureAction; import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.ui.jobs.JobProgressListener; import java.io.IOException; import java.io.InputStream; @@ -69,8 +78,17 @@ public static synchronized SparkClient getInstance(Configuration hiveConf) { private List localFiles = new ArrayList(); + private JobStateListener jobStateListener; + + private JobProgressListener jobProgressListener; + private SparkClient(Configuration hiveConf) { - sc = new JavaSparkContext(initiateSparkConf(hiveConf)); + SparkConf sparkConf = initiateSparkConf(hiveConf); + sc = new JavaSparkContext(sparkConf); + jobStateListener = new JobStateListener(); + jobProgressListener = new JobProgressListener(sparkConf); + sc.sc().listenerBus().addListener(jobStateListener); + sc.sc().listenerBus().addListener(jobProgressListener); } private SparkConf initiateSparkConf(Configuration hiveConf) { @@ -161,7 +179,15 @@ public int execute(DriverContext driverContext, SparkWork sparkWork) { // Execute generated plan. try { - plan.execute(); + JavaPairRDD finalRDD = plan.generateGraph(); + // We use Spark RDD async action to submit job as it's the only way to get jobId now. + FutureAction future = finalRDD.foreachAsync(HiveVoidFunction.getInstance()); + // An action may trigger multi jobs in Spark, we only monitor the latest job here + // until we found that Hive does trigger multi jobs. + SimpleSparkJobStatus sparkJobStatus = new SimpleSparkJobStatus( + (Integer) future.jobIds().last(), jobStateListener, jobProgressListener); + SparkJobMonitor monitor = new SparkJobMonitor(sparkJobStatus); + monitor.startMonitor(); } catch (Exception e) { LOG.error("Error executing Spark Plan", e); return 1; diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java index ecb311b..ddb9e93 100644 --- ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlan.java @@ -46,7 +46,7 @@ public void addInput(SparkTran tran, JavaPairRDD input) } } - public void execute() throws IllegalStateException { + public JavaPairRDD generateGraph() throws IllegalStateException { Map> tranToRDDMap = new HashMap>(); for (SparkTran tran : getAllTrans()) { @@ -77,7 +77,7 @@ public void execute() throws IllegalStateException { finalRDD = finalRDD.union(rdd); } } - finalRDD.foreach(HiveVoidFunction.getInstance()); + return finalRDD; } /** diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java new file mode 100644 index 0000000..b092abc --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.log.PerfLogger; +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; + +/** + * SparkJobMonitor monitor a single Spark job status in a loop until job finished/failed/killed. + * It print current job status to console and sleep current thread between monitor interval. + */ +public class SparkJobMonitor { + + private static final String CLASS_NAME = SparkJobMonitor.class.getName(); + private static final Log LOG = LogFactory.getLog(CLASS_NAME); + + private transient LogHelper console; + private final int checkInterval = 200; + private final int maxRetryInterval = 2500; + private final int printInterval = 3000; + private long lastPrintTime; + private Set completed; + + private SparkJobStatus sparkJobStatus; + + public SparkJobMonitor(SparkJobStatus sparkJobStatus) { + this.sparkJobStatus = sparkJobStatus; + console = new LogHelper(LOG); + } + + public int startMonitor() { + completed = new HashSet(); + + boolean running = false; + boolean done = false; + int failedCounter = 0; + int rc = 0; + SparkJobState lastState = null; + String lastReport = null; + long startTime = 0; + + while(true) { + + try { + Map progressMap = sparkJobStatus.getSparkStageProgress(); + SparkJobState state = sparkJobStatus.getState(); + + if (state != lastState || state == SparkJobState.RUNNING) { + lastState = state; + + switch(state) { + case SUBMITTED: + console.printInfo("Status: Submitted"); + break; + case INITING: + console.printInfo("Status: Initializing"); + break; + case RUNNING: + if (!running) { + // print job stages. + console.printInfo("\nQuery Hive on Spark job[" + + sparkJobStatus.getJobId() + "] stages:"); + for (int stageId : sparkJobStatus.getStageIds()) { + console.printInfo(Integer.toString(stageId)); + } + + console.printInfo("\nStatus: Running (Hive on Spark job[" + + sparkJobStatus.getJobId() + "])\n"); + startTime = System.currentTimeMillis(); + running = true; + } + + lastReport = printStatus(progressMap, lastReport, console); + break; + case SUCCEEDED: + lastReport = printStatus(progressMap, lastReport, console); + double duration = (System.currentTimeMillis() - startTime)/1000.0; + console.printInfo("Status: Finished successfully in " + + String.format("%.2f seconds", duration)); + running = false; + done = true; + break; + case KILLED: + console.printInfo("Status: Killed"); + running = false; + done = true; + rc = 1; + break; + case FAILED: + case ERROR: + console.printError("Status: Failed"); + running = false; + done = true; + rc = 2; + break; + } + } + if (!done) { + Thread.sleep(checkInterval); + } + } catch (Exception e) { + console.printInfo("Exception: "+e.getMessage()); + if (++failedCounter % maxRetryInterval/checkInterval == 0 + || e instanceof InterruptedException) { + console.printInfo("Killing Job..."); + console.printError("Execution has failed."); + rc = 1; + done = true; + } else { + console.printInfo("Retrying..."); + } + } finally { + if (done) { + break; + } + } + } + return rc; + } + + private String printStatus( + Map progressMap, + String lastReport, + LogHelper console) { + + StringBuffer reportBuffer = new StringBuffer(); + + SortedSet keys = new TreeSet(progressMap.keySet()); + for (String s: keys) { + SparkProgress progress = progressMap.get(s); + final int complete = progress.getSucceededTaskCount(); + final int total = progress.getTotalTaskCount(); + final int running = progress.getRunningTaskCount(); + final int failed = progress.getFailedTaskCount(); + if (total <= 0) { + reportBuffer.append(String.format("%s: -/-\t", s, complete, total)); + } else { + if (complete == total && !completed.contains(s)) { + completed.add(s); + } + if(complete < total && (complete > 0 || running > 0 || failed > 0)) { + /* stage is started, but not complete */ + if (failed > 0) { + reportBuffer.append( + String.format("%s: %d(+%d,-%d)/%d\t", s, complete, running, failed, total)); + } else { + reportBuffer.append(String.format("%s: %d(+%d)/%d\t", s, complete, running, total)); + } + } else { + /* stage is waiting for input/slots or complete */ + if (failed > 0) { + /* tasks finished but some failed */ + reportBuffer.append(String.format("%s: %d(-%d)/%d\t", s, complete, failed, total)); + } else { + reportBuffer.append(String.format("%s: %d/%d\t", s, complete, total)); + } + } + } + } + + String report = reportBuffer.toString(); + if (!report.equals(lastReport) + || System.currentTimeMillis() >= lastPrintTime + printInterval) { + console.printInfo(report); + lastPrintTime = System.currentTimeMillis(); + } + + return report; + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobState.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobState.java new file mode 100644 index 0000000..5f849f6 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobState.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status; + +public enum SparkJobState { + + SUBMITTED, + INITING, + RUNNING, + SUCCEEDED, + KILLED, + FAILED, + ERROR, +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java new file mode 100644 index 0000000..8717fe2 --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobStatus.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status; + +import java.util.Map; + +/** + * SparkJobStatus identify what Hive want to know about the status of a Spark job. + */ +public interface SparkJobStatus { + + public int getJobId(); + + public SparkJobState getState(); + + public SparkProgress getSparkJobProgress(); + + public int[] getStageIds(); + + public Map getSparkStageProgress(); + +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java new file mode 100644 index 0000000..36322eb --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkProgress.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status; + +public class SparkProgress { + + private int totalTaskCount; + private int succeededTaskCount; + private int runningTaskCount; + private int failedTaskCount; + private int killedTaskCount; + + public SparkProgress( + int totalTaskCount, + int succeededTaskCount, + int runningTaskCount, + int failedTaskCount, + int killedTaskCount) { + + this.totalTaskCount = totalTaskCount; + this.succeededTaskCount = succeededTaskCount; + this.runningTaskCount = runningTaskCount; + this.failedTaskCount = failedTaskCount; + this.killedTaskCount = killedTaskCount; + } + + public int getTotalTaskCount() { + return totalTaskCount; + } + + public int getSucceededTaskCount() { + return succeededTaskCount; + } + + public int getRunningTaskCount() { + return runningTaskCount; + } + + public int getFailedTaskCount() { + return failedTaskCount; + } + + public int getKilledTaskCount() { + return killedTaskCount; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof SparkProgress) { + SparkProgress other = (SparkProgress) obj; + return getTotalTaskCount() == other.getTotalTaskCount() + && getSucceededTaskCount() == other.getSucceededTaskCount() + && getRunningTaskCount() == other.getRunningTaskCount() + && getFailedTaskCount() == other.getFailedTaskCount() + && getKilledTaskCount() == other.getKilledTaskCount(); + } + return false; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("TotalTasks: "); + sb.append(getTotalTaskCount()); + sb.append(" Succeeded: "); + sb.append(getSucceededTaskCount()); + sb.append(" Running: "); + sb.append(getRunningTaskCount()); + sb.append(" Failed: "); + sb.append(getFailedTaskCount()); + sb.append(" Killed: "); + sb.append(getKilledTaskCount()); + return sb.toString(); + } +} diff --git ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java new file mode 100644 index 0000000..b4f753f --- /dev/null +++ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/JobStateListener.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.spark.status.impl; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.ql.exec.spark.status.SparkJobState; +import org.apache.spark.scheduler.JobSucceeded; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerApplicationStart; +import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; +import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; +import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerStageCompleted; +import org.apache.spark.scheduler.SparkListenerStageSubmitted; +import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.scheduler.SparkListenerTaskGettingResult; +import org.apache.spark.scheduler.SparkListenerTaskStart; +import org.apache.spark.scheduler.SparkListenerUnpersistRDD; + +import scala.collection.JavaConversions; + +public class JobStateListener implements SparkListener { + + private Map jobIdToStates = new HashMap(); + private Map jobIdToStageId = new HashMap(); + + @Override + public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { + + } + + @Override + public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { + + } + + @Override + public void onTaskStart(SparkListenerTaskStart taskStart) { + + } + + @Override + public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { + + } + + @Override + public void onTaskEnd(SparkListenerTaskEnd taskEnd) { + + } + + @Override + public synchronized void onJobStart(SparkListenerJobStart jobStart) { + jobIdToStates.put(jobStart.jobId(), SparkJobState.RUNNING); + List ids = JavaConversions.asJavaList(jobStart.stageIds()); + int[] intStageIds = new int[ids.size()]; + for(int i=0; i stageProgresses = getSparkStageProgress(); + + int totalTaskCount = 0; + int runningTaskCount = 0; + int completedTaskCount = 0; + int failedTaskCount = 0; + int killedTaskCount = 0; + + for (SparkProgress sparkProgress : stageProgresses.values()) { + totalTaskCount += sparkProgress.getTotalTaskCount(); + runningTaskCount += sparkProgress.getRunningTaskCount(); + completedTaskCount += sparkProgress.getSucceededTaskCount(); + failedTaskCount += sparkProgress.getFailedTaskCount(); + killedTaskCount += sparkProgress.getKilledTaskCount(); + } + + return new SparkProgress( + totalTaskCount, completedTaskCount, runningTaskCount, failedTaskCount, killedTaskCount); + } + + @Override + public int[] getStageIds() { + return jobStateListener.getStageIds(jobId); + } + + @Override + public Map getSparkStageProgress() { + Map stageProgresses = new HashMap(); + int[] stageIds = jobStateListener.getStageIds(jobId); + if (stageIds != null) { + for (int stageId : stageIds) { + List stageInfos = getStageInfo(stageId); + for (StageInfo stageInfo : stageInfos) { + Tuple2 tuple2 = new Tuple2(stageInfo.stageId(), + stageInfo.attemptId()); + UIData.StageUIData uiData = jobProgressListener.stageIdToData().get(tuple2).get(); + if (uiData != null) { + int runningTaskCount = uiData.numActiveTasks(); + int completedTaskCount = uiData.numCompleteTasks(); + int failedTaskCount = uiData.numFailedTasks(); + int totalTaskCount = stageInfo.numTasks(); + int killedTaskCount = 0; + SparkProgress stageProgress = new SparkProgress( + totalTaskCount, + completedTaskCount, + runningTaskCount, + failedTaskCount, + killedTaskCount); + stageProgresses.put(stageInfo.stageId() + "_" + stageInfo.attemptId(), stageProgress); + } + } + } + } + return stageProgresses; + } + + private List getStageInfo(int stageId) { + List stageInfos = new LinkedList(); + + Map activeStages = mutableMapAsJavaMap(jobProgressListener.activeStages()); + List completedStages = bufferAsJavaList(jobProgressListener.completedStages()); + List failedStages = bufferAsJavaList(jobProgressListener.failedStages()); + + if (activeStages.containsKey(stageId)) { + stageInfos.add(activeStages.get(stageId)); + } else { + for (StageInfo stageInfo : completedStages) { + if (stageInfo.stageId() == stageId) { + stageInfos.add(stageInfo); + } + } + + for (StageInfo stageInfo : failedStages) { + if (stageInfo.stageId() == stageId) { + stageInfos.add(stageInfo); + } + } + } + + return stageInfos; + } +}